Browse Source

Create a node API benchmark based on zenoh

benchmark
Philipp Oppermann 3 years ago
parent
commit
fce3046412
Failed to extract signature
8 changed files with 217 additions and 0 deletions
  1. +21
    -0
      Cargo.lock
  2. +7
    -0
      Cargo.toml
  3. +18
    -0
      examples/benchmark/dataflow.yml
  4. +13
    -0
      examples/benchmark/node/Cargo.toml
  5. +34
    -0
      examples/benchmark/node/src/main.rs
  6. +47
    -0
      examples/benchmark/run.rs
  7. +10
    -0
      examples/benchmark/sink/Cargo.toml
  8. +67
    -0
      examples/benchmark/sink/src/main.rs

+ 21
- 0
Cargo.lock View File

@@ -272,6 +272,25 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c"

[[package]]
name = "benchmark-example-node"
version = "0.1.2"
dependencies = [
"dora-node-api",
"eyre",
"futures",
"rand",
"tokio",
]

[[package]]
name = "benchmark-example-sink"
version = "0.1.2"
dependencies = [
"dora-node-api",
"eyre",
]

[[package]]
name = "bincode"
version = "1.3.3"
@@ -971,6 +990,8 @@ dependencies = [
"dunce",
"eyre",
"tokio",
"tracing",
"tracing-subscriber",
]

[[package]]


+ 7
- 0
Cargo.toml View File

@@ -10,6 +10,7 @@ members = [
"binaries/*",
"examples/rust-dataflow/*",
"examples/iceoryx/*",
"examples/benchmark/*",
"libraries/communication-layer/*",
"libraries/core",
"libraries/message",
@@ -38,6 +39,8 @@ eyre = "0.6.8"
tokio = "1.20.1"
dora-coordinator = { path = "binaries/coordinator" }
dunce = "1.0.2"
tracing = "0.1.36"
tracing-subscriber = "0.3.15"

[[example]]
name = "c-dataflow"
@@ -62,3 +65,7 @@ path = "examples/python-dataflow/run.rs"
[[example]]
name = "iceoryx"
path = "examples/iceoryx/run.rs"

[[example]]
name = "benchmark"
path = "examples/benchmark/run.rs"

+ 18
- 0
examples/benchmark/dataflow.yml View File

@@ -0,0 +1,18 @@
communication:
zenoh:
prefix: /benchmark-example

nodes:
- id: rust-node
custom:
build: cargo build -p benchmark-example-node --release
source: ../../target/release/benchmark-example-node
outputs:
- random
- id: rust-sink
custom:
build: cargo build -p benchmark-example-sink --release
source: ../../target/release/benchmark-example-sink
inputs:
# message: runtime-node/rust-operator/status
message: rust-node/random

+ 13
- 0
examples/benchmark/node/Cargo.toml View File

@@ -0,0 +1,13 @@
[package]
name = "benchmark-example-node"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["zenoh", "iceoryx"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.20.1", features = ["rt", "macros"] }

+ 34
- 0
examples/benchmark/node/src/main.rs View File

@@ -0,0 +1,34 @@
use dora_node_api::{self, dora_core::config::DataId, DoraNode};
use rand::Rng;

fn main() -> eyre::Result<()> {
let output = DataId::from("random".to_owned());

let mut node = DoraNode::init_from_env()?;
let sizes = [
0,
8,
64,
512,
2048,
4096,
4 * 4096,
10 * 4096,
100 * 4096,
1000 * 4096,
10000 * 4096,
];
for size in sizes {
for _ in 0..100 {
let data: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(size)
.collect();
node.send_output(&output, Default::default(), data.len(), |out| {
out.copy_from_slice(&data);
})?;
}
}

Ok(())
}

+ 47
- 0
examples/benchmark/run.rs View File

@@ -0,0 +1,47 @@
use eyre::{bail, Context};
use std::path::Path;
use tracing::metadata::LevelFilter;
use tracing_subscriber::Layer;

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing().wrap_err("failed to set up tracing subscriber")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

dora_coordinator::run(dora_coordinator::Args {
run_dataflow: dataflow.to_owned().into(),
runtime: None,
})
.await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

fn set_up_tracing() -> eyre::Result<()> {
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;

let stdout_log = tracing_subscriber::fmt::layer()
.pretty()
.with_filter(LevelFilter::DEBUG);
let subscriber = tracing_subscriber::Registry::default().with(stdout_log);
tracing::subscriber::set_global_default(subscriber)
.context("failed to set tracing global subscriber")
}

+ 10
- 0
examples/benchmark/sink/Cargo.toml View File

@@ -0,0 +1,10 @@
[package]
name = "benchmark-example-sink"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["zenoh", "iceoryx"] }
eyre = "0.6.8"

+ 67
- 0
examples/benchmark/sink/src/main.rs View File

@@ -0,0 +1,67 @@
use dora_node_api::{self, DoraNode};
use std::{
fmt::Write as _,
time::{Duration, Instant},
};

fn main() -> eyre::Result<()> {
let mut node = DoraNode::init_from_env()?;
let inputs = node.inputs()?;

let mut current_size = 0;
let mut n = 0;
let mut start = Instant::now();
let mut latencies = Vec::new();

let mut summary = String::new();

while let Ok(input) = inputs.recv() {
match input.id.as_str() {
"message" => {
let data = input.data();

if data.len() != current_size {
if n > 0 {
record_results(start, current_size, n, latencies, &mut summary);
}
current_size = data.len();
n = 0;
start = Instant::now();
latencies = Vec::new();
}
n += 1;
latencies.push(
input
.metadata
.timestamp()
.get_time()
.to_system_time()
.elapsed()?,
);
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

record_results(start, current_size, n, latencies, &mut summary);

println!("\nSummary:\n{summary}");

Ok(())
}

fn record_results(
start: Instant,
current_size: usize,
n: u32,
latencies: Vec<Duration>,
summary: &mut String,
) {
let duration = start.elapsed();
let per_message = duration / n;
let avg_latency = latencies.iter().sum::<Duration>() / n;
let msg =
format!("size {current_size:<#8x}: {per_message:?} per message (latency: {avg_latency:?})");
println!("{msg}");
writeln!(summary, "{msg}").unwrap();
}

Loading…
Cancel
Save