Browse Source

DRAFT: Fixing Python linking error

This commit is an initial draft at fixing #147. The error is due to the
fact that pyo3 has linked the libpython from the compilation and not
trying to use libpython that is available in `LD_LIBRARY_PATH`.

The current only solution to disable the linking is to use the `extension-module` flag.
This requires to make the python `runtime-node` packaged in a python library.
The python `runtime-node` should also be fully compatible with the other operators in case we want hybrid runtime.

The issue that i'm facing is that. Due to the packaging, I have to deal with the `GIL` that is present from the start of
`dora-runtime` node. This however makes the process single threaded wich is impossible.

So, I have to disable it, but when I do, I have a race condition:
```bash
Exception ignored in: <module 'threading' from '/usr/lib/python3.8/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 1373, in _shutdown
    assert tlock.locked()
AssertionError:
```
The issue is the same as https://github.com/PyO3/pyo3/issues/1274

To fix this issue, I'm going to look again at the different step to make this work.

But this is my current investigation.
tags/v0.1.1-test-pypi-release
haixuanTao 3 years ago
parent
commit
992b8251c2
14 changed files with 291 additions and 245 deletions
  1. +1
    -0
      Cargo.lock
  2. +1
    -0
      apis/python/node/Cargo.toml
  3. +10
    -1
      apis/python/node/src/lib.rs
  4. +16
    -2
      binaries/coordinator/src/run/runtime.rs
  5. +1
    -1
      binaries/runtime/Cargo.toml
  6. +189
    -0
      binaries/runtime/src/lib.rs
  7. +3
    -176
      binaries/runtime/src/main.rs
  8. +25
    -23
      binaries/runtime/src/operator/python.rs
  9. +2
    -2
      examples/python-dataflow/dataflow.yml
  10. +2
    -2
      examples/python-dataflow/dataflow_without_webcam.yml
  11. +3
    -1
      examples/python-dataflow/object_detection.py
  12. +34
    -33
      examples/python-dataflow/plot.py
  13. +3
    -3
      examples/python-dataflow/run.sh
  14. +1
    -1
      examples/python-dataflow/webcam.py

+ 1
- 0
Cargo.lock View File

@@ -1039,6 +1039,7 @@ version = "0.1.0"
dependencies = [
"dora-node-api",
"dora-operator-api-python",
"dora-runtime",
"eyre",
"flume",
"pyo3",


+ 1
- 0
apis/python/node/Cargo.toml View File

@@ -13,6 +13,7 @@ pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] }
eyre = "0.6"
serde_yaml = "0.8.23"
flume = "0.10.14"
dora-runtime = { path = "../../../binaries/runtime" }

[lib]
name = "dora"


+ 10
- 1
apis/python/node/src/lib.rs View File

@@ -64,7 +64,7 @@ impl Node {
data: &PyBytes,
metadata: Option<&PyDict>,
) -> Result<()> {
let data = &data.as_bytes();
let data = data.as_bytes();
let metadata = pydict_to_metadata(metadata)?;
self.node
.send_output(&output_id.into(), metadata, data.len(), |out| {
@@ -78,8 +78,17 @@ impl Node {
}
}

#[pyfunction]
fn start_runtime() -> Result<()> {
dora_runtime::main()
.wrap_err("Python Dora Runtime failed.")
.unwrap();
Ok(())
}

#[pymodule]
fn dora(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(start_runtime, m)?)?;
m.add_class::<Node>().unwrap();
Ok(())
}

+ 16
- 2
binaries/coordinator/src/run/runtime.rs View File

@@ -1,7 +1,7 @@
use super::command_init_common_env;
use dora_core::{
config::NodeId,
descriptor::{self, EnvValue},
descriptor::{self, EnvValue, OperatorSource},
};
use eyre::{eyre, WrapErr};
use std::{collections::BTreeMap, path::Path};
@@ -15,7 +15,21 @@ pub fn spawn_runtime_node(
communication: &dora_core::config::CommunicationConfig,
working_dir: &Path,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut command = tokio::process::Command::new(runtime);
let has_python_operator = node
.operators
.iter()
.any(|x| matches!(x.config.source, OperatorSource::Python { .. }));

let mut command = if has_python_operator {
// Use Python Runtime if runtime is
let mut command = tokio::process::Command::new("python3");
command.args(["-c", "import dora; dora.start_runtime()"]);
command
} else {
let command = tokio::process::Command::new(runtime);
command
};

command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_OPERATORS",


+ 1
- 1
binaries/runtime/Cargo.toml View File

@@ -32,7 +32,7 @@ tokio-stream = "0.1.8"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" }
fern = "0.6.1"
pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre", "abi3"] }
pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] }
# pyo3-abi3 flag allow simpler linking. See: https://pyo3.rs/v0.13.2/building_and_distribution.html
flume = "0.10.14"
dora-message = { path = "../../libraries/message" }


