Browse Source

Create a Rust dataflow example

The example shows how to use the dora node and operator APIs.
tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
8e21a990d0
Failed to extract signature
13 changed files with 202 additions and 109 deletions
  1. +28
    -7
      Cargo.lock
  2. +7
    -1
      Cargo.toml
  3. +0
    -1
      apis/rust/operator/src/lib.rs
  4. +0
    -55
      binaries/coordinator/examples/nodes/rust/sink_logger.rs
  5. +0
    -40
      examples/example-operator/src/lib.rs
  6. +26
    -0
      examples/rust-dataflow/dataflow.yml
  7. +13
    -0
      examples/rust-dataflow/node/Cargo.toml
  8. +3
    -3
      examples/rust-dataflow/node/src/main.rs
  9. +2
    -2
      examples/rust-dataflow/operator/Cargo.toml
  10. +49
    -0
      examples/rust-dataflow/operator/src/lib.rs
  11. +33
    -0
      examples/rust-dataflow/run.rs
  12. +12
    -0
      examples/rust-dataflow/sink/Cargo.toml
  13. +29
    -0
      examples/rust-dataflow/sink/src/main.rs

+ 28
- 7
Cargo.lock View File

@@ -798,13 +798,6 @@ version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"

[[package]]
name = "example-operator"
version = "0.1.0"
dependencies = [
"dora-operator-api",
]

[[package]]
name = "eyre"
version = "0.6.8"
@@ -2279,6 +2272,34 @@ dependencies = [
"zeroize",
]

[[package]]
name = "rust-dataflow-example-node"
version = "0.1.0"
dependencies = [
"dora-node-api",
"eyre",
"futures",
"rand",
"tokio",
]

[[package]]
name = "rust-dataflow-example-operator"
version = "0.1.0"
dependencies = [
"dora-operator-api",
]

[[package]]
name = "rust-dataflow-example-sink"
version = "0.1.0"
dependencies = [
"dora-node-api",
"eyre",
"futures",
"tokio",
]

[[package]]
name = "rustc_version"
version = "0.4.0"


+ 7
- 1
Cargo.toml View File

