Compare commits

...

6 Commits

Author SHA1 Message Date
  Philipp Oppermann 6ab0defb81
Fix: Actually run the iceoryx dataflow 3 years ago
  Philipp Oppermann 7bf362e4cd
Ignore late latency messages 3 years ago
  Philipp Oppermann fff4d27b57
Fix: wait a bit before switching to throughput test 3 years ago
  Philipp Oppermann 740b526ea1
Remove summary printing 3 years ago
  Philipp Oppermann 6dc0d58c8d
Reword benchmark example for more realistic latency results 3 years ago
  Philipp Oppermann fce3046412
Create a node API benchmark based on zenoh 3 years ago
9 changed files with 281 additions and 0 deletions
Unified View
  1. +21
    -0
      Cargo.lock
  2. +7
    -0
      Cargo.toml
  3. +19
    -0
      examples/benchmark/dataflow-iceoryx.yml
  4. +19
    -0
      examples/benchmark/dataflow-zenoh.yml
  5. +13
    -0
      examples/benchmark/node/Cargo.toml
  6. +58
    -0
      examples/benchmark/node/src/main.rs
  7. +57
    -0
      examples/benchmark/run.rs
  8. +10
    -0
      examples/benchmark/sink/Cargo.toml
  9. +77
    -0
      examples/benchmark/sink/src/main.rs

+ 21
- 0
Cargo.lock View File

@@ -272,6 +272,25 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c" checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c"


[[package]]
name = "benchmark-example-node"
version = "0.1.2"
dependencies = [
"dora-node-api",
"eyre",
"futures",
"rand",
"tokio",
]

[[package]]
name = "benchmark-example-sink"
version = "0.1.2"
dependencies = [
"dora-node-api",
"eyre",
]

[[package]] [[package]]
name = "bincode" name = "bincode"
version = "1.3.3" version = "1.3.3"
@@ -971,6 +990,8 @@ dependencies = [
"dunce", "dunce",
"eyre", "eyre",
"tokio", "tokio",
"tracing",
"tracing-subscriber",
] ]


[[package]] [[package]]


+ 7
- 0
Cargo.toml View File

