From 73bd73c12ae0be15eb01a14fd1d9196ff8df7b79 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 5 Dec 2024 13:35:11 +0100 Subject: [PATCH] Add test for queue latency --- Cargo.lock | 13 +++++- Cargo.toml | 1 + .../dataflow.yaml | 14 ++++++ .../receive_data.py | 22 ++++++++++ .../send_data.py | 10 +++++ .../queue_size_latest_data_rust/dataflow.yaml | 15 +++++++ .../receive_data/Cargo.toml | 14 ++++++ .../receive_data/README.md | 3 ++ .../receive_data/src/main.rs | 43 +++++++++++++++++++ 9 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 tests/queue_size_latest_data_python/dataflow.yaml create mode 100644 tests/queue_size_latest_data_python/receive_data.py create mode 100644 tests/queue_size_latest_data_python/send_data.py create mode 100644 tests/queue_size_latest_data_rust/dataflow.yaml create mode 100644 tests/queue_size_latest_data_rust/receive_data/Cargo.toml create mode 100644 tests/queue_size_latest_data_rust/receive_data/README.md create mode 100644 tests/queue_size_latest_data_rust/receive_data/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 6e8de4dc..e907e789 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1380,9 +1380,9 @@ checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" [[package]] name = "bytemuck" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" +checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" dependencies = [ "bytemuck_derive", ] @@ -8603,6 +8603,15 @@ dependencies = [ "tungstenite 0.20.1", ] +[[package]] +name = "receive_data" +version = "0.3.7" +dependencies = [ + "chrono", + "dora-node-api", + "eyre", +] + [[package]] name = "redox_syscall" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index e10dbfee..2c5baf4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ "libraries/extensions/ros2-bridge", "libraries/extensions/ros2-bridge/msg-gen", "libraries/extensions/ros2-bridge/python", + "tests/queue_size_latest_data_rust/receive_data", ] [workspace.package] diff --git a/tests/queue_size_latest_data_python/dataflow.yaml b/tests/queue_size_latest_data_python/dataflow.yaml new file mode 100644 index 00000000..fa44bdc4 --- /dev/null +++ b/tests/queue_size_latest_data_python/dataflow.yaml @@ -0,0 +1,14 @@ +nodes: + - id: send_data + path: ./send_data.py + inputs: + tick: dora/timer/millis/20 + outputs: + - data + + - id: receive_data_with_sleep + path: ./receive_data.py + inputs: + tick: + source: send_data/data + queue_size: 1 diff --git a/tests/queue_size_latest_data_python/receive_data.py b/tests/queue_size_latest_data_python/receive_data.py new file mode 100644 index 00000000..78b4346b --- /dev/null +++ b/tests/queue_size_latest_data_python/receive_data.py @@ -0,0 +1,22 @@ +from dora import Node +import time + + +node = Node() + +# Voluntarily sleep for 5 seconds to ensure that the node is dropping the oldest input +time.sleep(5) + +for event in node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + send_time = event["value"][0].as_py() + + duration = (time.clock_gettime_ns(0) - send_time) / 1_000_000_000 + print("Duration: ", duration) + assert ( + duration < 1.2 + ), f"Duration: {duration} should be less than 1 as we should always pull latest data." + time.sleep(1) diff --git a/tests/queue_size_latest_data_python/send_data.py b/tests/queue_size_latest_data_python/send_data.py new file mode 100644 index 00000000..b46b3fa6 --- /dev/null +++ b/tests/queue_size_latest_data_python/send_data.py @@ -0,0 +1,10 @@ +from dora import Node +import time +import pyarrow as pa +import numpy as np + +node = Node() + +for event in node: + now = time.clock_gettime_ns(0) + node.send_output("data", pa.array([np.uint64(now)])) diff --git a/tests/queue_size_latest_data_rust/dataflow.yaml b/tests/queue_size_latest_data_rust/dataflow.yaml new file mode 100644 index 00000000..33154c64 --- /dev/null +++ b/tests/queue_size_latest_data_rust/dataflow.yaml @@ -0,0 +1,15 @@ +nodes: + - id: send_data + path: ../queue_size_latest_data_python/send_data.py + inputs: + tick: dora/timer/millis/1 + outputs: + - data + + - id: receive_data_with_sleep + path: ../../target/release/receive_data + build: cargo build -p receive_data --release + inputs: + tick: + source: send_data/data + queue_size: 1 diff --git a/tests/queue_size_latest_data_rust/receive_data/Cargo.toml b/tests/queue_size_latest_data_rust/receive_data/Cargo.toml new file mode 100644 index 00000000..a1910514 --- /dev/null +++ b/tests/queue_size_latest_data_rust/receive_data/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "receive_data" +edition = "2021" +version.workspace = true +description.workspace = true +documentation.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" +chrono = "0.4" diff --git a/tests/queue_size_latest_data_rust/receive_data/README.md b/tests/queue_size_latest_data_rust/receive_data/README.md new file mode 100644 index 00000000..9aa46d90 --- /dev/null +++ b/tests/queue_size_latest_data_rust/receive_data/README.md @@ -0,0 +1,3 @@ +# Print received inputs in the terminal + +Check example at [examples/speech-to-text](examples/speech-to-text) diff --git a/tests/queue_size_latest_data_rust/receive_data/src/main.rs b/tests/queue_size_latest_data_rust/receive_data/src/main.rs new file mode 100644 index 00000000..401fcdd5 --- /dev/null +++ b/tests/queue_size_latest_data_rust/receive_data/src/main.rs @@ -0,0 +1,43 @@ +use std::{thread::sleep, time::Duration}; + +use chrono::Utc; +use dora_node_api::{ + self, + arrow::{ + array::{AsArray, PrimitiveArray}, + datatypes::UInt64Type, + }, + DoraNode, +}; + +fn main() -> eyre::Result<()> { + let (_node, mut events) = DoraNode::init_from_env()?; + + // Voluntarily sleep for 5 seconds to ensure that the node is dropping the oldest input + sleep(Duration::from_secs(5)); + + while let Some(event) = events.recv() { + match event { + dora_node_api::Event::Input { + id: _, + metadata: _, + data, + } => { + let data: &PrimitiveArray = data.as_primitive(); + let time: u64 = data.values()[0]; + let time = time as f64; + let now = Utc::now(); + + let timestamp = now.timestamp_nanos_opt().unwrap() as f64; + let duration = (timestamp - time) / 1_000_000_000.; + println!("Time Difference: {:?}", duration); + assert!( + duration < 1., + "Time difference should be less than 2 seconds as data is sent every seconds" + ); + } + _ => {} + } + } + Ok(()) +}