diff --git a/runtime/src/operator/python.rs b/runtime/src/operator/python.rs index 90702dde..ca0cfa66 100644 --- a/runtime/src/operator/python.rs +++ b/runtime/src/operator/python.rs @@ -1,6 +1,6 @@ use super::{OperatorEvent, OperatorInput}; use eyre::{bail, eyre, Context}; -use pyo3::{pyclass, types::IntoPyDict, Python}; +use pyo3::{pyclass, types::IntoPyDict, Py, Python}; use std::{ panic::{catch_unwind, AssertUnwindSafe}, path::Path, @@ -25,7 +25,7 @@ pub fn spawn( events_tx: events_tx.clone(), }; - let python_runner = move |py: pyo3::Python| { + let init_operator = move |py: Python| { if let Some(parent_path) = path.parent() { let parent_path = parent_path .to_str() @@ -58,32 +58,42 @@ pub fn spawn( let operator = py .eval("Operator()", None, Some(locals)) .wrap_err("failed to create Operator instance")?; + Result::<_, eyre::Report>::Ok(Py::from(operator)) + }; + + let python_runner = move || { + let operator = + Python::with_gil(init_operator).wrap_err("failed to init python operator")?; while let Some(input) = inputs.blocking_recv() { - operator - .call_method1( + Python::with_gil(|py| { + operator.call_method1( + py, "on_input", (input.id.to_string(), input.value, send_output.clone()), ) - .wrap_err("on_input failed")?; + }) + .wrap_err("on_input failed")?; } - if operator - .hasattr("drop_operator") - .wrap_err("failed to look for drop_operator")? - { - operator - .call_method0("drop_operator") - .wrap_err("drop_operator failed")?; - } + Python::with_gil(|py| { + let operator = operator.as_ref(py); + if operator + .hasattr("drop_operator") + .wrap_err("failed to look for drop_operator")? + { + operator.call_method0("drop_operator")?; + } + Result::<_, eyre::Report>::Ok(()) + })?; Result::<_, eyre::Report>::Ok(()) }; thread::spawn(move || { let closure = AssertUnwindSafe(|| { - let result = Python::with_gil(python_runner); - result.wrap_err_with(|| format!("error in Python module at {}", path_cloned.display())) + python_runner() + .wrap_err_with(|| format!("error in Python module at {}", path_cloned.display())) }); match catch_unwind(closure) {