diff --git a/Cargo.lock b/Cargo.lock index 15f6a9f5..d2628b5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -992,6 +992,7 @@ name = "dora-node-api-python" version = "0.1.0" dependencies = [ "dora-node-api", + "dora-operator-api-python", "eyre", "flume", "pyo3", @@ -1024,6 +1025,17 @@ dependencies = [ "syn", ] +[[package]] +name = "dora-operator-api-python" +version = "0.1.0" +dependencies = [ + "dora-node-api", + "eyre", + "flume", + "pyo3", + "serde_yaml 0.8.23", +] + [[package]] name = "dora-operator-api-types" version = "0.1.0" @@ -1040,6 +1052,7 @@ dependencies = [ "dora-message", "dora-metrics", "dora-node-api", + "dora-operator-api-python", "dora-operator-api-types", "dora-tracing", "eyre", diff --git a/Cargo.toml b/Cargo.toml index f281ff4c..4a806645 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "apis/c/*", "apis/python/node", + "apis/python/operator", "apis/rust/*", "apis/rust/operator/macros", "apis/rust/operator/types", diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 00c88ab3..ce870912 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -8,6 +8,7 @@ license = "Apache-2.0" [dependencies] dora-node-api = { path = "../../rust/node" } +dora-operator-api-python = { path = "../operator" } pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] } eyre = "0.6" serde_yaml = "0.8.23" diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 7d8e10f4..7fec3e78 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,9 +1,13 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] use dora_node_api::{config::NodeId, DoraNode, Input}; +use dora_operator_api_python::{metadata_to_pydict, pydict_to_metadata}; use eyre::{Context, Result}; use flume::Receiver; -use pyo3::{prelude::*, types::PyBytes}; +use pyo3::{ + prelude::*, + types::{PyBytes, PyDict}, +}; #[pyclass] pub struct Node { @@ -16,7 +20,12 @@ pub struct PyInput(Input); impl IntoPy for PyInput { fn into_py(self, py: Python) -> PyObject { - (self.0.id.to_string(), PyBytes::new(py, &self.0.data())).into_py(py) + ( + self.0.id.to_string(), + PyBytes::new(py, &self.0.data()), + metadata_to_pydict(self.0.metadata(), py), + ) + .into_py(py) } } @@ -49,10 +58,16 @@ impl Node { slf } - pub fn send_output(&mut self, output_id: String, data: &PyBytes) -> Result<()> { + pub fn send_output( + &mut self, + output_id: String, + data: &PyBytes, + metadata: Option<&PyDict>, + ) -> Result<()> { let data = &data.as_bytes(); + let metadata = pydict_to_metadata(metadata)?; self.node - .send_output(&output_id.into(), &Default::default(), data.len(), |out| { + .send_output(&output_id.into(), &metadata, data.len(), |out| { out.copy_from_slice(data); }) .wrap_err("Could not send output") diff --git a/apis/python/operator/Cargo.toml b/apis/python/operator/Cargo.toml new file mode 100644 index 00000000..d51de4a0 --- /dev/null +++ b/apis/python/operator/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "dora-operator-api-python" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { path = "../../rust/node" } +pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] } +eyre = "0.6" +serde_yaml = "0.8.23" +flume = "0.10.14" diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs new file mode 100644 index 00000000..ced862ad --- /dev/null +++ b/apis/python/operator/src/lib.rs @@ -0,0 +1,43 @@ +use std::borrow::Cow; + +use dora_node_api::Metadata; +use eyre::{Context, Result}; +use pyo3::{prelude::*, types::PyDict}; + +pub fn pydict_to_metadata<'a>(dict: Option<&'a PyDict>) -> Result> { + let mut default_metadata = Metadata::default(); + if let Some(metadata) = dict { + for (key, value) in metadata.iter() { + match key.extract::<&str>().context("Parsing metadata keys")? { + "metadata_version" => { + default_metadata.metadata_version = + value.extract().context("parsing metadata version failed")?; + } + "watermark" => { + default_metadata.watermark = + value.extract().context("parsing watermark failed")?; + } + "deadline" => { + default_metadata.deadline = + value.extract().context("parsing deadline failed")?; + } + "open_telemetry_context" => { + let otel_context: &str = value + .extract() + .context("parsing open telemetry context failed")?; + default_metadata.open_telemetry_context = Cow::Borrowed(otel_context); + } + _ => (), + } + } + } + Ok(default_metadata) +} + +pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> &'a PyDict { + let dict = PyDict::new(py); + dict.set_item("open_telemetry_context", &metadata.open_telemetry_context) + .wrap_err("could not make metadata a python dictionary item") + .unwrap(); + dict +} diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 57e3b81c..4caea8b6 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -12,6 +12,7 @@ dora-node-api = { path = "../../apis/rust/node", default-features = false, featu "zenoh", "iceoryx", ] } +dora-operator-api-python = { path = "../../apis/python/operator" } dora-operator-api-types = { path = "../../apis/rust/operator/types" } dora-core = { version = "0.1.0", path = "../../libraries/core" } dora-tracing = { path = "../../libraries/extensions/telemetry/tracing", optional = true } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 4fefb204..21a5e79e 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -2,6 +2,7 @@ use super::{OperatorEvent, Tracer}; use dora_node_api::{communication::Publisher, config::DataId}; +use dora_operator_api_python::metadata_to_pydict; use eyre::{bail, eyre, Context}; use pyo3::{ pyclass, @@ -10,6 +11,7 @@ use pyo3::{ Py, Python, }; use std::{ + borrow::Cow, collections::HashMap, panic::{catch_unwind, AssertUnwindSafe}, path::Path, @@ -87,7 +89,7 @@ pub fn spawn( let operator = Python::with_gil(init_operator).wrap_err("failed to init python operator")?; - while let Ok(input) = inputs.recv() { + while let Ok(mut input) = inputs.recv() { #[cfg(feature = "tracing")] let cx = { use dora_tracing::{deserialize_context, serialize_context}; @@ -107,14 +109,14 @@ pub fn spawn( let () = tracer; "" }; + input.metadata.open_telemetry_context = Cow::Borrowed(cx); + let status_enum = Python::with_gil(|py| { - let metadata = PyDict::new(py); - metadata.set_item("open_telemetry_context", &cx)?; let input_dict = PyDict::new(py); input_dict.set_item("id", input.id.as_str())?; input_dict.set_item("data", PyBytes::new(py, &input.data()))?; - input_dict.set_item("metadata", metadata)?; + input_dict.set_item("metadata", metadata_to_pydict(input.metadata(), py))?; operator .call_method1(py, "on_input", (input_dict, send_output.clone())) @@ -176,17 +178,14 @@ struct SendOutputCallback { #[allow(unsafe_op_in_unsafe_fn)] mod callback_impl { - use std::borrow::Cow; - use super::SendOutputCallback; - use dora_message::Metadata; + use dora_operator_api_python::pydict_to_metadata; use eyre::{eyre, Context}; use pyo3::{ pymethods, types::{PyBytes, PyDict}, PyResult, }; - use tracing::warn; #[pymethods] impl SendOutputCallback { @@ -198,35 +197,7 @@ mod callback_impl { ) -> PyResult<()> { match self.publishers.get(output) { Some(publisher) => { - let mut default_metadata = Metadata::default(); - if let Some(metadata) = metadata { - for (key, value) in metadata.iter() { - match key.extract::<&str>().context("Parsing metadata keys")? { - "metadata_version" => { - default_metadata.metadata_version = value - .extract() - .context("parsing metadata version failed")?; - } - "watermark" => { - default_metadata.watermark = - value.extract().context("parsing watermark failed")?; - } - "deadline" => { - default_metadata.deadline = - value.extract().context("parsing deadline failed")?; - } - "open_telemetry_context" => { - let otel_context: &str = value - .extract() - .context("parsing open telemetry context failed")?; - default_metadata.open_telemetry_context = - Cow::Borrowed(otel_context); - } - _ => warn!("Unexpected key argument for metadata"), - } - } - }; - let message = default_metadata + let message = pydict_to_metadata(metadata)? .serialize() .context(format!("failed to serialize `{}` metadata", output)); message.and_then(|mut message| { diff --git a/docs/src/python-api.md b/docs/src/python-api.md index 8d33224f..94d1523c 100644 --- a/docs/src/python-api.md +++ b/docs/src/python-api.md @@ -10,8 +10,7 @@ An operator requires an `on_input` method and requires to return a `DoraStatus` class Operator: def on_input( self, - input_id: str, - value: bytes, + dora_input: dict, send_output: Callable[[str, bytes], None], ) -> DoraStatus: ``` @@ -47,11 +46,11 @@ node = Node() `.next()` gives you the next input that the node has received. It blocks until the next input becomes available. It will return `None` when all senders has been dropped. ```python -input_id, value = node.next() +input_id, value, metadata = node.next() # or -for input_id, value in node: +for input_id, value, metadata in node: ``` #### `.send_output(output_id, data)` @@ -59,19 +58,14 @@ for input_id, value in node: `send_output` send data from the node. ```python -node.send_output("string", b"string") +node.send_output("string", b"string", {"open_telemetry_context": "7632e76"}) ``` - ### Try it out! - Install python node API: ```bash -cd apis/python/node -python3 -m venv .env -source .env/bin/activate -pip install maturin -maturin develop +pip install dora-rs ``` - Create a python file called `webcam.py`: