From 992b8251c2f41be0e245c0a2ca73f73629d4bad7 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 21 Nov 2022 18:27:23 -0500 Subject: [PATCH] DRAFT: Fixing Python linking error This commit is an initial draft at fixing #147. The error is due to the fact that pyo3 has linked the libpython from the compilation and not trying to use libpython that is available in `LD_LIBRARY_PATH`. The current only solution to disable the linking is to use the `extension-module` flag. This requires to make the python `runtime-node` packaged in a python library. The python `runtime-node` should also be fully compatible with the other operators in case we want hybrid runtime. The issue that i'm facing is that. Due to the packaging, I have to deal with the `GIL` that is present from the start of `dora-runtime` node. This however makes the process single threaded wich is impossible. So, I have to disable it, but when I do, I have a race condition: ```bash Exception ignored in: Traceback (most recent call last): File "/usr/lib/python3.8/threading.py", line 1373, in _shutdown assert tlock.locked() AssertionError: ``` The issue is the same as https://github.com/PyO3/pyo3/issues/1274 To fix this issue, I'm going to look again at the different step to make this work. But this is my current investigation. --- Cargo.lock | 1 + apis/python/node/Cargo.toml | 1 + apis/python/node/src/lib.rs | 11 +- binaries/coordinator/src/run/runtime.rs | 18 +- binaries/runtime/Cargo.toml | 2 +- binaries/runtime/src/lib.rs | 189 ++++++++++++++++++ binaries/runtime/src/main.rs | 179 +---------------- binaries/runtime/src/operator/python.rs | 48 ++--- examples/python-dataflow/dataflow.yml | 4 +- .../dataflow_without_webcam.yml | 4 +- examples/python-dataflow/object_detection.py | 4 +- examples/python-dataflow/plot.py | 67 ++++--- examples/python-dataflow/run.sh | 6 +- examples/python-dataflow/webcam.py | 2 +- 14 files changed, 291 insertions(+), 245 deletions(-) create mode 100644 binaries/runtime/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index e3b204c1..ead0b8ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1039,6 +1039,7 @@ version = "0.1.0" dependencies = [ "dora-node-api", "dora-operator-api-python", + "dora-runtime", "eyre", "flume", "pyo3", diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index ce870912..0bbda832 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -13,6 +13,7 @@ pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] } eyre = "0.6" serde_yaml = "0.8.23" flume = "0.10.14" +dora-runtime = { path = "../../../binaries/runtime" } [lib] name = "dora" diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 97f68014..b729b782 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -64,7 +64,7 @@ impl Node { data: &PyBytes, metadata: Option<&PyDict>, ) -> Result<()> { - let data = &data.as_bytes(); + let data = data.as_bytes(); let metadata = pydict_to_metadata(metadata)?; self.node .send_output(&output_id.into(), metadata, data.len(), |out| { @@ -78,8 +78,17 @@ impl Node { } } +#[pyfunction] +fn start_runtime() -> Result<()> { + dora_runtime::main() + .wrap_err("Python Dora Runtime failed.") + .unwrap(); + Ok(()) +} + #[pymodule] fn dora(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(start_runtime, m)?)?; m.add_class::().unwrap(); Ok(()) } diff --git a/binaries/coordinator/src/run/runtime.rs b/binaries/coordinator/src/run/runtime.rs index e6aba2da..c3d37ea4 100644 --- a/binaries/coordinator/src/run/runtime.rs +++ b/binaries/coordinator/src/run/runtime.rs @@ -1,7 +1,7 @@ use super::command_init_common_env; use dora_core::{ config::NodeId, - descriptor::{self, EnvValue}, + descriptor::{self, EnvValue, OperatorSource}, }; use eyre::{eyre, WrapErr}; use std::{collections::BTreeMap, path::Path}; @@ -15,7 +15,21 @@ pub fn spawn_runtime_node( communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { - let mut command = tokio::process::Command::new(runtime); + let has_python_operator = node + .operators + .iter() + .any(|x| matches!(x.config.source, OperatorSource::Python { .. })); + + let mut command = if has_python_operator { + // Use Python Runtime if runtime is + let mut command = tokio::process::Command::new("python3"); + command.args(["-c", "import dora; dora.start_runtime()"]); + command + } else { + let command = tokio::process::Command::new(runtime); + command + }; + command_init_common_env(&mut command, &node_id, communication)?; command.env( "DORA_OPERATORS", diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index ebf80c12..69f0ba93 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -32,7 +32,7 @@ tokio-stream = "0.1.8" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" } zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" } fern = "0.6.1" -pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre", "abi3"] } +pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] } # pyo3-abi3 flag allow simpler linking. See: https://pyo3.rs/v0.13.2/building_and_distribution.html flume = "0.10.14" dora-message = { path = "../../libraries/message" } diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs new file mode 100644 index 00000000..03781ab7 --- /dev/null +++ b/binaries/runtime/src/lib.rs @@ -0,0 +1,189 @@ +#![warn(unsafe_op_in_unsafe_fn)] + +use dora_core::{ + config::{CommunicationConfig, DataId, NodeId, OperatorId}, + descriptor::OperatorDefinition, +}; +use dora_node_api::{ + self, + communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, + manual_stop_publisher, +}; +use eyre::{bail, Context}; +use futures::{Stream, StreamExt}; +use operator::{spawn_operator, OperatorEvent, StopReason}; +use pyo3::Python; +use std::{ + collections::{BTreeSet, HashMap}, + mem, +}; +use tokio::{runtime::Builder, sync::mpsc}; +use tokio_stream::{wrappers::ReceiverStream, StreamMap}; + +mod operator; + +pub fn main() -> eyre::Result<()> { + set_up_tracing().context("failed to set up tracing subscriber")?; + + let node_id = { + let raw = + std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize operator config")? + }; + let communication_config: CommunicationConfig = { + let raw = std::env::var("DORA_COMMUNICATION_CONFIG") + .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize communication config")? + }; + let operators: Vec = { + let raw = + std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize operator config")? + }; + + let mut communication: Box = + communication::init(&communication_config)?; + + let mut operator_events = StreamMap::new(); + let mut operator_stop_publishers = HashMap::new(); + for operator_config in &operators { + let (events_tx, events) = mpsc::channel(1); + spawn_operator( + &node_id, + operator_config.clone(), + events_tx.clone(), + communication.as_mut(), + ) + .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; + operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); + + let stop_publisher = publisher( + &node_id, + operator_config.id.clone(), + STOP_TOPIC.to_owned().into(), + communication.as_mut(), + ) + .with_context(|| { + format!( + "failed to create stop publisher for operator {}", + operator_config.id + ) + })?; + operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher); + } + + let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); + let gil = Python::acquire_gil(); + let py = gil.python(); + py.allow_threads(|| { + let join = std::thread::spawn(move || { + Builder::new_current_thread() + .enable_all() + .build()? + .block_on(run( + node_id, + operator_events, + operator_stop_publishers, + communication.as_mut(), + )) + }); + join.join().unwrap().unwrap(); + }); + Ok(()) +} + +async fn run( + node_id: NodeId, + mut events: impl Stream + Unpin, + mut operator_stop_publishers: HashMap>, + communication: &mut dyn CommunicationLayer, +) -> eyre::Result<()> { + #[cfg(feature = "metrics")] + let _started = { + use dora_metrics::init_meter; + use opentelemetry::global; + use opentelemetry_system_metrics::init_process_observer; + + let _started = init_meter(); + let meter = global::meter(Box::leak(node_id.to_string().into_boxed_str())); + init_process_observer(meter); + _started + }; + + let mut stopped_operators = BTreeSet::new(); + + while let Some(event) = events.next().await { + match event { + Event::Operator { id, event } => { + match event { + OperatorEvent::Error(err) => { + bail!(err.wrap_err(format!("operator {id} failed"))) + } + OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), + OperatorEvent::Finished { reason } => { + if let StopReason::ExplicitStopAll = reason { + let manual_stop_publisher = manual_stop_publisher(communication)?; + tokio::task::spawn_blocking(manual_stop_publisher) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre::eyre!(err)) + .wrap_err("failed to send stop message")?; + } + if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { + tracing::info!("operator {node_id}/{id} finished ({reason:?})"); + stopped_operators.insert(id.clone()); + // send stopped message + tokio::task::spawn_blocking(move || stop_publisher.publish(&[])) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre::eyre!(err)) + .with_context(|| { + format!( + "failed to send stop message for operator `{node_id}/{id}`" + ) + })?; + if operator_stop_publishers.is_empty() { + break; + } + } else { + tracing::warn!("no stop publisher for {id}"); + } + } + } + } + } + } + + mem::drop(events); + + Ok(()) +} + +fn publisher( + self_id: &NodeId, + operator_id: OperatorId, + output_id: DataId, + communication: &mut dyn CommunicationLayer, +) -> eyre::Result> { + let topic = format!("{self_id}/{operator_id}/{output_id}"); + communication + .publisher(&topic) + .map_err(|err| eyre::eyre!(err)) + .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) +} + +enum Event { + Operator { + id: OperatorId, + event: OperatorEvent, + }, +} + +fn set_up_tracing() -> eyre::Result<()> { + use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; + + let stdout_log = tracing_subscriber::fmt::layer().pretty(); + let subscriber = tracing_subscriber::Registry::default().with(stdout_log); + tracing::subscriber::set_global_default(subscriber) + .context("failed to set tracing global subscriber") +} diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index b8d38d84..73156226 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -1,178 +1,5 @@ -#![warn(unsafe_op_in_unsafe_fn)] +use dora_runtime; -use dora_core::{ - config::{CommunicationConfig, DataId, NodeId, OperatorId}, - descriptor::OperatorDefinition, -}; -use dora_node_api::{ - self, - communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, - manual_stop_publisher, -}; -use eyre::{bail, Context}; -use futures::{Stream, StreamExt}; -use operator::{spawn_operator, OperatorEvent, StopReason}; -use std::{ - collections::{BTreeSet, HashMap}, - mem, -}; -use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, StreamMap}; - -mod operator; - -fn main() -> eyre::Result<()> { - set_up_tracing().context("failed to set up tracing subscriber")?; - - let node_id = { - let raw = - std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; - serde_yaml::from_str(&raw).context("failed to deserialize operator config")? - }; - let communication_config: CommunicationConfig = { - let raw = std::env::var("DORA_COMMUNICATION_CONFIG") - .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; - serde_yaml::from_str(&raw).context("failed to deserialize communication config")? - }; - let operators: Vec = { - let raw = - std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?; - serde_yaml::from_str(&raw).context("failed to deserialize operator config")? - }; - - let mut communication: Box = - communication::init(&communication_config)?; - - let mut operator_events = StreamMap::new(); - let mut operator_stop_publishers = HashMap::new(); - for operator_config in &operators { - let (events_tx, events) = mpsc::channel(1); - spawn_operator( - &node_id, - operator_config.clone(), - events_tx.clone(), - communication.as_mut(), - ) - .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; - operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); - - let stop_publisher = publisher( - &node_id, - operator_config.id.clone(), - STOP_TOPIC.to_owned().into(), - communication.as_mut(), - ) - .with_context(|| { - format!( - "failed to create stop publisher for operator {}", - operator_config.id - ) - })?; - operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher); - } - - let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); - - tokio::runtime::Runtime::new()?.block_on(run( - node_id, - operator_events, - operator_stop_publishers, - communication.as_mut(), - )) -} - -async fn run( - node_id: NodeId, - mut events: impl Stream + Unpin, - mut operator_stop_publishers: HashMap>, - communication: &mut dyn CommunicationLayer, -) -> eyre::Result<()> { - #[cfg(feature = "metrics")] - let _started = { - use dora_metrics::init_meter; - use opentelemetry::global; - use opentelemetry_system_metrics::init_process_observer; - - let _started = init_meter(); - let meter = global::meter(Box::leak(node_id.to_string().into_boxed_str())); - init_process_observer(meter); - _started - }; - - let mut stopped_operators = BTreeSet::new(); - - while let Some(event) = events.next().await { - match event { - Event::Operator { id, event } => { - match event { - OperatorEvent::Error(err) => { - bail!(err.wrap_err(format!("operator {id} failed"))) - } - OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), - OperatorEvent::Finished { reason } => { - if let StopReason::ExplicitStopAll = reason { - let manual_stop_publisher = manual_stop_publisher(communication)?; - tokio::task::spawn_blocking(manual_stop_publisher) - .await - .wrap_err("failed to join stop publish task")? - .map_err(|err| eyre::eyre!(err)) - .wrap_err("failed to send stop message")?; - } - if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { - tracing::info!("operator {node_id}/{id} finished ({reason:?})"); - stopped_operators.insert(id.clone()); - // send stopped message - tokio::task::spawn_blocking(move || stop_publisher.publish(&[])) - .await - .wrap_err("failed to join stop publish task")? - .map_err(|err| eyre::eyre!(err)) - .with_context(|| { - format!( - "failed to send stop message for operator `{node_id}/{id}`" - ) - })?; - if operator_stop_publishers.is_empty() { - break; - } - } else { - tracing::warn!("no stop publisher for {id}"); - } - } - } - } - } - } - - mem::drop(events); - - Ok(()) -} - -fn publisher( - self_id: &NodeId, - operator_id: OperatorId, - output_id: DataId, - communication: &mut dyn CommunicationLayer, -) -> eyre::Result> { - let topic = format!("{self_id}/{operator_id}/{output_id}"); - communication - .publisher(&topic) - .map_err(|err| eyre::eyre!(err)) - .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) -} - -enum Event { - Operator { - id: OperatorId, - event: OperatorEvent, - }, -} - -fn set_up_tracing() -> eyre::Result<()> { - use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; - - let stdout_log = tracing_subscriber::fmt::layer().pretty(); - let subscriber = tracing_subscriber::Registry::default().with(stdout_log); - tracing::subscriber::set_global_default(subscriber) - .context("failed to set tracing global subscriber") +fn main() -> Result<(), eyre::Report> { + dora_runtime::main() } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 189346f0..b8b0d074 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -202,14 +202,15 @@ struct SendOutputCallback { #[allow(unsafe_op_in_unsafe_fn)] mod callback_impl { + use std::thread::sleep; + use super::SendOutputCallback; use dora_message::Metadata; use dora_operator_api_python::pydict_to_metadata; - use eyre::{eyre, Context}; + use eyre::{eyre, Context, Result}; use pyo3::{ pymethods, types::{PyBytes, PyDict}, - PyResult, }; #[pymethods] @@ -219,27 +220,28 @@ mod callback_impl { output: &str, data: &PyBytes, metadata: Option<&PyDict>, - ) -> PyResult<()> { - match self.publishers.get(output) { - Some(publisher) => { - let parameters = pydict_to_metadata(metadata)?; - let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); - let message = metadata - .serialize() - .context(format!("failed to serialize `{}` metadata", output)); - message.and_then(|mut message| { - message.extend_from_slice(data.as_bytes()); - publisher - .publish(&message) - .map_err(|err| eyre::eyre!(err)) - .context("publish failed") - }) - } - None => Err(eyre!( - "unexpected output {output} (not defined in dataflow config)" - )), - } - .map_err(|err| err.into()) + ) -> Result<()> { + //let data = data.as_bytes(); + //let parameters = pydict_to_metadata(metadata).wrap_err("Could not parse metadata.")?; + //let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); + //let mut message = metadata + //.serialize() + //.context(format!("failed to serialize `{}` metadata", output))?; + + //match self.publishers.get(output) { + //Some(publisher) => { + //message.extend_from_slice(data); + + //publisher + //.publish(&message) + //.map_err(|err| eyre::eyre!(err)) + //.context("publish failed") + //} + //None => Err(eyre!( + //"unexpected output {output} (not defined in dataflow config)" + //)), + //} + Ok(()) } } } diff --git a/examples/python-dataflow/dataflow.yml b/examples/python-dataflow/dataflow.yml index 1aa1d0ac..f6eafc98 100644 --- a/examples/python-dataflow/dataflow.yml +++ b/examples/python-dataflow/dataflow.yml @@ -1,6 +1,6 @@ communication: - zenoh: - prefix: /example-python-dataflow + iceoryx: + app_name_prefix: example-python-dataflow nodes: - id: webcam diff --git a/examples/python-dataflow/dataflow_without_webcam.yml b/examples/python-dataflow/dataflow_without_webcam.yml index 6b9a00af..ab47eb9b 100644 --- a/examples/python-dataflow/dataflow_without_webcam.yml +++ b/examples/python-dataflow/dataflow_without_webcam.yml @@ -1,6 +1,6 @@ communication: - zenoh: - prefix: /example-python-no-webcam-dataflow + iceoryx: + app_name_prefix: example-python-no-webcam-dataflow nodes: - id: no_webcam diff --git a/examples/python-dataflow/object_detection.py b/examples/python-dataflow/object_detection.py index efd0adc6..19478ff0 100644 --- a/examples/python-dataflow/object_detection.py +++ b/examples/python-dataflow/object_detection.py @@ -34,8 +34,10 @@ class Operator: frame = np.frombuffer(dora_input["data"], dtype="uint8") frame = cv2.imdecode(frame, -1) frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) - + print("before model") results = self.model(frame) # includes NMS + print("after model") arrays = np.array(results.xyxy[0].cpu()).tobytes() send_output("bbox", arrays, dora_input["metadata"]) + print("after send output") return DoraStatus.CONTINUE diff --git a/examples/python-dataflow/plot.py b/examples/python-dataflow/plot.py index 727494c6..43b20f7b 100644 --- a/examples/python-dataflow/plot.py +++ b/examples/python-dataflow/plot.py @@ -24,6 +24,7 @@ class Operator: def __init__(self): self.image = [] + self.bbox = [] def on_input( self, @@ -45,39 +46,39 @@ class Operator: elif dora_input["id"] == "bbox" and len(self.image) != 0: bboxs = np.frombuffer(dora_input["data"], dtype="float32") - bboxs = np.reshape(bboxs, (-1, 6)) - for bbox in bboxs: - [ - min_x, - min_y, - max_x, - max_y, - confidence, - label, - ] = bbox - cv2.rectangle( - self.image, - (int(min_x), int(min_y)), - (int(max_x), int(max_y)), - (0, 255, 0), - 2, - ) - - cv2.putText( - self.image, - LABELS[int(label)] + f", {confidence:0.2f}", - (int(max_x), int(max_y)), - font, - 0.75, - (0, 255, 0), - 2, - 1, - ) - - if CI != "true": - cv2.imshow("frame", self.image) - if cv2.waitKey(1) & 0xFF == ord("q"): - return DoraStatus.STOP + self.bboxs = np.reshape(bboxs, (-1, 6)) + for bbox in self.bboxs: + [ + min_x, + min_y, + max_x, + max_y, + confidence, + label, + ] = bbox + cv2.rectangle( + self.image, + (int(min_x), int(min_y)), + (int(max_x), int(max_y)), + (0, 255, 0), + 2, + ) + + cv2.putText( + self.image, + LABELS[int(label)] + f", {confidence:0.2f}", + (int(max_x), int(max_y)), + font, + 0.75, + (0, 255, 0), + 2, + 1, + ) + + if CI != "true": + cv2.imshow("frame", self.image) + if cv2.waitKey(1) & 0xFF == ord("q"): + return DoraStatus.STOP return DoraStatus.CONTINUE diff --git a/examples/python-dataflow/run.sh b/examples/python-dataflow/run.sh index 51ee97a3..fe89018a 100644 --- a/examples/python-dataflow/run.sh +++ b/examples/python-dataflow/run.sh @@ -1,5 +1,5 @@ -python3 -m venv .env -. $(pwd)/.env/bin/activate +#python3 -m venv .env +# . $(pwd)/.env/bin/activate # Dev dependencies pip install maturin cd ../../apis/python/node @@ -10,4 +10,4 @@ cd ../../../examples/python-dataflow pip install --upgrade pip pip install -r requirements.txt -cargo run -p dora-coordinator -- --run-dataflow dataflow_without_webcam.yml +cargo run -p dora-coordinator -- --run-dataflow dataflow.yml diff --git a/examples/python-dataflow/webcam.py b/examples/python-dataflow/webcam.py index f64849fa..a44d776a 100755 --- a/examples/python-dataflow/webcam.py +++ b/examples/python-dataflow/webcam.py @@ -13,7 +13,7 @@ video_capture = cv2.VideoCapture(0) start = time.time() # Run for 20 seconds -while time.time() - start < 20: +while time.time() - start < 10: # Wait next dora_input node.next() ret, frame = video_capture.read()