+ 189
- 0
binaries/runtime/src/lib.rs View File

@@ -0,0 +1,189 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_core::{
config::{CommunicationConfig, DataId, NodeId, OperatorId},
descriptor::OperatorDefinition,
};
use dora_node_api::{
self,
communication::{self, CommunicationLayer, Publisher, STOP_TOPIC},
manual_stop_publisher,
};
use eyre::{bail, Context};
use futures::{Stream, StreamExt};
use operator::{spawn_operator, OperatorEvent, StopReason};
use pyo3::Python;
use std::{
collections::{BTreeSet, HashMap},
mem,
};
use tokio::{runtime::Builder, sync::mpsc};
use tokio_stream::{wrappers::ReceiverStream, StreamMap};

mod operator;

pub fn main() -> eyre::Result<()> {
set_up_tracing().context("failed to set up tracing subscriber")?;

let node_id = {
let raw =
std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};
let communication_config: CommunicationConfig = {
let raw = std::env::var("DORA_COMMUNICATION_CONFIG")
.wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize communication config")?
};
let operators: Vec<OperatorDefinition> = {
let raw =
std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};

let mut communication: Box<dyn CommunicationLayer> =
communication::init(&communication_config)?;

let mut operator_events = StreamMap::new();
let mut operator_stop_publishers = HashMap::new();
for operator_config in &operators {
let (events_tx, events) = mpsc::channel(1);
spawn_operator(
&node_id,
operator_config.clone(),
events_tx.clone(),
communication.as_mut(),
)
.wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?;
operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events));

let stop_publisher = publisher(
&node_id,
operator_config.id.clone(),
STOP_TOPIC.to_owned().into(),
communication.as_mut(),
)
.with_context(|| {
format!(
"failed to create stop publisher for operator {}",
operator_config.id
)
})?;
operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher);
}

let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event });
let gil = Python::acquire_gil();
let py = gil.python();
py.allow_threads(|| {
let join = std::thread::spawn(move || {
Builder::new_current_thread()
.enable_all()
.build()?
.block_on(run(
node_id,
operator_events,
operator_stop_publishers,
communication.as_mut(),
))
});
join.join().unwrap().unwrap();
});
Ok(())
}

async fn run(
node_id: NodeId,
mut events: impl Stream<Item = Event> + Unpin,
mut operator_stop_publishers: HashMap<OperatorId, Box<dyn Publisher>>,
communication: &mut dyn CommunicationLayer,
) -> eyre::Result<()> {
#[cfg(feature = "metrics")]
let _started = {
use dora_metrics::init_meter;
use opentelemetry::global;
use opentelemetry_system_metrics::init_process_observer;

let _started = init_meter();
let meter = global::meter(Box::leak(node_id.to_string().into_boxed_str()));
init_process_observer(meter);
_started
};

let mut stopped_operators = BTreeSet::new();

while let Some(event) = events.next().await {
match event {
Event::Operator { id, event } => {
match event {
OperatorEvent::Error(err) => {
bail!(err.wrap_err(format!("operator {id} failed")))
}
OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload),
OperatorEvent::Finished { reason } => {
if let StopReason::ExplicitStopAll = reason {
let manual_stop_publisher = manual_stop_publisher(communication)?;
tokio::task::spawn_blocking(manual_stop_publisher)
.await
.wrap_err("failed to join stop publish task")?
.map_err(|err| eyre::eyre!(err))
.wrap_err("failed to send stop message")?;
}
if let Some(stop_publisher) = operator_stop_publishers.remove(&id) {
tracing::info!("operator {node_id}/{id} finished ({reason:?})");
stopped_operators.insert(id.clone());
// send stopped message
tokio::task::spawn_blocking(move || stop_publisher.publish(&[]))
.await
.wrap_err("failed to join stop publish task")?
.map_err(|err| eyre::eyre!(err))
.with_context(|| {
format!(
"failed to send stop message for operator `{node_id}/{id}`"
)
})?;
if operator_stop_publishers.is_empty() {
break;
}
} else {
tracing::warn!("no stop publisher for {id}");
}
}
}
}
}
}

