diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 4e4c1452..54ebff5c 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -10,9 +10,10 @@ repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["tracing", "metrics", "telemetry"] +default = ["tracing", "metrics", "telemetry", "async"] tracing = ["dora-node-api/tracing"] metrics = ["dora-node-api/metrics"] +async = ["pyo3/experimental-async"] telemetry = ["dora-runtime/telemetry"] [dependencies] diff --git a/apis/python/node/pyproject.toml b/apis/python/node/pyproject.toml index 49e4ba32..33048a3f 100644 --- a/apis/python/node/pyproject.toml +++ b/apis/python/node/pyproject.toml @@ -12,13 +12,13 @@ readme = "README.md" dependencies = ['pyarrow'] [dependency-groups] -dev = ["pytest >=8.1.1", "ruff >=0.9.1"] +dev = ["pytest >=7.1.1", "ruff >=0.9.1"] [tool.maturin] features = ["pyo3/extension-module"] [tool.ruff.lint] extend-select = [ - "D", # pydocstyle - "UP" + "D", # pydocstyle + "UP", ] diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index a54a17ed..e2a249a9 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -57,14 +57,14 @@ impl Node { let dataflow_id = *node.dataflow_id(); let node_id = node.id().clone(); let node = DelayedCleanup::new(node); - let events = DelayedCleanup::new(events); + let events = events; let cleanup_handle = NodeCleanupHandle { - _handles: Arc::new((node.handle(), events.handle())), + _handles: Arc::new(node.handle()), }; Ok(Node { events: Events { inner: EventsInner::Dora(events), - cleanup_handle, + _cleanup_handle: cleanup_handle, }, dataflow_id, node_id, @@ -107,6 +107,43 @@ impl Node { } } + /// `.recv_async()` gives you the next input that the node has received asynchronously. + /// It does not blocks until the next event becomes available. + /// You can use timeout in seconds to return if no input is available. + /// It will return an Error if the timeout is reached. + /// It will return `None` when all senders has been dropped. + /// + /// warning:: + /// This feature is experimental as pyo3 async (rust-python FFI) is still in development. + /// + /// ```python + /// event = await node.recv_async() + /// ``` + /// + /// You can also iterate over the event stream with a loop + /// + /// :type timeout: float, optional + /// :rtype: dict + #[pyo3(signature = (timeout=None))] + #[allow(clippy::should_implement_trait)] + pub async fn recv_async(&mut self, timeout: Option) -> PyResult>> { + let event = self + .events + .recv_async_timeout(timeout.map(Duration::from_secs_f32)) + .await; + if let Some(event) = event { + // Get python + Python::with_gil(|py| { + let dict = event + .to_py_dict(py) + .context("Could not convert event into a dict")?; + Ok(Some(dict)) + }) + } else { + Ok(None) + } + } + /// You can iterate over the event stream with a loop /// /// ```python @@ -254,30 +291,38 @@ fn err_to_pyany(err: eyre::Report, gil: Python<'_>) -> Py { struct Events { inner: EventsInner, - cleanup_handle: NodeCleanupHandle, + _cleanup_handle: NodeCleanupHandle, } impl Events { fn recv(&mut self, timeout: Option) -> Option { + let event = match &mut self.inner { + EventsInner::Dora(events) => match timeout { + Some(timeout) => events.recv_timeout(timeout).map(MergedEvent::Dora), + None => events.recv().map(MergedEvent::Dora), + }, + EventsInner::Merged(events) => futures::executor::block_on(events.next()), + }; + event.map(|event| PyEvent { event }) + } + + async fn recv_async_timeout(&mut self, timeout: Option) -> Option { let event = match &mut self.inner { EventsInner::Dora(events) => match timeout { Some(timeout) => events - .get_mut() - .recv_timeout(timeout) + .recv_async_timeout(timeout) + .await .map(MergedEvent::Dora), - None => events.get_mut().recv().map(MergedEvent::Dora), + None => events.recv_async().await.map(MergedEvent::Dora), }, - EventsInner::Merged(events) => futures::executor::block_on(events.next()), + EventsInner::Merged(events) => events.next().await, }; - event.map(|event| PyEvent { - event, - _cleanup: Some(self.cleanup_handle.clone()), - }) + event.map(|event| PyEvent { event }) } } enum EventsInner { - Dora(DelayedCleanup), + Dora(EventStream), Merged(Box> + Unpin + Send + Sync>), } diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 2e3031e3..cfc1e2bd 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -19,14 +19,13 @@ use pyo3::{ /// Dora Event pub struct PyEvent { pub event: MergedEvent, - pub _cleanup: Option, } /// Keeps the dora node alive until all event objects have been dropped. #[derive(Clone)] #[pyclass] pub struct NodeCleanupHandle { - pub _handles: Arc<(CleanupHandle, CleanupHandle)>, + pub _handles: Arc>, } /// Owned type with delayed cleanup (using `handle` method). @@ -139,17 +138,6 @@ impl PyEvent { } } - if let Some(cleanup) = self._cleanup.clone() { - pydict.insert( - "_cleanup", - cleanup - .into_pyobject(py) - .context("failed to convert cleanup handle to pyobject")? - .into_any() - .unbind(), - ); - } - Ok(pydict .into_py_dict(py) .context("Failed to create py_dict")? diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index b6895bda..6a250579 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -217,7 +217,6 @@ pub fn run( let py_event = PyEvent { event: MergedEvent::Dora(event), - _cleanup: None, } .to_py_dict(py) .context("Could not convert event to pydict bound")?; diff --git a/examples/python-async/README.md b/examples/python-async/README.md new file mode 100644 index 00000000..4d0aebba --- /dev/null +++ b/examples/python-async/README.md @@ -0,0 +1,10 @@ +# Python Async Example + +To get started: + +```bash +uv venv --seed -p 3.11 +uv pip install -e ../../apis/python/node +dora build dataflow.yaml --uv +dora run dataflow.yaml --uv +``` diff --git a/examples/python-async/dataflow.yaml b/examples/python-async/dataflow.yaml new file mode 100644 index 00000000..1cef415f --- /dev/null +++ b/examples/python-async/dataflow.yaml @@ -0,0 +1,14 @@ +nodes: + - id: send_data + build: pip install asyncio + path: ./send_data.py + inputs: + tick: dora/timer/millis/10 + outputs: + - data + + - id: receive_data_with_sleep + build: pip install numpy pyarrow + path: ./receive_data.py + inputs: + tick: send_data/data diff --git a/examples/python-async/receive_data.py b/examples/python-async/receive_data.py new file mode 100644 index 00000000..14064314 --- /dev/null +++ b/examples/python-async/receive_data.py @@ -0,0 +1,17 @@ +import asyncio + +from dora import Node + + +async def main(): + node = Node() + for _ in range(100): + event = await node.recv_async() + print(event) + # del event + print('done!') + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) diff --git a/examples/python-async/send_data.py b/examples/python-async/send_data.py new file mode 100644 index 00000000..f7e717a0 --- /dev/null +++ b/examples/python-async/send_data.py @@ -0,0 +1,18 @@ +"""TODO: Add docstring.""" + +import time + +import numpy as np +import pyarrow as pa +from dora import Node + +node = Node() + +i = 0 +for event in node: + if i == 100: + break + else: + i += 1 + now = time.perf_counter_ns() + node.send_output("data", pa.array([np.uint64(now)]))