Browse Source

Adjust multiple-daemons example

tags/v0.3.0-rc
Philipp Oppermann 2 years ago
parent
commit
ddf8035d2f
Failed to extract signature
3 changed files with 33 additions and 20 deletions
  1. +10
    -5
      examples/multiple-daemons/node/src/main.rs
  2. +17
    -10
      examples/multiple-daemons/operator/src/lib.rs
  3. +6
    -5
      examples/multiple-daemons/sink/src/main.rs

+ 10
- 5
examples/multiple-daemons/node/src/main.rs View File

@@ -1,4 +1,11 @@
use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event};
use std::iter;

use dora_node_api::{
self,
arrow::{array::PrimitiveArray, datatypes::UInt64Type},
dora_core::config::DataId,
DoraNode, Event,
};

fn main() -> eyre::Result<()> {
println!("hello");
@@ -22,10 +29,8 @@ fn main() -> eyre::Result<()> {
"tick" => {
let random: u64 = rand::random();
println!("tick {i}, sending {random:#x}");
let data: &[u8] = &random.to_le_bytes();
node.send_output_raw(output.clone(), metadata.parameters, data.len(), |out| {
out.copy_from_slice(data);
})?;
let data: PrimitiveArray<UInt64Type> = iter::once(random).collect();
node.send_output(output.clone(), metadata.parameters, data)?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},


+ 17
- 10
examples/multiple-daemons/operator/src/lib.rs View File

@@ -1,6 +1,13 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event};
use dora_operator_api::{
register_operator,
types::arrow::{
array::{AsArray, StringArray},
datatypes::UInt64Type,
},
DoraOperator, DoraOutputSender, DoraStatus, Event,
};

register_operator!(ExampleOperator);

@@ -21,16 +28,16 @@ impl DoraOperator for ExampleOperator {
self.ticks += 1;
}
"random" => {
let parsed = {
let data: [u8; 8] =
(*data).try_into().map_err(|_| "unexpected random data")?;
u64::from_le_bytes(data)
};
let output = format!(
"operator received random value {parsed:#x} after {} ticks",
let data: u64 = data
.as_primitive_opt::<UInt64Type>()
.ok_or_else(|| "expected u64 value".to_owned())?
.value(0);
let output = StringArray::from_iter(std::iter::once(Some(format!(
"operator received random value {data:#x} after {} ticks",
self.ticks
);
output_sender.send("status".into(), output.into_bytes())?;
))));
output_sender.send("status".into(), output)?;
}
other => eprintln!("ignoring unexpected input {other}"),
},


+ 6
- 5
examples/multiple-daemons/sink/src/main.rs View File

@@ -1,5 +1,5 @@
use dora_node_api::{self, DoraNode, Event};
use eyre::{bail, Context, ContextCompat};
use dora_node_api::{self, arrow::array::AsArray, DoraNode, Event};
use eyre::{bail, ContextCompat};

fn main() -> eyre::Result<()> {
let (_node, mut events) = DoraNode::init_from_env()?;
@@ -12,9 +12,10 @@ fn main() -> eyre::Result<()> {
data,
} => match id.as_str() {
"message" => {
let data = data.wrap_err("no data")?;
let received_string = std::str::from_utf8(&data)
.wrap_err("received message was not utf8-encoded")?;
let received_string = &data
.as_string_opt::<i32>()
.context("expected string message")?
.value(0);
println!("sink received message: {}", received_string);
if !received_string.starts_with("operator received random value ") {
bail!("unexpected message format (should start with 'operator received random value')")


Loading…
Cancel
Save