From c51565126494a1bfd18b78c86e8dda4cc63d37aa Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 17 Apr 2023 21:08:26 +0200 Subject: [PATCH] Start working on a example that uses multiple daemons --- Cargo.lock | 29 +++++++ Cargo.toml | 8 ++ binaries/coordinator/Cargo.toml | 1 + binaries/coordinator/src/lib.rs | 73 ++++++++++++---- binaries/coordinator/src/main.rs | 2 +- examples/multiple-daemons/dataflow.yml | 37 +++++++++ examples/multiple-daemons/node/Cargo.toml | 13 +++ examples/multiple-daemons/node/src/main.rs | 38 +++++++++ examples/multiple-daemons/operator/Cargo.toml | 13 +++ examples/multiple-daemons/operator/src/lib.rs | 52 ++++++++++++ examples/multiple-daemons/run.rs | 83 +++++++++++++++++++ examples/multiple-daemons/sink/Cargo.toml | 10 +++ examples/multiple-daemons/sink/src/main.rs | 39 +++++++++ 13 files changed, 379 insertions(+), 19 deletions(-) create mode 100644 examples/multiple-daemons/dataflow.yml create mode 100644 examples/multiple-daemons/node/Cargo.toml create mode 100644 examples/multiple-daemons/node/src/main.rs create mode 100644 examples/multiple-daemons/operator/Cargo.toml create mode 100644 examples/multiple-daemons/operator/src/lib.rs create mode 100644 examples/multiple-daemons/run.rs create mode 100644 examples/multiple-daemons/sink/Cargo.toml create mode 100644 examples/multiple-daemons/sink/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 979d5966..724e17e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1068,6 +1068,7 @@ dependencies = [ name = "dora-coordinator" version = "0.2.2" dependencies = [ + "clap 3.2.23", "ctrlc", "dora-core", "dora-tracing", @@ -1143,10 +1144,12 @@ dependencies = [ name = "dora-examples" version = "0.0.0" dependencies = [ + "dora-coordinator", "dora-core", "dora-daemon", "dunce", "eyre", + "futures", "serde_yaml 0.8.26", "tokio", "tracing", @@ -2375,6 +2378,32 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "multiple-daemons-example-node" +version = "0.2.2" +dependencies = [ + "dora-node-api", + "eyre", + "futures", + "rand", + "tokio", +] + +[[package]] +name = "multiple-daemons-example-operator" +version = "0.2.2" +dependencies = [ + "dora-operator-api", +] + +[[package]] +name = "multiple-daemons-example-sink" +version = "0.2.2" +dependencies = [ + "dora-node-api", + "eyre", +] + [[package]] name = "nanorand" version = "0.7.0" diff --git a/Cargo.toml b/Cargo.toml index ea907ef7..abdee5f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "binaries/runtime", "examples/rust-dataflow/*", "examples/benchmark/*", + "examples/multiple-daemons/*", "libraries/communication-layer/*", "libraries/core", "libraries/message", @@ -46,6 +47,7 @@ communication-layer-request-reply = { version = "0.2.2", path = "libraries/commu dora-message = { version = "0.2.2", path = "libraries/message" } dora-runtime = { version = "0.2.2", path = "binaries/runtime" } dora-daemon = { version = "0.2.2", path = "binaries/daemon" } +dora-coordinator = { version = "0.2.2", path = "binaries/coordinator" } [package] name = "dora-examples" @@ -58,12 +60,14 @@ license = "Apache-2.0" eyre = "0.6.8" tokio = "1.24.2" dora-daemon = { workspace = true } +dora-coordinator = { workspace = true } dora-core = { workspace = true } dunce = "1.0.2" serde_yaml = "0.8.23" uuid = { version = "1.2.1", features = ["v4", "serde"] } tracing = "0.1.36" tracing-subscriber = "0.3.15" +futures = "0.3.25" [[example]] name = "c-dataflow" @@ -92,3 +96,7 @@ path = "examples/python-operator-dataflow/run.rs" [[example]] name = "benchmark" path = "examples/benchmark/run.rs" + +[[example]] +name = "multiple-daemons" +path = "examples/multiple-daemons/run.rs" diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 53e0bcfb..e07398cd 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -30,3 +30,4 @@ serde_json = "1.0.86" which = "4.3.0" thiserror = "1.0.37" ctrlc = "3.2.5" +clap = { version = "3.1.8", features = ["derive"] } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 949df240..b66cb37e 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -14,15 +14,19 @@ use dora_core::{ }, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; -use futures::{stream::FuturesUnordered, Stream, StreamExt}; +use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; use futures_concurrency::stream::Merge; use run::SpawnedDataflow; use std::{ collections::{BTreeSet, HashMap}, - path::Path, + path::{Path, PathBuf}, time::Duration, }; -use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::mpsc, + task::JoinHandle, +}; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; use uuid::Uuid; @@ -31,27 +35,60 @@ mod listener; mod run; mod tcp_utils; -pub async fn run() -> eyre::Result<()> { - let mut tasks = FuturesUnordered::new(); +#[derive(Debug, Clone, clap::Parser)] +#[clap(about = "Dora coordinator")] +pub struct Args { + #[clap(long)] + pub port: Option, - // start in daemon mode - start(&tasks).await?; + #[clap(long)] + pub run_dataflow: Option, - tracing::debug!("coordinator main loop finished, waiting on spawned tasks"); - while let Some(join_result) = tasks.next().await { - if let Err(err) = join_result { - tracing::error!("task panicked: {err}"); - } - } - tracing::debug!("all spawned tasks finished, exiting.."); + #[clap(long)] + pub dora_runtime_path: Option, +} + +pub async fn run(args: Args) -> eyre::Result<()> { + let ctrlc_events = set_up_ctrlc_handler()?; + + let (_, task) = start(args, ctrlc_events).await?; + + task.await?; Ok(()) } -async fn start(tasks: &FuturesUnordered>) -> eyre::Result<()> { - let ctrlc_events = set_up_ctrlc_handler()?; +pub async fn start( + args: Args, + external_events: impl Stream + Unpin, +) -> Result<(u16, impl Future>), eyre::ErrReport> { + let port = args.port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT); + let listener = listener::create_listener(port).await?; + let port = listener + .local_addr() + .wrap_err("failed to get local addr of listener")? + .port(); + let mut tasks = FuturesUnordered::new(); + let future = async move { + start_inner(listener, &tasks, external_events).await?; + + tracing::debug!("coordinator main loop finished, waiting on spawned tasks"); + while let Some(join_result) = tasks.next().await { + if let Err(err) = join_result { + tracing::error!("task panicked: {err}"); + } + } + tracing::debug!("all spawned tasks finished, exiting.."); + Ok(()) + }; + Ok((port, future)) +} - let listener = listener::create_listener(DORA_COORDINATOR_PORT_DEFAULT).await?; +async fn start_inner( + listener: TcpListener, + tasks: &FuturesUnordered>, + external_events: impl Stream + Unpin, +) -> eyre::Result<()> { let new_daemon_connections = TcpListenerStream::new(listener).map(|c| { c.map(Event::NewDaemonConnection) .wrap_err("failed to open connection") @@ -75,7 +112,7 @@ async fn start(tasks: &FuturesUnordered>) -> eyre::Result<()> { ( control_events, new_daemon_connections, - ctrlc_events, + external_events, daemon_watchdog_interval, ) .merge(), diff --git a/binaries/coordinator/src/main.rs b/binaries/coordinator/src/main.rs index 95f91e70..705b4942 100644 --- a/binaries/coordinator/src/main.rs +++ b/binaries/coordinator/src/main.rs @@ -8,5 +8,5 @@ async fn main() -> eyre::Result<()> { #[cfg(feature = "tracing")] set_up_tracing("dora-coordinator").context("failed to set up tracing subscriber")?; - dora_coordinator::run().await + dora_coordinator::run(clap::Parser::parse()).await } diff --git a/examples/multiple-daemons/dataflow.yml b/examples/multiple-daemons/dataflow.yml new file mode 100644 index 00000000..df6d89fe --- /dev/null +++ b/examples/multiple-daemons/dataflow.yml @@ -0,0 +1,37 @@ +communication: + zenoh: + prefix: example-multiple-daemons + +daemon_config: Tcp # or Shmem + +nodes: + - id: rust-node + deploy: + machine: A + custom: + build: cargo build -p multiple-daemons-example-node + source: ../../target/debug/multiple-daemons-example-node + inputs: + tick: dora/timer/millis/10 + outputs: + - random + - id: runtime-node + deploy: + machine: A + operators: + - id: rust-operator + build: cargo build -p multiple-daemons-example-operator + shared-library: ../../target/debug/rust_dataflow_example_operator + inputs: + tick: dora/timer/millis/100 + random: rust-node/random + outputs: + - status + - id: rust-sink + deploy: + machine: B + custom: + build: cargo build -p multiple-daemons-example-sink + source: ../../target/debug/multiple-daemons-example-sink + inputs: + message: runtime-node/rust-operator/status diff --git a/examples/multiple-daemons/node/Cargo.toml b/examples/multiple-daemons/node/Cargo.toml new file mode 100644 index 00000000..1175c80d --- /dev/null +++ b/examples/multiple-daemons/node/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "multiple-daemons-example-node" +version.workspace = true +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" +futures = "0.3.21" +rand = "0.8.5" +tokio = { version = "1.24.2", features = ["rt", "macros"] } diff --git a/examples/multiple-daemons/node/src/main.rs b/examples/multiple-daemons/node/src/main.rs new file mode 100644 index 00000000..c52e4618 --- /dev/null +++ b/examples/multiple-daemons/node/src/main.rs @@ -0,0 +1,38 @@ +use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event}; + +fn main() -> eyre::Result<()> { + println!("hello"); + + let output = DataId::from("random".to_owned()); + + let (mut node, mut events) = DoraNode::init_from_env()?; + + for i in 0..100 { + let event = match events.recv() { + Some(input) => input, + None => break, + }; + + match event { + Event::Input { + id, + metadata, + data: _, + } => match id.as_str() { + "tick" => { + let random: u64 = rand::random(); + println!("tick {i}, sending {random:#x}"); + let data: &[u8] = &random.to_le_bytes(); + node.send_output(output.clone(), metadata.parameters, data.len(), |out| { + out.copy_from_slice(data); + })?; + } + other => eprintln!("Ignoring unexpected input `{other}`"), + }, + Event::Stop => println!("Received manual stop"), + other => eprintln!("Received unexpected input: {other:?}"), + } + } + + Ok(()) +} diff --git a/examples/multiple-daemons/operator/Cargo.toml b/examples/multiple-daemons/operator/Cargo.toml new file mode 100644 index 00000000..f7ed41fb --- /dev/null +++ b/examples/multiple-daemons/operator/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "multiple-daemons-example-operator" +version.workspace = true +edition = "2021" +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +crate-type = ["cdylib"] + +[dependencies] +dora-operator-api = { workspace = true } diff --git a/examples/multiple-daemons/operator/src/lib.rs b/examples/multiple-daemons/operator/src/lib.rs new file mode 100644 index 00000000..81a1a1f0 --- /dev/null +++ b/examples/multiple-daemons/operator/src/lib.rs @@ -0,0 +1,52 @@ +#![warn(unsafe_op_in_unsafe_fn)] + +use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; + +register_operator!(ExampleOperator); + +#[derive(Debug, Default)] +struct ExampleOperator { + ticks: usize, +} + +impl DoraOperator for ExampleOperator { + fn on_event( + &mut self, + event: &Event, + output_sender: &mut DoraOutputSender, + ) -> Result { + match event { + Event::Input { id, data } => match *id { + "tick" => { + self.ticks += 1; + } + "random" => { + let parsed = { + let data: [u8; 8] = + (*data).try_into().map_err(|_| "unexpected random data")?; + u64::from_le_bytes(data) + }; + let output = format!( + "operator received random value {parsed:#x} after {} ticks", + self.ticks + ); + output_sender.send("status".into(), output.into_bytes())?; + } + other => eprintln!("ignoring unexpected input {other}"), + }, + Event::Stop => {} + Event::InputClosed { id } => { + println!("input `{id}` was closed"); + if *id == "random" { + println!("`random` input was closed -> exiting"); + return Ok(DoraStatus::Stop); + } + } + other => { + println!("received unknown event {other:?}"); + } + } + + Ok(DoraStatus::Continue) + } +} diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs new file mode 100644 index 00000000..0280278f --- /dev/null +++ b/examples/multiple-daemons/run.rs @@ -0,0 +1,83 @@ +use eyre::{bail, Context}; +use futures::stream; +use std::{ + net::{Ipv4Addr, SocketAddr}, + path::Path, +}; +use tokio::task::JoinSet; +use tracing::metadata::LevelFilter; +use tracing_subscriber::Layer; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + set_up_tracing().wrap_err("failed to set up tracing subscriber")?; + + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + let dataflow = Path::new("dataflow.yml"); + build_dataflow(dataflow).await?; + + build_package("dora-runtime").await?; + let dora_runtime_path = Some(root.join("target").join("debug").join("dora-runtime")); + + let (coordinator_port, coordinator) = dora_coordinator::start( + dora_coordinator::Args { + port: Some(0), + run_dataflow: Some(dataflow.to_path_buf()), + dora_runtime_path: dora_runtime_path.clone(), + }, + stream::empty(), + ) + .await?; + let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); + let daemon_a = + dora_daemon::Daemon::run(coordinator_addr, "A".into(), dora_runtime_path.clone()); + let daemon_b = dora_daemon::Daemon::run(coordinator_addr, "B".into(), dora_runtime_path); + + let mut tasks = JoinSet::new(); + tasks.spawn(coordinator); + tasks.spawn(daemon_a); + tasks.spawn(daemon_b); + + while let Some(res) = tasks.join_next().await { + res.unwrap()?; + } + + Ok(()) +} + +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + +async fn build_package(package: &str) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("build"); + cmd.arg("--package").arg(package); + if !cmd.status().await?.success() { + bail!("failed to build {package}"); + }; + Ok(()) +} + +fn set_up_tracing() -> eyre::Result<()> { + use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; + + let stdout_log = tracing_subscriber::fmt::layer() + .pretty() + .with_filter(LevelFilter::DEBUG); + let subscriber = tracing_subscriber::Registry::default().with(stdout_log); + tracing::subscriber::set_global_default(subscriber) + .context("failed to set tracing global subscriber") +} diff --git a/examples/multiple-daemons/sink/Cargo.toml b/examples/multiple-daemons/sink/Cargo.toml new file mode 100644 index 00000000..ed0823bd --- /dev/null +++ b/examples/multiple-daemons/sink/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "multiple-daemons-example-sink" +version.workspace = true +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" diff --git a/examples/multiple-daemons/sink/src/main.rs b/examples/multiple-daemons/sink/src/main.rs new file mode 100644 index 00000000..18632c3f --- /dev/null +++ b/examples/multiple-daemons/sink/src/main.rs @@ -0,0 +1,39 @@ +use dora_node_api::{self, DoraNode, Event}; +use eyre::{bail, Context, ContextCompat}; + +fn main() -> eyre::Result<()> { + let (_node, mut events) = DoraNode::init_from_env()?; + + while let Some(event) = events.recv() { + match event { + Event::Input { + id, + metadata: _, + data, + } => match id.as_str() { + "message" => { + let data = data.wrap_err("no data")?; + let received_string = std::str::from_utf8(&data) + .wrap_err("received message was not utf8-encoded")?; + println!("sink received message: {}", received_string); + if !received_string.starts_with("operator received random value ") { + bail!("unexpected message format (should start with 'operator received random value')") + } + if !received_string.ends_with(" ticks") { + bail!("unexpected message format (should end with 'ticks')") + } + } + other => eprintln!("Ignoring unexpected input `{other}`"), + }, + Event::Stop => { + println!("Received manual stop"); + } + Event::InputClosed { id } => { + println!("Input `{id}` was closed"); + } + other => eprintln!("Received unexpected input: {other:?}"), + } + } + + Ok(()) +}