| @@ -0,0 +1,19 @@ | |||
| communication: | |||
| iceoryx: | |||
| app_name_prefix: benchmark-example | |||
| nodes: | |||
| - id: rust-node | |||
| custom: | |||
| build: cargo build -p benchmark-example-node --release | |||
| source: ../../target/release/benchmark-example-node | |||
| outputs: | |||
| - latency | |||
| - throughput | |||
| - id: rust-sink | |||
| custom: | |||
| build: cargo build -p benchmark-example-sink --release | |||
| source: ../../target/release/benchmark-example-sink | |||
| inputs: | |||
| latency: rust-node/latency | |||
| throughput: rust-node/throughput | |||
| @@ -8,11 +8,12 @@ nodes: | |||
| build: cargo build -p benchmark-example-node --release | |||
| source: ../../target/release/benchmark-example-node | |||
| outputs: | |||
| - random | |||
| - latency | |||
| - throughput | |||
| - id: rust-sink | |||
| custom: | |||
| build: cargo build -p benchmark-example-sink --release | |||
| source: ../../target/release/benchmark-example-sink | |||
| inputs: | |||
| # message: runtime-node/rust-operator/status | |||
| message: rust-node/random | |||
| latency: rust-node/latency | |||
| throughput: rust-node/throughput | |||
| @@ -1,8 +1,11 @@ | |||
| use std::time::Duration; | |||
| use dora_node_api::{self, dora_core::config::DataId, DoraNode}; | |||
| use rand::Rng; | |||
| fn main() -> eyre::Result<()> { | |||
| let output = DataId::from("random".to_owned()); | |||
| let latency = DataId::from("latency".to_owned()); | |||
| let throughput = DataId::from("throughput".to_owned()); | |||
| let mut node = DoraNode::init_from_env()?; | |||
| let sizes = [ | |||
| @@ -18,13 +21,31 @@ fn main() -> eyre::Result<()> { | |||
| 1000 * 4096, | |||
| 10000 * 4096, | |||
| ]; | |||
| // test latency first | |||
| for size in sizes { | |||
| for _ in 0..100 { | |||
| let data: Vec<u8> = rand::thread_rng() | |||
| .sample_iter(rand::distributions::Standard) | |||
| .take(size) | |||
| .collect(); | |||
| node.send_output(&latency, Default::default(), data.len(), |out| { | |||
| out.copy_from_slice(&data); | |||
| })?; | |||
| // sleep a bit to avoid queue buildup | |||
| std::thread::sleep(Duration::from_millis(10)); | |||
| } | |||
| } | |||
| // then throughput with full speed | |||
| for size in sizes { | |||
| for _ in 0..100 { | |||
| let data: Vec<u8> = rand::thread_rng() | |||
| .sample_iter(rand::distributions::Standard) | |||
| .take(size) | |||
| .collect(); | |||
| node.send_output(&output, Default::default(), data.len(), |out| { | |||
| node.send_output(&throughput, Default::default(), data.len(), |out| { | |||
| out.copy_from_slice(&data); | |||
| })?; | |||
| } | |||
| @@ -11,11 +11,21 @@ async fn main() -> eyre::Result<()> { | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| build_dataflow(dataflow).await?; | |||
| let dataflow_zenoh = Path::new("dataflow-zenoh.yml"); | |||
| build_dataflow(dataflow_zenoh).await?; | |||
| let dataflow_iceoryx = Path::new("dataflow-iceoryx.yml"); | |||
| build_dataflow(dataflow_iceoryx).await?; | |||
| println!("ZENOH:"); | |||
| dora_coordinator::run(dora_coordinator::Args { | |||
| run_dataflow: dataflow_zenoh.to_owned().into(), | |||
| runtime: None, | |||
| }) | |||
| .await?; | |||
| println!("\n\nICEORYX:"); | |||
| dora_coordinator::run(dora_coordinator::Args { | |||
| run_dataflow: dataflow.to_owned().into(), | |||
| run_dataflow: dataflow_zenoh.to_owned().into(), | |||
| runtime: None, | |||
| }) | |||
| .await?; | |||
| @@ -8,42 +8,56 @@ fn main() -> eyre::Result<()> { | |||
| let mut node = DoraNode::init_from_env()?; | |||
| let inputs = node.inputs()?; | |||
| // latency is tested first | |||
| let mut latency = true; | |||
| let mut current_size = 0; | |||
| let mut n = 0; | |||
| let mut start = Instant::now(); | |||
| let mut latencies = Vec::new(); | |||
| let mut summary = String::new(); | |||
| println!("Latency:"); | |||
| writeln!(summary, "Latency:")?; | |||
| while let Ok(input) = inputs.recv() { | |||
| match input.id.as_str() { | |||
| "message" => { | |||
| let data = input.data(); | |||
| let data = input.data(); | |||
| if data.len() != current_size { | |||
| if n > 0 { | |||
| record_results(start, current_size, n, latencies, &mut summary); | |||
| } | |||
| current_size = data.len(); | |||
| n = 0; | |||
| start = Instant::now(); | |||
| latencies = Vec::new(); | |||
| // check if new size bracket | |||
| if data.len() != current_size { | |||
| if n > 0 { | |||
| record_results(start, current_size, n, latencies, &mut summary, latency); | |||
| } | |||
| current_size = data.len(); | |||
| n = 0; | |||
| start = Instant::now(); | |||
| latencies = Vec::new(); | |||
| } | |||
| match input.id.as_str() { | |||
| "latency" => {} | |||
| "throughput" => { | |||
| if latency { | |||
| latency = false; | |||
| println!("Throughput:"); | |||
| writeln!(summary, "Throughput:")?; | |||
| } | |||
| n += 1; | |||
| latencies.push( | |||
| input | |||
| .metadata | |||
| .timestamp() | |||
| .get_time() | |||
| .to_system_time() | |||
| .elapsed()?, | |||
| ); | |||
| } | |||
| other => eprintln!("Ignoring unexpected input `{other}`"), | |||
| } | |||
| n += 1; | |||
| latencies.push( | |||
| input | |||
| .metadata | |||
| .timestamp() | |||
| .get_time() | |||
| .to_system_time() | |||
| .elapsed()?, | |||
| ); | |||
| } | |||
| record_results(start, current_size, n, latencies, &mut summary); | |||
| record_results(start, current_size, n, latencies, &mut summary, latency); | |||
| println!("\nSummary:\n{summary}"); | |||
| @@ -56,12 +70,16 @@ fn record_results( | |||
| n: u32, | |||
| latencies: Vec<Duration>, | |||
| summary: &mut String, | |||
| latency: bool, | |||
| ) { | |||
| let duration = start.elapsed(); | |||
| let per_message = duration / n; | |||
| let avg_latency = latencies.iter().sum::<Duration>() / n; | |||
| let msg = | |||
| format!("size {current_size:<#8x}: {per_message:?} per message (latency: {avg_latency:?})"); | |||
| let msg = if latency { | |||
| let avg_latency = latencies.iter().sum::<Duration>() / n; | |||
| format!("size {current_size:<#8x}: {avg_latency:?}") | |||
| } else { | |||
| let duration = start.elapsed(); | |||
| let msg_per_sec = n as f64 / duration.as_secs_f64(); | |||
| format!("size {current_size:<#8x}: {msg_per_sec:.0} messages per second") | |||
| }; | |||
| println!("{msg}"); | |||
| writeln!(summary, "{msg}").unwrap(); | |||
| } | |||