diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 0ebf4111..64512ab8 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -8,3 +8,13 @@ edition = "2021" [dependencies] dora-node-api = { path = "../../rust/node" } pyo3 = "0.16" +eyre = "0.6" +pollster = "0.2" +futures = "0.3.21" +tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] } +serde_yaml = "0.8.23" + + +[lib] +name = "dora" +crate-type = ["cdylib"] diff --git a/apis/python/node/README.md b/apis/python/node/README.md new file mode 100644 index 00000000..408cd83c --- /dev/null +++ b/apis/python/node/README.md @@ -0,0 +1,12 @@ +This crate corresponds to the Node API for Dora. + +## Building + +To build the Python module for development: + +````bash +python3 -m venv .env +source .env/bin/activate +pip install maturin +maturin develop +```` diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 3f487b5e..7035d46b 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,15 +1,93 @@ -use dora_node_api::DoraNode; +use dora_node_api::config::DataId; +use dora_node_api::{DoraNode, Input}; +use eyre::Context; +use futures::StreamExt; use pyo3::prelude::*; - +use pyo3::types::PyBytes; +use std::sync::Arc; +use std::thread; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; #[pyclass] -#[repr(transparent)] +// #[repr(transparent)] pub struct PyDoraNode { - pub node: DoraNode, + // pub node: DoraNode, + pub rx_input: Receiver, + pub tx_output: Sender<(String, Vec)>, +} + +pub struct PyInput(Input); + +impl IntoPy for PyInput { + fn into_py(self, py: Python) -> PyObject { + (self.0.id.to_string(), PyBytes::new(py, &self.0.data)).into_py(py) + } +} + +#[pymethods] +impl PyDoraNode { + #[staticmethod] + pub fn init_from_env() -> Self { + let (tx_input, rx_input) = mpsc::channel(10); + let (tx_output, mut rx_output) = mpsc::channel::<(String, Vec)>(10); + + // Dispatching a tokio threadpool enables us to conveniently use Dora Future stream + // through tokio channel. + // It would have been difficult to expose the FutureStream of Dora directly. + thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + rt.block_on(async move { + let node = Arc::new(DoraNode::init_from_env().await.unwrap()); + let _node = node.clone(); + let receive_handle = tokio::spawn(async move { + let mut inputs = _node.inputs().await.unwrap(); + loop { + if let Some(input) = inputs.next().await { + tx_input.send(input).await.unwrap() + }; + } + }); + let send_handle = tokio::spawn(async move { + loop { + if let Some((output_str, data)) = rx_output.recv().await { + let output_id = DataId::from(output_str); + node.send_output(&output_id, data.as_slice()).await.unwrap() + }; + } + }); + let (_, _) = tokio::join!(receive_handle, send_handle); + }); + }); + + PyDoraNode { + rx_input, + tx_output, + } + } + + pub fn next(&mut self) -> PyResult> { + self.__next__() + } + + pub fn __next__(&mut self) -> PyResult> { + if let Some(input) = self.rx_input.blocking_recv() { + Ok(Some(PyInput(input))) + } else { + Ok(None) + } + } + + pub fn send_output(&self, output_str: String, data: Vec) -> () { + self.tx_output + .blocking_send((output_str, data)) + .wrap_err("Could not send output") + .unwrap() + } } /// This module is implemented in Rust. #[pymodule] -fn wonnx(_py: Python, m: &PyModule) -> PyResult<()> { +fn dora(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::().unwrap(); Ok(()) }