Browse Source

Update nodes of rust example to new API

tags/v0.2.0-candidate
Philipp Oppermann 3 years ago
parent
commit
d01dc38134
Failed to extract signature
2 changed files with 44 additions and 36 deletions
  1. +22
    -18
      examples/rust-dataflow/node/src/main.rs
  2. +22
    -18
      examples/rust-dataflow/sink/src/main.rs

+ 22
- 18
examples/rust-dataflow/node/src/main.rs View File

@@ -1,32 +1,36 @@
use dora_node_api::{self, dora_core::config::DataId, DoraNode};
use dora_node_api::{
self,
dora_core::{config::DataId, daemon_messages::NodeEvent},
DoraNode,
};

fn main() -> eyre::Result<()> {
let output = DataId::from("random".to_owned());

let mut operator = DoraNode::init_from_env()?;

let inputs = operator.inputs()?;
let (mut node, events) = DoraNode::init_from_env()?;

for _ in 0..20 {
let input = match inputs.recv() {
let event = match events.recv() {
Ok(input) => input,
Err(_) => break,
};

match input.id.as_str() {
"tick" => {
let random: u64 = rand::random();
let data: &[u8] = &random.to_le_bytes();
operator.send_output(
&output,
input.metadata().parameters.clone(),
data.len(),
|out| {
match event {
NodeEvent::Stop => break,
NodeEvent::Input {
id,
metadata,
data: _,
} => match id.as_str() {
"tick" => {
let random: u64 = rand::random();
let data: &[u8] = &random.to_le_bytes();
node.send_output(output.clone(), metadata.parameters, data.len(), |out| {
out.copy_from_slice(data);
},
)?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
})?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
}
}



+ 22
- 18
examples/rust-dataflow/sink/src/main.rs View File

@@ -1,26 +1,30 @@
use dora_node_api::{self, DoraNode};
use dora_node_api::{self, dora_core::daemon_messages::NodeEvent, DoraNode};
use eyre::{bail, Context};

fn main() -> eyre::Result<()> {
let mut operator = DoraNode::init_from_env()?;
let (_node, events) = DoraNode::init_from_env()?;

let inputs = operator.inputs()?;

while let Ok(input) = inputs.recv() {
match input.id.as_str() {
"message" => {
let data = input.data();
let received_string =
std::str::from_utf8(&data).wrap_err("received message was not utf8-encoded")?;
println!("received message: {}", received_string);
if !received_string.starts_with("operator received random value ") {
bail!("unexpected message format (should start with 'operator received random value')")
}
if !received_string.ends_with(" ticks") {
bail!("unexpected message format (should end with 'ticks')")
while let Ok(event) = events.recv() {
match event {
NodeEvent::Stop => break,
NodeEvent::Input {
id,
metadata: _,
data,
} => match id.as_str() {
"message" => {
let received_string = std::str::from_utf8(data.get())
.wrap_err("received message was not utf8-encoded")?;
println!("received message: {}", received_string);
if !received_string.starts_with("operator received random value ") {
bail!("unexpected message format (should start with 'operator received random value')")
}
if !received_string.ends_with(" ticks") {
bail!("unexpected message format (should end with 'ticks')")
}
}
}
other => eprintln!("Ignoring unexpected input `{other}`"),
other => eprintln!("Ignoring unexpected input `{other}`"),
},
}
}



Loading…
Cancel
Save