Browse Source

Add a basic benchmark to test throughput and latency for different message sizes

tags/v0.2.0-candidate
Philipp Oppermann 3 years ago
parent
commit
19214c4829
Failed to extract signature
9 changed files with 202 additions and 0 deletions
  1. +5
    -0
      .github/workflows/ci.yml
  2. +19
    -0
      Cargo.lock
  3. +5
    -0
      Cargo.toml
  4. +18
    -0
      examples/benchmark/dataflow.yml
  5. +13
    -0
      examples/benchmark/node/Cargo.toml
  6. +21
    -0
      examples/benchmark/node/src/main.rs
  7. +43
    -0
      examples/benchmark/run.rs
  8. +10
    -0
      examples/benchmark/sink/Cargo.toml
  9. +68
    -0
      examples/benchmark/sink/src/main.rs

+ 5
- 0
.github/workflows/ci.yml View File

@@ -60,6 +60,11 @@ jobs:
- name: "Rust Dataflow example"
timeout-minutes: 30
run: cargo run --example rust-dataflow

- name: "Benchmark example"
timeout-minutes: 30
run: cargo run --example benchmark --release

- name: "C Dataflow example"
timeout-minutes: 15
run: cargo run --example c-dataflow


+ 19
- 0
Cargo.lock View File

@@ -272,6 +272,25 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c"

[[package]]
name = "benchmark-example-node"
version = "0.1.2"
dependencies = [
"dora-node-api",
"eyre",
"futures",
"rand",
"tokio",
]

[[package]]
name = "benchmark-example-sink"
version = "0.1.2"
dependencies = [
"dora-node-api",
"eyre",
]

[[package]]
name = "bincode"
version = "1.3.3"


+ 5
- 0
Cargo.toml View File

@@ -12,6 +12,7 @@ members = [
"binaries/daemon",
# "binaries/runtime",
"examples/rust-dataflow/*",
"examples/benchmark/*",
"libraries/communication-layer/*",
"libraries/core",
"libraries/message",
@@ -65,3 +66,7 @@ path = "examples/c++-dataflow/run.rs"
[[example]]
name = "python-dataflow"
path = "examples/python-dataflow/run.rs"

[[example]]
name = "benchmark"
path = "examples/benchmark/run.rs"

+ 18
- 0
examples/benchmark/dataflow.yml View File

@@ -0,0 +1,18 @@
communication:
zenoh:
prefix: /benchmark-example

nodes:
- id: rust-node
custom:
build: cargo build -p benchmark-example-node --release
source: ../../target/release/benchmark-example-node
outputs:
- random
- id: rust-sink
custom:
build: cargo build -p benchmark-example-sink --release
source: ../../target/release/benchmark-example-sink
inputs:
# message: runtime-node/rust-operator/status
message: rust-node/random

+ 13
- 0
examples/benchmark/node/Cargo.toml View File

@@ -0,0 +1,13 @@
[package]
name = "benchmark-example-node"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.20.1", features = ["rt", "macros"] }

+ 21
- 0
examples/benchmark/node/src/main.rs View File

@@ -0,0 +1,21 @@
use dora_node_api::{self, dora_core::config::DataId, DoraNode};
use rand::Rng;

fn main() -> eyre::Result<()> {
let output = DataId::from("random".to_owned());

let (mut node, _events) = DoraNode::init_from_env()?;
for size in [0, 8, 64, 512, 2048, 4096, 4 * 4096, 10 * 4096] {
for _ in 0..100 {
let data: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(size)
.collect();
node.send_output(output.clone(), Default::default(), data.len(), |out| {
out.copy_from_slice(&data);
})?;
}
}

Ok(())
}

+ 43
- 0
examples/benchmark/run.rs View File

@@ -0,0 +1,43 @@
use eyre::{bail, Context};
use std::path::Path;
use tracing::metadata::LevelFilter;
use tracing_subscriber::Layer;

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing().wrap_err("failed to set up tracing subscriber")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

dora_daemon::Daemon::run_dataflow(dataflow).await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

fn set_up_tracing() -> eyre::Result<()> {
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;

let stdout_log = tracing_subscriber::fmt::layer()
.pretty()
.with_filter(LevelFilter::DEBUG);
let subscriber = tracing_subscriber::Registry::default().with(stdout_log);
tracing::subscriber::set_global_default(subscriber)
.context("failed to set tracing global subscriber")
}

+ 10
- 0
examples/benchmark/sink/Cargo.toml View File

@@ -0,0 +1,10 @@
[package]
name = "benchmark-example-sink"
version.workspace = true
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true }
eyre = "0.6.8"

+ 68
- 0
examples/benchmark/sink/src/main.rs View File

@@ -0,0 +1,68 @@
use dora_node_api::{self, daemon::Event, DoraNode};
use eyre::ContextCompat;
use std::{
fmt::Write as _,
time::{Duration, Instant},
};

fn main() -> eyre::Result<()> {
let (_node, mut events) = DoraNode::init_from_env()?;

let mut current_size = 0;
let mut n = 0;
let mut start = Instant::now();
let mut latencies = Vec::new();

let mut summary = String::new();

while let Some(event) = events.recv() {
match event {
Event::Stop => break,
Event::Input { id, metadata, data } => match id.as_str() {
"message" => {
let data = data.as_deref().unwrap_or_default();

if data.len() != current_size {
if n > 0 {
record_results(start, current_size, n, latencies, &mut summary);
}
current_size = data.len();
n = 0;
start = Instant::now();
latencies = Vec::new();
}
n += 1;
latencies.push(metadata.timestamp().get_time().to_system_time().elapsed()?);
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::InputClosed { id } => {
println!("Input `{id}` was closed -> exiting");
break;
}
other => eprintln!("Received unexpected input: {other:?}"),
}
}

record_results(start, current_size, n, latencies, &mut summary);

println!("\nSummary:\n{summary}");

Ok(())
}

fn record_results(
start: Instant,
current_size: usize,
n: u32,
latencies: Vec<Duration>,
summary: &mut String,
) {
let duration = start.elapsed();
let per_message = duration / n;
let avg_latency = latencies.iter().sum::<Duration>() / n;
let msg =
format!("size {current_size:<#8x}: {per_message:?} per message (latency: {avg_latency:?})");
println!("{msg}");
writeln!(summary, "{msg}").unwrap();
}

Loading…
Cancel
Save