Browse Source

Merge pull request #557 from dora-rs/convert-pyevent-into-dictionary

Transform custom PyEvent into standard python dictionary for easier d…
tags/v0.3.5-rc0
Haixuan Xavier Tao GitHub 1 year ago
parent
commit
42255f5a70
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
11 changed files with 68 additions and 92 deletions
  1. +1
    -1
      .github/workflows/ci.yml
  2. +0
    -1
      apis/python/node/dora/__init__.py
  3. +2
    -11
      apis/python/node/dora/__init__.pyi
  4. +17
    -9
      apis/python/node/src/lib.rs
  5. +35
    -57
      apis/python/operator/src/lib.rs
  6. +3
    -1
      binaries/runtime/src/operator/python.rs
  7. +5
    -0
      examples/python-dataflow/example.py
  8. +2
    -2
      examples/python-dataflow/requirements.txt
  9. +0
    -7
      examples/python-dataflow/run.rs
  10. +1
    -1
      examples/python-operator-dataflow/requirements.txt
  11. +2
    -2
      examples/python-ros2-dataflow/random_turtle.py

+ 1
- 1
.github/workflows/ci.yml View File

@@ -318,7 +318,7 @@ jobs:
dora start dataflow.yml --name ci-python-test
sleep 10
dora stop --name ci-python-test --grace-duration 5s
pip install opencv-python
pip install "numpy<2.0.0" opencv-python
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic
python ../examples/python-dataflow/plot_dynamic.py
sleep 5


+ 0
- 1
apis/python/node/dora/__init__.py View File

@@ -13,7 +13,6 @@ from .dora import *

from .dora import (
Node,
PyEvent,
Ros2Context,
Ros2Node,
Ros2NodeOptions,


+ 2
- 11
apis/python/node/dora/__init__.pyi View File

@@ -22,7 +22,7 @@ from dora import Node
node = Node()
```"""

def __init__(self) -> None:
def __init__(self, node_id: str=None) -> None:
"""The custom node API lets you integrate `dora` into your application.
It allows you to retrieve input and send output in any fashion you want.

@@ -46,7 +46,7 @@ This method returns the parsed dataflow YAML file."""
"""Merge an external event stream with dora main loop.
This currently only work with ROS2."""

def next(self, timeout: float=None) -> dora.PyEvent:
def next(self, timeout: float=None) -> dict:
"""`.next()` gives you the next input that the node has received.
It blocks until the next event becomes available.
You can use timeout in seconds to return if no input is available.
@@ -88,15 +88,6 @@ node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})
def __next__(self) -> typing.Any:
"""Implement next(self)."""

@typing.final
class PyEvent:
"""Dora Event"""

def inner(self):...

def __getitem__(self, key: typing.Any) -> typing.Any:
"""Return self[key]."""

@typing.final
class Ros2Context:
"""ROS2 Context holding all messages definition for receiving and sending messages to ROS2.


+ 17
- 9
apis/python/node/src/lib.rs View File

@@ -24,6 +24,7 @@ use pyo3::types::{PyBytes, PyDict};
/// node = Node()
/// ```
///
/// :type node_id: str, optional
#[pyclass]
pub struct Node {
events: Events,
@@ -67,11 +68,18 @@ impl Node {
/// ```
///
/// :type timeout: float, optional
/// :rtype: dora.PyEvent
/// :rtype: dict
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self, py: Python, timeout: Option<f32>) -> PyResult<Option<PyEvent>> {
pub fn next(&mut self, py: Python, timeout: Option<f32>) -> PyResult<Option<Py<PyDict>>> {
let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32)));
Ok(event)
if let Some(event) = event {
let dict = event
.to_py_dict(py)
.context("Could not convert event into a dict")?;
Ok(Some(dict))
} else {
Ok(None)
}
}