mem::drop(events);

Ok(())
}

fn publisher(
self_id: &NodeId,
operator_id: OperatorId,
output_id: DataId,
communication: &mut dyn CommunicationLayer,
) -> eyre::Result<Box<dyn Publisher>> {
let topic = format!("{self_id}/{operator_id}/{output_id}");
communication
.publisher(&topic)
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| format!("failed to create publisher for output {output_id}"))
}

enum Event {
Operator {
id: OperatorId,
event: OperatorEvent,
},
}

fn set_up_tracing() -> eyre::Result<()> {
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;

let stdout_log = tracing_subscriber::fmt::layer().pretty();
let subscriber = tracing_subscriber::Registry::default().with(stdout_log);
tracing::subscriber::set_global_default(subscriber)
.context("failed to set tracing global subscriber")
}

+ 3
- 176
binaries/runtime/src/main.rs View File

@@ -1,178 +1,5 @@
#![warn(unsafe_op_in_unsafe_fn)]
use dora_runtime;

use dora_core::{
config::{CommunicationConfig, DataId, NodeId, OperatorId},
descriptor::OperatorDefinition,
};
use dora_node_api::{
self,
communication::{self, CommunicationLayer, Publisher, STOP_TOPIC},
manual_stop_publisher,
};
use eyre::{bail, Context};
use futures::{Stream, StreamExt};
use operator::{spawn_operator, OperatorEvent, StopReason};
use std::{
collections::{BTreeSet, HashMap},
mem,
};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamMap};

mod operator;

fn main() -> eyre::Result<()> {
set_up_tracing().context("failed to set up tracing subscriber")?;

let node_id = {
let raw =
std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};
let communication_config: CommunicationConfig = {
let raw = std::env::var("DORA_COMMUNICATION_CONFIG")
.wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize communication config")?
};
let operators: Vec<OperatorDefinition> = {
let raw =
std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};

let mut communication: Box<dyn CommunicationLayer> =
communication::init(&communication_config)?;

let mut operator_events = StreamMap::new();
let mut operator_stop_publishers = HashMap::new();
for operator_config in &operators {
let (events_tx, events) = mpsc::channel(1);
spawn_operator(
&node_id,
operator_config.clone(),
events_tx.clone(),
communication.as_mut(),
)
.wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?;
operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events));

let stop_publisher = publisher(
&node_id,
operator_config.id.clone(),
STOP_TOPIC.to_owned().into(),
communication.as_mut(),
)
.with_context(|| {
format!(
"failed to create stop publisher for operator {}",
operator_config.id
)
})?;
operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher);
}

let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event });

tokio::runtime::Runtime::new()?.block_on(run(
node_id,
operator_events,
operator_stop_publishers,
communication.as_mut(),
))
}

