From 6dc0d58c8df167c0c65d7f84422eadc634b486e9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 5 Jan 2023 14:08:21 +0100 Subject: [PATCH] Reword benchmark example for more realistic latency results --- examples/benchmark/dataflow-iceoryx.yml | 19 +++++ .../{dataflow.yml => dataflow-zenoh.yml} | 7 +- examples/benchmark/node/src/main.rs | 25 ++++++- examples/benchmark/run.rs | 16 ++++- examples/benchmark/sink/src/main.rs | 70 ++++++++++++------- 5 files changed, 103 insertions(+), 34 deletions(-) create mode 100644 examples/benchmark/dataflow-iceoryx.yml rename examples/benchmark/{dataflow.yml => dataflow-zenoh.yml} (77%) diff --git a/examples/benchmark/dataflow-iceoryx.yml b/examples/benchmark/dataflow-iceoryx.yml new file mode 100644 index 00000000..d3404e04 --- /dev/null +++ b/examples/benchmark/dataflow-iceoryx.yml @@ -0,0 +1,19 @@ +communication: + iceoryx: + app_name_prefix: benchmark-example + +nodes: + - id: rust-node + custom: + build: cargo build -p benchmark-example-node --release + source: ../../target/release/benchmark-example-node + outputs: + - latency + - throughput + - id: rust-sink + custom: + build: cargo build -p benchmark-example-sink --release + source: ../../target/release/benchmark-example-sink + inputs: + latency: rust-node/latency + throughput: rust-node/throughput diff --git a/examples/benchmark/dataflow.yml b/examples/benchmark/dataflow-zenoh.yml similarity index 77% rename from examples/benchmark/dataflow.yml rename to examples/benchmark/dataflow-zenoh.yml index 87452f6c..382a6909 100644 --- a/examples/benchmark/dataflow.yml +++ b/examples/benchmark/dataflow-zenoh.yml @@ -8,11 +8,12 @@ nodes: build: cargo build -p benchmark-example-node --release source: ../../target/release/benchmark-example-node outputs: - - random + - latency + - throughput - id: rust-sink custom: build: cargo build -p benchmark-example-sink --release source: ../../target/release/benchmark-example-sink inputs: - # message: runtime-node/rust-operator/status - message: rust-node/random + latency: rust-node/latency + throughput: rust-node/throughput diff --git a/examples/benchmark/node/src/main.rs b/examples/benchmark/node/src/main.rs index 15ca2992..acf5332f 100644 --- a/examples/benchmark/node/src/main.rs +++ b/examples/benchmark/node/src/main.rs @@ -1,8 +1,11 @@ +use std::time::Duration; + use dora_node_api::{self, dora_core::config::DataId, DoraNode}; use rand::Rng; fn main() -> eyre::Result<()> { - let output = DataId::from("random".to_owned()); + let latency = DataId::from("latency".to_owned()); + let throughput = DataId::from("throughput".to_owned()); let mut node = DoraNode::init_from_env()?; let sizes = [ @@ -18,13 +21,31 @@ fn main() -> eyre::Result<()> { 1000 * 4096, 10000 * 4096, ]; + + // test latency first + for size in sizes { + for _ in 0..100 { + let data: Vec = rand::thread_rng() + .sample_iter(rand::distributions::Standard) + .take(size) + .collect(); + node.send_output(&latency, Default::default(), data.len(), |out| { + out.copy_from_slice(&data); + })?; + + // sleep a bit to avoid queue buildup + std::thread::sleep(Duration::from_millis(10)); + } + } + + // then throughput with full speed for size in sizes { for _ in 0..100 { let data: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) .take(size) .collect(); - node.send_output(&output, Default::default(), data.len(), |out| { + node.send_output(&throughput, Default::default(), data.len(), |out| { out.copy_from_slice(&data); })?; } diff --git a/examples/benchmark/run.rs b/examples/benchmark/run.rs index d9790f52..b1b5ee2f 100644 --- a/examples/benchmark/run.rs +++ b/examples/benchmark/run.rs @@ -11,11 +11,21 @@ async fn main() -> eyre::Result<()> { std::env::set_current_dir(root.join(file!()).parent().unwrap()) .wrap_err("failed to set working dir")?; - let dataflow = Path::new("dataflow.yml"); - build_dataflow(dataflow).await?; + let dataflow_zenoh = Path::new("dataflow-zenoh.yml"); + build_dataflow(dataflow_zenoh).await?; + let dataflow_iceoryx = Path::new("dataflow-iceoryx.yml"); + build_dataflow(dataflow_iceoryx).await?; + + println!("ZENOH:"); + dora_coordinator::run(dora_coordinator::Args { + run_dataflow: dataflow_zenoh.to_owned().into(), + runtime: None, + }) + .await?; + println!("\n\nICEORYX:"); dora_coordinator::run(dora_coordinator::Args { - run_dataflow: dataflow.to_owned().into(), + run_dataflow: dataflow_zenoh.to_owned().into(), runtime: None, }) .await?; diff --git a/examples/benchmark/sink/src/main.rs b/examples/benchmark/sink/src/main.rs index 58561705..9ba7f857 100644 --- a/examples/benchmark/sink/src/main.rs +++ b/examples/benchmark/sink/src/main.rs @@ -8,42 +8,56 @@ fn main() -> eyre::Result<()> { let mut node = DoraNode::init_from_env()?; let inputs = node.inputs()?; + // latency is tested first + let mut latency = true; + let mut current_size = 0; let mut n = 0; let mut start = Instant::now(); let mut latencies = Vec::new(); let mut summary = String::new(); + println!("Latency:"); + writeln!(summary, "Latency:")?; while let Ok(input) = inputs.recv() { - match input.id.as_str() { - "message" => { - let data = input.data(); + let data = input.data(); - if data.len() != current_size { - if n > 0 { - record_results(start, current_size, n, latencies, &mut summary); - } - current_size = data.len(); - n = 0; - start = Instant::now(); - latencies = Vec::new(); + // check if new size bracket + if data.len() != current_size { + if n > 0 { + record_results(start, current_size, n, latencies, &mut summary, latency); + } + current_size = data.len(); + n = 0; + start = Instant::now(); + latencies = Vec::new(); + } + + match input.id.as_str() { + "latency" => {} + "throughput" => { + if latency { + latency = false; + println!("Throughput:"); + writeln!(summary, "Throughput:")?; } - n += 1; - latencies.push( - input - .metadata - .timestamp() - .get_time() - .to_system_time() - .elapsed()?, - ); } other => eprintln!("Ignoring unexpected input `{other}`"), } + + n += 1; + latencies.push( + input + .metadata + .timestamp() + .get_time() + .to_system_time() + .elapsed()?, + ); } - record_results(start, current_size, n, latencies, &mut summary); + record_results(start, current_size, n, latencies, &mut summary, latency); println!("\nSummary:\n{summary}"); @@ -56,12 +70,16 @@ fn record_results( n: u32, latencies: Vec, summary: &mut String, + latency: bool, ) { - let duration = start.elapsed(); - let per_message = duration / n; - let avg_latency = latencies.iter().sum::() / n; - let msg = - format!("size {current_size:<#8x}: {per_message:?} per message (latency: {avg_latency:?})"); + let msg = if latency { + let avg_latency = latencies.iter().sum::() / n; + format!("size {current_size:<#8x}: {avg_latency:?}") + } else { + let duration = start.elapsed(); + let msg_per_sec = n as f64 / duration.as_secs_f64(); + format!("size {current_size:<#8x}: {msg_per_sec:.0} messages per second") + }; println!("{msg}"); writeln!(summary, "{msg}").unwrap(); }