From fce304641254b893852f7acf83cf61e85e3991ca Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 3 Jan 2023 09:26:51 +0100 Subject: [PATCH] Create a node API benchmark based on zenoh --- Cargo.lock | 21 +++++++++ Cargo.toml | 7 +++ examples/benchmark/dataflow.yml | 18 ++++++++ examples/benchmark/node/Cargo.toml | 13 ++++++ examples/benchmark/node/src/main.rs | 34 +++++++++++++++ examples/benchmark/run.rs | 47 ++++++++++++++++++++ examples/benchmark/sink/Cargo.toml | 10 +++++ examples/benchmark/sink/src/main.rs | 67 +++++++++++++++++++++++++++++ 8 files changed, 217 insertions(+) create mode 100644 examples/benchmark/dataflow.yml create mode 100644 examples/benchmark/node/Cargo.toml create mode 100644 examples/benchmark/node/src/main.rs create mode 100644 examples/benchmark/run.rs create mode 100644 examples/benchmark/sink/Cargo.toml create mode 100644 examples/benchmark/sink/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 8e6c67ad..af97d097 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -272,6 +272,25 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c" +[[package]] +name = "benchmark-example-node" +version = "0.1.2" +dependencies = [ + "dora-node-api", + "eyre", + "futures", + "rand", + "tokio", +] + +[[package]] +name = "benchmark-example-sink" +version = "0.1.2" +dependencies = [ + "dora-node-api", + "eyre", +] + [[package]] name = "bincode" version = "1.3.3" @@ -971,6 +990,8 @@ dependencies = [ "dunce", "eyre", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d04d0faf..7c5be6d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "binaries/*", "examples/rust-dataflow/*", "examples/iceoryx/*", + "examples/benchmark/*", "libraries/communication-layer/*", "libraries/core", "libraries/message", @@ -38,6 +39,8 @@ eyre = "0.6.8" tokio = "1.20.1" dora-coordinator = { path = "binaries/coordinator" } dunce = "1.0.2" +tracing = "0.1.36" +tracing-subscriber = "0.3.15" [[example]] name = "c-dataflow" @@ -62,3 +65,7 @@ path = "examples/python-dataflow/run.rs" [[example]] name = "iceoryx" path = "examples/iceoryx/run.rs" + +[[example]] +name = "benchmark" +path = "examples/benchmark/run.rs" diff --git a/examples/benchmark/dataflow.yml b/examples/benchmark/dataflow.yml new file mode 100644 index 00000000..87452f6c --- /dev/null +++ b/examples/benchmark/dataflow.yml @@ -0,0 +1,18 @@ +communication: + zenoh: + prefix: /benchmark-example + +nodes: + - id: rust-node + custom: + build: cargo build -p benchmark-example-node --release + source: ../../target/release/benchmark-example-node + outputs: + - random + - id: rust-sink + custom: + build: cargo build -p benchmark-example-sink --release + source: ../../target/release/benchmark-example-sink + inputs: + # message: runtime-node/rust-operator/status + message: rust-node/random diff --git a/examples/benchmark/node/Cargo.toml b/examples/benchmark/node/Cargo.toml new file mode 100644 index 00000000..cf3a6b28 --- /dev/null +++ b/examples/benchmark/node/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "benchmark-example-node" +version.workspace = true +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { workspace = true, features = ["zenoh", "iceoryx"] } +eyre = "0.6.8" +futures = "0.3.21" +rand = "0.8.5" +tokio = { version = "1.20.1", features = ["rt", "macros"] } diff --git a/examples/benchmark/node/src/main.rs b/examples/benchmark/node/src/main.rs new file mode 100644 index 00000000..15ca2992 --- /dev/null +++ b/examples/benchmark/node/src/main.rs @@ -0,0 +1,34 @@ +use dora_node_api::{self, dora_core::config::DataId, DoraNode}; +use rand::Rng; + +fn main() -> eyre::Result<()> { + let output = DataId::from("random".to_owned()); + + let mut node = DoraNode::init_from_env()?; + let sizes = [ + 0, + 8, + 64, + 512, + 2048, + 4096, + 4 * 4096, + 10 * 4096, + 100 * 4096, + 1000 * 4096, + 10000 * 4096, + ]; + for size in sizes { + for _ in 0..100 { + let data: Vec = rand::thread_rng() + .sample_iter(rand::distributions::Standard) + .take(size) + .collect(); + node.send_output(&output, Default::default(), data.len(), |out| { + out.copy_from_slice(&data); + })?; + } + } + + Ok(()) +} diff --git a/examples/benchmark/run.rs b/examples/benchmark/run.rs new file mode 100644 index 00000000..d9790f52 --- /dev/null +++ b/examples/benchmark/run.rs @@ -0,0 +1,47 @@ +use eyre::{bail, Context}; +use std::path::Path; +use tracing::metadata::LevelFilter; +use tracing_subscriber::Layer; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + set_up_tracing().wrap_err("failed to set up tracing subscriber")?; + + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + let dataflow = Path::new("dataflow.yml"); + build_dataflow(dataflow).await?; + + dora_coordinator::run(dora_coordinator::Args { + run_dataflow: dataflow.to_owned().into(), + runtime: None, + }) + .await?; + + Ok(()) +} + +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + +fn set_up_tracing() -> eyre::Result<()> { + use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; + + let stdout_log = tracing_subscriber::fmt::layer() + .pretty() + .with_filter(LevelFilter::DEBUG); + let subscriber = tracing_subscriber::Registry::default().with(stdout_log); + tracing::subscriber::set_global_default(subscriber) + .context("failed to set tracing global subscriber") +} diff --git a/examples/benchmark/sink/Cargo.toml b/examples/benchmark/sink/Cargo.toml new file mode 100644 index 00000000..e84e7206 --- /dev/null +++ b/examples/benchmark/sink/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "benchmark-example-sink" +version.workspace = true +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { workspace = true, features = ["zenoh", "iceoryx"] } +eyre = "0.6.8" diff --git a/examples/benchmark/sink/src/main.rs b/examples/benchmark/sink/src/main.rs new file mode 100644 index 00000000..58561705 --- /dev/null +++ b/examples/benchmark/sink/src/main.rs @@ -0,0 +1,67 @@ +use dora_node_api::{self, DoraNode}; +use std::{ + fmt::Write as _, + time::{Duration, Instant}, +}; + +fn main() -> eyre::Result<()> { + let mut node = DoraNode::init_from_env()?; + let inputs = node.inputs()?; + + let mut current_size = 0; + let mut n = 0; + let mut start = Instant::now(); + let mut latencies = Vec::new(); + + let mut summary = String::new(); + + while let Ok(input) = inputs.recv() { + match input.id.as_str() { + "message" => { + let data = input.data(); + + if data.len() != current_size { + if n > 0 { + record_results(start, current_size, n, latencies, &mut summary); + } + current_size = data.len(); + n = 0; + start = Instant::now(); + latencies = Vec::new(); + } + n += 1; + latencies.push( + input + .metadata + .timestamp() + .get_time() + .to_system_time() + .elapsed()?, + ); + } + other => eprintln!("Ignoring unexpected input `{other}`"), + } + } + + record_results(start, current_size, n, latencies, &mut summary); + + println!("\nSummary:\n{summary}"); + + Ok(()) +} + +fn record_results( + start: Instant, + current_size: usize, + n: u32, + latencies: Vec, + summary: &mut String, +) { + let duration = start.elapsed(); + let per_message = duration / n; + let avg_latency = latencies.iter().sum::() / n; + let msg = + format!("size {current_size:<#8x}: {per_message:?} per message (latency: {avg_latency:?})"); + println!("{msg}"); + writeln!(summary, "{msg}").unwrap(); +}