async fn run(
node_id: NodeId,
mut events: impl Stream<Item = Event> + Unpin,
mut operator_stop_publishers: HashMap<OperatorId, Box<dyn Publisher>>,
communication: &mut dyn CommunicationLayer,
) -> eyre::Result<()> {
#[cfg(feature = "metrics")]
let _started = {
use dora_metrics::init_meter;
use opentelemetry::global;
use opentelemetry_system_metrics::init_process_observer;

let _started = init_meter();
let meter = global::meter(Box::leak(node_id.to_string().into_boxed_str()));
init_process_observer(meter);
_started
};

let mut stopped_operators = BTreeSet::new();

while let Some(event) = events.next().await {
match event {
Event::Operator { id, event } => {
match event {
OperatorEvent::Error(err) => {
bail!(err.wrap_err(format!("operator {id} failed")))
}
OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload),
OperatorEvent::Finished { reason } => {
if let StopReason::ExplicitStopAll = reason {
let manual_stop_publisher = manual_stop_publisher(communication)?;
tokio::task::spawn_blocking(manual_stop_publisher)
.await
.wrap_err("failed to join stop publish task")?
.map_err(|err| eyre::eyre!(err))
.wrap_err("failed to send stop message")?;
}
if let Some(stop_publisher) = operator_stop_publishers.remove(&id) {
tracing::info!("operator {node_id}/{id} finished ({reason:?})");
stopped_operators.insert(id.clone());
// send stopped message
tokio::task::spawn_blocking(move || stop_publisher.publish(&[]))
.await
.wrap_err("failed to join stop publish task")?
.map_err(|err| eyre::eyre!(err))
.with_context(|| {
format!(
"failed to send stop message for operator `{node_id}/{id}`"
)
})?;
if operator_stop_publishers.is_empty() {
break;
}
} else {
tracing::warn!("no stop publisher for {id}");
}
}
}
}
}
}

mem::drop(events);

Ok(())
}

fn publisher(
self_id: &NodeId,
operator_id: OperatorId,
output_id: DataId,
communication: &mut dyn CommunicationLayer,
) -> eyre::Result<Box<dyn Publisher>> {
let topic = format!("{self_id}/{operator_id}/{output_id}");
communication
.publisher(&topic)
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| format!("failed to create publisher for output {output_id}"))
}

enum Event {
Operator {
id: OperatorId,
event: OperatorEvent,
},
}

fn set_up_tracing() -> eyre::Result<()> {
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;

let stdout_log = tracing_subscriber::fmt::layer().pretty();
let subscriber = tracing_subscriber::Registry::default().with(stdout_log);
tracing::subscriber::set_global_default(subscriber)
.context("failed to set tracing global subscriber")
fn main() -> Result<(), eyre::Report> {
dora_runtime::main()
}

+ 25
- 23
binaries/runtime/src/operator/python.rs View File

@@ -202,14 +202,15 @@ struct SendOutputCallback {
#[allow(unsafe_op_in_unsafe_fn)]
mod callback_impl {

use std::thread::sleep;

use super::SendOutputCallback;
use dora_message::Metadata;
use dora_operator_api_python::pydict_to_metadata;
use eyre::{eyre, Context};
use eyre::{eyre, Context, Result};
use pyo3::{
pymethods,
types::{PyBytes, PyDict},
PyResult,
};

#[pymethods]
@@ -219,27 +220,28 @@ mod callback_impl {
output: &str,
data: &PyBytes,
metadata: Option<&PyDict>,
) -> PyResult<()> {
match self.publishers.get(output) {
Some(publisher) => {
let parameters = pydict_to_metadata(metadata)?;
let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters);
let message = metadata
.serialize()
.context(format!("failed to serialize `{}` metadata", output));
message.and_then(|mut message| {
message.extend_from_slice(data.as_bytes());
publisher
.publish(&message)
.map_err(|err| eyre::eyre!(err))
.context("publish failed")
})
}
None => Err(eyre!(
"unexpected output {output} (not defined in dataflow config)"
)),
}
.map_err(|err| err.into())
) -> Result<()> {
//let data = data.as_bytes();
//let parameters = pydict_to_metadata(metadata).wrap_err("Could not parse metadata.")?;
//let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters);
//let mut message = metadata
//.serialize()
//.context(format!("failed to serialize `{}` metadata", output))?;

//match self.publishers.get(output) {
//Some(publisher) => {
//message.extend_from_slice(data);

//publisher
//.publish(&message)
//.map_err(|err| eyre::eyre!(err))
//.context("publish failed")
//}
//None => Err(eyre!(
//"unexpected output {output} (not defined in dataflow config)"
//)),
//}
Ok(())
}
}
}

