From f33ee1e811dffcd8bcb551d6072fb98e6d8da80a Mon Sep 17 00:00:00 2001 From: haixuantao Date: Sun, 23 Feb 2025 15:33:09 +0100 Subject: [PATCH] Fix scheduler not being used when called through node timeout --- apis/rust/node/src/event_stream/mod.rs | 11 ++++++----- tests/queue_size_and_timeout_python/dataflow.yaml | 4 +--- tests/queue_size_and_timeout_python/receive_data.py | 12 ++++++------ tests/queue_size_and_timeout_python/send_data.py | 4 +++- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 4856cbb6..7276b6bf 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, HashMap, VecDeque}, + pin::pin, sync::Arc, time::Duration, }; @@ -187,13 +188,13 @@ impl EventStream { } pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option { - let next_event = match select(Delay::new(dur), self.receiver.next()).await { - Either::Left((_elapsed, _)) => { - Some(EventItem::TimeoutError(eyre!("Receiver timed out"))) - } + let next_event = match select(Delay::new(dur), pin!(self.recv_async())).await { + Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError( + eyre!("Receiver timed out"), + ))), Either::Right((event, _)) => event, }; - next_event.map(Self::convert_event_item) + next_event } fn convert_event_item(item: EventItem) -> Event { diff --git a/tests/queue_size_and_timeout_python/dataflow.yaml b/tests/queue_size_and_timeout_python/dataflow.yaml index c900c292..b53f7d47 100644 --- a/tests/queue_size_and_timeout_python/dataflow.yaml +++ b/tests/queue_size_and_timeout_python/dataflow.yaml @@ -9,6 +9,4 @@ nodes: - id: receive_data_with_sleep path: ./receive_data.py inputs: - ts: - source: send_data/ts - queue_size: 1 + ts: send_data/ts diff --git a/tests/queue_size_and_timeout_python/receive_data.py b/tests/queue_size_and_timeout_python/receive_data.py index 6cdf0c55..b01463ba 100644 --- a/tests/queue_size_and_timeout_python/receive_data.py +++ b/tests/queue_size_and_timeout_python/receive_data.py @@ -8,23 +8,23 @@ def main() -> None: i = 0 while True: - message = dora_node.next(timeout=0.001) - + message = dora_node.next(timeout=0.05) if message is None: break if message["type"] != "INPUT": continue sent = message["value"][0].as_py() - sent_in_s = sent / 1_000_000 + j = message["value"][1].as_py() + sent_in_s = sent / 1_000_000_000 received = time.perf_counter_ns() - received_in_s = received / 1_000_000 + received_in_s = received / 1_000_000_000 i += 1 print( - f"[{i}] Sent: {sent_in_s}, Received: {received_in_s}, Difference: {received_in_s - sent_in_s}" + f"[{i}, {j}] Sent: {sent_in_s}, Received: {received_in_s}, Difference: {received_in_s - sent_in_s}" ) - assert received_in_s - sent_in_s < 0.1 + assert received_in_s - sent_in_s < 1.0 time.sleep(0.1) diff --git a/tests/queue_size_and_timeout_python/send_data.py b/tests/queue_size_and_timeout_python/send_data.py index cce91e1e..2ae6c465 100644 --- a/tests/queue_size_and_timeout_python/send_data.py +++ b/tests/queue_size_and_timeout_python/send_data.py @@ -8,10 +8,12 @@ def main() -> None: dora_node = Node() i = 0 while True: - dora_node.send_output("ts", pa.array([time.perf_counter_ns()])) + dora_node.send_output("ts", pa.array([time.perf_counter_ns(), i])) i += 1 # print(f"Sent {i} times", flush=True) time.sleep(0.001) + if dora_node.next(timeout=0.001) is None: + break if __name__ == "__main__":