Browse Source

Adding `next`, `send_output` for Python node API

Adding `next` and `send_output` requires an async threadpool as the
communication Layer defined by the Middleware Layer returns an async Future
stream.

I solve this issue by adding a tokio runtime on a separater thread that is connected with two channels.
One for sending data and one for receiving data.

Those channel are then exposed synchronously to Python. This should not be cause for
concern as channel are really fast.

Looking at Zenoh Python client, they are heavily using `pyo3-asyncio` implementation
of futures to pass Rust futures into Python.

This can be a solution as well, but, from previous experiment, I'm concerned about performance on such
solution. I have experienced that putting futures from Rust into the `asyncio` queue to be slow.

I'm concerned also by mixing `async` and `sync` code in Python, as it might be blocking. This might requires 2 threadpool in Python.
This might seem as heavy overhead for some operations.
tags/v0.0.0-test.4
haixuanTao 3 years ago
parent
commit
128bf6ca43
3 changed files with 105 additions and 5 deletions
  1. +10
    -0
      apis/python/node/Cargo.toml
  2. +12
    -0
      apis/python/node/README.md
  3. +83
    -5
      apis/python/node/src/lib.rs

+ 10
- 0
apis/python/node/Cargo.toml View File

@@ -8,3 +8,13 @@ edition = "2021"
[dependencies]
dora-node-api = { path = "../../rust/node" }
pyo3 = "0.16"
eyre = "0.6"
pollster = "0.2"
futures = "0.3.21"
tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] }
serde_yaml = "0.8.23"


[lib]
name = "dora"
crate-type = ["cdylib"]

+ 12
- 0
apis/python/node/README.md View File

@@ -0,0 +1,12 @@
This crate corresponds to the Node API for Dora.

## Building

To build the Python module for development:

````bash
python3 -m venv .env
source .env/bin/activate
pip install maturin
maturin develop
````

+ 83
- 5
apis/python/node/src/lib.rs View File

@@ -1,15 +1,93 @@
use dora_node_api::DoraNode;
use dora_node_api::config::DataId;
use dora_node_api::{DoraNode, Input};
use eyre::Context;
use futures::StreamExt;
use pyo3::prelude::*;

use pyo3::types::PyBytes;
use std::sync::Arc;
use std::thread;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
#[pyclass]
#[repr(transparent)]
// #[repr(transparent)]
pub struct PyDoraNode {
pub node: DoraNode,
// pub node: DoraNode,
pub rx_input: Receiver<Input>,
pub tx_output: Sender<(String, Vec<u8>)>,
}

pub struct PyInput(Input);

impl IntoPy<PyObject> for PyInput {
fn into_py(self, py: Python) -> PyObject {
(self.0.id.to_string(), PyBytes::new(py, &self.0.data)).into_py(py)
}
}

#[pymethods]
impl PyDoraNode {
#[staticmethod]
pub fn init_from_env() -> Self {
let (tx_input, rx_input) = mpsc::channel(10);
let (tx_output, mut rx_output) = mpsc::channel::<(String, Vec<u8>)>(10);

// Dispatching a tokio threadpool enables us to conveniently use Dora Future stream
// through tokio channel.
// It would have been difficult to expose the FutureStream of Dora directly.
thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
rt.block_on(async move {
let node = Arc::new(DoraNode::init_from_env().await.unwrap());
let _node = node.clone();
let receive_handle = tokio::spawn(async move {
let mut inputs = _node.inputs().await.unwrap();
loop {
if let Some(input) = inputs.next().await {
tx_input.send(input).await.unwrap()
};
}
});
let send_handle = tokio::spawn(async move {
loop {
if let Some((output_str, data)) = rx_output.recv().await {
let output_id = DataId::from(output_str);
node.send_output(&output_id, data.as_slice()).await.unwrap()
};
}
});
let (_, _) = tokio::join!(receive_handle, send_handle);
});
});

PyDoraNode {
rx_input,
tx_output,
}
}

pub fn next(&mut self) -> PyResult<Option<PyInput>> {
self.__next__()
}

pub fn __next__(&mut self) -> PyResult<Option<PyInput>> {
if let Some(input) = self.rx_input.blocking_recv() {
Ok(Some(PyInput(input)))
} else {
Ok(None)
}
}

pub fn send_output(&self, output_str: String, data: Vec<u8>) -> () {
self.tx_output
.blocking_send((output_str, data))
.wrap_err("Could not send output")
.unwrap()
}
}

/// This module is implemented in Rust.
#[pymodule]
fn wonnx(_py: Python, m: &PyModule) -> PyResult<()> {
fn dora(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyDoraNode>().unwrap();
Ok(())
}

Loading…
Cancel
Save