From 6f598ebb4478482491dd0206cca0f4850aaa4005 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Thu, 13 Feb 2025 22:39:37 +0100 Subject: [PATCH 1/4] Adding a test for checking on the latency when used timeout and queue at the same time --- .../dataflow.yaml | 14 ++++++++ .../receive_data.py | 32 +++++++++++++++++++ .../send_data.py | 18 +++++++++++ 3 files changed, 64 insertions(+) create mode 100644 tests/queue_size_and_timeout_test_python/dataflow.yaml create mode 100644 tests/queue_size_and_timeout_test_python/receive_data.py create mode 100644 tests/queue_size_and_timeout_test_python/send_data.py diff --git a/tests/queue_size_and_timeout_test_python/dataflow.yaml b/tests/queue_size_and_timeout_test_python/dataflow.yaml new file mode 100644 index 00000000..c900c292 --- /dev/null +++ b/tests/queue_size_and_timeout_test_python/dataflow.yaml @@ -0,0 +1,14 @@ +nodes: + - id: send_data + path: ./send_data.py + inputs: + keepalive: dora/timer/millis/100000 + outputs: + - ts + + - id: receive_data_with_sleep + path: ./receive_data.py + inputs: + ts: + source: send_data/ts + queue_size: 1 diff --git a/tests/queue_size_and_timeout_test_python/receive_data.py b/tests/queue_size_and_timeout_test_python/receive_data.py new file mode 100644 index 00000000..6cdf0c55 --- /dev/null +++ b/tests/queue_size_and_timeout_test_python/receive_data.py @@ -0,0 +1,32 @@ +import time + +from dora import Node + + +def main() -> None: + dora_node = Node() + + i = 0 + while True: + message = dora_node.next(timeout=0.001) + + if message is None: + break + + if message["type"] != "INPUT": + continue + sent = message["value"][0].as_py() + sent_in_s = sent / 1_000_000 + received = time.perf_counter_ns() + received_in_s = received / 1_000_000 + + i += 1 + print( + f"[{i}] Sent: {sent_in_s}, Received: {received_in_s}, Difference: {received_in_s - sent_in_s}" + ) + assert received_in_s - sent_in_s < 0.1 + time.sleep(0.1) + + +if __name__ == "__main__": + main() diff --git a/tests/queue_size_and_timeout_test_python/send_data.py b/tests/queue_size_and_timeout_test_python/send_data.py new file mode 100644 index 00000000..cce91e1e --- /dev/null +++ b/tests/queue_size_and_timeout_test_python/send_data.py @@ -0,0 +1,18 @@ +import time + +import pyarrow as pa +from dora import Node + + +def main() -> None: + dora_node = Node() + i = 0 + while True: + dora_node.send_output("ts", pa.array([time.perf_counter_ns()])) + i += 1 + # print(f"Sent {i} times", flush=True) + time.sleep(0.001) + + +if __name__ == "__main__": + main() From a2910b907d3a57701d977aa6e14903d89361c613 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Mon, 17 Feb 2025 10:43:06 +0100 Subject: [PATCH 2/4] Rename timeout test --- .github/workflows/ci.yml | 3 +++ .../dataflow.yaml | 0 .../receive_data.py | 0 .../send_data.py | 0 4 files changed, 3 insertions(+) rename tests/{queue_size_and_timeout_test_python => queue_size_and_timeout_python}/dataflow.yaml (100%) rename tests/{queue_size_and_timeout_test_python => queue_size_and_timeout_python}/receive_data.py (100%) rename tests/{queue_size_and_timeout_test_python => queue_size_and_timeout_python}/send_data.py (100%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4cee1692..afbe95ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -357,6 +357,9 @@ jobs: # Run Python queue latency test dora run tests/queue_size_latest_data_python/dataflow.yaml + # Run Python queue latency test + timeout + dora run tests/queue_size_and_timeout_python/dataflow.yaml + # Run Rust queue latency test dora build tests/queue_size_latest_data_rust/dataflow.yaml dora run tests/queue_size_latest_data_rust/dataflow.yaml diff --git a/tests/queue_size_and_timeout_test_python/dataflow.yaml b/tests/queue_size_and_timeout_python/dataflow.yaml similarity index 100% rename from tests/queue_size_and_timeout_test_python/dataflow.yaml rename to tests/queue_size_and_timeout_python/dataflow.yaml diff --git a/tests/queue_size_and_timeout_test_python/receive_data.py b/tests/queue_size_and_timeout_python/receive_data.py similarity index 100% rename from tests/queue_size_and_timeout_test_python/receive_data.py rename to tests/queue_size_and_timeout_python/receive_data.py diff --git a/tests/queue_size_and_timeout_test_python/send_data.py b/tests/queue_size_and_timeout_python/send_data.py similarity index 100% rename from tests/queue_size_and_timeout_test_python/send_data.py rename to tests/queue_size_and_timeout_python/send_data.py From f33ee1e811dffcd8bcb551d6072fb98e6d8da80a Mon Sep 17 00:00:00 2001 From: haixuantao Date: Sun, 23 Feb 2025 15:33:09 +0100 Subject: [PATCH 3/4] Fix scheduler not being used when called through node timeout --- apis/rust/node/src/event_stream/mod.rs | 11 ++++++----- tests/queue_size_and_timeout_python/dataflow.yaml | 4 +--- tests/queue_size_and_timeout_python/receive_data.py | 12 ++++++------ tests/queue_size_and_timeout_python/send_data.py | 4 +++- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 4856cbb6..7276b6bf 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, HashMap, VecDeque}, + pin::pin, sync::Arc, time::Duration, }; @@ -187,13 +188,13 @@ impl EventStream { } pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option { - let next_event = match select(Delay::new(dur), self.receiver.next()).await { - Either::Left((_elapsed, _)) => { - Some(EventItem::TimeoutError(eyre!("Receiver timed out"))) - } + let next_event = match select(Delay::new(dur), pin!(self.recv_async())).await { + Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError( + eyre!("Receiver timed out"), + ))), Either::Right((event, _)) => event, }; - next_event.map(Self::convert_event_item) + next_event } fn convert_event_item(item: EventItem) -> Event { diff --git a/tests/queue_size_and_timeout_python/dataflow.yaml b/tests/queue_size_and_timeout_python/dataflow.yaml index c900c292..b53f7d47 100644 --- a/tests/queue_size_and_timeout_python/dataflow.yaml +++ b/tests/queue_size_and_timeout_python/dataflow.yaml @@ -9,6 +9,4 @@ nodes: - id: receive_data_with_sleep path: ./receive_data.py inputs: - ts: - source: send_data/ts - queue_size: 1 + ts: send_data/ts diff --git a/tests/queue_size_and_timeout_python/receive_data.py b/tests/queue_size_and_timeout_python/receive_data.py index 6cdf0c55..b01463ba 100644 --- a/tests/queue_size_and_timeout_python/receive_data.py +++ b/tests/queue_size_and_timeout_python/receive_data.py @@ -8,23 +8,23 @@ def main() -> None: i = 0 while True: - message = dora_node.next(timeout=0.001) - + message = dora_node.next(timeout=0.05) if message is None: break if message["type"] != "INPUT": continue sent = message["value"][0].as_py() - sent_in_s = sent / 1_000_000 + j = message["value"][1].as_py() + sent_in_s = sent / 1_000_000_000 received = time.perf_counter_ns() - received_in_s = received / 1_000_000 + received_in_s = received / 1_000_000_000 i += 1 print( - f"[{i}] Sent: {sent_in_s}, Received: {received_in_s}, Difference: {received_in_s - sent_in_s}" + f"[{i}, {j}] Sent: {sent_in_s}, Received: {received_in_s}, Difference: {received_in_s - sent_in_s}" ) - assert received_in_s - sent_in_s < 0.1 + assert received_in_s - sent_in_s < 1.0 time.sleep(0.1) diff --git a/tests/queue_size_and_timeout_python/send_data.py b/tests/queue_size_and_timeout_python/send_data.py index cce91e1e..2ae6c465 100644 --- a/tests/queue_size_and_timeout_python/send_data.py +++ b/tests/queue_size_and_timeout_python/send_data.py @@ -8,10 +8,12 @@ def main() -> None: dora_node = Node() i = 0 while True: - dora_node.send_output("ts", pa.array([time.perf_counter_ns()])) + dora_node.send_output("ts", pa.array([time.perf_counter_ns(), i])) i += 1 # print(f"Sent {i} times", flush=True) time.sleep(0.001) + if dora_node.next(timeout=0.001) is None: + break if __name__ == "__main__": From 3fca6f9e2cd35f362b84cc86602c0718f2762b00 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Sun, 23 Feb 2025 16:14:02 +0100 Subject: [PATCH 4/4] Add time limit for test of 10s --- tests/queue_size_and_timeout_python/send_data.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/queue_size_and_timeout_python/send_data.py b/tests/queue_size_and_timeout_python/send_data.py index 2ae6c465..a29e1abb 100644 --- a/tests/queue_size_and_timeout_python/send_data.py +++ b/tests/queue_size_and_timeout_python/send_data.py @@ -3,11 +3,13 @@ import time import pyarrow as pa from dora import Node +start = time.time() + def main() -> None: dora_node = Node() i = 0 - while True: + while time.time() - start < 10: dora_node.send_output("ts", pa.array([time.perf_counter_ns(), i])) i += 1 # print(f"Sent {i} times", flush=True)