diff --git a/Cargo.lock b/Cargo.lock index 8d6586c4..55caa521 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -798,13 +798,6 @@ version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" -[[package]] -name = "example-operator" -version = "0.1.0" -dependencies = [ - "dora-operator-api", -] - [[package]] name = "eyre" version = "0.6.8" @@ -2279,6 +2272,34 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rust-dataflow-example-node" +version = "0.1.0" +dependencies = [ + "dora-node-api", + "eyre", + "futures", + "rand", + "tokio", +] + +[[package]] +name = "rust-dataflow-example-operator" +version = "0.1.0" +dependencies = [ + "dora-operator-api", +] + +[[package]] +name = "rust-dataflow-example-sink" +version = "0.1.0" +dependencies = [ + "dora-node-api", + "eyre", + "futures", + "tokio", +] + [[package]] name = "rustc_version" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index a11b9489..2a656c60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,9 @@ members = [ "apis/rust/*", "apis/rust/operator/macros", "binaries/*", - "examples/example-operator", + "examples/rust-dataflow/operator", + "examples/rust-dataflow/node", + "examples/rust-dataflow/sink", "libraries/core", "libraries/extensions/message", "libraries/extensions/telemetry/*", @@ -27,3 +29,7 @@ dora-coordinator = { path = "binaries/coordinator" } [[example]] name = "c-dataflow" path = "examples/c-dataflow/run.rs" + +[[example]] +name = "rust-dataflow" +path = "examples/rust-dataflow/run.rs" diff --git a/apis/rust/operator/src/lib.rs b/apis/rust/operator/src/lib.rs index efcf6ac4..0284f9ea 100644 --- a/apis/rust/operator/src/lib.rs +++ b/apis/rust/operator/src/lib.rs @@ -30,7 +30,6 @@ pub struct DoraOutputSender { impl DoraOutputSender { pub fn send(&mut self, id: &str, data: &[u8]) -> Result<(), isize> { - println!("operator sending output.."); let result = unsafe { (self.output_fn_raw)( id.as_ptr(), diff --git a/binaries/coordinator/examples/nodes/rust/sink_logger.rs b/binaries/coordinator/examples/nodes/rust/sink_logger.rs deleted file mode 100644 index cb499ba1..00000000 --- a/binaries/coordinator/examples/nodes/rust/sink_logger.rs +++ /dev/null @@ -1,55 +0,0 @@ -use dora_node_api::{self, DoraNode}; -use eyre::bail; -use futures::StreamExt; -use std::time::Duration; - -#[tokio::main] -async fn main() -> eyre::Result<()> { - let operator = DoraNode::init_from_env().await?; - - let mut inputs = operator.inputs().await?; - - let mut last_timestamp = None; - - loop { - let timeout = Duration::from_secs(5); - let input = match tokio::time::timeout(timeout, inputs.next()).await { - Ok(Some(input)) => input, - Ok(None) => break, - Err(_) => bail!("timeout while waiting for input"), - }; - - match input.id.as_str() { - "time" => { - // only record it, but don't print anything - last_timestamp = Some(String::from_utf8_lossy(&input.data).into_owned()); - } - "random" => { - let number = match input.data.try_into() { - Ok(bytes) => u64::from_le_bytes(bytes), - Err(_) => { - eprintln!("Malformed `random` message"); - continue; - } - }; - if let Some(timestamp) = &last_timestamp { - println!("random at {}: {}", timestamp, number); - } - } - "timestamped-random" => { - let data = String::from_utf8(input.data)?; - println!("received timestamped random value: {data}"); - } - "c-counter" => { - println!("received C counter value: {:?}", input.data); - } - "python-counter" => { - println!("received PYTHON counter value: {:?}", input.data); - } - - other => eprintln!("Ignoring unexpected input `{other}`"), - } - } - - Ok(()) -} diff --git a/examples/example-operator/src/lib.rs b/examples/example-operator/src/lib.rs deleted file mode 100644 index e0b1a4ba..00000000 --- a/examples/example-operator/src/lib.rs +++ /dev/null @@ -1,40 +0,0 @@ -#![warn(unsafe_op_in_unsafe_fn)] - -use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus}; - -register_operator!(ExampleOperator); - -#[derive(Debug, Default)] -struct ExampleOperator { - time: Option, -} - -impl DoraOperator for ExampleOperator { - fn on_input( - &mut self, - id: &str, - data: &[u8], - output_sender: &mut DoraOutputSender, - ) -> Result { - match id { - "time" => { - let parsed = std::str::from_utf8(data).map_err(|_| ())?; - self.time = Some(parsed.to_owned()); - } - "random" => { - let parsed = { - let data: [u8; 8] = data.try_into().map_err(|_| ())?; - u64::from_le_bytes(data) - }; - if let Some(time) = &self.time { - let output = format!("state operator random value {parsed} at {time}"); - output_sender - .send("timestamped-random", output.as_bytes()) - .map_err(|_| ())?; - } - } - other => eprintln!("ignoring unexpected input {other}"), - } - Ok(DoraStatus::Continue) - } -} diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml new file mode 100644 index 00000000..f53131fd --- /dev/null +++ b/examples/rust-dataflow/dataflow.yml @@ -0,0 +1,26 @@ +communication: + zenoh: + prefix: /example-rust-dataflow + +nodes: + - id: rust-node + custom: + run: ../../target/release/rust-dataflow-example-node + inputs: + tick: dora/timer/millis/300 + outputs: + - random + - id: runtime-node + operators: + - id: rust-operator + shared-library: ../../target/release/librust_dataflow_example_operator.so + inputs: + tick: dora/timer/millis/100 + random: rust-node/random + outputs: + - status + - id: rust-sink + custom: + run: ../../target/release/rust-dataflow-example-sink + inputs: + message: runtime-node/rust-operator/status diff --git a/examples/rust-dataflow/node/Cargo.toml b/examples/rust-dataflow/node/Cargo.toml new file mode 100644 index 00000000..d7bcd656 --- /dev/null +++ b/examples/rust-dataflow/node/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "rust-dataflow-example-node" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +eyre = "0.6.8" +futures = "0.3.21" +rand = "0.8.5" +tokio = { version = "1.20.1", features = ["rt", "macros"] } diff --git a/binaries/coordinator/examples/nodes/rust/random_number.rs b/examples/rust-dataflow/node/src/main.rs similarity index 89% rename from binaries/coordinator/examples/nodes/rust/random_number.rs rename to examples/rust-dataflow/node/src/main.rs index 5d0f97a1..edb349f7 100644 --- a/binaries/coordinator/examples/nodes/rust/random_number.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -5,13 +5,13 @@ use std::time::Duration; #[tokio::main] async fn main() -> eyre::Result<()> { - let output = DataId::from("number".to_owned()); + let output = DataId::from("random".to_owned()); let operator = DoraNode::init_from_env().await?; let mut inputs = operator.inputs().await?; - loop { + for _ in 0..20 { let timeout = Duration::from_secs(3); let input = match tokio::time::timeout(timeout, inputs.next()).await { Ok(Some(input)) => input, @@ -20,7 +20,7 @@ async fn main() -> eyre::Result<()> { }; match input.id.as_str() { - "timestamp" => { + "tick" => { let random: u64 = rand::random(); operator.send_output(&output, &random.to_le_bytes()).await?; } diff --git a/examples/example-operator/Cargo.toml b/examples/rust-dataflow/operator/Cargo.toml similarity index 67% rename from examples/example-operator/Cargo.toml rename to examples/rust-dataflow/operator/Cargo.toml index 3c6d872a..edcac2b8 100644 --- a/examples/example-operator/Cargo.toml +++ b/examples/rust-dataflow/operator/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "example-operator" +name = "rust-dataflow-example-operator" version = "0.1.0" edition = "2021" license = "Apache-2.0" @@ -10,4 +10,4 @@ license = "Apache-2.0" crate-type = ["cdylib"] [dependencies] -dora-operator-api = { path = "../../apis/rust/operator" } +dora-operator-api = { path = "../../../apis/rust/operator" } diff --git a/examples/rust-dataflow/operator/src/lib.rs b/examples/rust-dataflow/operator/src/lib.rs new file mode 100644 index 00000000..211b2cc3 --- /dev/null +++ b/examples/rust-dataflow/operator/src/lib.rs @@ -0,0 +1,49 @@ +#![warn(unsafe_op_in_unsafe_fn)] + +use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus}; +use std::time::{Duration, Instant}; + +register_operator!(ExampleOperator); + +#[derive(Debug, Default)] +struct ExampleOperator { + ticks: usize, + last_random_at: Option, +} + +impl DoraOperator for ExampleOperator { + fn on_input( + &mut self, + id: &str, + data: &[u8], + output_sender: &mut DoraOutputSender, + ) -> Result { + match id { + "tick" => { + self.ticks += 1; + } + "random" => { + let parsed = { + let data: [u8; 8] = data.try_into().map_err(|_| ())?; + u64::from_le_bytes(data) + }; + let output = format!( + "operator received random value {parsed} after {} ticks", + self.ticks + ); + output_sender + .send("status", output.as_bytes()) + .map_err(|_| ())?; + self.last_random_at = Some(Instant::now()); + } + other => eprintln!("ignoring unexpected input {other}"), + } + if let Some(last_random_at) = self.last_random_at { + if last_random_at.elapsed() > Duration::from_secs(1) { + // looks like the node sending the random values finished -> exit too + return Ok(DoraStatus::Stop); + } + } + Ok(DoraStatus::Continue) + } +} diff --git a/examples/rust-dataflow/run.rs b/examples/rust-dataflow/run.rs new file mode 100644 index 00000000..1f24373e --- /dev/null +++ b/examples/rust-dataflow/run.rs @@ -0,0 +1,33 @@ +use eyre::{bail, Context}; +use std::path::Path; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + build_package("rust-dataflow-example-node").await?; + build_package("rust-dataflow-example-operator").await?; + build_package("rust-dataflow-example-sink").await?; + build_package("dora-runtime").await?; + + dora_coordinator::run(dora_coordinator::Command::Run { + dataflow: Path::new("dataflow.yml").to_owned(), + runtime: Some(root.join("target").join("release").join("dora-runtime")), + }) + .await?; + + Ok(()) +} + +async fn build_package(package: &str) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("build").arg("--release"); + cmd.arg("--package").arg(package); + if !cmd.status().await?.success() { + bail!("failed to build {package}"); + }; + Ok(()) +} diff --git a/examples/rust-dataflow/sink/Cargo.toml b/examples/rust-dataflow/sink/Cargo.toml new file mode 100644 index 00000000..284ed20a --- /dev/null +++ b/examples/rust-dataflow/sink/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "rust-dataflow-example-sink" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +eyre = "0.6.8" +futures = "0.3.21" +tokio = { version = "1.20.1", features = ["macros"] } diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs new file mode 100644 index 00000000..6c827a25 --- /dev/null +++ b/examples/rust-dataflow/sink/src/main.rs @@ -0,0 +1,29 @@ +use dora_node_api::{self, DoraNode}; +use eyre::bail; +use futures::StreamExt; +use std::time::Duration; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let operator = DoraNode::init_from_env().await?; + + let mut inputs = operator.inputs().await?; + + loop { + let timeout = Duration::from_secs(5); + let input = match tokio::time::timeout(timeout, inputs.next()).await { + Ok(Some(input)) => input, + Ok(None) => break, + Err(_) => bail!("timeout while waiting for input"), + }; + + match input.id.as_str() { + "message" => { + println!("received message: {}", String::from_utf8_lossy(&input.data)); + } + other => eprintln!("Ignoring unexpected input `{other}`"), + } + } + + Ok(()) +}