diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index 7e8f92fb..ce7408df 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -1,32 +1,36 @@ -use dora_node_api::{self, dora_core::config::DataId, DoraNode}; +use dora_node_api::{ + self, + dora_core::{config::DataId, daemon_messages::NodeEvent}, + DoraNode, +}; fn main() -> eyre::Result<()> { let output = DataId::from("random".to_owned()); - let mut operator = DoraNode::init_from_env()?; - - let inputs = operator.inputs()?; + let (mut node, events) = DoraNode::init_from_env()?; for _ in 0..20 { - let input = match inputs.recv() { + let event = match events.recv() { Ok(input) => input, Err(_) => break, }; - match input.id.as_str() { - "tick" => { - let random: u64 = rand::random(); - let data: &[u8] = &random.to_le_bytes(); - operator.send_output( - &output, - input.metadata().parameters.clone(), - data.len(), - |out| { + match event { + NodeEvent::Stop => break, + NodeEvent::Input { + id, + metadata, + data: _, + } => match id.as_str() { + "tick" => { + let random: u64 = rand::random(); + let data: &[u8] = &random.to_le_bytes(); + node.send_output(output.clone(), metadata.parameters, data.len(), |out| { out.copy_from_slice(data); - }, - )?; - } - other => eprintln!("Ignoring unexpected input `{other}`"), + })?; + } + other => eprintln!("Ignoring unexpected input `{other}`"), + }, } } diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index f9c932a5..16091882 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -1,26 +1,30 @@ -use dora_node_api::{self, DoraNode}; +use dora_node_api::{self, dora_core::daemon_messages::NodeEvent, DoraNode}; use eyre::{bail, Context}; fn main() -> eyre::Result<()> { - let mut operator = DoraNode::init_from_env()?; + let (_node, events) = DoraNode::init_from_env()?; - let inputs = operator.inputs()?; - - while let Ok(input) = inputs.recv() { - match input.id.as_str() { - "message" => { - let data = input.data(); - let received_string = - std::str::from_utf8(&data).wrap_err("received message was not utf8-encoded")?; - println!("received message: {}", received_string); - if !received_string.starts_with("operator received random value ") { - bail!("unexpected message format (should start with 'operator received random value')") - } - if !received_string.ends_with(" ticks") { - bail!("unexpected message format (should end with 'ticks')") + while let Ok(event) = events.recv() { + match event { + NodeEvent::Stop => break, + NodeEvent::Input { + id, + metadata: _, + data, + } => match id.as_str() { + "message" => { + let received_string = std::str::from_utf8(data.get()) + .wrap_err("received message was not utf8-encoded")?; + println!("received message: {}", received_string); + if !received_string.starts_with("operator received random value ") { + bail!("unexpected message format (should start with 'operator received random value')") + } + if !received_string.ends_with(" ticks") { + bail!("unexpected message format (should end with 'ticks')") + } } - } - other => eprintln!("Ignoring unexpected input `{other}`"), + other => eprintln!("Ignoring unexpected input `{other}`"), + }, } }