/// You can iterate over the event stream with a loop
@@ -84,10 +92,11 @@ impl Node {
/// case "image":
/// ```
///
/// :rtype: dora.PyEvent
pub fn __next__(&mut self, py: Python) -> PyResult<Option<PyEvent>> {
let event = py.allow_threads(|| self.events.recv(None));
Ok(event)
/// Default behaviour is to timeout after 2 seconds.
///
/// :rtype: dict
pub fn __next__(&mut self, py: Python) -> PyResult<Option<Py<PyDict>>> {
self.next(py, None)
}

/// You can iterate over the event stream with a loop
@@ -100,7 +109,7 @@ impl Node {
/// case "image":
/// ```
///
/// :rtype: dora.PyEvent
/// :rtype: dict
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
@@ -262,7 +271,6 @@ fn dora(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {

m.add_function(wrap_pyfunction!(start_runtime, &m)?)?;
m.add_class::<Node>()?;
m.add_class::<PyEvent>()?;
m.setattr("__version__", env!("CARGO_PKG_VERSION"))?;
m.setattr("__author__", "Dora-rs Authors")?;



+ 35
- 57
apis/python/operator/src/lib.rs View File

@@ -1,69 +1,52 @@
use arrow::{array::ArrayRef, pyarrow::ToPyArrow};
use std::collections::HashMap;

use arrow::pyarrow::ToPyArrow;
use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters};
use eyre::{Context, Result};
use pyo3::{exceptions::PyLookupError, prelude::*, pybacked::PyBackedStr, types::PyDict};
use pyo3::{
prelude::*,
pybacked::PyBackedStr,
types::{IntoPyDict, PyDict},
};

/// Dora Event
#[pyclass]
#[derive(Debug)]
pub struct PyEvent {
event: MergedEvent<PyObject>,
data: Option<ArrayRef>,
}

// Dora Event
#[pymethods]
impl PyEvent {
///
/// :rtype: dora.PyObject
pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult<Option<PyObject>> {
if key == "kind" {
let kind = match &self.event {
MergedEvent::Dora(_) => "dora",
MergedEvent::External(_) => "external",
};
return Ok(Some(kind.to_object(py)));
}
pub fn to_py_dict(self, py: Python<'_>) -> PyResult<Py<PyDict>> {
let mut pydict = HashMap::new();
match &self.event {
MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)),
MergedEvent::External(_) => pydict.insert("kind", "external".to_object(py)),
};
match &self.event {
MergedEvent::Dora(event) => {
let value = match key {
"type" => Some(Self::ty(event).to_object(py)),
"id" => Self::id(event).map(|v| v.to_object(py)),
"value" => self.value(py)?,
"metadata" => Self::metadata(event, py),
"error" => Self::error(event).map(|v| v.to_object(py)),
other => {
return Err(PyLookupError::new_err(format!(
"event has no property `{other}`"
)))
}
};
Ok(value)
if let Some(id) = Self::id(event) {
pydict.insert("id", id.into_py(py));
}
pydict.insert("type", Self::ty(event).to_object(py));

if let Some(value) = self.value(py)? {
pydict.insert("value", value);
}
if let Some(metadata) = Self::metadata(event, py) {
pydict.insert("metadata", metadata);
}
if let Some(error) = Self::error(event) {
pydict.insert("error", error.to_object(py));
}
}
MergedEvent::External(event) => {
let value = match key {
"value" => event,
_ => todo!(),
};

Ok(Some(value.clone()))
pydict.insert("value", event.clone());
}
}
}

pub fn inner(&mut self) -> Option<&PyObject> {
match &self.event {
MergedEvent::Dora(_) => None,
MergedEvent::External(event) => Some(event),
}
Ok(pydict.into_py_dict_bound(py).unbind())
}

fn __str__(&self) -> PyResult<String> {
Ok(format!("{:#?}", &self.event))
}
}

impl PyEvent {
fn ty(event: &Event) -> &str {
match event {
Event::Stop => "STOP",
@@ -84,9 +67,9 @@ impl PyEvent {

/// Returns the payload of an input event as an arrow array (if any).
fn value(&self, py: Python<'_>) -> PyResult<Option<PyObject>> {
match (&self.event, &self.data) {
(MergedEvent::Dora(Event::Input { .. }), Some(data)) => {
// TODO: Does this call leak data?
match &self.event {
MergedEvent::Dora(Event::Input { data, .. }) => {
// TODO: Does this call leak data?&
let array_data = data.to_data().to_pyarrow(py)?;
Ok(Some(array_data))
}
@@ -116,13 +99,8 @@ impl From<Event> for PyEvent {
}

impl From<MergedEvent<PyObject>> for PyEvent {
fn from(mut event: MergedEvent<PyObject>) -> Self {
let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event {
Some(data.clone())
} else {
None
};
Self { event, data }
fn from(event: MergedEvent<PyObject>) -> Self {
Self { event }
}
}



+ 3
- 1
binaries/runtime/src/operator/python.rs View File

@@ -208,7 +208,9 @@ pub fn run(
metadata.parameters.open_telemetry_context = string_cx;
}

let py_event = PyEvent::from(event);
let py_event = PyEvent::from(event)
.to_py_dict(py)
.context("Could not convert event to pydict bound")?;

let status_enum = operator
.call_method1(py, "on_event", (py_event, send_output.clone()))


+ 5
- 0
examples/python-dataflow/example.py View File

@@ -0,0 +1,5 @@
from dora import Node

node = Node("plot")

event = node.next()

+ 2
- 2
examples/python-dataflow/requirements.txt View File

@@ -6,7 +6,7 @@ ultralytics
gitpython
ipython # interactive notebook
matplotlib>=3.2.2
numpy>=1.18.5
numpy<2.0.0 # See: https://github.com/opencv/opencv-python/issues/997
opencv-python>=4.1.1
Pillow>=7.1.2
psutil # system resources
@@ -44,4 +44,4 @@ seaborn>=0.11.0
# roboflow

opencv-python>=4.1.1
maturin
maturin

+ 0
- 7
examples/python-dataflow/run.rs View File

@@ -1,5 +1,4 @@
use dora_core::{get_pip_path, get_python_path, run};
use dora_download::download_file;
use dora_tracing::set_up_tracing;
use eyre::{bail, ContextCompat, WrapErr};
use std::path::Path;
@@ -73,12 +72,6 @@ async fn main() -> eyre::Result<()> {
)
.await
.context("maturin develop failed")?;
download_file(
"https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt",
Path::new("yolov8n.pt"),
)
.await
.context("Could not download weights.")?;

let dataflow = Path::new("dataflow.yml");
run_dataflow(dataflow).await?;


+ 1
- 1
examples/python-operator-dataflow/requirements.txt View File

@@ -6,7 +6,7 @@ ultralytics
gitpython
ipython # interactive notebook
matplotlib>=3.2.2
numpy>=1.18.5
numpy<2.0.0
opencv-python>=4.1.1
Pillow>=7.1.2
psutil # system resources


+ 2
- 2
examples/python-ros2-dataflow/random_turtle.py View File

@@ -55,11 +55,11 @@ for i in range(500):

# ROS2 Event
elif event_kind == "external":
pose = event.inner()[0].as_py()
pose = event["value"][0].as_py()
min_x = min([min_x, pose["x"]])
max_x = max([max_x, pose["x"]])
min_y = min([min_y, pose["y"]])
max_y = max([max_y, pose["y"]])
dora_node.send_output("turtle_pose", event.inner())
dora_node.send_output("turtle_pose", event["value"])

assert max_x - min_x > 1 or max_y - min_y > 1, "no turtle movement"

Loading…
Cancel
Save