Browse Source

Adding a test for checking on the latency when used timeout and queue at the same time (#783)

This PR is a follow of issue https://github.com/dora-rs/dora/issues/774
and indeed it seems that the latency is higher than what we would
expect.
tags/v0.3.10-rc0
Haixuan Xavier Tao GitHub 11 months ago
parent
commit
b9b8ced620
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
5 changed files with 75 additions and 5 deletions
  1. +3
    -0
      .github/workflows/ci.yml
  2. +6
    -5
      apis/rust/node/src/event_stream/mod.rs
  3. +12
    -0
      tests/queue_size_and_timeout_python/dataflow.yaml
  4. +32
    -0
      tests/queue_size_and_timeout_python/receive_data.py
  5. +22
    -0
      tests/queue_size_and_timeout_python/send_data.py

+ 3
- 0
.github/workflows/ci.yml View File

@@ -357,6 +357,9 @@ jobs:
# Run Python queue latency test
dora run tests/queue_size_latest_data_python/dataflow.yaml

# Run Python queue latency test + timeout
dora run tests/queue_size_and_timeout_python/dataflow.yaml

# Run Rust queue latency test
dora build tests/queue_size_latest_data_rust/dataflow.yaml
dora run tests/queue_size_latest_data_rust/dataflow.yaml


+ 6
- 5
apis/rust/node/src/event_stream/mod.rs View File

@@ -1,5 +1,6 @@
use std::{
collections::{BTreeMap, HashMap, VecDeque},
pin::pin,
sync::Arc,
time::Duration,
};
@@ -187,13 +188,13 @@ impl EventStream {
}

pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
let next_event = match select(Delay::new(dur), self.receiver.next()).await {
Either::Left((_elapsed, _)) => {
Some(EventItem::TimeoutError(eyre!("Receiver timed out")))
}
let next_event = match select(Delay::new(dur), pin!(self.recv_async())).await {
Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError(
eyre!("Receiver timed out"),
))),
Either::Right((event, _)) => event,
};
next_event.map(Self::convert_event_item)
next_event
}

fn convert_event_item(item: EventItem) -> Event {


+ 12
- 0
tests/queue_size_and_timeout_python/dataflow.yaml View File

@@ -0,0 +1,12 @@
nodes:
- id: send_data
path: ./send_data.py
inputs:
keepalive: dora/timer/millis/100000
outputs:
- ts

- id: receive_data_with_sleep
path: ./receive_data.py
inputs:
ts: send_data/ts

+ 32
- 0
tests/queue_size_and_timeout_python/receive_data.py View File

@@ -0,0 +1,32 @@
import time

from dora import Node


def main() -> None:
dora_node = Node()

i = 0
while True:
message = dora_node.next(timeout=0.05)
if message is None:
break

if message["type"] != "INPUT":
continue
sent = message["value"][0].as_py()
j = message["value"][1].as_py()
sent_in_s = sent / 1_000_000_000
received = time.perf_counter_ns()
received_in_s = received / 1_000_000_000

i += 1
print(
f"[{i}, {j}] Sent: {sent_in_s}, Received: {received_in_s}, Difference: {received_in_s - sent_in_s}"
)
assert received_in_s - sent_in_s < 1.0
time.sleep(0.1)


if __name__ == "__main__":
main()

+ 22
- 0
tests/queue_size_and_timeout_python/send_data.py View File

@@ -0,0 +1,22 @@
import time

import pyarrow as pa
from dora import Node

start = time.time()


def main() -> None:
dora_node = Node()
i = 0
while time.time() - start < 10:
dora_node.send_output("ts", pa.array([time.perf_counter_ns(), i]))
i += 1
# print(f"Sent {i} times", flush=True)
time.sleep(0.001)
if dora_node.next(timeout=0.001) is None:
break


if __name__ == "__main__":
main()

Loading…
Cancel
Save