From 2cc35cc117c8a6378cc84d692dfed70748805509 Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Tue, 30 Apr 2024 09:06:54 -0500 Subject: [PATCH] convert rust-dataflow example to node api Minimal conversion from previous operator api to node api. Ref #474 --- Cargo.lock | 6 ++- examples/rust-dataflow/dataflow.yml | 11 ++--- examples/rust-dataflow/operator/Cargo.toml | 8 ++-- examples/rust-dataflow/operator/src/lib.rs | 52 --------------------- examples/rust-dataflow/operator/src/main.rs | 47 +++++++++++++++++++ 5 files changed, 62 insertions(+), 62 deletions(-) delete mode 100644 examples/rust-dataflow/operator/src/lib.rs create mode 100644 examples/rust-dataflow/operator/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 918c83fe..8da13f2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7878,7 +7878,11 @@ dependencies = [ name = "rust-dataflow-example-operator" version = "0.3.3" dependencies = [ - "dora-operator-api", + "dora-node-api", + "eyre", + "futures", + "rand", + "tokio", ] [[package]] diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml index 92d60734..f937a49b 100644 --- a/examples/rust-dataflow/dataflow.yml +++ b/examples/rust-dataflow/dataflow.yml @@ -7,11 +7,10 @@ nodes: tick: dora/timer/millis/10 outputs: - random - - id: runtime-node - operators: - - id: rust-operator + - id: rust-operator + custom: build: cargo build -p rust-dataflow-example-operator - shared-library: ../../target/debug/rust_dataflow_example_operator + source: ../../target/debug/rust-dataflow-example-operator inputs: tick: dora/timer/millis/100 random: rust-node/random @@ -22,11 +21,11 @@ nodes: build: cargo build -p rust-dataflow-example-sink source: ../../target/debug/rust-dataflow-example-sink inputs: - message: runtime-node/rust-operator/status + message: rust-operator/status - id: dora-record custom: build: cargo build -p dora-record source: ../../target/debug/dora-record inputs: - message: runtime-node/rust-operator/status + message: rust-operator/status random: rust-node/random \ No newline at end of file diff --git a/examples/rust-dataflow/operator/Cargo.toml b/examples/rust-dataflow/operator/Cargo.toml index 2c4ee3f9..d57b79d0 100644 --- a/examples/rust-dataflow/operator/Cargo.toml +++ b/examples/rust-dataflow/operator/Cargo.toml @@ -6,8 +6,10 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[lib] -crate-type = ["cdylib"] [dependencies] -dora-operator-api = { workspace = true } +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" +futures = "0.3.21" +rand = "0.8.5" +tokio = { version = "1.24.2", features = ["rt", "macros"] } diff --git a/examples/rust-dataflow/operator/src/lib.rs b/examples/rust-dataflow/operator/src/lib.rs deleted file mode 100644 index 5a131a15..00000000 --- a/examples/rust-dataflow/operator/src/lib.rs +++ /dev/null @@ -1,52 +0,0 @@ -#![warn(unsafe_op_in_unsafe_fn)] - -use dora_operator_api::{ - register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, -}; - -register_operator!(ExampleOperator); - -#[derive(Debug, Default)] -struct ExampleOperator { - ticks: usize, -} - -impl DoraOperator for ExampleOperator { - fn on_event( - &mut self, - event: &Event, - output_sender: &mut DoraOutputSender, - ) -> Result { - match event { - Event::Input { id, data } => match *id { - "tick" => { - self.ticks += 1; - } - "random" => { - let value = u64::try_from(data) - .map_err(|err| format!("unexpected data type: {err}"))?; - - let output = format!( - "operator received random value {value:#x} after {} ticks", - self.ticks - ); - output_sender.send("status".into(), output.into_arrow())?; - } - other => eprintln!("ignoring unexpected input {other}"), - }, - Event::Stop => {} - Event::InputClosed { id } => { - println!("input `{id}` was closed"); - if *id == "random" { - println!("`random` input was closed -> exiting"); - return Ok(DoraStatus::Stop); - } - } - other => { - println!("received unknown event {other:?}"); - } - } - - Ok(DoraStatus::Continue) - } -} diff --git a/examples/rust-dataflow/operator/src/main.rs b/examples/rust-dataflow/operator/src/main.rs new file mode 100644 index 00000000..5eceb244 --- /dev/null +++ b/examples/rust-dataflow/operator/src/main.rs @@ -0,0 +1,47 @@ +use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event, IntoArrow}; +use eyre::Context; + +fn main() -> eyre::Result<()> { + println!("hello"); + + let status_output = DataId::from("status".to_owned()); + let (mut node, mut events) = DoraNode::init_from_env()?; + + let mut ticks = 0; + while let Some(event) = events.recv() { + match event { + Event::Input { id, metadata, data } => match id.as_ref() { + "tick" => { + ticks += 1; + } + "random" => { + let value = u64::try_from(&data).context("unexpected data type")?; + + let output = format!( + "operator received random value {value:#x} after {} ticks", + ticks + ); + node.send_output( + status_output.clone(), + metadata.parameters, + output.into_arrow(), + )?; + } + other => eprintln!("ignoring unexpected input {other}"), + }, + Event::Stop => {} // TODO: should we stop the node here? + Event::InputClosed { id } => { + println!("input `{id}` was closed"); + if *id == "random" { + println!("`random` input was closed -> exiting"); + break; + } + } + other => { + println!("received unknown event {other:?}"); + } + } + } + + Ok(()) +}