From 60c2ac3b6aa2656affe0713a9e1061fdc8ee9cfd Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 22 Jun 2022 14:44:58 +0200 Subject: [PATCH] Replace output callback function with method implementation --- Cargo.lock | 1 + runtime/Cargo.toml | 2 +- runtime/src/operator/python.rs | 41 +++++++++++++++------------------- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8e33f05..4f122dd7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2131,6 +2131,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e6302e85060011447471887705bb7838f14aba43fcb06957d823739a496b3dc" dependencies = [ "cfg-if", + "eyre", "indoc", "libc", "parking_lot", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 15e3f5db..d795c6aa 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -20,4 +20,4 @@ zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" } zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" } log = "0.4.17" fern = "0.6.1" -pyo3 = { version = "0.16.5", features = ["auto-initialize"] } +pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre"] } diff --git a/runtime/src/operator/python.rs b/runtime/src/operator/python.rs index 692bd188..10a28226 100644 --- a/runtime/src/operator/python.rs +++ b/runtime/src/operator/python.rs @@ -20,7 +20,10 @@ pub fn spawn( .canonicalize() .wrap_err_with(|| format!("no file found at `{}`", path.display()))?; let path_cloned = path.clone(); - let events_tx_cloned = events_tx.clone(); + + let send_output = SendOutputCallback { + events_tx: events_tx.clone(), + }; let python_runner = move |py: pyo3::Python| { if let Some(parent_path) = path.parent() { @@ -57,26 +60,10 @@ pub fn spawn( .wrap_err("dora_init_operator failed")?; while let Some(input) = inputs.blocking_recv() { - let events_tx = events_tx.clone(); - let send_output_callback = move |output_id: &str, data: &[u8]| { - println!("RUNTIME received python output `{output_id}` with value `{data:?}`"); - let result = events_tx.blocking_send(OperatorEvent::Output { - id: output_id.to_owned().into(), - value: data.to_owned(), - }); - if result.is_ok() { - 0 - } else { - -1 - } - }; - let send_output = SendOutputCallback { - callback: Box::new(send_output_callback), - }; operator_context .call_method1( "dora_on_input", - (input.id.to_string(), input.value, send_output), + (input.id.to_string(), input.value, send_output.clone()), ) .wrap_err("dora_on_input failed")?; } @@ -97,10 +84,10 @@ pub fn spawn( match catch_unwind(closure) { Ok(Ok(())) => {} Ok(Err(err)) => { - let _ = events_tx_cloned.blocking_send(OperatorEvent::Error(err)); + let _ = events_tx.blocking_send(OperatorEvent::Error(err)); } Err(panic) => { - let _ = events_tx_cloned.blocking_send(OperatorEvent::Panic(panic)); + let _ = events_tx.blocking_send(OperatorEvent::Panic(panic)); } } }); @@ -109,19 +96,27 @@ pub fn spawn( } #[pyclass] +#[derive(Clone)] struct SendOutputCallback { - callback: Box isize + Send>, + events_tx: Sender, } #[allow(unsafe_op_in_unsafe_fn)] mod callback_impl { use super::SendOutputCallback; + use crate::operator::OperatorEvent; use pyo3::{pymethods, PyResult}; #[pymethods] impl SendOutputCallback { - fn __call__(&mut self, output: &str, data: &[u8]) -> PyResult { - Ok((self.callback)(output, data)) + fn __call__(&mut self, output: &str, data: &[u8]) -> PyResult<()> { + println!("RUNTIME received python output `{output}` with value `{data:?}`"); + let result = self.events_tx.blocking_send(OperatorEvent::Output { + id: output.to_owned().into(), + value: data.to_owned(), + }); + result + .map_err(|_| eyre::eyre!("channel to dora runtime was closed unexpectedly").into()) } } }