From b2eace277d04f46601d2e05bccf55092ea285051 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 16 Jun 2024 17:16:46 +0200 Subject: [PATCH 1/7] Transform custom PyEvent into standard python dictionary for easier debuggability. Currently having a custom PyEvent make debugging very hard as fields are hidden within the class PyEvent that is defined within Rust Code. Python user are getting really confused about this obscure class. This PR transforms the class into a standard python dictionary. --- apis/python/node/src/lib.rs | 19 ++-- apis/python/operator/src/lib.rs | 91 +++++++------------ binaries/runtime/src/operator/python.rs | 4 +- examples/python-dataflow/example.py | 5 + .../python-ros2-dataflow/random_turtle.py | 4 +- 5 files changed, 58 insertions(+), 65 deletions(-) create mode 100644 examples/python-dataflow/example.py diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 3c1fcb2a..849e20a2 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -69,9 +69,16 @@ impl Node { /// :type timeout: float, optional /// :rtype: dora.PyEvent #[allow(clippy::should_implement_trait)] - pub fn next(&mut self, py: Python, timeout: Option) -> PyResult> { + pub fn next(&mut self, py: Python, timeout: Option) -> PyResult>> { let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32))); - Ok(event) + if let Some(event) = event { + let dict = event + .to_py_dict_bound(py) + .context("Could not convert event into a dictionnary")?; + Ok(Some(dict)) + } else { + Ok(None) + } } /// You can iterate over the event stream with a loop @@ -84,10 +91,11 @@ impl Node { /// case "image": /// ``` /// + /// Default behaviour is to timeout after 2 seconds. + /// /// :rtype: dora.PyEvent - pub fn __next__(&mut self, py: Python) -> PyResult> { - let event = py.allow_threads(|| self.events.recv(None)); - Ok(event) + pub fn __next__(&mut self, py: Python) -> PyResult>> { + self.next(py, Some(2.0)) } /// You can iterate over the event stream with a loop @@ -262,7 +270,6 @@ fn dora(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(start_runtime, &m)?)?; m.add_class::()?; - m.add_class::()?; m.setattr("__version__", env!("CARGO_PKG_VERSION"))?; m.setattr("__author__", "Dora-rs Authors")?; diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index bc74279e..772f2963 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,69 +1,53 @@ -use arrow::{array::ArrayRef, pyarrow::ToPyArrow}; +use std::collections::HashMap; + +use arrow::pyarrow::ToPyArrow; use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters}; use eyre::{Context, Result}; -use pyo3::{exceptions::PyLookupError, prelude::*, pybacked::PyBackedStr, types::PyDict}; +use pyo3::{ + prelude::*, + pybacked::PyBackedStr, + types::{IntoPyDict, PyDict}, +}; /// Dora Event -#[pyclass] #[derive(Debug)] pub struct PyEvent { event: MergedEvent, - data: Option, } -// Dora Event -#[pymethods] impl PyEvent { - /// - /// :rtype: dora.PyObject - pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult> { - if key == "kind" { - let kind = match &self.event { - MergedEvent::Dora(_) => "dora", - MergedEvent::External(_) => "external", - }; - return Ok(Some(kind.to_object(py))); - } + pub fn to_py_dict_bound(self, py: Python<'_>) -> PyResult> { + let mut pydict = HashMap::new(); match &self.event { MergedEvent::Dora(event) => { - let value = match key { - "type" => Some(Self::ty(event).to_object(py)), - "id" => Self::id(event).map(|v| v.to_object(py)), - "value" => self.value(py)?, - "metadata" => Self::metadata(event, py), - "error" => Self::error(event).map(|v| v.to_object(py)), - other => { - return Err(PyLookupError::new_err(format!( - "event has no property `{other}`" - ))) - } + if let Some(id) = Self::id(event) { + pydict.insert("id", id.into_py(py)); + } + pydict.insert("type", Self::ty(event).to_object(py)); + + match &self.event { + MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)), + MergedEvent::External(_) => pydict.insert("kind", "external".to_object(py)), }; - Ok(value) + + if let Some(value) = self.value(py)? { + pydict.insert("value", value); + } + if let Some(metadata) = Self::metadata(event, py) { + pydict.insert("metadata", metadata); + } + if let Some(error) = Self::error(event) { + pydict.insert("error", error.to_object(py)); + } } MergedEvent::External(event) => { - let value = match key { - "value" => event, - _ => todo!(), - }; - - Ok(Some(value.clone())) + pydict.insert("value", event.clone()); } } - } - pub fn inner(&mut self) -> Option<&PyObject> { - match &self.event { - MergedEvent::Dora(_) => None, - MergedEvent::External(event) => Some(event), - } + Ok(pydict.into_py_dict_bound(py).unbind()) } - fn __str__(&self) -> PyResult { - Ok(format!("{:#?}", &self.event)) - } -} - -impl PyEvent { fn ty(event: &Event) -> &str { match event { Event::Stop => "STOP", @@ -84,9 +68,9 @@ impl PyEvent { /// Returns the payload of an input event as an arrow array (if any). fn value(&self, py: Python<'_>) -> PyResult> { - match (&self.event, &self.data) { - (MergedEvent::Dora(Event::Input { .. }), Some(data)) => { - // TODO: Does this call leak data? + match &self.event { + MergedEvent::Dora(Event::Input { data, .. }) => { + // TODO: Does this call leak data?& let array_data = data.to_data().to_pyarrow(py)?; Ok(Some(array_data)) } @@ -116,13 +100,8 @@ impl From for PyEvent { } impl From> for PyEvent { - fn from(mut event: MergedEvent) -> Self { - let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event { - Some(data.clone()) - } else { - None - }; - Self { event, data } + fn from(event: MergedEvent) -> Self { + Self { event } } } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 82d01ebb..366b9482 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -208,7 +208,9 @@ pub fn run( metadata.parameters.open_telemetry_context = string_cx; } - let py_event = PyEvent::from(event); + let py_event = PyEvent::from(event) + .to_py_dict_bound(py) + .context("Could not convert event to pydict bound")?; let status_enum = operator .call_method1(py, "on_event", (py_event, send_output.clone())) diff --git a/examples/python-dataflow/example.py b/examples/python-dataflow/example.py new file mode 100644 index 00000000..c9221a3a --- /dev/null +++ b/examples/python-dataflow/example.py @@ -0,0 +1,5 @@ +from dora import Node + +node = Node("plot") + +event = node.next() diff --git a/examples/python-ros2-dataflow/random_turtle.py b/examples/python-ros2-dataflow/random_turtle.py index 1e690d07..3b25ac21 100755 --- a/examples/python-ros2-dataflow/random_turtle.py +++ b/examples/python-ros2-dataflow/random_turtle.py @@ -55,11 +55,11 @@ for i in range(500): # ROS2 Event elif event_kind == "external": - pose = event.inner()[0].as_py() + pose = event["value"][0].as_py() min_x = min([min_x, pose["x"]]) max_x = max([max_x, pose["x"]]) min_y = min([min_y, pose["y"]]) max_y = max([max_y, pose["y"]]) - dora_node.send_output("turtle_pose", event.inner()) + dora_node.send_output("turtle_pose", event["value"]) assert max_x - min_x > 1 or max_y - min_y > 1, "no turtle movement" From ae9aea24e04023683992358050f47a82cc30da67 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 16 Jun 2024 17:33:31 +0200 Subject: [PATCH 2/7] update stubs definition --- apis/python/node/dora/__init__.py | 1 - apis/python/node/dora/__init__.pyi | 13 ++----------- apis/python/node/src/lib.rs | 9 +++++---- 3 files changed, 7 insertions(+), 16 deletions(-) diff --git a/apis/python/node/dora/__init__.py b/apis/python/node/dora/__init__.py index 7269bda1..354d9ad4 100644 --- a/apis/python/node/dora/__init__.py +++ b/apis/python/node/dora/__init__.py @@ -13,7 +13,6 @@ from .dora import * from .dora import ( Node, - PyEvent, Ros2Context, Ros2Node, Ros2NodeOptions, diff --git a/apis/python/node/dora/__init__.pyi b/apis/python/node/dora/__init__.pyi index 3dc33676..a6dcb659 100644 --- a/apis/python/node/dora/__init__.pyi +++ b/apis/python/node/dora/__init__.pyi @@ -22,7 +22,7 @@ from dora import Node node = Node() ```""" - def __init__(self) -> None: + def __init__(self, node_id: str=None) -> None: """The custom node API lets you integrate `dora` into your application. It allows you to retrieve input and send output in any fashion you want. @@ -46,7 +46,7 @@ This method returns the parsed dataflow YAML file.""" """Merge an external event stream with dora main loop. This currently only work with ROS2.""" - def next(self, timeout: float=None) -> dora.PyEvent: + def next(self, timeout: float=None) -> dict: """`.next()` gives you the next input that the node has received. It blocks until the next event becomes available. You can use timeout in seconds to return if no input is available. @@ -88,15 +88,6 @@ node.send_output("string", b"string", {"open_telemetry_context": "7632e76"}) def __next__(self) -> typing.Any: """Implement next(self).""" -@typing.final -class PyEvent: - """Dora Event""" - - def inner(self):... - - def __getitem__(self, key: typing.Any) -> typing.Any: - """Return self[key].""" - @typing.final class Ros2Context: """ROS2 Context holding all messages definition for receiving and sending messages to ROS2. diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 849e20a2..fe090354 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -24,6 +24,7 @@ use pyo3::types::{PyBytes, PyDict}; /// node = Node() /// ``` /// +/// :type node_id: str, optional #[pyclass] pub struct Node { events: Events, @@ -67,14 +68,14 @@ impl Node { /// ``` /// /// :type timeout: float, optional - /// :rtype: dora.PyEvent + /// :rtype: dict #[allow(clippy::should_implement_trait)] pub fn next(&mut self, py: Python, timeout: Option) -> PyResult>> { let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32))); if let Some(event) = event { let dict = event .to_py_dict_bound(py) - .context("Could not convert event into a dictionnary")?; + .context("Could not convert event into a dict")?; Ok(Some(dict)) } else { Ok(None) @@ -93,7 +94,7 @@ impl Node { /// /// Default behaviour is to timeout after 2 seconds. /// - /// :rtype: dora.PyEvent + /// :rtype: dict pub fn __next__(&mut self, py: Python) -> PyResult>> { self.next(py, Some(2.0)) } @@ -108,7 +109,7 @@ impl Node { /// case "image": /// ``` /// - /// :rtype: dora.PyEvent + /// :rtype: dict fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { slf } From 525dfa19e3a73f7a6ebf0edbdd20019e794f5599 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 16 Jun 2024 17:48:35 +0200 Subject: [PATCH 3/7] Make kind generic for both type --- apis/python/operator/src/lib.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 772f2963..2f667579 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -18,6 +18,10 @@ pub struct PyEvent { impl PyEvent { pub fn to_py_dict_bound(self, py: Python<'_>) -> PyResult> { let mut pydict = HashMap::new(); + match &self.event { + MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)), + MergedEvent::External(_) => pydict.insert("kind", "external".to_object(py)), + }; match &self.event { MergedEvent::Dora(event) => { if let Some(id) = Self::id(event) { @@ -25,11 +29,6 @@ impl PyEvent { } pydict.insert("type", Self::ty(event).to_object(py)); - match &self.event { - MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)), - MergedEvent::External(_) => pydict.insert("kind", "external".to_object(py)), - }; - if let Some(value) = self.value(py)? { pydict.insert("value", value); } From 00a83b93554df8bcb60b027bc24603de606d631f Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 16 Jun 2024 17:54:02 +0200 Subject: [PATCH 4/7] Do not upgrade to numpy 2.0.0 --- examples/python-dataflow/requirements.txt | 4 ++-- examples/python-operator-dataflow/requirements.txt | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/python-dataflow/requirements.txt b/examples/python-dataflow/requirements.txt index a8f02b0a..6740879e 100644 --- a/examples/python-dataflow/requirements.txt +++ b/examples/python-dataflow/requirements.txt @@ -6,7 +6,7 @@ ultralytics gitpython ipython # interactive notebook matplotlib>=3.2.2 -numpy>=1.18.5 +numpy>=1.18.5,<2.0.0 opencv-python>=4.1.1 Pillow>=7.1.2 psutil # system resources @@ -44,4 +44,4 @@ seaborn>=0.11.0 # roboflow opencv-python>=4.1.1 -maturin \ No newline at end of file +maturin diff --git a/examples/python-operator-dataflow/requirements.txt b/examples/python-operator-dataflow/requirements.txt index 68020faa..6740879e 100644 --- a/examples/python-operator-dataflow/requirements.txt +++ b/examples/python-operator-dataflow/requirements.txt @@ -6,7 +6,7 @@ ultralytics gitpython ipython # interactive notebook matplotlib>=3.2.2 -numpy>=1.18.5 +numpy>=1.18.5,<2.0.0 opencv-python>=4.1.1 Pillow>=7.1.2 psutil # system resources From c4983fa822288675962932e6903421cb686dfe11 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 17 Jun 2024 11:00:28 +0200 Subject: [PATCH 5/7] Fix python dependencies --- examples/python-dataflow/requirements.txt | 2 +- examples/python-dataflow/run.rs | 7 ------- examples/python-operator-dataflow/requirements.txt | 2 +- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/examples/python-dataflow/requirements.txt b/examples/python-dataflow/requirements.txt index 6740879e..9c1fc915 100644 --- a/examples/python-dataflow/requirements.txt +++ b/examples/python-dataflow/requirements.txt @@ -6,7 +6,7 @@ ultralytics gitpython ipython # interactive notebook matplotlib>=3.2.2 -numpy>=1.18.5,<2.0.0 +numpy<2.0.0 # See: https://github.com/opencv/opencv-python/issues/997 opencv-python>=4.1.1 Pillow>=7.1.2 psutil # system resources diff --git a/examples/python-dataflow/run.rs b/examples/python-dataflow/run.rs index a14b553f..65ae5831 100644 --- a/examples/python-dataflow/run.rs +++ b/examples/python-dataflow/run.rs @@ -1,5 +1,4 @@ use dora_core::{get_pip_path, get_python_path, run}; -use dora_download::download_file; use dora_tracing::set_up_tracing; use eyre::{bail, ContextCompat, WrapErr}; use std::path::Path; @@ -73,12 +72,6 @@ async fn main() -> eyre::Result<()> { ) .await .context("maturin develop failed")?; - download_file( - "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt", - Path::new("yolov8n.pt"), - ) - .await - .context("Could not download weights.")?; let dataflow = Path::new("dataflow.yml"); run_dataflow(dataflow).await?; diff --git a/examples/python-operator-dataflow/requirements.txt b/examples/python-operator-dataflow/requirements.txt index 6740879e..bd15e396 100644 --- a/examples/python-operator-dataflow/requirements.txt +++ b/examples/python-operator-dataflow/requirements.txt @@ -6,7 +6,7 @@ ultralytics gitpython ipython # interactive notebook matplotlib>=3.2.2 -numpy>=1.18.5,<2.0.0 +numpy<2.0.0 opencv-python>=4.1.1 Pillow>=7.1.2 psutil # system resources From c4263e6c8fee0a69663750fa145f6f52ea88ab01 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 17 Jun 2024 11:00:52 +0200 Subject: [PATCH 6/7] Remove bound from definition of .to_py_dict --- apis/python/node/src/lib.rs | 4 ++-- apis/python/operator/src/lib.rs | 2 +- binaries/runtime/src/operator/python.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index fe090354..45125ec0 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -74,7 +74,7 @@ impl Node { let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32))); if let Some(event) = event { let dict = event - .to_py_dict_bound(py) + .to_py_dict(py) .context("Could not convert event into a dict")?; Ok(Some(dict)) } else { @@ -96,7 +96,7 @@ impl Node { /// /// :rtype: dict pub fn __next__(&mut self, py: Python) -> PyResult>> { - self.next(py, Some(2.0)) + self.next(py, None) } /// You can iterate over the event stream with a loop diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 2f667579..2c027e9a 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -16,7 +16,7 @@ pub struct PyEvent { } impl PyEvent { - pub fn to_py_dict_bound(self, py: Python<'_>) -> PyResult> { + pub fn to_py_dict(self, py: Python<'_>) -> PyResult> { let mut pydict = HashMap::new(); match &self.event { MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)), diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 366b9482..fa488388 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -209,7 +209,7 @@ pub fn run( } let py_event = PyEvent::from(event) - .to_py_dict_bound(py) + .to_py_dict(py) .context("Could not convert event to pydict bound")?; let status_enum = operator From a8d4c070d2a8373aaf9be6119705ab373c088397 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 17 Jun 2024 13:13:50 +0200 Subject: [PATCH 7/7] Fix CI opencv-python install --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d74da80a..8fec30c3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -318,7 +318,7 @@ jobs: dora start dataflow.yml --name ci-python-test sleep 10 dora stop --name ci-python-test --grace-duration 5s - pip install opencv-python + pip install "numpy<2.0.0" opencv-python dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic python ../examples/python-dataflow/plot_dynamic.py sleep 5