Browse Source

Add Python Node API metadata (#101)

* Add a `dora-python-operator` crate to hold utils functions for dora python

* Remove python serialisation and deserialisation from `dora-runtime`

* Update `python` documentation
tags/v0.0.0-test-pr-120
Xavier Tao GitHub 3 years ago
parent
commit
44a7c16c21
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 105 additions and 52 deletions
  1. +13
    -0
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +1
    -0
      apis/python/node/Cargo.toml
  4. +19
    -4
      apis/python/node/src/lib.rs
  5. +14
    -0
      apis/python/operator/Cargo.toml
  6. +43
    -0
      apis/python/operator/src/lib.rs
  7. +1
    -0
      binaries/runtime/Cargo.toml
  8. +8
    -37
      binaries/runtime/src/operator/python.rs
  9. +5
    -11
      docs/src/python-api.md

+ 13
- 0
Cargo.lock View File

@@ -992,6 +992,7 @@ name = "dora-node-api-python"
version = "0.1.0"
dependencies = [
"dora-node-api",
"dora-operator-api-python",
"eyre",
"flume",
"pyo3",
@@ -1024,6 +1025,17 @@ dependencies = [
"syn",
]

[[package]]
name = "dora-operator-api-python"
version = "0.1.0"
dependencies = [
"dora-node-api",
"eyre",
"flume",
"pyo3",
"serde_yaml 0.8.23",
]

[[package]]
name = "dora-operator-api-types"
version = "0.1.0"
@@ -1040,6 +1052,7 @@ dependencies = [
"dora-message",
"dora-metrics",
"dora-node-api",
"dora-operator-api-python",
"dora-operator-api-types",
"dora-tracing",
"eyre",


+ 1
- 0
Cargo.toml View File

@@ -2,6 +2,7 @@
members = [
"apis/c/*",
"apis/python/node",
"apis/python/operator",
"apis/rust/*",
"apis/rust/operator/macros",
"apis/rust/operator/types",


+ 1
- 0
apis/python/node/Cargo.toml View File

@@ -8,6 +8,7 @@ license = "Apache-2.0"

[dependencies]
dora-node-api = { path = "../../rust/node" }
dora-operator-api-python = { path = "../operator" }
pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] }
eyre = "0.6"
serde_yaml = "0.8.23"


+ 19
- 4
apis/python/node/src/lib.rs View File

@@ -1,9 +1,13 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use dora_node_api::{config::NodeId, DoraNode, Input};
use dora_operator_api_python::{metadata_to_pydict, pydict_to_metadata};
use eyre::{Context, Result};
use flume::Receiver;
use pyo3::{prelude::*, types::PyBytes};
use pyo3::{
prelude::*,
types::{PyBytes, PyDict},
};

#[pyclass]
pub struct Node {
@@ -16,7 +20,12 @@ pub struct PyInput(Input);

impl IntoPy<PyObject> for PyInput {
fn into_py(self, py: Python) -> PyObject {
(self.0.id.to_string(), PyBytes::new(py, &self.0.data())).into_py(py)
(
self.0.id.to_string(),
PyBytes::new(py, &self.0.data()),
metadata_to_pydict(self.0.metadata(), py),
)
.into_py(py)
}
}

@@ -49,10 +58,16 @@ impl Node {
slf
}

pub fn send_output(&mut self, output_id: String, data: &PyBytes) -> Result<()> {
pub fn send_output(
&mut self,
output_id: String,
data: &PyBytes,
metadata: Option<&PyDict>,
) -> Result<()> {
let data = &data.as_bytes();
let metadata = pydict_to_metadata(metadata)?;
self.node
.send_output(&output_id.into(), &Default::default(), data.len(), |out| {
.send_output(&output_id.into(), &metadata, data.len(), |out| {
out.copy_from_slice(data);
})
.wrap_err("Could not send output")


+ 14
- 0
apis/python/operator/Cargo.toml View File

@@ -0,0 +1,14 @@
[package]
name = "dora-operator-api-python"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { path = "../../rust/node" }
pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] }
eyre = "0.6"
serde_yaml = "0.8.23"
flume = "0.10.14"

+ 43
- 0
apis/python/operator/src/lib.rs View File

@@ -0,0 +1,43 @@
use std::borrow::Cow;

use dora_node_api::Metadata;
use eyre::{Context, Result};
use pyo3::{prelude::*, types::PyDict};

pub fn pydict_to_metadata<'a>(dict: Option<&'a PyDict>) -> Result<Metadata<'a>> {
let mut default_metadata = Metadata::default();
if let Some(metadata) = dict {
for (key, value) in metadata.iter() {
match key.extract::<&str>().context("Parsing metadata keys")? {
"metadata_version" => {
default_metadata.metadata_version =
value.extract().context("parsing metadata version failed")?;
}
"watermark" => {
default_metadata.watermark =
value.extract().context("parsing watermark failed")?;
}
"deadline" => {
default_metadata.deadline =
value.extract().context("parsing deadline failed")?;
}
"open_telemetry_context" => {
let otel_context: &str = value
.extract()
.context("parsing open telemetry context failed")?;
default_metadata.open_telemetry_context = Cow::Borrowed(otel_context);
}
_ => (),
}
}
}
Ok(default_metadata)
}

pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> &'a PyDict {
let dict = PyDict::new(py);
dict.set_item("open_telemetry_context", &metadata.open_telemetry_context)
.wrap_err("could not make metadata a python dictionary item")
.unwrap();
dict
}

+ 1
- 0
binaries/runtime/Cargo.toml View File

@@ -12,6 +12,7 @@ dora-node-api = { path = "../../apis/rust/node", default-features = false, featu
"zenoh",
"iceoryx",
] }
dora-operator-api-python = { path = "../../apis/python/operator" }
dora-operator-api-types = { path = "../../apis/rust/operator/types" }
dora-core = { version = "0.1.0", path = "../../libraries/core" }
dora-tracing = { path = "../../libraries/extensions/telemetry/tracing", optional = true }


+ 8
- 37
binaries/runtime/src/operator/python.rs View File

@@ -2,6 +2,7 @@

use super::{OperatorEvent, Tracer};
use dora_node_api::{communication::Publisher, config::DataId};
use dora_operator_api_python::metadata_to_pydict;
use eyre::{bail, eyre, Context};
use pyo3::{
pyclass,
@@ -10,6 +11,7 @@ use pyo3::{
Py, Python,
};
use std::{
borrow::Cow,
collections::HashMap,
panic::{catch_unwind, AssertUnwindSafe},
path::Path,
@@ -87,7 +89,7 @@ pub fn spawn(
let operator =
Python::with_gil(init_operator).wrap_err("failed to init python operator")?;

while let Ok(input) = inputs.recv() {
while let Ok(mut input) = inputs.recv() {
#[cfg(feature = "tracing")]
let cx = {
use dora_tracing::{deserialize_context, serialize_context};
@@ -107,14 +109,14 @@ pub fn spawn(
let () = tracer;
""
};
input.metadata.open_telemetry_context = Cow::Borrowed(cx);

let status_enum = Python::with_gil(|py| {
let metadata = PyDict::new(py);
metadata.set_item("open_telemetry_context", &cx)?;
let input_dict = PyDict::new(py);

input_dict.set_item("id", input.id.as_str())?;
input_dict.set_item("data", PyBytes::new(py, &input.data()))?;
input_dict.set_item("metadata", metadata)?;
input_dict.set_item("metadata", metadata_to_pydict(input.metadata(), py))?;

operator
.call_method1(py, "on_input", (input_dict, send_output.clone()))
@@ -176,17 +178,14 @@ struct SendOutputCallback {
#[allow(unsafe_op_in_unsafe_fn)]
mod callback_impl {

use std::borrow::Cow;

use super::SendOutputCallback;
use dora_message::Metadata;
use dora_operator_api_python::pydict_to_metadata;
use eyre::{eyre, Context};
use pyo3::{
pymethods,
types::{PyBytes, PyDict},
PyResult,
};
use tracing::warn;

#[pymethods]
impl SendOutputCallback {
@@ -198,35 +197,7 @@ mod callback_impl {
) -> PyResult<()> {
match self.publishers.get(output) {
Some(publisher) => {
let mut default_metadata = Metadata::default();
if let Some(metadata) = metadata {
for (key, value) in metadata.iter() {
match key.extract::<&str>().context("Parsing metadata keys")? {
"metadata_version" => {
default_metadata.metadata_version = value
.extract()
.context("parsing metadata version failed")?;
}
"watermark" => {
default_metadata.watermark =
value.extract().context("parsing watermark failed")?;
}
"deadline" => {
default_metadata.deadline =
value.extract().context("parsing deadline failed")?;
}
"open_telemetry_context" => {
let otel_context: &str = value
.extract()
.context("parsing open telemetry context failed")?;
default_metadata.open_telemetry_context =
Cow::Borrowed(otel_context);
}
_ => warn!("Unexpected key argument for metadata"),
}
}
};
let message = default_metadata
let message = pydict_to_metadata(metadata)?
.serialize()
.context(format!("failed to serialize `{}` metadata", output));
message.and_then(|mut message| {


+ 5
- 11
docs/src/python-api.md View File

@@ -10,8 +10,7 @@ An operator requires an `on_input` method and requires to return a `DoraStatus`
class Operator:
def on_input(
self,
input_id: str,
value: bytes,
dora_input: dict,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
```
@@ -47,11 +46,11 @@ node = Node()
`.next()` gives you the next input that the node has received. It blocks until the next input becomes available. It will return `None` when all senders has been dropped.

```python
input_id, value = node.next()
input_id, value, metadata = node.next()

# or

for input_id, value in node:
for input_id, value, metadata in node:
```

#### `.send_output(output_id, data)`
@@ -59,19 +58,14 @@ for input_id, value in node:
`send_output` send data from the node.

```python
node.send_output("string", b"string")
node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})
```


### Try it out!

- Install python node API:
```bash
cd apis/python/node
python3 -m venv .env
source .env/bin/activate
pip install maturin
maturin develop
pip install dora-rs
```

- Create a python file called `webcam.py`:


Loading…
Cancel
Save