diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3ae03a65..3b5d13dd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,6 +60,11 @@ jobs: - name: "Rust Dataflow example" timeout-minutes: 30 run: cargo run --example rust-dataflow + + - name: "Benchmark example" + timeout-minutes: 30 + run: cargo run --example benchmark --release + - name: "C Dataflow example" timeout-minutes: 15 run: cargo run --example c-dataflow diff --git a/Cargo.lock b/Cargo.lock index b08f01c5..dc121df2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -272,6 +272,25 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c" +[[package]] +name = "benchmark-example-node" +version = "0.1.2" +dependencies = [ + "dora-node-api", + "eyre", + "futures", + "rand", + "tokio", +] + +[[package]] +name = "benchmark-example-sink" +version = "0.1.2" +dependencies = [ + "dora-node-api", + "eyre", +] + [[package]] name = "bincode" version = "1.3.3" diff --git a/Cargo.toml b/Cargo.toml index 2de4e586..595fdc34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "binaries/daemon", # "binaries/runtime", "examples/rust-dataflow/*", + "examples/benchmark/*", "libraries/communication-layer/*", "libraries/core", "libraries/message", @@ -65,3 +66,7 @@ path = "examples/c++-dataflow/run.rs" [[example]] name = "python-dataflow" path = "examples/python-dataflow/run.rs" + +[[example]] +name = "benchmark" +path = "examples/benchmark/run.rs" diff --git a/examples/benchmark/dataflow.yml b/examples/benchmark/dataflow.yml new file mode 100644 index 00000000..87452f6c --- /dev/null +++ b/examples/benchmark/dataflow.yml @@ -0,0 +1,18 @@ +communication: + zenoh: + prefix: /benchmark-example + +nodes: + - id: rust-node + custom: + build: cargo build -p benchmark-example-node --release + source: ../../target/release/benchmark-example-node + outputs: + - random + - id: rust-sink + custom: + build: cargo build -p benchmark-example-sink --release + source: ../../target/release/benchmark-example-sink + inputs: + # message: runtime-node/rust-operator/status + message: rust-node/random diff --git a/examples/benchmark/node/Cargo.toml b/examples/benchmark/node/Cargo.toml new file mode 100644 index 00000000..35e582a7 --- /dev/null +++ b/examples/benchmark/node/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "benchmark-example-node" +version.workspace = true +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { workspace = true } +eyre = "0.6.8" +futures = "0.3.21" +rand = "0.8.5" +tokio = { version = "1.20.1", features = ["rt", "macros"] } diff --git a/examples/benchmark/node/src/main.rs b/examples/benchmark/node/src/main.rs new file mode 100644 index 00000000..53644f8b --- /dev/null +++ b/examples/benchmark/node/src/main.rs @@ -0,0 +1,21 @@ +use dora_node_api::{self, dora_core::config::DataId, DoraNode}; +use rand::Rng; + +fn main() -> eyre::Result<()> { + let output = DataId::from("random".to_owned()); + + let (mut node, _events) = DoraNode::init_from_env()?; + for size in [0, 8, 64, 512, 2048, 4096, 4 * 4096, 10 * 4096] { + for _ in 0..100 { + let data: Vec = rand::thread_rng() + .sample_iter(rand::distributions::Standard) + .take(size) + .collect(); + node.send_output(output.clone(), Default::default(), data.len(), |out| { + out.copy_from_slice(&data); + })?; + } + } + + Ok(()) +} diff --git a/examples/benchmark/run.rs b/examples/benchmark/run.rs new file mode 100644 index 00000000..37f26473 --- /dev/null +++ b/examples/benchmark/run.rs @@ -0,0 +1,43 @@ +use eyre::{bail, Context}; +use std::path::Path; +use tracing::metadata::LevelFilter; +use tracing_subscriber::Layer; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + set_up_tracing().wrap_err("failed to set up tracing subscriber")?; + + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + let dataflow = Path::new("dataflow.yml"); + build_dataflow(dataflow).await?; + + dora_daemon::Daemon::run_dataflow(dataflow).await?; + + Ok(()) +} + +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + +fn set_up_tracing() -> eyre::Result<()> { + use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; + + let stdout_log = tracing_subscriber::fmt::layer() + .pretty() + .with_filter(LevelFilter::DEBUG); + let subscriber = tracing_subscriber::Registry::default().with(stdout_log); + tracing::subscriber::set_global_default(subscriber) + .context("failed to set tracing global subscriber") +} diff --git a/examples/benchmark/sink/Cargo.toml b/examples/benchmark/sink/Cargo.toml new file mode 100644 index 00000000..58545c97 --- /dev/null +++ b/examples/benchmark/sink/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "benchmark-example-sink" +version.workspace = true +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { workspace = true } +eyre = "0.6.8" diff --git a/examples/benchmark/sink/src/main.rs b/examples/benchmark/sink/src/main.rs new file mode 100644 index 00000000..5ad29240 --- /dev/null +++ b/examples/benchmark/sink/src/main.rs @@ -0,0 +1,68 @@ +use dora_node_api::{self, daemon::Event, DoraNode}; +use eyre::ContextCompat; +use std::{ + fmt::Write as _, + time::{Duration, Instant}, +}; + +fn main() -> eyre::Result<()> { + let (_node, mut events) = DoraNode::init_from_env()?; + + let mut current_size = 0; + let mut n = 0; + let mut start = Instant::now(); + let mut latencies = Vec::new(); + + let mut summary = String::new(); + + while let Some(event) = events.recv() { + match event { + Event::Stop => break, + Event::Input { id, metadata, data } => match id.as_str() { + "message" => { + let data = data.as_deref().unwrap_or_default(); + + if data.len() != current_size { + if n > 0 { + record_results(start, current_size, n, latencies, &mut summary); + } + current_size = data.len(); + n = 0; + start = Instant::now(); + latencies = Vec::new(); + } + n += 1; + latencies.push(metadata.timestamp().get_time().to_system_time().elapsed()?); + } + other => eprintln!("Ignoring unexpected input `{other}`"), + }, + Event::InputClosed { id } => { + println!("Input `{id}` was closed -> exiting"); + break; + } + other => eprintln!("Received unexpected input: {other:?}"), + } + } + + record_results(start, current_size, n, latencies, &mut summary); + + println!("\nSummary:\n{summary}"); + + Ok(()) +} + +fn record_results( + start: Instant, + current_size: usize, + n: u32, + latencies: Vec, + summary: &mut String, +) { + let duration = start.elapsed(); + let per_message = duration / n; + let avg_latency = latencies.iter().sum::() / n; + let msg = + format!("size {current_size:<#8x}: {per_message:?} per message (latency: {avg_latency:?})"); + println!("{msg}"); + writeln!(summary, "{msg}").unwrap(); +}