| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
6ab0defb81
|
Fix: Actually run the iceoryx dataflow | 3 years ago |
|
|
7bf362e4cd
|
Ignore late latency messages | 3 years ago |
|
|
fff4d27b57
|
Fix: wait a bit before switching to throughput test | 3 years ago |
|
|
740b526ea1
|
Remove summary printing | 3 years ago |
|
|
6dc0d58c8d
|
Reword benchmark example for more realistic latency results | 3 years ago |
|
|
fce3046412
|
Create a node API benchmark based on zenoh | 3 years ago |
| @@ -272,6 +272,25 @@ version = "1.1.1" | |||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c" | checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c" | ||||
| [[package]] | |||||
| name = "benchmark-example-node" | |||||
| version = "0.1.2" | |||||
| dependencies = [ | |||||
| "dora-node-api", | |||||
| "eyre", | |||||
| "futures", | |||||
| "rand", | |||||
| "tokio", | |||||
| ] | |||||
| [[package]] | |||||
| name = "benchmark-example-sink" | |||||
| version = "0.1.2" | |||||
| dependencies = [ | |||||
| "dora-node-api", | |||||
| "eyre", | |||||
| ] | |||||
| [[package]] | [[package]] | ||||
| name = "bincode" | name = "bincode" | ||||
| version = "1.3.3" | version = "1.3.3" | ||||
| @@ -971,6 +990,8 @@ dependencies = [ | |||||
| "dunce", | "dunce", | ||||
| "eyre", | "eyre", | ||||
| "tokio", | "tokio", | ||||
| "tracing", | |||||
| "tracing-subscriber", | |||||
| ] | ] | ||||
| [[package]] | [[package]] | ||||
| @@ -10,6 +10,7 @@ members = [ | |||||
| "binaries/*", | "binaries/*", | ||||
| "examples/rust-dataflow/*", | "examples/rust-dataflow/*", | ||||
| "examples/iceoryx/*", | "examples/iceoryx/*", | ||||
| "examples/benchmark/*", | |||||
| "libraries/communication-layer/*", | "libraries/communication-layer/*", | ||||
| "libraries/core", | "libraries/core", | ||||
| "libraries/message", | "libraries/message", | ||||
| @@ -38,6 +39,8 @@ eyre = "0.6.8" | |||||
| tokio = "1.20.1" | tokio = "1.20.1" | ||||
| dora-coordinator = { path = "binaries/coordinator" } | dora-coordinator = { path = "binaries/coordinator" } | ||||
| dunce = "1.0.2" | dunce = "1.0.2" | ||||
| tracing = "0.1.36" | |||||
| tracing-subscriber = "0.3.15" | |||||
| [[example]] | [[example]] | ||||
| name = "c-dataflow" | name = "c-dataflow" | ||||
| @@ -62,3 +65,7 @@ path = "examples/python-dataflow/run.rs" | |||||
| [[example]] | [[example]] | ||||
| name = "iceoryx" | name = "iceoryx" | ||||
| path = "examples/iceoryx/run.rs" | path = "examples/iceoryx/run.rs" | ||||
| [[example]] | |||||
| name = "benchmark" | |||||
| path = "examples/benchmark/run.rs" | |||||
| @@ -0,0 +1,19 @@ | |||||
| communication: | |||||
| iceoryx: | |||||
| app_name_prefix: benchmark-example | |||||
| nodes: | |||||
| - id: rust-node | |||||
| custom: | |||||
| build: cargo build -p benchmark-example-node --release | |||||
| source: ../../target/release/benchmark-example-node | |||||
| outputs: | |||||
| - latency | |||||
| - throughput | |||||
| - id: rust-sink | |||||
| custom: | |||||
| build: cargo build -p benchmark-example-sink --release | |||||
| source: ../../target/release/benchmark-example-sink | |||||
| inputs: | |||||
| latency: rust-node/latency | |||||
| throughput: rust-node/throughput | |||||
| @@ -0,0 +1,19 @@ | |||||
| communication: | |||||
| zenoh: | |||||
| prefix: /benchmark-example | |||||
| nodes: | |||||
| - id: rust-node | |||||
| custom: | |||||
| build: cargo build -p benchmark-example-node --release | |||||
| source: ../../target/release/benchmark-example-node | |||||
| outputs: | |||||
| - latency | |||||
| - throughput | |||||
| - id: rust-sink | |||||
| custom: | |||||
| build: cargo build -p benchmark-example-sink --release | |||||
| source: ../../target/release/benchmark-example-sink | |||||
| inputs: | |||||
| latency: rust-node/latency | |||||
| throughput: rust-node/throughput | |||||
| @@ -0,0 +1,13 @@ | |||||
| [package] | |||||
| name = "benchmark-example-node" | |||||
| version.workspace = true | |||||
| edition = "2021" | |||||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||||
| [dependencies] | |||||
| dora-node-api = { workspace = true, features = ["zenoh", "iceoryx"] } | |||||
| eyre = "0.6.8" | |||||
| futures = "0.3.21" | |||||
| rand = "0.8.5" | |||||
| tokio = { version = "1.20.1", features = ["rt", "macros"] } | |||||
| @@ -0,0 +1,58 @@ | |||||
| use std::time::Duration; | |||||
| use dora_node_api::{self, dora_core::config::DataId, DoraNode}; | |||||
| use rand::Rng; | |||||
| fn main() -> eyre::Result<()> { | |||||
| let latency = DataId::from("latency".to_owned()); | |||||
| let throughput = DataId::from("throughput".to_owned()); | |||||
| let mut node = DoraNode::init_from_env()?; | |||||
| let sizes = [ | |||||
| 0, | |||||
| 8, | |||||
| 64, | |||||
| 512, | |||||
| 2048, | |||||
| 4096, | |||||
| 4 * 4096, | |||||
| 10 * 4096, | |||||
| 100 * 4096, | |||||
| // 1000 * 4096, | |||||
| // 10000 * 4096, | |||||
| ]; | |||||
| // test latency first | |||||
| for size in sizes { | |||||
| for _ in 0..100 { | |||||
| let data: Vec<u8> = rand::thread_rng() | |||||
| .sample_iter(rand::distributions::Standard) | |||||
| .take(size) | |||||
| .collect(); | |||||
| node.send_output(&latency, Default::default(), data.len(), |out| { | |||||
| out.copy_from_slice(&data); | |||||
| })?; | |||||
| // sleep a bit to avoid queue buildup | |||||
| std::thread::sleep(Duration::from_millis(10)); | |||||
| } | |||||
| } | |||||
| // wait a bit to ensure that all throughput messages reached their target | |||||
| std::thread::sleep(Duration::from_secs(2)); | |||||
| // then throughput with full speed | |||||
| for size in sizes { | |||||
| for _ in 0..100 { | |||||
| let data: Vec<u8> = rand::thread_rng() | |||||
| .sample_iter(rand::distributions::Standard) | |||||
| .take(size) | |||||
| .collect(); | |||||
| node.send_output(&throughput, Default::default(), data.len(), |out| { | |||||
| out.copy_from_slice(&data); | |||||
| })?; | |||||
| } | |||||
| } | |||||
| Ok(()) | |||||
| } | |||||
| @@ -0,0 +1,57 @@ | |||||
| use eyre::{bail, Context}; | |||||
| use std::path::Path; | |||||
| use tracing::metadata::LevelFilter; | |||||
| use tracing_subscriber::Layer; | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| set_up_tracing().wrap_err("failed to set up tracing subscriber")?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||||
| .wrap_err("failed to set working dir")?; | |||||
| let dataflow_zenoh = Path::new("dataflow-zenoh.yml"); | |||||
| build_dataflow(dataflow_zenoh).await?; | |||||
| let dataflow_iceoryx = Path::new("dataflow-iceoryx.yml"); | |||||
| build_dataflow(dataflow_iceoryx).await?; | |||||
| println!("ZENOH:"); | |||||
| dora_coordinator::run(dora_coordinator::Args { | |||||
| run_dataflow: dataflow_zenoh.to_owned().into(), | |||||
| runtime: None, | |||||
| }) | |||||
| .await?; | |||||
| println!("\n\nICEORYX:"); | |||||
| dora_coordinator::run(dora_coordinator::Args { | |||||
| run_dataflow: dataflow_iceoryx.to_owned().into(), | |||||
| runtime: None, | |||||
| }) | |||||
| .await?; | |||||
| Ok(()) | |||||
| } | |||||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--").arg("build").arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to build dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| } | |||||
| fn set_up_tracing() -> eyre::Result<()> { | |||||
| use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; | |||||
| let stdout_log = tracing_subscriber::fmt::layer() | |||||
| .pretty() | |||||
| .with_filter(LevelFilter::DEBUG); | |||||
| let subscriber = tracing_subscriber::Registry::default().with(stdout_log); | |||||
| tracing::subscriber::set_global_default(subscriber) | |||||
| .context("failed to set tracing global subscriber") | |||||
| } | |||||
| @@ -0,0 +1,10 @@ | |||||
| [package] | |||||
| name = "benchmark-example-sink" | |||||
| version.workspace = true | |||||
| edition = "2021" | |||||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||||
| [dependencies] | |||||
| dora-node-api = { workspace = true, features = ["zenoh", "iceoryx"] } | |||||
| eyre = "0.6.8" | |||||
| @@ -0,0 +1,77 @@ | |||||
| use dora_node_api::{self, DoraNode}; | |||||
| use std::time::{Duration, Instant}; | |||||
| fn main() -> eyre::Result<()> { | |||||
| let mut node = DoraNode::init_from_env()?; | |||||
| let inputs = node.inputs()?; | |||||
| // latency is tested first | |||||
| let mut latency = true; | |||||
| let mut current_size = 0; | |||||
| let mut n = 0; | |||||
| let mut start = Instant::now(); | |||||
| let mut latencies = Vec::new(); | |||||
| println!("Latency:"); | |||||
| while let Ok(input) = inputs.recv() { | |||||
| let data = input.data(); | |||||
| // check if new size bracket | |||||
| if data.len() != current_size { | |||||
| if n > 0 { | |||||
| record_results(start, current_size, n, latencies, latency); | |||||
| } | |||||
| current_size = data.len(); | |||||
| n = 0; | |||||
| start = Instant::now(); | |||||
| latencies = Vec::new(); | |||||
| } | |||||
| match input.id.as_str() { | |||||
| "latency" if latency => {} | |||||
| "throughput" if latency => { | |||||
| latency = false; | |||||
| println!("Throughput:"); | |||||
| } | |||||
| "throughput" => {} | |||||
| other => { | |||||
| eprintln!("Ignoring unexpected input `{other}`"); | |||||
| continue; | |||||
| } | |||||
| } | |||||
| n += 1; | |||||
| latencies.push( | |||||
| input | |||||
| .metadata | |||||
| .timestamp() | |||||
| .get_time() | |||||
| .to_system_time() | |||||
| .elapsed()?, | |||||
| ); | |||||
| } | |||||
| record_results(start, current_size, n, latencies, latency); | |||||
| Ok(()) | |||||
| } | |||||
| fn record_results( | |||||
| start: Instant, | |||||
| current_size: usize, | |||||
| n: u32, | |||||
| latencies: Vec<Duration>, | |||||
| latency: bool, | |||||
| ) { | |||||
| let msg = if latency { | |||||
| let avg_latency = latencies.iter().sum::<Duration>() / n; | |||||
| format!("size {current_size:<#8x}: {avg_latency:?}") | |||||
| } else { | |||||
| let duration = start.elapsed(); | |||||
| let msg_per_sec = n as f64 / duration.as_secs_f64(); | |||||
| format!("size {current_size:<#8x}: {msg_per_sec:.0} messages per second") | |||||
| }; | |||||
| println!("{msg}"); | |||||
| } | |||||