+ 2
- 2
examples/python-dataflow/dataflow.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /example-python-dataflow
iceoryx:
app_name_prefix: example-python-dataflow

nodes:
- id: webcam


+ 2
- 2
examples/python-dataflow/dataflow_without_webcam.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /example-python-no-webcam-dataflow
iceoryx:
app_name_prefix: example-python-no-webcam-dataflow

nodes:
- id: no_webcam


+ 3
- 1
examples/python-dataflow/object_detection.py View File

@@ -34,8 +34,10 @@ class Operator:
frame = np.frombuffer(dora_input["data"], dtype="uint8")
frame = cv2.imdecode(frame, -1)
frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB)
print("before model")
results = self.model(frame) # includes NMS
print("after model")
arrays = np.array(results.xyxy[0].cpu()).tobytes()
send_output("bbox", arrays, dora_input["metadata"])
print("after send output")
return DoraStatus.CONTINUE

+ 34
- 33
examples/python-dataflow/plot.py View File

@@ -24,6 +24,7 @@ class Operator:

def __init__(self):
self.image = []
self.bbox = []

def on_input(
self,
@@ -45,39 +46,39 @@ class Operator:

elif dora_input["id"] == "bbox" and len(self.image) != 0:
bboxs = np.frombuffer(dora_input["data"], dtype="float32")
bboxs = np.reshape(bboxs, (-1, 6))
for bbox in bboxs:
[
min_x,
min_y,
max_x,
max_y,
confidence,
label,
] = bbox
cv2.rectangle(
self.image,
(int(min_x), int(min_y)),
(int(max_x), int(max_y)),
(0, 255, 0),
2,
)
cv2.putText(
self.image,
LABELS[int(label)] + f", {confidence:0.2f}",
(int(max_x), int(max_y)),
font,
0.75,
(0, 255, 0),
2,
1,
)
if CI != "true":
cv2.imshow("frame", self.image)
if cv2.waitKey(1) & 0xFF == ord("q"):
return DoraStatus.STOP
self.bboxs = np.reshape(bboxs, (-1, 6))
for bbox in self.bboxs:
[
min_x,
min_y,
max_x,
max_y,
confidence,
label,
] = bbox
cv2.rectangle(
self.image,
(int(min_x), int(min_y)),
(int(max_x), int(max_y)),
(0, 255, 0),
2,
)
cv2.putText(
self.image,
LABELS[int(label)] + f", {confidence:0.2f}",
(int(max_x), int(max_y)),
font,
0.75,
(0, 255, 0),
2,
1,
)
if CI != "true":
cv2.imshow("frame", self.image)
if cv2.waitKey(1) & 0xFF == ord("q"):
return DoraStatus.STOP

return DoraStatus.CONTINUE



+ 3
- 3
examples/python-dataflow/run.sh View File

@@ -1,5 +1,5 @@
python3 -m venv .env
. $(pwd)/.env/bin/activate
#python3 -m venv .env
# . $(pwd)/.env/bin/activate
# Dev dependencies
pip install maturin
cd ../../apis/python/node
@@ -10,4 +10,4 @@ cd ../../../examples/python-dataflow
pip install --upgrade pip
pip install -r requirements.txt

cargo run -p dora-coordinator -- --run-dataflow dataflow_without_webcam.yml
cargo run -p dora-coordinator -- --run-dataflow dataflow.yml

+ 1
- 1
examples/python-dataflow/webcam.py View File

@@ -13,7 +13,7 @@ video_capture = cv2.VideoCapture(0)
start = time.time()

# Run for 20 seconds
while time.time() - start < 20:
while time.time() - start < 10:
# Wait next dora_input
node.next()
ret, frame = video_capture.read()


Loading…
Cancel
Save