Browse Source

Release the Python interpreter lock when waiting for data (#39)

As long as we keep the global interpreter lock (GIL) active, no other Python thread can make progress. This lead to starvation when there are multiple Python operators on the same runtime node. This PR fixes this by holding the GIL only as long as needed, i.e. when calling code of the operator. Most importantly, we don't hold the GIL anymore when the operator is idle and waiting for new inputs.
tags/v0.0.0-test.4
Philipp Oppermann GitHub 3 years ago
parent
commit
cfe8e8c571
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 25 additions and 15 deletions
  1. +25
    -15
      runtime/src/operator/python.rs

+ 25
- 15
runtime/src/operator/python.rs View File

@@ -1,6 +1,6 @@
use super::{OperatorEvent, OperatorInput};
use eyre::{bail, eyre, Context};
use pyo3::{pyclass, types::IntoPyDict, Python};
use pyo3::{pyclass, types::IntoPyDict, Py, Python};
use std::{
panic::{catch_unwind, AssertUnwindSafe},
path::Path,
@@ -25,7 +25,7 @@ pub fn spawn(
events_tx: events_tx.clone(),
};

let python_runner = move |py: pyo3::Python| {
let init_operator = move |py: Python| {
if let Some(parent_path) = path.parent() {
let parent_path = parent_path
.to_str()
@@ -58,32 +58,42 @@ pub fn spawn(
let operator = py
.eval("Operator()", None, Some(locals))
.wrap_err("failed to create Operator instance")?;
Result::<_, eyre::Report>::Ok(Py::from(operator))
};

let python_runner = move || {
let operator =
Python::with_gil(init_operator).wrap_err("failed to init python operator")?;

while let Some(input) = inputs.blocking_recv() {
operator
.call_method1(
Python::with_gil(|py| {
operator.call_method1(
py,
"on_input",
(input.id.to_string(), input.value, send_output.clone()),
)
.wrap_err("on_input failed")?;
})
.wrap_err("on_input failed")?;
}

if operator
.hasattr("drop_operator")
.wrap_err("failed to look for drop_operator")?
{
operator
.call_method0("drop_operator")
.wrap_err("drop_operator failed")?;
}
Python::with_gil(|py| {
let operator = operator.as_ref(py);
if operator
.hasattr("drop_operator")
.wrap_err("failed to look for drop_operator")?
{
operator.call_method0("drop_operator")?;
}
Result::<_, eyre::Report>::Ok(())
})?;

Result::<_, eyre::Report>::Ok(())
};

thread::spawn(move || {
let closure = AssertUnwindSafe(|| {
let result = Python::with_gil(python_runner);
result.wrap_err_with(|| format!("error in Python module at {}", path_cloned.display()))
python_runner()
.wrap_err_with(|| format!("error in Python module at {}", path_cloned.display()))
});

match catch_unwind(closure) {


Loading…
Cancel
Save