Browse Source

Replace output callback function with method implementation

tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
60c2ac3b6a
3 changed files with 20 additions and 24 deletions
  1. +1
    -0
      Cargo.lock
  2. +1
    -1
      runtime/Cargo.toml
  3. +18
    -23
      runtime/src/operator/python.rs

+ 1
- 0
Cargo.lock View File

@@ -2131,6 +2131,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e6302e85060011447471887705bb7838f14aba43fcb06957d823739a496b3dc"
dependencies = [
"cfg-if",
"eyre",
"indoc",
"libc",
"parking_lot",


+ 1
- 1
runtime/Cargo.toml View File

@@ -20,4 +20,4 @@ zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
log = "0.4.17"
fern = "0.6.1"
pyo3 = { version = "0.16.5", features = ["auto-initialize"] }
pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre"] }

+ 18
- 23
runtime/src/operator/python.rs View File

@@ -20,7 +20,10 @@ pub fn spawn(
.canonicalize()
.wrap_err_with(|| format!("no file found at `{}`", path.display()))?;
let path_cloned = path.clone();
let events_tx_cloned = events_tx.clone();

let send_output = SendOutputCallback {
events_tx: events_tx.clone(),
};

let python_runner = move |py: pyo3::Python| {
if let Some(parent_path) = path.parent() {
@@ -57,26 +60,10 @@ pub fn spawn(
.wrap_err("dora_init_operator failed")?;

while let Some(input) = inputs.blocking_recv() {
let events_tx = events_tx.clone();
let send_output_callback = move |output_id: &str, data: &[u8]| {
println!("RUNTIME received python output `{output_id}` with value `{data:?}`");
let result = events_tx.blocking_send(OperatorEvent::Output {
id: output_id.to_owned().into(),
value: data.to_owned(),
});
if result.is_ok() {
0
} else {
-1
}
};
let send_output = SendOutputCallback {
callback: Box::new(send_output_callback),
};
operator_context
.call_method1(
"dora_on_input",
(input.id.to_string(), input.value, send_output),
(input.id.to_string(), input.value, send_output.clone()),
)
.wrap_err("dora_on_input failed")?;
}
@@ -97,10 +84,10 @@ pub fn spawn(
match catch_unwind(closure) {
Ok(Ok(())) => {}
Ok(Err(err)) => {
let _ = events_tx_cloned.blocking_send(OperatorEvent::Error(err));
let _ = events_tx.blocking_send(OperatorEvent::Error(err));
}
Err(panic) => {
let _ = events_tx_cloned.blocking_send(OperatorEvent::Panic(panic));
let _ = events_tx.blocking_send(OperatorEvent::Panic(panic));
}
}
});
@@ -109,19 +96,27 @@ pub fn spawn(
}

#[pyclass]
#[derive(Clone)]
struct SendOutputCallback {
callback: Box<dyn FnMut(&str, &[u8]) -> isize + Send>,
events_tx: Sender<OperatorEvent>,
}

#[allow(unsafe_op_in_unsafe_fn)]
mod callback_impl {
use super::SendOutputCallback;
use crate::operator::OperatorEvent;
use pyo3::{pymethods, PyResult};

#[pymethods]
impl SendOutputCallback {
fn __call__(&mut self, output: &str, data: &[u8]) -> PyResult<isize> {
Ok((self.callback)(output, data))
fn __call__(&mut self, output: &str, data: &[u8]) -> PyResult<()> {
println!("RUNTIME received python output `{output}` with value `{data:?}`");
let result = self.events_tx.blocking_send(OperatorEvent::Output {
id: output.to_owned().into(),
value: data.to_owned(),
});
result
.map_err(|_| eyre::eyre!("channel to dora runtime was closed unexpectedly").into())
}
}
}

Loading…
Cancel
Save