@@ -5,7 +5,9 @@ members = [
"apis/rust/*",
"apis/rust/operator/macros",
"binaries/*",
"examples/example-operator",
"examples/rust-dataflow/operator",
"examples/rust-dataflow/node",
"examples/rust-dataflow/sink",
"libraries/core",
"libraries/extensions/message",
"libraries/extensions/telemetry/*",
@@ -27,3 +29,7 @@ dora-coordinator = { path = "binaries/coordinator" }
[[example]]
name = "c-dataflow"
path = "examples/c-dataflow/run.rs"

[[example]]
name = "rust-dataflow"
path = "examples/rust-dataflow/run.rs"

+ 0
- 1
apis/rust/operator/src/lib.rs View File

@@ -30,7 +30,6 @@ pub struct DoraOutputSender {

impl DoraOutputSender {
pub fn send(&mut self, id: &str, data: &[u8]) -> Result<(), isize> {
println!("operator sending output..");
let result = unsafe {
(self.output_fn_raw)(
id.as_ptr(),


+ 0
- 55
binaries/coordinator/examples/nodes/rust/sink_logger.rs View File

@@ -1,55 +0,0 @@
use dora_node_api::{self, DoraNode};
use eyre::bail;
use futures::StreamExt;
use std::time::Duration;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let operator = DoraNode::init_from_env().await?;

let mut inputs = operator.inputs().await?;

let mut last_timestamp = None;

loop {
let timeout = Duration::from_secs(5);
let input = match tokio::time::timeout(timeout, inputs.next()).await {
Ok(Some(input)) => input,
Ok(None) => break,
Err(_) => bail!("timeout while waiting for input"),
};

match input.id.as_str() {
"time" => {
// only record it, but don't print anything
last_timestamp = Some(String::from_utf8_lossy(&input.data).into_owned());
}
"random" => {
let number = match input.data.try_into() {
Ok(bytes) => u64::from_le_bytes(bytes),
Err(_) => {
eprintln!("Malformed `random` message");
continue;
}
};
if let Some(timestamp) = &last_timestamp {
println!("random at {}: {}", timestamp, number);
}
}
"timestamped-random" => {
let data = String::from_utf8(input.data)?;
println!("received timestamped random value: {data}");
}
"c-counter" => {
println!("received C counter value: {:?}", input.data);
}
"python-counter" => {
println!("received PYTHON counter value: {:?}", input.data);
}

other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

Ok(())
}

+ 0
- 40
examples/example-operator/src/lib.rs View File

@@ -1,40 +0,0 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
time: Option<String>,
}

impl DoraOperator for ExampleOperator {
fn on_input(
&mut self,
id: &str,
data: &[u8],
output_sender: &mut DoraOutputSender,
) -> Result<DoraStatus, ()> {
match id {
"time" => {
let parsed = std::str::from_utf8(data).map_err(|_| ())?;
self.time = Some(parsed.to_owned());
}
"random" => {
let parsed = {
let data: [u8; 8] = data.try_into().map_err(|_| ())?;
u64::from_le_bytes(data)
};
if let Some(time) = &self.time {
let output = format!("state operator random value {parsed} at {time}");
output_sender
.send("timestamped-random", output.as_bytes())
.map_err(|_| ())?;
}
}
other => eprintln!("ignoring unexpected input {other}"),
}
Ok(DoraStatus::Continue)
}
}

+ 26
- 0
examples/rust-dataflow/dataflow.yml View File

@@ -0,0 +1,26 @@
communication:
zenoh:
prefix: /example-rust-dataflow

nodes:
- id: rust-node
custom:
run: ../../target/release/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/300
outputs:
- random
- id: runtime-node
operators:
- id: rust-operator
shared-library: ../../target/release/librust_dataflow_example_operator.so
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink
custom:
run: ../../target/release/rust-dataflow-example-sink
inputs:
message: runtime-node/rust-operator/status

+ 13
- 0
examples/rust-dataflow/node/Cargo.toml View File

@@ -0,0 +1,13 @@
[package]
name = "rust-dataflow-example-node"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.20.1", features = ["rt", "macros"] }

binaries/coordinator/examples/nodes/rust/random_number.rs → examples/rust-dataflow/node/src/main.rs View File

@@ -5,13 +5,13 @@ use std::time::Duration;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let output = DataId::from("number".to_owned());
let output = DataId::from("random".to_owned());

let operator = DoraNode::init_from_env().await?;

let mut inputs = operator.inputs().await?;

loop {
for _ in 0..20 {
let timeout = Duration::from_secs(3);
let input = match tokio::time::timeout(timeout, inputs.next()).await {
Ok(Some(input)) => input,
@@ -20,7 +20,7 @@ async fn main() -> eyre::Result<()> {
};

match input.id.as_str() {
"timestamp" => {
"tick" => {
let random: u64 = rand::random();
operator.send_output(&output, &random.to_le_bytes()).await?;
}

examples/example-operator/Cargo.toml → examples/rust-dataflow/operator/Cargo.toml View File

@@ -1,5 +1,5 @@
[package]
name = "example-operator"
name = "rust-dataflow-example-operator"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
@@ -10,4 +10,4 @@ license = "Apache-2.0"
crate-type = ["cdylib"]

[dependencies]
dora-operator-api = { path = "../../apis/rust/operator" }
dora-operator-api = { path = "../../../apis/rust/operator" }

+ 49
- 0
examples/rust-dataflow/operator/src/lib.rs View File

@@ -0,0 +1,49 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus};
use std::time::{Duration, Instant};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
ticks: usize,
last_random_at: Option<Instant>,
}

impl DoraOperator for ExampleOperator {
fn on_input(
&mut self,
id: &str,
data: &[u8],
output_sender: &mut DoraOutputSender,
) -> Result<DoraStatus, ()> {
match id {
"tick" => {
self.ticks += 1;
}
"random" => {
let parsed = {
let data: [u8; 8] = data.try_into().map_err(|_| ())?;
u64::from_le_bytes(data)
};
let output = format!(
"operator received random value {parsed} after {} ticks",
self.ticks
);
output_sender
.send("status", output.as_bytes())
.map_err(|_| ())?;
self.last_random_at = Some(Instant::now());
}
other => eprintln!("ignoring unexpected input {other}"),
}
if let Some(last_random_at) = self.last_random_at {
if last_random_at.elapsed() > Duration::from_secs(1) {
// looks like the node sending the random values finished -> exit too
return Ok(DoraStatus::Stop);
}
}
Ok(DoraStatus::Continue)
}
}

+ 33
- 0
examples/rust-dataflow/run.rs View File

@@ -0,0 +1,33 @@
use eyre::{bail, Context};
use std::path::Path;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

build_package("rust-dataflow-example-node").await?;
build_package("rust-dataflow-example-operator").await?;
build_package("rust-dataflow-example-sink").await?;
build_package("dora-runtime").await?;

dora_coordinator::run(dora_coordinator::Command::Run {
dataflow: Path::new("dataflow.yml").to_owned(),
runtime: Some(root.join("target").join("release").join("dora-runtime")),
})
.await?;

Ok(())
}

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build").arg("--release");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
};
Ok(())
}

+ 12
- 0
examples/rust-dataflow/sink/Cargo.toml View File

@@ -0,0 +1,12 @@
[package]
name = "rust-dataflow-example-sink"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" }
eyre = "0.6.8"
futures = "0.3.21"
tokio = { version = "1.20.1", features = ["macros"] }

+ 29
- 0
examples/rust-dataflow/sink/src/main.rs View File

@@ -0,0 +1,29 @@
use dora_node_api::{self, DoraNode};
use eyre::bail;
use futures::StreamExt;
use std::time::Duration;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let operator = DoraNode::init_from_env().await?;

let mut inputs = operator.inputs().await?;

loop {
let timeout = Duration::from_secs(5);
let input = match tokio::time::timeout(timeout, inputs.next()).await {
Ok(Some(input)) => input,
Ok(None) => break,
Err(_) => bail!("timeout while waiting for input"),
};

match input.id.as_str() {
"message" => {
println!("received message: {}", String::from_utf8_lossy(&input.data));
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

Ok(())
}

Loading…
Cancel
Save