From b2eace277d04f46601d2e05bccf55092ea285051 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 16 Jun 2024 17:16:46 +0200 Subject: [PATCH] Transform custom PyEvent into standard python dictionary for easier debuggability. Currently having a custom PyEvent make debugging very hard as fields are hidden within the class PyEvent that is defined within Rust Code. Python user are getting really confused about this obscure class. This PR transforms the class into a standard python dictionary. --- apis/python/node/src/lib.rs | 19 ++-- apis/python/operator/src/lib.rs | 91 +++++++------------ binaries/runtime/src/operator/python.rs | 4 +- examples/python-dataflow/example.py | 5 + .../python-ros2-dataflow/random_turtle.py | 4 +- 5 files changed, 58 insertions(+), 65 deletions(-) create mode 100644 examples/python-dataflow/example.py diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 3c1fcb2a..849e20a2 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -69,9 +69,16 @@ impl Node { /// :type timeout: float, optional /// :rtype: dora.PyEvent #[allow(clippy::should_implement_trait)] - pub fn next(&mut self, py: Python, timeout: Option) -> PyResult> { + pub fn next(&mut self, py: Python, timeout: Option) -> PyResult>> { let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32))); - Ok(event) + if let Some(event) = event { + let dict = event + .to_py_dict_bound(py) + .context("Could not convert event into a dictionnary")?; + Ok(Some(dict)) + } else { + Ok(None) + } } /// You can iterate over the event stream with a loop @@ -84,10 +91,11 @@ impl Node { /// case "image": /// ``` /// + /// Default behaviour is to timeout after 2 seconds. + /// /// :rtype: dora.PyEvent - pub fn __next__(&mut self, py: Python) -> PyResult> { - let event = py.allow_threads(|| self.events.recv(None)); - Ok(event) + pub fn __next__(&mut self, py: Python) -> PyResult>> { + self.next(py, Some(2.0)) } /// You can iterate over the event stream with a loop @@ -262,7 +270,6 @@ fn dora(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(start_runtime, &m)?)?; m.add_class::()?; - m.add_class::()?; m.setattr("__version__", env!("CARGO_PKG_VERSION"))?; m.setattr("__author__", "Dora-rs Authors")?; diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index bc74279e..772f2963 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,69 +1,53 @@ -use arrow::{array::ArrayRef, pyarrow::ToPyArrow}; +use std::collections::HashMap; + +use arrow::pyarrow::ToPyArrow; use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters}; use eyre::{Context, Result}; -use pyo3::{exceptions::PyLookupError, prelude::*, pybacked::PyBackedStr, types::PyDict}; +use pyo3::{ + prelude::*, + pybacked::PyBackedStr, + types::{IntoPyDict, PyDict}, +}; /// Dora Event -#[pyclass] #[derive(Debug)] pub struct PyEvent { event: MergedEvent, - data: Option, } -// Dora Event -#[pymethods] impl PyEvent { - /// - /// :rtype: dora.PyObject - pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult> { - if key == "kind" { - let kind = match &self.event { - MergedEvent::Dora(_) => "dora", - MergedEvent::External(_) => "external", - }; - return Ok(Some(kind.to_object(py))); - } + pub fn to_py_dict_bound(self, py: Python<'_>) -> PyResult> { + let mut pydict = HashMap::new(); match &self.event { MergedEvent::Dora(event) => { - let value = match key { - "type" => Some(Self::ty(event).to_object(py)), - "id" => Self::id(event).map(|v| v.to_object(py)), - "value" => self.value(py)?, - "metadata" => Self::metadata(event, py), - "error" => Self::error(event).map(|v| v.to_object(py)), - other => { - return Err(PyLookupError::new_err(format!( - "event has no property `{other}`" - ))) - } + if let Some(id) = Self::id(event) { + pydict.insert("id", id.into_py(py)); + } + pydict.insert("type", Self::ty(event).to_object(py)); + + match &self.event { + MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)), + MergedEvent::External(_) => pydict.insert("kind", "external".to_object(py)), }; - Ok(value) + + if let Some(value) = self.value(py)? { + pydict.insert("value", value); + } + if let Some(metadata) = Self::metadata(event, py) { + pydict.insert("metadata", metadata); + } + if let Some(error) = Self::error(event) { + pydict.insert("error", error.to_object(py)); + } } MergedEvent::External(event) => { - let value = match key { - "value" => event, - _ => todo!(), - }; - - Ok(Some(value.clone())) + pydict.insert("value", event.clone()); } } - } - pub fn inner(&mut self) -> Option<&PyObject> { - match &self.event { - MergedEvent::Dora(_) => None, - MergedEvent::External(event) => Some(event), - } + Ok(pydict.into_py_dict_bound(py).unbind()) } - fn __str__(&self) -> PyResult { - Ok(format!("{:#?}", &self.event)) - } -} - -impl PyEvent { fn ty(event: &Event) -> &str { match event { Event::Stop => "STOP", @@ -84,9 +68,9 @@ impl PyEvent { /// Returns the payload of an input event as an arrow array (if any). fn value(&self, py: Python<'_>) -> PyResult> { - match (&self.event, &self.data) { - (MergedEvent::Dora(Event::Input { .. }), Some(data)) => { - // TODO: Does this call leak data? + match &self.event { + MergedEvent::Dora(Event::Input { data, .. }) => { + // TODO: Does this call leak data?& let array_data = data.to_data().to_pyarrow(py)?; Ok(Some(array_data)) } @@ -116,13 +100,8 @@ impl From for PyEvent { } impl From> for PyEvent { - fn from(mut event: MergedEvent) -> Self { - let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event { - Some(data.clone()) - } else { - None - }; - Self { event, data } + fn from(event: MergedEvent) -> Self { + Self { event } } } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 82d01ebb..366b9482 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -208,7 +208,9 @@ pub fn run( metadata.parameters.open_telemetry_context = string_cx; } - let py_event = PyEvent::from(event); + let py_event = PyEvent::from(event) + .to_py_dict_bound(py) + .context("Could not convert event to pydict bound")?; let status_enum = operator .call_method1(py, "on_event", (py_event, send_output.clone())) diff --git a/examples/python-dataflow/example.py b/examples/python-dataflow/example.py new file mode 100644 index 00000000..c9221a3a --- /dev/null +++ b/examples/python-dataflow/example.py @@ -0,0 +1,5 @@ +from dora import Node + +node = Node("plot") + +event = node.next() diff --git a/examples/python-ros2-dataflow/random_turtle.py b/examples/python-ros2-dataflow/random_turtle.py index 1e690d07..3b25ac21 100755 --- a/examples/python-ros2-dataflow/random_turtle.py +++ b/examples/python-ros2-dataflow/random_turtle.py @@ -55,11 +55,11 @@ for i in range(500): # ROS2 Event elif event_kind == "external": - pose = event.inner()[0].as_py() + pose = event["value"][0].as_py() min_x = min([min_x, pose["x"]]) max_x = max([max_x, pose["x"]]) min_y = min([min_y, pose["y"]]) max_y = max([max_y, pose["y"]]) - dora_node.send_output("turtle_pose", event.inner()) + dora_node.send_output("turtle_pose", event["value"]) assert max_x - min_x > 1 or max_y - min_y > 1, "no turtle movement"