From 128bf6ca43e9e03eed9696a265bc360c4de3d010 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 29 Jul 2022 11:41:54 +0200 Subject: [PATCH] Adding `next`, `send_output` for Python node API Adding `next` and `send_output` requires an async threadpool as the communication Layer defined by the Middleware Layer returns an async Future stream. I solve this issue by adding a tokio runtime on a separater thread that is connected with two channels. One for sending data and one for receiving data. Those channel are then exposed synchronously to Python. This should not be cause for concern as channel are really fast. Looking at Zenoh Python client, they are heavily using `pyo3-asyncio` implementation of futures to pass Rust futures into Python. This can be a solution as well, but, from previous experiment, I'm concerned about performance on such solution. I have experienced that putting futures from Rust into the `asyncio` queue to be slow. I'm concerned also by mixing `async` and `sync` code in Python, as it might be blocking. This might requires 2 threadpool in Python. This might seem as heavy overhead for some operations. --- apis/python/node/Cargo.toml | 10 +++++ apis/python/node/README.md | 12 +++++ apis/python/node/src/lib.rs | 88 ++++++++++++++++++++++++++++++++++--- 3 files changed, 105 insertions(+), 5 deletions(-) create mode 100644 apis/python/node/README.md diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 0ebf4111..64512ab8 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -8,3 +8,13 @@ edition = "2021" [dependencies] dora-node-api = { path = "../../rust/node" } pyo3 = "0.16" +eyre = "0.6" +pollster = "0.2" +futures = "0.3.21" +tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] } +serde_yaml = "0.8.23" + + +[lib] +name = "dora" +crate-type = ["cdylib"] diff --git a/apis/python/node/README.md b/apis/python/node/README.md new file mode 100644 index 00000000..408cd83c --- /dev/null +++ b/apis/python/node/README.md @@ -0,0 +1,12 @@ +This crate corresponds to the Node API for Dora. + +## Building + +To build the Python module for development: + +````bash +python3 -m venv .env +source .env/bin/activate +pip install maturin +maturin develop +```` diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 3f487b5e..7035d46b 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,15 +1,93 @@ -use dora_node_api::DoraNode; +use dora_node_api::config::DataId; +use dora_node_api::{DoraNode, Input}; +use eyre::Context; +use futures::StreamExt; use pyo3::prelude::*; - +use pyo3::types::PyBytes; +use std::sync::Arc; +use std::thread; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; #[pyclass] -#[repr(transparent)] +// #[repr(transparent)] pub struct PyDoraNode { - pub node: DoraNode, + // pub node: DoraNode, + pub rx_input: Receiver, + pub tx_output: Sender<(String, Vec)>, +} + +pub struct PyInput(Input); + +impl IntoPy for PyInput { + fn into_py(self, py: Python) -> PyObject { + (self.0.id.to_string(), PyBytes::new(py, &self.0.data)).into_py(py) + } +} + +#[pymethods] +impl PyDoraNode { + #[staticmethod] + pub fn init_from_env() -> Self { + let (tx_input, rx_input) = mpsc::channel(10); + let (tx_output, mut rx_output) = mpsc::channel::<(String, Vec)>(10); + + // Dispatching a tokio threadpool enables us to conveniently use Dora Future stream + // through tokio channel. + // It would have been difficult to expose the FutureStream of Dora directly. + thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + rt.block_on(async move { + let node = Arc::new(DoraNode::init_from_env().await.unwrap()); + let _node = node.clone(); + let receive_handle = tokio::spawn(async move { + let mut inputs = _node.inputs().await.unwrap(); + loop { + if let Some(input) = inputs.next().await { + tx_input.send(input).await.unwrap() + }; + } + }); + let send_handle = tokio::spawn(async move { + loop { + if let Some((output_str, data)) = rx_output.recv().await { + let output_id = DataId::from(output_str); + node.send_output(&output_id, data.as_slice()).await.unwrap() + }; + } + }); + let (_, _) = tokio::join!(receive_handle, send_handle); + }); + }); + + PyDoraNode { + rx_input, + tx_output, + } + } + + pub fn next(&mut self) -> PyResult> { + self.__next__() + } + + pub fn __next__(&mut self) -> PyResult> { + if let Some(input) = self.rx_input.blocking_recv() { + Ok(Some(PyInput(input))) + } else { + Ok(None) + } + } + + pub fn send_output(&self, output_str: String, data: Vec) -> () { + self.tx_output + .blocking_send((output_str, data)) + .wrap_err("Could not send output") + .unwrap() + } } /// This module is implemented in Rust. #[pymodule] -fn wonnx(_py: Python, m: &PyModule) -> PyResult<()> { +fn dora(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::().unwrap(); Ok(()) }