diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4cee1692..afbe95ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -357,6 +357,9 @@ jobs: # Run Python queue latency test dora run tests/queue_size_latest_data_python/dataflow.yaml + # Run Python queue latency test + timeout + dora run tests/queue_size_and_timeout_python/dataflow.yaml + # Run Rust queue latency test dora build tests/queue_size_latest_data_rust/dataflow.yaml dora run tests/queue_size_latest_data_rust/dataflow.yaml diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 4856cbb6..7276b6bf 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, HashMap, VecDeque}, + pin::pin, sync::Arc, time::Duration, }; @@ -187,13 +188,13 @@ impl EventStream { } pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option { - let next_event = match select(Delay::new(dur), self.receiver.next()).await { - Either::Left((_elapsed, _)) => { - Some(EventItem::TimeoutError(eyre!("Receiver timed out"))) - } + let next_event = match select(Delay::new(dur), pin!(self.recv_async())).await { + Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError( + eyre!("Receiver timed out"), + ))), Either::Right((event, _)) => event, }; - next_event.map(Self::convert_event_item) + next_event } fn convert_event_item(item: EventItem) -> Event { diff --git a/tests/queue_size_and_timeout_python/dataflow.yaml b/tests/queue_size_and_timeout_python/dataflow.yaml new file mode 100644 index 00000000..b53f7d47 --- /dev/null +++ b/tests/queue_size_and_timeout_python/dataflow.yaml @@ -0,0 +1,12 @@ +nodes: + - id: send_data + path: ./send_data.py + inputs: + keepalive: dora/timer/millis/100000 + outputs: + - ts + + - id: receive_data_with_sleep + path: ./receive_data.py + inputs: + ts: send_data/ts diff --git a/tests/queue_size_and_timeout_python/receive_data.py b/tests/queue_size_and_timeout_python/receive_data.py new file mode 100644 index 00000000..b01463ba --- /dev/null +++ b/tests/queue_size_and_timeout_python/receive_data.py @@ -0,0 +1,32 @@ +import time + +from dora import Node + + +def main() -> None: + dora_node = Node() + + i = 0 + while True: + message = dora_node.next(timeout=0.05) + if message is None: + break + + if message["type"] != "INPUT": + continue + sent = message["value"][0].as_py() + j = message["value"][1].as_py() + sent_in_s = sent / 1_000_000_000 + received = time.perf_counter_ns() + received_in_s = received / 1_000_000_000 + + i += 1 + print( + f"[{i}, {j}] Sent: {sent_in_s}, Received: {received_in_s}, Difference: {received_in_s - sent_in_s}" + ) + assert received_in_s - sent_in_s < 1.0 + time.sleep(0.1) + + +if __name__ == "__main__": + main() diff --git a/tests/queue_size_and_timeout_python/send_data.py b/tests/queue_size_and_timeout_python/send_data.py new file mode 100644 index 00000000..a29e1abb --- /dev/null +++ b/tests/queue_size_and_timeout_python/send_data.py @@ -0,0 +1,22 @@ +import time + +import pyarrow as pa +from dora import Node + +start = time.time() + + +def main() -> None: + dora_node = Node() + i = 0 + while time.time() - start < 10: + dora_node.send_output("ts", pa.array([time.perf_counter_ns(), i])) + i += 1 + # print(f"Sent {i} times", flush=True) + time.sleep(0.001) + if dora_node.next(timeout=0.001) is None: + break + + +if __name__ == "__main__": + main()