| @@ -8,22 +8,23 @@ nodes: | |||
| build: cargo build -p rust-dataflow-example-node | |||
| source: ../../target/debug/rust-dataflow-example-node | |||
| inputs: | |||
| tick: dora/timer/millis/300 | |||
| tick: dora/timer/millis/3 | |||
| outputs: | |||
| - random | |||
| - id: runtime-node | |||
| operators: | |||
| - id: rust-operator | |||
| build: cargo build -p rust-dataflow-example-operator | |||
| shared-library: ../../target/debug/rust_dataflow_example_operator | |||
| inputs: | |||
| tick: dora/timer/millis/100 | |||
| random: rust-node/random | |||
| outputs: | |||
| - status | |||
| # - id: runtime-node | |||
| # operators: | |||
| # - id: rust-operator | |||
| # build: cargo build -p rust-dataflow-example-operator | |||
| # shared-library: ../../target/debug/rust_dataflow_example_operator | |||
| # inputs: | |||
| # tick: dora/timer/millis/100 | |||
| # random: rust-node/random | |||
| # outputs: | |||
| # - status | |||
| - id: rust-sink | |||
| custom: | |||
| build: cargo build -p rust-dataflow-example-sink | |||
| source: ../../target/debug/rust-dataflow-example-sink | |||
| inputs: | |||
| message: runtime-node/rust-operator/status | |||
| # message: runtime-node/rust-operator/status | |||
| message: rust-node/random | |||
| @@ -1,29 +1,28 @@ | |||
| use dora_node_api::{ | |||
| self, | |||
| dora_core::{config::DataId, daemon_messages::NodeEvent}, | |||
| DoraNode, | |||
| }; | |||
| use dora_node_api::{self, daemon::Event, dora_core::config::DataId, DoraNode}; | |||
| fn main() -> eyre::Result<()> { | |||
| println!("hello"); | |||
| let output = DataId::from("random".to_owned()); | |||
| let (mut node, events) = DoraNode::init_from_env()?; | |||
| let (mut node, mut events) = DoraNode::init_from_env()?; | |||
| for _ in 0..20 { | |||
| for i in 0..100 { | |||
| let event = match events.recv() { | |||
| Ok(input) => input, | |||
| Err(_) => break, | |||
| Some(input) => input, | |||
| None => break, | |||
| }; | |||
| match event { | |||
| NodeEvent::Stop => break, | |||
| NodeEvent::Input { | |||
| Event::Stop => break, | |||
| Event::Input { | |||
| id, | |||
| metadata, | |||
| data: _, | |||
| } => match id.as_str() { | |||
| "tick" => { | |||
| let random: u64 = rand::random(); | |||
| println!("tick {i}, sending {random:#x}"); | |||
| let data: &[u8] = &random.to_le_bytes(); | |||
| node.send_output(output.clone(), metadata.parameters, data.len(), |out| { | |||
| out.copy_from_slice(data); | |||
| @@ -1,32 +1,26 @@ | |||
| use dora_node_api::{self, dora_core::daemon_messages::NodeEvent, DoraNode}; | |||
| use eyre::{bail, Context, ContextCompat}; | |||
| use dora_node_api::{self, daemon::Event, DoraNode}; | |||
| use eyre::ContextCompat; | |||
| fn main() -> eyre::Result<()> { | |||
| let (_node, events) = DoraNode::init_from_env()?; | |||
| let (_node, mut events) = DoraNode::init_from_env()?; | |||
| while let Ok(event) = events.recv() { | |||
| while let Some(event) = events.recv() { | |||
| match event { | |||
| NodeEvent::Stop => break, | |||
| NodeEvent::Input { | |||
| Event::Stop => break, | |||
| Event::Input { | |||
| id, | |||
| metadata: _, | |||
| data, | |||
| } => match id.as_str() { | |||
| "message" => { | |||
| let data = data.wrap_err("no data")?.map()?; | |||
| let received_string = std::str::from_utf8(&data) | |||
| .wrap_err("received message was not utf8-encoded")?; | |||
| println!("received message: {}", received_string); | |||
| if !received_string.starts_with("operator received random value ") { | |||
| bail!("unexpected message format (should start with 'operator received random value')") | |||
| } | |||
| if !received_string.ends_with(" ticks") { | |||
| bail!("unexpected message format (should end with 'ticks')") | |||
| } | |||
| let data = data.wrap_err("no data")?; | |||
| let raw = (&data[..]).try_into().unwrap(); | |||
| println!("received data: {:#x}", u64::from_le_bytes(raw)); | |||
| } | |||
| other => eprintln!("Ignoring unexpected input `{other}`"), | |||
| }, | |||
| NodeEvent::InputClosed { id } => { | |||
| Event::InputClosed { id } => { | |||
| println!("Input `{id}` was closed -> exiting"); | |||
| break; | |||
| } | |||