@@ -10,6 +10,7 @@ members = [
"binaries/*", "binaries/*",
"examples/rust-dataflow/*", "examples/rust-dataflow/*",
"examples/iceoryx/*", "examples/iceoryx/*",
"examples/benchmark/*",
"libraries/communication-layer/*", "libraries/communication-layer/*",
"libraries/core", "libraries/core",
"libraries/message", "libraries/message",
@@ -38,6 +39,8 @@ eyre = "0.6.8"
tokio = "1.20.1" tokio = "1.20.1"
dora-coordinator = { path = "binaries/coordinator" } dora-coordinator = { path = "binaries/coordinator" }
dunce = "1.0.2" dunce = "1.0.2"
tracing = "0.1.36"
tracing-subscriber = "0.3.15"


[[example]] [[example]]
name = "c-dataflow" name = "c-dataflow"
@@ -62,3 +65,7 @@ path = "examples/python-dataflow/run.rs"
[[example]] [[example]]
name = "iceoryx" name = "iceoryx"
path = "examples/iceoryx/run.rs" path = "examples/iceoryx/run.rs"

[[example]]
name = "benchmark"
path = "examples/benchmark/run.rs"

+ 19
- 0
examples/benchmark/dataflow-iceoryx.yml View File

@@ -0,0 +1,19 @@
communication:
iceoryx:
app_name_prefix: benchmark-example

nodes:
- id: rust-node
custom:
build: cargo build -p benchmark-example-node --release
source: ../../target/release/benchmark-example-node
outputs:
- latency
- throughput
- id: rust-sink
custom:
build: cargo build -p benchmark-example-sink --release
source: ../../target/release/benchmark-example-sink
inputs:
latency: rust-node/latency
throughput: rust-node/throughput

+ 19
- 0
examples/benchmark/dataflow-zenoh.yml View File

@@ -0,0 +1,19 @@
communication:
zenoh:
prefix: /benchmark-example

nodes:
- id: rust-node
custom:
build: cargo build -p benchmark-example-node --release
source: ../../target/release/benchmark-example-node
outputs:
- latency
- throughput
- id: rust-sink
custom:
build: cargo build -p benchmark-example-sink --release
source: ../../target/release/benchmark-example-sink
inputs:
latency: rust-node/latency
throughput: rust-node/throughput

+ 13
- 0
examples/benchmark/node/Cargo.toml View File

@@ -0,0 +1,13 @@
[package]
name = "benchmark-example-node"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["zenoh", "iceoryx"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.20.1", features = ["rt", "macros"] }

+ 58
- 0
examples/benchmark/node/src/main.rs View File

@@ -0,0 +1,58 @@
use std::time::Duration;

use dora_node_api::{self, dora_core::config::DataId, DoraNode};
use rand::Rng;

fn main() -> eyre::Result<()> {
let latency = DataId::from("latency".to_owned());
let throughput = DataId::from("throughput".to_owned());

let mut node = DoraNode::init_from_env()?;
let sizes = [
0,
8,
64,
512,
2048,
4096,
4 * 4096,
10 * 4096,
100 * 4096,
// 1000 * 4096,
// 10000 * 4096,
];

// test latency first
for size in sizes {
for _ in 0..100 {
let data: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(size)
.collect();
node.send_output(&latency, Default::default(), data.len(), |out| {
out.copy_from_slice(&data);
})?;

// sleep a bit to avoid queue buildup
std::thread::sleep(Duration::from_millis(10));
}
}

// wait a bit to ensure that all throughput messages reached their target
std::thread::sleep(Duration::from_secs(2));

// then throughput with full speed
for size in sizes {
for _ in 0..100 {
let data: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(size)
.collect();
node.send_output(&throughput, Default::default(), data.len(), |out| {
out.copy_from_slice(&data);
})?;
}
}

Ok(())
}

+ 57
- 0
examples/benchmark/run.rs View File

@@ -0,0 +1,57 @@
use eyre::{bail, Context};
use std::path::Path;
use tracing::metadata::LevelFilter;
use tracing_subscriber::Layer;

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing().wrap_err("failed to set up tracing subscriber")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

let dataflow_zenoh = Path::new("dataflow-zenoh.yml");
build_dataflow(dataflow_zenoh).await?;

let dataflow_iceoryx = Path::new("dataflow-iceoryx.yml");
build_dataflow(dataflow_iceoryx).await?;

println!("ZENOH:");
dora_coordinator::run(dora_coordinator::Args {
run_dataflow: dataflow_zenoh.to_owned().into(),
runtime: None,
})
.await?;
println!("\n\nICEORYX:");
dora_coordinator::run(dora_coordinator::Args {
run_dataflow: dataflow_iceoryx.to_owned().into(),
runtime: None,
})
.await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

fn set_up_tracing() -> eyre::Result<()> {
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;

let stdout_log = tracing_subscriber::fmt::layer()
.pretty()
.with_filter(LevelFilter::DEBUG);
let subscriber = tracing_subscriber::Registry::default().with(stdout_log);
tracing::subscriber::set_global_default(subscriber)
.context("failed to set tracing global subscriber")
}

+ 10
- 0
examples/benchmark/sink/Cargo.toml View File

@@ -0,0 +1,10 @@
[package]
name = "benchmark-example-sink"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["zenoh", "iceoryx"] }
eyre = "0.6.8"

+ 77
- 0
examples/benchmark/sink/src/main.rs View File

@@ -0,0 +1,77 @@
use dora_node_api::{self, DoraNode};
use std::time::{Duration, Instant};

fn main() -> eyre::Result<()> {
let mut node = DoraNode::init_from_env()?;
let inputs = node.inputs()?;

// latency is tested first
let mut latency = true;

let mut current_size = 0;
let mut n = 0;
let mut start = Instant::now();
let mut latencies = Vec::new();

println!("Latency:");

while let Ok(input) = inputs.recv() {
let data = input.data();

// check if new size bracket
if data.len() != current_size {
if n > 0 {
record_results(start, current_size, n, latencies, latency);
}
current_size = data.len();
n = 0;
start = Instant::now();
latencies = Vec::new();
}

match input.id.as_str() {
"latency" if latency => {}
"throughput" if latency => {
latency = false;
println!("Throughput:");
}
"throughput" => {}
other => {
eprintln!("Ignoring unexpected input `{other}`");
continue;
}
}

n += 1;
latencies.push(
input
.metadata
.timestamp()
.get_time()
.to_system_time()
.elapsed()?,
);
}

record_results(start, current_size, n, latencies, latency);

Ok(())
}

fn record_results(
start: Instant,
current_size: usize,
n: u32,
latencies: Vec<Duration>,
latency: bool,
) {
let msg = if latency {
let avg_latency = latencies.iter().sum::<Duration>() / n;
format!("size {current_size:<#8x}: {avg_latency:?}")
} else {
let duration = start.elapsed();
let msg_per_sec = n as f64 / duration.as_secs_f64();
format!("size {current_size:<#8x}: {msg_per_sec:.0} messages per second")
};
println!("{msg}");
}

Loading…
Cancel
Save