diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml index 78b8a6cb..eb5e5995 100644 --- a/examples/rust-dataflow/dataflow.yml +++ b/examples/rust-dataflow/dataflow.yml @@ -8,22 +8,23 @@ nodes: build: cargo build -p rust-dataflow-example-node source: ../../target/debug/rust-dataflow-example-node inputs: - tick: dora/timer/millis/300 + tick: dora/timer/millis/3 outputs: - random - - id: runtime-node - operators: - - id: rust-operator - build: cargo build -p rust-dataflow-example-operator - shared-library: ../../target/debug/rust_dataflow_example_operator - inputs: - tick: dora/timer/millis/100 - random: rust-node/random - outputs: - - status + # - id: runtime-node + # operators: + # - id: rust-operator + # build: cargo build -p rust-dataflow-example-operator + # shared-library: ../../target/debug/rust_dataflow_example_operator + # inputs: + # tick: dora/timer/millis/100 + # random: rust-node/random + # outputs: + # - status - id: rust-sink custom: build: cargo build -p rust-dataflow-example-sink source: ../../target/debug/rust-dataflow-example-sink inputs: - message: runtime-node/rust-operator/status + # message: runtime-node/rust-operator/status + message: rust-node/random diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index 442932e7..17f635b0 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -1,29 +1,28 @@ -use dora_node_api::{ - self, - dora_core::{config::DataId, daemon_messages::NodeEvent}, - DoraNode, -}; +use dora_node_api::{self, daemon::Event, dora_core::config::DataId, DoraNode}; fn main() -> eyre::Result<()> { + println!("hello"); + let output = DataId::from("random".to_owned()); - let (mut node, events) = DoraNode::init_from_env()?; + let (mut node, mut events) = DoraNode::init_from_env()?; - for _ in 0..20 { + for i in 0..100 { let event = match events.recv() { - Ok(input) => input, - Err(_) => break, + Some(input) => input, + None => break, }; match event { - NodeEvent::Stop => break, - NodeEvent::Input { + Event::Stop => break, + Event::Input { id, metadata, data: _, } => match id.as_str() { "tick" => { let random: u64 = rand::random(); + println!("tick {i}, sending {random:#x}"); let data: &[u8] = &random.to_le_bytes(); node.send_output(output.clone(), metadata.parameters, data.len(), |out| { out.copy_from_slice(data); diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index 57e1f026..20a0931f 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -1,32 +1,26 @@ -use dora_node_api::{self, dora_core::daemon_messages::NodeEvent, DoraNode}; -use eyre::{bail, Context, ContextCompat}; +use dora_node_api::{self, daemon::Event, DoraNode}; +use eyre::ContextCompat; fn main() -> eyre::Result<()> { - let (_node, events) = DoraNode::init_from_env()?; + let (_node, mut events) = DoraNode::init_from_env()?; - while let Ok(event) = events.recv() { + while let Some(event) = events.recv() { match event { - NodeEvent::Stop => break, - NodeEvent::Input { + Event::Stop => break, + Event::Input { id, metadata: _, data, } => match id.as_str() { "message" => { - let data = data.wrap_err("no data")?.map()?; - let received_string = std::str::from_utf8(&data) - .wrap_err("received message was not utf8-encoded")?; - println!("received message: {}", received_string); - if !received_string.starts_with("operator received random value ") { - bail!("unexpected message format (should start with 'operator received random value')") - } - if !received_string.ends_with(" ticks") { - bail!("unexpected message format (should end with 'ticks')") - } + let data = data.wrap_err("no data")?; + let raw = (&data[..]).try_into().unwrap(); + + println!("received data: {:#x}", u64::from_le_bytes(raw)); } other => eprintln!("Ignoring unexpected input `{other}`"), }, - NodeEvent::InputClosed { id } => { + Event::InputClosed { id } => { println!("Input `{id}` was closed -> exiting"); break; }