diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 3c1fcb2a..849e20a2 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -69,9 +69,16 @@ impl Node { /// :type timeout: float, optional /// :rtype: dora.PyEvent #[allow(clippy::should_implement_trait)] - pub fn next(&mut self, py: Python, timeout: Option) -> PyResult> { + pub fn next(&mut self, py: Python, timeout: Option) -> PyResult>> { let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32))); - Ok(event) + if let Some(event) = event { + let dict = event + .to_py_dict_bound(py) + .context("Could not convert event into a dictionnary")?; + Ok(Some(dict)) + } else { + Ok(None) + } } /// You can iterate over the event stream with a loop @@ -84,10 +91,11 @@ impl Node { /// case "image": /// ``` /// + /// Default behaviour is to timeout after 2 seconds. + /// /// :rtype: dora.PyEvent - pub fn __next__(&mut self, py: Python) -> PyResult> { - let event = py.allow_threads(|| self.events.recv(None)); - Ok(event) + pub fn __next__(&mut self, py: Python) -> PyResult>> { + self.next(py, Some(2.0)) } /// You can iterate over the event stream with a loop @@ -262,7 +270,6 @@ fn dora(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(start_runtime, &m)?)?; m.add_class::()?; - m.add_class::()?; m.setattr("__version__", env!("CARGO_PKG_VERSION"))?; m.setattr("__author__", "Dora-rs Authors")?; diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index bc74279e..772f2963 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,69 +1,53 @@ -use arrow::{array::ArrayRef, pyarrow::ToPyArrow}; +use std::collections::HashMap; + +use arrow::pyarrow::ToPyArrow; use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters}; use eyre::{Context, Result}; -use pyo3::{exceptions::PyLookupError, prelude::*, pybacked::PyBackedStr, types::PyDict}; +use pyo3::{ + prelude::*, + pybacked::PyBackedStr, + types::{IntoPyDict, PyDict}, +}; /// Dora Event -#[pyclass] #[derive(Debug)] pub struct PyEvent { event: MergedEvent, - data: Option, } -// Dora Event -#[pymethods] impl PyEvent { - /// - /// :rtype: dora.PyObject - pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult> { - if key == "kind" { - let kind = match &self.event { - MergedEvent::Dora(_) => "dora", - MergedEvent::External(_) => "external", - }; - return Ok(Some(kind.to_object(py))); - } + pub fn to_py_dict_bound(self, py: Python<'_>) -> PyResult> { + let mut pydict = HashMap::new(); match &self.event { MergedEvent::Dora(event) => { - let value = match key { - "type" => Some(Self::ty(event).to_object(py)), - "id" => Self::id(event).map(|v| v.to_object(py)), - "value" => self.value(py)?, - "metadata" => Self::metadata(event, py), - "error" => Self::error(event).map(|v| v.to_object(py)), - other => { - return Err(PyLookupError::new_err(format!( - "event has no property `{other}`" - ))) - } + if let Some(id) = Self::id(event) { + pydict.insert("id", id.into_py(py)); + } + pydict.insert("type", Self::ty(event).to_object(py)); + + match &self.event { + MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)), + MergedEvent::External(_) => pydict.insert("kind", "external".to_object(py)), }; - Ok(value) + + if let Some(value) = self.value(py)? { + pydict.insert("value", value); + } + if let Some(metadata) = Self::metadata(event, py) { + pydict.insert("metadata", metadata); + } + if let Some(error) = Self::error(event) { + pydict.insert("error", error.to_object(py)); + } } MergedEvent::External(event) => { - let value = match key { - "value" => event, - _ => todo!(), - }; - - Ok(Some(value.clone())) + pydict.insert("value", event.clone()); } } - } - pub fn inner(&mut self) -> Option<&PyObject> { - match &self.event { - MergedEvent::Dora(_) => None, - MergedEvent::External(event) => Some(event), - } + Ok(pydict.into_py_dict_bound(py).unbind()) } - fn __str__(&self) -> PyResult { - Ok(format!("{:#?}", &self.event)) - } -} - -impl PyEvent { fn ty(event: &Event) -> &str { match event { Event::Stop => "STOP", @@ -84,9 +68,9 @@ impl PyEvent { /// Returns the payload of an input event as an arrow array (if any). fn value(&self, py: Python<'_>) -> PyResult> { - match (&self.event, &self.data) { - (MergedEvent::Dora(Event::Input { .. }), Some(data)) => { - // TODO: Does this call leak data? + match &self.event { + MergedEvent::Dora(Event::Input { data, .. }) => { + // TODO: Does this call leak data?& let array_data = data.to_data().to_pyarrow(py)?; Ok(Some(array_data)) } @@ -116,13 +100,8 @@ impl From for PyEvent { } impl From> for PyEvent { - fn from(mut event: MergedEvent) -> Self { - let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event { - Some(data.clone()) - } else { - None - }; - Self { event, data } + fn from(event: MergedEvent) -> Self { + Self { event } } } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 82d01ebb..366b9482 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -208,7 +208,9 @@ pub fn run( metadata.parameters.open_telemetry_context = string_cx; } - let py_event = PyEvent::from(event); + let py_event = PyEvent::from(event) + .to_py_dict_bound(py) + .context("Could not convert event to pydict bound")?; let status_enum = operator .call_method1(py, "on_event", (py_event, send_output.clone())) diff --git a/examples/python-dataflow/example.py b/examples/python-dataflow/example.py new file mode 100644 index 00000000..c9221a3a --- /dev/null +++ b/examples/python-dataflow/example.py @@ -0,0 +1,5 @@ +from dora import Node + +node = Node("plot") + +event = node.next() diff --git a/examples/python-ros2-dataflow/random_turtle.py b/examples/python-ros2-dataflow/random_turtle.py index 1e690d07..3b25ac21 100755 --- a/examples/python-ros2-dataflow/random_turtle.py +++ b/examples/python-ros2-dataflow/random_turtle.py @@ -55,11 +55,11 @@ for i in range(500): # ROS2 Event elif event_kind == "external": - pose = event.inner()[0].as_py() + pose = event["value"][0].as_py() min_x = min([min_x, pose["x"]]) max_x = max([max_x, pose["x"]]) min_y = min([min_y, pose["y"]]) max_y = max([max_y, pose["y"]]) - dora_node.send_output("turtle_pose", event.inner()) + dora_node.send_output("turtle_pose", event["value"]) assert max_x - min_x > 1 or max_y - min_y > 1, "no turtle movement"