Browse Source

convert rust-dataflow example to node api

Minimal conversion from previous operator api to node api.

Ref #474
tags/v0.3.4-rc1
Michael-J-Ward 1 year ago
parent
commit
2cc35cc117
5 changed files with 62 additions and 62 deletions
  1. +5
    -1
      Cargo.lock
  2. +5
    -6
      examples/rust-dataflow/dataflow.yml
  3. +5
    -3
      examples/rust-dataflow/operator/Cargo.toml
  4. +0
    -52
      examples/rust-dataflow/operator/src/lib.rs
  5. +47
    -0
      examples/rust-dataflow/operator/src/main.rs

+ 5
- 1
Cargo.lock View File

@@ -7878,7 +7878,11 @@ dependencies = [
name = "rust-dataflow-example-operator"
version = "0.3.3"
dependencies = [
"dora-operator-api",
"dora-node-api",
"eyre",
"futures",
"rand",
"tokio",
]

[[package]]


+ 5
- 6
examples/rust-dataflow/dataflow.yml View File

@@ -7,11 +7,10 @@ nodes:
tick: dora/timer/millis/10
outputs:
- random
- id: runtime-node
operators:
- id: rust-operator
- id: rust-operator
custom:
build: cargo build -p rust-dataflow-example-operator
shared-library: ../../target/debug/rust_dataflow_example_operator
source: ../../target/debug/rust-dataflow-example-operator
inputs:
tick: dora/timer/millis/100
random: rust-node/random
@@ -22,11 +21,11 @@ nodes:
build: cargo build -p rust-dataflow-example-sink
source: ../../target/debug/rust-dataflow-example-sink
inputs:
message: runtime-node/rust-operator/status
message: rust-operator/status
- id: dora-record
custom:
build: cargo build -p dora-record
source: ../../target/debug/dora-record
inputs:
message: runtime-node/rust-operator/status
message: rust-operator/status
random: rust-node/random

+ 5
- 3
examples/rust-dataflow/operator/Cargo.toml View File

@@ -6,8 +6,10 @@ license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
crate-type = ["cdylib"]

[dependencies]
dora-operator-api = { workspace = true }
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.24.2", features = ["rt", "macros"] }

+ 0
- 52
examples/rust-dataflow/operator/src/lib.rs View File

@@ -1,52 +0,0 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{
register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow,
};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
ticks: usize,
}

impl DoraOperator for ExampleOperator {
fn on_event(
&mut self,
event: &Event,
output_sender: &mut DoraOutputSender,
) -> Result<DoraStatus, String> {
match event {
Event::Input { id, data } => match *id {
"tick" => {
self.ticks += 1;
}
"random" => {
let value = u64::try_from(data)
.map_err(|err| format!("unexpected data type: {err}"))?;

let output = format!(
"operator received random value {value:#x} after {} ticks",
self.ticks
);
output_sender.send("status".into(), output.into_arrow())?;
}
other => eprintln!("ignoring unexpected input {other}"),
},
Event::Stop => {}
Event::InputClosed { id } => {
println!("input `{id}` was closed");
if *id == "random" {
println!("`random` input was closed -> exiting");
return Ok(DoraStatus::Stop);
}
}
other => {
println!("received unknown event {other:?}");
}
}

Ok(DoraStatus::Continue)
}
}

+ 47
- 0
examples/rust-dataflow/operator/src/main.rs View File

@@ -0,0 +1,47 @@
use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event, IntoArrow};
use eyre::Context;

fn main() -> eyre::Result<()> {
println!("hello");

let status_output = DataId::from("status".to_owned());
let (mut node, mut events) = DoraNode::init_from_env()?;

let mut ticks = 0;
while let Some(event) = events.recv() {
match event {
Event::Input { id, metadata, data } => match id.as_ref() {
"tick" => {
ticks += 1;
}
"random" => {
let value = u64::try_from(&data).context("unexpected data type")?;

let output = format!(
"operator received random value {value:#x} after {} ticks",
ticks
);
node.send_output(
status_output.clone(),
metadata.parameters,
output.into_arrow(),
)?;
}
other => eprintln!("ignoring unexpected input {other}"),
},
Event::Stop => {} // TODO: should we stop the node here?
Event::InputClosed { id } => {
println!("input `{id}` was closed");
if *id == "random" {
println!("`random` input was closed -> exiting");
break;
}
}
other => {
println!("received unknown event {other:?}");
}
}
}

Ok(())
}

Loading…
Cancel
Save