Browse Source

Add test for queue latency

tags/0.3.8-rc
haixuanTao 1 year ago
parent
commit
73bd73c12a
9 changed files with 133 additions and 2 deletions
  1. +11
    -2
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +14
    -0
      tests/queue_size_latest_data_python/dataflow.yaml
  4. +22
    -0
      tests/queue_size_latest_data_python/receive_data.py
  5. +10
    -0
      tests/queue_size_latest_data_python/send_data.py
  6. +15
    -0
      tests/queue_size_latest_data_rust/dataflow.yaml
  7. +14
    -0
      tests/queue_size_latest_data_rust/receive_data/Cargo.toml
  8. +3
    -0
      tests/queue_size_latest_data_rust/receive_data/README.md
  9. +43
    -0
      tests/queue_size_latest_data_rust/receive_data/src/main.rs

+ 11
- 2
Cargo.lock View File

@@ -1380,9 +1380,9 @@ checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce"

[[package]]
name = "bytemuck"
version = "1.19.0"
version = "1.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d"
checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a"
dependencies = [
"bytemuck_derive",
]
@@ -8603,6 +8603,15 @@ dependencies = [
"tungstenite 0.20.1",
]

[[package]]
name = "receive_data"
version = "0.3.7"
dependencies = [
"chrono",
"dora-node-api",
"eyre",
]

[[package]]
name = "redox_syscall"
version = "0.4.1"


+ 1
- 0
Cargo.toml View File

@@ -38,6 +38,7 @@ members = [
"libraries/extensions/ros2-bridge",
"libraries/extensions/ros2-bridge/msg-gen",
"libraries/extensions/ros2-bridge/python",
"tests/queue_size_latest_data_rust/receive_data",
]

[workspace.package]


+ 14
- 0
tests/queue_size_latest_data_python/dataflow.yaml View File

@@ -0,0 +1,14 @@
nodes:
- id: send_data
path: ./send_data.py
inputs:
tick: dora/timer/millis/20
outputs:
- data

- id: receive_data_with_sleep
path: ./receive_data.py
inputs:
tick:
source: send_data/data
queue_size: 1

+ 22
- 0
tests/queue_size_latest_data_python/receive_data.py View File

@@ -0,0 +1,22 @@
from dora import Node
import time


node = Node()

# Voluntarily sleep for 5 seconds to ensure that the node is dropping the oldest input
time.sleep(5)

for event in node:
event_type = event["type"]

if event_type == "INPUT":
event_id = event["id"]
send_time = event["value"][0].as_py()

duration = (time.clock_gettime_ns(0) - send_time) / 1_000_000_000
print("Duration: ", duration)
assert (
duration < 1.2
), f"Duration: {duration} should be less than 1 as we should always pull latest data."
time.sleep(1)

+ 10
- 0
tests/queue_size_latest_data_python/send_data.py View File

@@ -0,0 +1,10 @@
from dora import Node
import time
import pyarrow as pa
import numpy as np

node = Node()

for event in node:
now = time.clock_gettime_ns(0)
node.send_output("data", pa.array([np.uint64(now)]))

+ 15
- 0
tests/queue_size_latest_data_rust/dataflow.yaml View File

@@ -0,0 +1,15 @@
nodes:
- id: send_data
path: ../queue_size_latest_data_python/send_data.py
inputs:
tick: dora/timer/millis/1
outputs:
- data

- id: receive_data_with_sleep
path: ../../target/release/receive_data
build: cargo build -p receive_data --release
inputs:
tick:
source: send_data/data
queue_size: 1

+ 14
- 0
tests/queue_size_latest_data_rust/receive_data/Cargo.toml View File

@@ -0,0 +1,14 @@
[package]
name = "receive_data"
edition = "2021"
version.workspace = true
description.workspace = true
documentation.workspace = true
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
chrono = "0.4"

+ 3
- 0
tests/queue_size_latest_data_rust/receive_data/README.md View File

@@ -0,0 +1,3 @@
# Print received inputs in the terminal

Check example at [examples/speech-to-text](examples/speech-to-text)

+ 43
- 0
tests/queue_size_latest_data_rust/receive_data/src/main.rs View File

@@ -0,0 +1,43 @@
use std::{thread::sleep, time::Duration};

use chrono::Utc;
use dora_node_api::{
self,
arrow::{
array::{AsArray, PrimitiveArray},
datatypes::UInt64Type,
},
DoraNode,
};

fn main() -> eyre::Result<()> {
let (_node, mut events) = DoraNode::init_from_env()?;

// Voluntarily sleep for 5 seconds to ensure that the node is dropping the oldest input
sleep(Duration::from_secs(5));

while let Some(event) = events.recv() {
match event {
dora_node_api::Event::Input {
id: _,
metadata: _,
data,
} => {
let data: &PrimitiveArray<UInt64Type> = data.as_primitive();
let time: u64 = data.values()[0];
let time = time as f64;
let now = Utc::now();

let timestamp = now.timestamp_nanos_opt().unwrap() as f64;
let duration = (timestamp - time) / 1_000_000_000.;
println!("Time Difference: {:?}", duration);
assert!(
duration < 1.,
"Time difference should be less than 2 seconds as data is sent every seconds"
);
}
_ => {}
}
}
Ok(())
}

Loading…
Cancel
Save