Browse Source

Adding `recv_async` dora method to retrieve data in python async (#909)

Make it possible to retrieve data asynchronously within python asyncio.

This is possible due to `experimental-async` from pyo3.

Note that this feature should be considered experimental as long as pyo3
async is marked as experimental.

This feature required to remove events delayedcleanup as it was no
longer used.
tags/v0.3.11-rc1
Haixuan Xavier Tao GitHub 10 months ago
parent
commit
3d6df72e14
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
9 changed files with 123 additions and 31 deletions
  1. +2
    -1
      apis/python/node/Cargo.toml
  2. +3
    -3
      apis/python/node/pyproject.toml
  3. +58
    -13
      apis/python/node/src/lib.rs
  4. +1
    -13
      apis/python/operator/src/lib.rs
  5. +0
    -1
      binaries/runtime/src/operator/python.rs
  6. +10
    -0
      examples/python-async/README.md
  7. +14
    -0
      examples/python-async/dataflow.yaml
  8. +17
    -0
      examples/python-async/receive_data.py
  9. +18
    -0
      examples/python-async/send_data.py

+ 2
- 1
apis/python/node/Cargo.toml View File

@@ -10,9 +10,10 @@ repository.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["tracing", "metrics", "telemetry"]
default = ["tracing", "metrics", "telemetry", "async"]
tracing = ["dora-node-api/tracing"]
metrics = ["dora-node-api/metrics"]
async = ["pyo3/experimental-async"]
telemetry = ["dora-runtime/telemetry"]

[dependencies]


+ 3
- 3
apis/python/node/pyproject.toml View File

@@ -12,13 +12,13 @@ readme = "README.md"
dependencies = ['pyarrow']

[dependency-groups]
dev = ["pytest >=8.1.1", "ruff >=0.9.1"]
dev = ["pytest >=7.1.1", "ruff >=0.9.1"]

[tool.maturin]
features = ["pyo3/extension-module"]

[tool.ruff.lint]
extend-select = [
"D", # pydocstyle
"UP"
"D", # pydocstyle
"UP",
]

+ 58
- 13
apis/python/node/src/lib.rs View File

@@ -57,14 +57,14 @@ impl Node {
let dataflow_id = *node.dataflow_id();
let node_id = node.id().clone();
let node = DelayedCleanup::new(node);
let events = DelayedCleanup::new(events);
let events = events;
let cleanup_handle = NodeCleanupHandle {
_handles: Arc::new((node.handle(), events.handle())),
_handles: Arc::new(node.handle()),
};
Ok(Node {
events: Events {
inner: EventsInner::Dora(events),
cleanup_handle,
_cleanup_handle: cleanup_handle,
},
dataflow_id,
node_id,
@@ -107,6 +107,43 @@ impl Node {
}
}

/// `.recv_async()` gives you the next input that the node has received asynchronously.
/// It does not blocks until the next event becomes available.
/// You can use timeout in seconds to return if no input is available.
/// It will return an Error if the timeout is reached.
/// It will return `None` when all senders has been dropped.
///
/// warning::
/// This feature is experimental as pyo3 async (rust-python FFI) is still in development.
///
/// ```python
/// event = await node.recv_async()
/// ```
///
/// You can also iterate over the event stream with a loop
///
/// :type timeout: float, optional
/// :rtype: dict
#[pyo3(signature = (timeout=None))]
#[allow(clippy::should_implement_trait)]
pub async fn recv_async(&mut self, timeout: Option<f32>) -> PyResult<Option<Py<PyDict>>> {
let event = self
.events
.recv_async_timeout(timeout.map(Duration::from_secs_f32))
.await;
if let Some(event) = event {
// Get python
Python::with_gil(|py| {
let dict = event
.to_py_dict(py)
.context("Could not convert event into a dict")?;
Ok(Some(dict))
})
} else {
Ok(None)
}
}

/// You can iterate over the event stream with a loop
///
/// ```python
@@ -254,30 +291,38 @@ fn err_to_pyany(err: eyre::Report, gil: Python<'_>) -> Py<PyAny> {

struct Events {
inner: EventsInner,
cleanup_handle: NodeCleanupHandle,
_cleanup_handle: NodeCleanupHandle,
}

impl Events {
fn recv(&mut self, timeout: Option<Duration>) -> Option<PyEvent> {
let event = match &mut self.inner {
EventsInner::Dora(events) => match timeout {
Some(timeout) => events.recv_timeout(timeout).map(MergedEvent::Dora),
None => events.recv().map(MergedEvent::Dora),
},
EventsInner::Merged(events) => futures::executor::block_on(events.next()),
};
event.map(|event| PyEvent { event })
}

async fn recv_async_timeout(&mut self, timeout: Option<Duration>) -> Option<PyEvent> {
let event = match &mut self.inner {
EventsInner::Dora(events) => match timeout {
Some(timeout) => events
.get_mut()
.recv_timeout(timeout)
.recv_async_timeout(timeout)
.await
.map(MergedEvent::Dora),
None => events.get_mut().recv().map(MergedEvent::Dora),
None => events.recv_async().await.map(MergedEvent::Dora),
},
EventsInner::Merged(events) => futures::executor::block_on(events.next()),
EventsInner::Merged(events) => events.next().await,
};
event.map(|event| PyEvent {
event,
_cleanup: Some(self.cleanup_handle.clone()),
})
event.map(|event| PyEvent { event })
}
}

enum EventsInner {
Dora(DelayedCleanup<EventStream>),
Dora(EventStream),
Merged(Box<dyn Stream<Item = MergedEvent<PyObject>> + Unpin + Send + Sync>),
}



+ 1
- 13
apis/python/operator/src/lib.rs View File

@@ -19,14 +19,13 @@ use pyo3::{
/// Dora Event
pub struct PyEvent {
pub event: MergedEvent<PyObject>,
pub _cleanup: Option<NodeCleanupHandle>,
}

/// Keeps the dora node alive until all event objects have been dropped.
#[derive(Clone)]
#[pyclass]
pub struct NodeCleanupHandle {
pub _handles: Arc<(CleanupHandle<DoraNode>, CleanupHandle<EventStream>)>,
pub _handles: Arc<CleanupHandle<DoraNode>>,
}

/// Owned type with delayed cleanup (using `handle` method).
@@ -139,17 +138,6 @@ impl PyEvent {
}
}

if let Some(cleanup) = self._cleanup.clone() {
pydict.insert(
"_cleanup",
cleanup
.into_pyobject(py)
.context("failed to convert cleanup handle to pyobject")?
.into_any()
.unbind(),
);
}

Ok(pydict
.into_py_dict(py)
.context("Failed to create py_dict")?


+ 0
- 1
binaries/runtime/src/operator/python.rs View File

@@ -217,7 +217,6 @@ pub fn run(

let py_event = PyEvent {
event: MergedEvent::Dora(event),
_cleanup: None,
}
.to_py_dict(py)
.context("Could not convert event to pydict bound")?;


+ 10
- 0
examples/python-async/README.md View File

@@ -0,0 +1,10 @@
# Python Async Example

To get started:

```bash
uv venv --seed -p 3.11
uv pip install -e ../../apis/python/node
dora build dataflow.yaml --uv
dora run dataflow.yaml --uv
```

+ 14
- 0
examples/python-async/dataflow.yaml View File

@@ -0,0 +1,14 @@
nodes:
- id: send_data
build: pip install asyncio
path: ./send_data.py
inputs:
tick: dora/timer/millis/10
outputs:
- data

- id: receive_data_with_sleep
build: pip install numpy pyarrow
path: ./receive_data.py
inputs:
tick: send_data/data

+ 17
- 0
examples/python-async/receive_data.py View File

@@ -0,0 +1,17 @@
import asyncio

from dora import Node


async def main():
node = Node()
for _ in range(100):
event = await node.recv_async()
print(event)
# del event
print('done!')


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

+ 18
- 0
examples/python-async/send_data.py View File

@@ -0,0 +1,18 @@
"""TODO: Add docstring."""

import time

import numpy as np
import pyarrow as pa
from dora import Node

node = Node()

i = 0
for event in node:
if i == 100:
break
else:
i += 1
now = time.perf_counter_ns()
node.send_output("data", pa.array([np.uint64(now)]))

Loading…
Cancel
Save