diff --git a/tests/queue_size_and_timeout_test_python/dataflow.yaml b/tests/queue_size_and_timeout_test_python/dataflow.yaml new file mode 100644 index 00000000..c900c292 --- /dev/null +++ b/tests/queue_size_and_timeout_test_python/dataflow.yaml @@ -0,0 +1,14 @@ +nodes: + - id: send_data + path: ./send_data.py + inputs: + keepalive: dora/timer/millis/100000 + outputs: + - ts + + - id: receive_data_with_sleep + path: ./receive_data.py + inputs: + ts: + source: send_data/ts + queue_size: 1 diff --git a/tests/queue_size_and_timeout_test_python/receive_data.py b/tests/queue_size_and_timeout_test_python/receive_data.py new file mode 100644 index 00000000..6cdf0c55 --- /dev/null +++ b/tests/queue_size_and_timeout_test_python/receive_data.py @@ -0,0 +1,32 @@ +import time + +from dora import Node + + +def main() -> None: + dora_node = Node() + + i = 0 + while True: + message = dora_node.next(timeout=0.001) + + if message is None: + break + + if message["type"] != "INPUT": + continue + sent = message["value"][0].as_py() + sent_in_s = sent / 1_000_000 + received = time.perf_counter_ns() + received_in_s = received / 1_000_000 + + i += 1 + print( + f"[{i}] Sent: {sent_in_s}, Received: {received_in_s}, Difference: {received_in_s - sent_in_s}" + ) + assert received_in_s - sent_in_s < 0.1 + time.sleep(0.1) + + +if __name__ == "__main__": + main() diff --git a/tests/queue_size_and_timeout_test_python/send_data.py b/tests/queue_size_and_timeout_test_python/send_data.py new file mode 100644 index 00000000..cce91e1e --- /dev/null +++ b/tests/queue_size_and_timeout_test_python/send_data.py @@ -0,0 +1,18 @@ +import time + +import pyarrow as pa +from dora import Node + + +def main() -> None: + dora_node = Node() + i = 0 + while True: + dora_node.send_output("ts", pa.array([time.perf_counter_ns()])) + i += 1 + # print(f"Sent {i} times", flush=True) + time.sleep(0.001) + + +if __name__ == "__main__": + main()