From 992b8251c2f41be0e245c0a2ca73f73629d4bad7 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 21 Nov 2022 18:27:23 -0500 Subject: [PATCH 01/26] DRAFT: Fixing Python linking error This commit is an initial draft at fixing #147. The error is due to the fact that pyo3 has linked the libpython from the compilation and not trying to use libpython that is available in `LD_LIBRARY_PATH`. The current only solution to disable the linking is to use the `extension-module` flag. This requires to make the python `runtime-node` packaged in a python library. The python `runtime-node` should also be fully compatible with the other operators in case we want hybrid runtime. The issue that i'm facing is that. Due to the packaging, I have to deal with the `GIL` that is present from the start of `dora-runtime` node. This however makes the process single threaded wich is impossible. So, I have to disable it, but when I do, I have a race condition: ```bash Exception ignored in: Traceback (most recent call last): File "/usr/lib/python3.8/threading.py", line 1373, in _shutdown assert tlock.locked() AssertionError: ``` The issue is the same as https://github.com/PyO3/pyo3/issues/1274 To fix this issue, I'm going to look again at the different step to make this work. But this is my current investigation. --- Cargo.lock | 1 + apis/python/node/Cargo.toml | 1 + apis/python/node/src/lib.rs | 11 +- binaries/coordinator/src/run/runtime.rs | 18 +- binaries/runtime/Cargo.toml | 2 +- binaries/runtime/src/lib.rs | 189 ++++++++++++++++++ binaries/runtime/src/main.rs | 179 +---------------- binaries/runtime/src/operator/python.rs | 48 ++--- examples/python-dataflow/dataflow.yml | 4 +- .../dataflow_without_webcam.yml | 4 +- examples/python-dataflow/object_detection.py | 4 +- examples/python-dataflow/plot.py | 67 ++++--- examples/python-dataflow/run.sh | 6 +- examples/python-dataflow/webcam.py | 2 +- 14 files changed, 291 insertions(+), 245 deletions(-) create mode 100644 binaries/runtime/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index e3b204c1..ead0b8ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1039,6 +1039,7 @@ version = "0.1.0" dependencies = [ "dora-node-api", "dora-operator-api-python", + "dora-runtime", "eyre", "flume", "pyo3", diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index ce870912..0bbda832 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -13,6 +13,7 @@ pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] } eyre = "0.6" serde_yaml = "0.8.23" flume = "0.10.14" +dora-runtime = { path = "../../../binaries/runtime" } [lib] name = "dora" diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 97f68014..b729b782 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -64,7 +64,7 @@ impl Node { data: &PyBytes, metadata: Option<&PyDict>, ) -> Result<()> { - let data = &data.as_bytes(); + let data = data.as_bytes(); let metadata = pydict_to_metadata(metadata)?; self.node .send_output(&output_id.into(), metadata, data.len(), |out| { @@ -78,8 +78,17 @@ impl Node { } } +#[pyfunction] +fn start_runtime() -> Result<()> { + dora_runtime::main() + .wrap_err("Python Dora Runtime failed.") + .unwrap(); + Ok(()) +} + #[pymodule] fn dora(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(start_runtime, m)?)?; m.add_class::().unwrap(); Ok(()) } diff --git a/binaries/coordinator/src/run/runtime.rs b/binaries/coordinator/src/run/runtime.rs index e6aba2da..c3d37ea4 100644 --- a/binaries/coordinator/src/run/runtime.rs +++ b/binaries/coordinator/src/run/runtime.rs @@ -1,7 +1,7 @@ use super::command_init_common_env; use dora_core::{ config::NodeId, - descriptor::{self, EnvValue}, + descriptor::{self, EnvValue, OperatorSource}, }; use eyre::{eyre, WrapErr}; use std::{collections::BTreeMap, path::Path}; @@ -15,7 +15,21 @@ pub fn spawn_runtime_node( communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { - let mut command = tokio::process::Command::new(runtime); + let has_python_operator = node + .operators + .iter() + .any(|x| matches!(x.config.source, OperatorSource::Python { .. })); + + let mut command = if has_python_operator { + // Use Python Runtime if runtime is + let mut command = tokio::process::Command::new("python3"); + command.args(["-c", "import dora; dora.start_runtime()"]); + command + } else { + let command = tokio::process::Command::new(runtime); + command + }; + command_init_common_env(&mut command, &node_id, communication)?; command.env( "DORA_OPERATORS", diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index ebf80c12..69f0ba93 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -32,7 +32,7 @@ tokio-stream = "0.1.8" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" } zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" } fern = "0.6.1" -pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre", "abi3"] } +pyo3 = { version = "0.16", features = ["eyre", "abi3-py37"] } # pyo3-abi3 flag allow simpler linking. See: https://pyo3.rs/v0.13.2/building_and_distribution.html flume = "0.10.14" dora-message = { path = "../../libraries/message" } diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs new file mode 100644 index 00000000..03781ab7 --- /dev/null +++ b/binaries/runtime/src/lib.rs @@ -0,0 +1,189 @@ +#![warn(unsafe_op_in_unsafe_fn)] + +use dora_core::{ + config::{CommunicationConfig, DataId, NodeId, OperatorId}, + descriptor::OperatorDefinition, +}; +use dora_node_api::{ + self, + communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, + manual_stop_publisher, +}; +use eyre::{bail, Context}; +use futures::{Stream, StreamExt}; +use operator::{spawn_operator, OperatorEvent, StopReason}; +use pyo3::Python; +use std::{ + collections::{BTreeSet, HashMap}, + mem, +}; +use tokio::{runtime::Builder, sync::mpsc}; +use tokio_stream::{wrappers::ReceiverStream, StreamMap}; + +mod operator; + +pub fn main() -> eyre::Result<()> { + set_up_tracing().context("failed to set up tracing subscriber")?; + + let node_id = { + let raw = + std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize operator config")? + }; + let communication_config: CommunicationConfig = { + let raw = std::env::var("DORA_COMMUNICATION_CONFIG") + .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize communication config")? + }; + let operators: Vec = { + let raw = + std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize operator config")? + }; + + let mut communication: Box = + communication::init(&communication_config)?; + + let mut operator_events = StreamMap::new(); + let mut operator_stop_publishers = HashMap::new(); + for operator_config in &operators { + let (events_tx, events) = mpsc::channel(1); + spawn_operator( + &node_id, + operator_config.clone(), + events_tx.clone(), + communication.as_mut(), + ) + .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; + operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); + + let stop_publisher = publisher( + &node_id, + operator_config.id.clone(), + STOP_TOPIC.to_owned().into(), + communication.as_mut(), + ) + .with_context(|| { + format!( + "failed to create stop publisher for operator {}", + operator_config.id + ) + })?; + operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher); + } + + let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); + let gil = Python::acquire_gil(); + let py = gil.python(); + py.allow_threads(|| { + let join = std::thread::spawn(move || { + Builder::new_current_thread() + .enable_all() + .build()? + .block_on(run( + node_id, + operator_events, + operator_stop_publishers, + communication.as_mut(), + )) + }); + join.join().unwrap().unwrap(); + }); + Ok(()) +} + +async fn run( + node_id: NodeId, + mut events: impl Stream + Unpin, + mut operator_stop_publishers: HashMap>, + communication: &mut dyn CommunicationLayer, +) -> eyre::Result<()> { + #[cfg(feature = "metrics")] + let _started = { + use dora_metrics::init_meter; + use opentelemetry::global; + use opentelemetry_system_metrics::init_process_observer; + + let _started = init_meter(); + let meter = global::meter(Box::leak(node_id.to_string().into_boxed_str())); + init_process_observer(meter); + _started + }; + + let mut stopped_operators = BTreeSet::new(); + + while let Some(event) = events.next().await { + match event { + Event::Operator { id, event } => { + match event { + OperatorEvent::Error(err) => { + bail!(err.wrap_err(format!("operator {id} failed"))) + } + OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), + OperatorEvent::Finished { reason } => { + if let StopReason::ExplicitStopAll = reason { + let manual_stop_publisher = manual_stop_publisher(communication)?; + tokio::task::spawn_blocking(manual_stop_publisher) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre::eyre!(err)) + .wrap_err("failed to send stop message")?; + } + if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { + tracing::info!("operator {node_id}/{id} finished ({reason:?})"); + stopped_operators.insert(id.clone()); + // send stopped message + tokio::task::spawn_blocking(move || stop_publisher.publish(&[])) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre::eyre!(err)) + .with_context(|| { + format!( + "failed to send stop message for operator `{node_id}/{id}`" + ) + })?; + if operator_stop_publishers.is_empty() { + break; + } + } else { + tracing::warn!("no stop publisher for {id}"); + } + } + } + } + } + } + + mem::drop(events); + + Ok(()) +} + +fn publisher( + self_id: &NodeId, + operator_id: OperatorId, + output_id: DataId, + communication: &mut dyn CommunicationLayer, +) -> eyre::Result> { + let topic = format!("{self_id}/{operator_id}/{output_id}"); + communication + .publisher(&topic) + .map_err(|err| eyre::eyre!(err)) + .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) +} + +enum Event { + Operator { + id: OperatorId, + event: OperatorEvent, + }, +} + +fn set_up_tracing() -> eyre::Result<()> { + use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; + + let stdout_log = tracing_subscriber::fmt::layer().pretty(); + let subscriber = tracing_subscriber::Registry::default().with(stdout_log); + tracing::subscriber::set_global_default(subscriber) + .context("failed to set tracing global subscriber") +} diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index b8d38d84..73156226 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -1,178 +1,5 @@ -#![warn(unsafe_op_in_unsafe_fn)] +use dora_runtime; -use dora_core::{ - config::{CommunicationConfig, DataId, NodeId, OperatorId}, - descriptor::OperatorDefinition, -}; -use dora_node_api::{ - self, - communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, - manual_stop_publisher, -}; -use eyre::{bail, Context}; -use futures::{Stream, StreamExt}; -use operator::{spawn_operator, OperatorEvent, StopReason}; -use std::{ - collections::{BTreeSet, HashMap}, - mem, -}; -use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, StreamMap}; - -mod operator; - -fn main() -> eyre::Result<()> { - set_up_tracing().context("failed to set up tracing subscriber")?; - - let node_id = { - let raw = - std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; - serde_yaml::from_str(&raw).context("failed to deserialize operator config")? - }; - let communication_config: CommunicationConfig = { - let raw = std::env::var("DORA_COMMUNICATION_CONFIG") - .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; - serde_yaml::from_str(&raw).context("failed to deserialize communication config")? - }; - let operators: Vec = { - let raw = - std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?; - serde_yaml::from_str(&raw).context("failed to deserialize operator config")? - }; - - let mut communication: Box = - communication::init(&communication_config)?; - - let mut operator_events = StreamMap::new(); - let mut operator_stop_publishers = HashMap::new(); - for operator_config in &operators { - let (events_tx, events) = mpsc::channel(1); - spawn_operator( - &node_id, - operator_config.clone(), - events_tx.clone(), - communication.as_mut(), - ) - .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; - operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); - - let stop_publisher = publisher( - &node_id, - operator_config.id.clone(), - STOP_TOPIC.to_owned().into(), - communication.as_mut(), - ) - .with_context(|| { - format!( - "failed to create stop publisher for operator {}", - operator_config.id - ) - })?; - operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher); - } - - let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); - - tokio::runtime::Runtime::new()?.block_on(run( - node_id, - operator_events, - operator_stop_publishers, - communication.as_mut(), - )) -} - -async fn run( - node_id: NodeId, - mut events: impl Stream + Unpin, - mut operator_stop_publishers: HashMap>, - communication: &mut dyn CommunicationLayer, -) -> eyre::Result<()> { - #[cfg(feature = "metrics")] - let _started = { - use dora_metrics::init_meter; - use opentelemetry::global; - use opentelemetry_system_metrics::init_process_observer; - - let _started = init_meter(); - let meter = global::meter(Box::leak(node_id.to_string().into_boxed_str())); - init_process_observer(meter); - _started - }; - - let mut stopped_operators = BTreeSet::new(); - - while let Some(event) = events.next().await { - match event { - Event::Operator { id, event } => { - match event { - OperatorEvent::Error(err) => { - bail!(err.wrap_err(format!("operator {id} failed"))) - } - OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), - OperatorEvent::Finished { reason } => { - if let StopReason::ExplicitStopAll = reason { - let manual_stop_publisher = manual_stop_publisher(communication)?; - tokio::task::spawn_blocking(manual_stop_publisher) - .await - .wrap_err("failed to join stop publish task")? - .map_err(|err| eyre::eyre!(err)) - .wrap_err("failed to send stop message")?; - } - if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { - tracing::info!("operator {node_id}/{id} finished ({reason:?})"); - stopped_operators.insert(id.clone()); - // send stopped message - tokio::task::spawn_blocking(move || stop_publisher.publish(&[])) - .await - .wrap_err("failed to join stop publish task")? - .map_err(|err| eyre::eyre!(err)) - .with_context(|| { - format!( - "failed to send stop message for operator `{node_id}/{id}`" - ) - })?; - if operator_stop_publishers.is_empty() { - break; - } - } else { - tracing::warn!("no stop publisher for {id}"); - } - } - } - } - } - } - - mem::drop(events); - - Ok(()) -} - -fn publisher( - self_id: &NodeId, - operator_id: OperatorId, - output_id: DataId, - communication: &mut dyn CommunicationLayer, -) -> eyre::Result> { - let topic = format!("{self_id}/{operator_id}/{output_id}"); - communication - .publisher(&topic) - .map_err(|err| eyre::eyre!(err)) - .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) -} - -enum Event { - Operator { - id: OperatorId, - event: OperatorEvent, - }, -} - -fn set_up_tracing() -> eyre::Result<()> { - use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; - - let stdout_log = tracing_subscriber::fmt::layer().pretty(); - let subscriber = tracing_subscriber::Registry::default().with(stdout_log); - tracing::subscriber::set_global_default(subscriber) - .context("failed to set tracing global subscriber") +fn main() -> Result<(), eyre::Report> { + dora_runtime::main() } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 189346f0..b8b0d074 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -202,14 +202,15 @@ struct SendOutputCallback { #[allow(unsafe_op_in_unsafe_fn)] mod callback_impl { + use std::thread::sleep; + use super::SendOutputCallback; use dora_message::Metadata; use dora_operator_api_python::pydict_to_metadata; - use eyre::{eyre, Context}; + use eyre::{eyre, Context, Result}; use pyo3::{ pymethods, types::{PyBytes, PyDict}, - PyResult, }; #[pymethods] @@ -219,27 +220,28 @@ mod callback_impl { output: &str, data: &PyBytes, metadata: Option<&PyDict>, - ) -> PyResult<()> { - match self.publishers.get(output) { - Some(publisher) => { - let parameters = pydict_to_metadata(metadata)?; - let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); - let message = metadata - .serialize() - .context(format!("failed to serialize `{}` metadata", output)); - message.and_then(|mut message| { - message.extend_from_slice(data.as_bytes()); - publisher - .publish(&message) - .map_err(|err| eyre::eyre!(err)) - .context("publish failed") - }) - } - None => Err(eyre!( - "unexpected output {output} (not defined in dataflow config)" - )), - } - .map_err(|err| err.into()) + ) -> Result<()> { + //let data = data.as_bytes(); + //let parameters = pydict_to_metadata(metadata).wrap_err("Could not parse metadata.")?; + //let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); + //let mut message = metadata + //.serialize() + //.context(format!("failed to serialize `{}` metadata", output))?; + + //match self.publishers.get(output) { + //Some(publisher) => { + //message.extend_from_slice(data); + + //publisher + //.publish(&message) + //.map_err(|err| eyre::eyre!(err)) + //.context("publish failed") + //} + //None => Err(eyre!( + //"unexpected output {output} (not defined in dataflow config)" + //)), + //} + Ok(()) } } } diff --git a/examples/python-dataflow/dataflow.yml b/examples/python-dataflow/dataflow.yml index 1aa1d0ac..f6eafc98 100644 --- a/examples/python-dataflow/dataflow.yml +++ b/examples/python-dataflow/dataflow.yml @@ -1,6 +1,6 @@ communication: - zenoh: - prefix: /example-python-dataflow + iceoryx: + app_name_prefix: example-python-dataflow nodes: - id: webcam diff --git a/examples/python-dataflow/dataflow_without_webcam.yml b/examples/python-dataflow/dataflow_without_webcam.yml index 6b9a00af..ab47eb9b 100644 --- a/examples/python-dataflow/dataflow_without_webcam.yml +++ b/examples/python-dataflow/dataflow_without_webcam.yml @@ -1,6 +1,6 @@ communication: - zenoh: - prefix: /example-python-no-webcam-dataflow + iceoryx: + app_name_prefix: example-python-no-webcam-dataflow nodes: - id: no_webcam diff --git a/examples/python-dataflow/object_detection.py b/examples/python-dataflow/object_detection.py index efd0adc6..19478ff0 100644 --- a/examples/python-dataflow/object_detection.py +++ b/examples/python-dataflow/object_detection.py @@ -34,8 +34,10 @@ class Operator: frame = np.frombuffer(dora_input["data"], dtype="uint8") frame = cv2.imdecode(frame, -1) frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) - + print("before model") results = self.model(frame) # includes NMS + print("after model") arrays = np.array(results.xyxy[0].cpu()).tobytes() send_output("bbox", arrays, dora_input["metadata"]) + print("after send output") return DoraStatus.CONTINUE diff --git a/examples/python-dataflow/plot.py b/examples/python-dataflow/plot.py index 727494c6..43b20f7b 100644 --- a/examples/python-dataflow/plot.py +++ b/examples/python-dataflow/plot.py @@ -24,6 +24,7 @@ class Operator: def __init__(self): self.image = [] + self.bbox = [] def on_input( self, @@ -45,39 +46,39 @@ class Operator: elif dora_input["id"] == "bbox" and len(self.image) != 0: bboxs = np.frombuffer(dora_input["data"], dtype="float32") - bboxs = np.reshape(bboxs, (-1, 6)) - for bbox in bboxs: - [ - min_x, - min_y, - max_x, - max_y, - confidence, - label, - ] = bbox - cv2.rectangle( - self.image, - (int(min_x), int(min_y)), - (int(max_x), int(max_y)), - (0, 255, 0), - 2, - ) - - cv2.putText( - self.image, - LABELS[int(label)] + f", {confidence:0.2f}", - (int(max_x), int(max_y)), - font, - 0.75, - (0, 255, 0), - 2, - 1, - ) - - if CI != "true": - cv2.imshow("frame", self.image) - if cv2.waitKey(1) & 0xFF == ord("q"): - return DoraStatus.STOP + self.bboxs = np.reshape(bboxs, (-1, 6)) + for bbox in self.bboxs: + [ + min_x, + min_y, + max_x, + max_y, + confidence, + label, + ] = bbox + cv2.rectangle( + self.image, + (int(min_x), int(min_y)), + (int(max_x), int(max_y)), + (0, 255, 0), + 2, + ) + + cv2.putText( + self.image, + LABELS[int(label)] + f", {confidence:0.2f}", + (int(max_x), int(max_y)), + font, + 0.75, + (0, 255, 0), + 2, + 1, + ) + + if CI != "true": + cv2.imshow("frame", self.image) + if cv2.waitKey(1) & 0xFF == ord("q"): + return DoraStatus.STOP return DoraStatus.CONTINUE diff --git a/examples/python-dataflow/run.sh b/examples/python-dataflow/run.sh index 51ee97a3..fe89018a 100644 --- a/examples/python-dataflow/run.sh +++ b/examples/python-dataflow/run.sh @@ -1,5 +1,5 @@ -python3 -m venv .env -. $(pwd)/.env/bin/activate +#python3 -m venv .env +# . $(pwd)/.env/bin/activate # Dev dependencies pip install maturin cd ../../apis/python/node @@ -10,4 +10,4 @@ cd ../../../examples/python-dataflow pip install --upgrade pip pip install -r requirements.txt -cargo run -p dora-coordinator -- --run-dataflow dataflow_without_webcam.yml +cargo run -p dora-coordinator -- --run-dataflow dataflow.yml diff --git a/examples/python-dataflow/webcam.py b/examples/python-dataflow/webcam.py index f64849fa..a44d776a 100755 --- a/examples/python-dataflow/webcam.py +++ b/examples/python-dataflow/webcam.py @@ -13,7 +13,7 @@ video_capture = cv2.VideoCapture(0) start = time.time() # Run for 20 seconds -while time.time() - start < 20: +while time.time() - start < 10: # Wait next dora_input node.next() ret, frame = video_capture.read() From 9f0a0e69575d3c8e9c032f23e015bc5e65f9ccf1 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 28 Nov 2022 20:38:48 -0500 Subject: [PATCH 02/26] Fix GIL race condition By making the stopping loop the first loop, we can avoid using `pyo3/allow_threads`, which seems buggy. --- .github/workflows/release.yml | 2 +- binaries/runtime/src/lib.rs | 68 ++++++++---------- binaries/runtime/src/operator/python.rs | 74 +++++++++----------- examples/python-dataflow/object_detection.py | 3 - examples/python-dataflow/plot.py | 2 +- examples/python-dataflow/run.sh | 4 +- 6 files changed, 69 insertions(+), 84 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5b37ec1e..de214425 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -15,7 +15,7 @@ jobs: strategy: matrix: platform: [ubuntu-latest, macos-latest, windows-latest] - python-version: ["3.7"] + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] fail-fast: false runs-on: ${{ matrix.platform }} diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 03781ab7..6aa07e24 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -7,12 +7,11 @@ use dora_core::{ use dora_node_api::{ self, communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, - manual_stop_publisher, }; use eyre::{bail, Context}; use futures::{Stream, StreamExt}; -use operator::{spawn_operator, OperatorEvent, StopReason}; -use pyo3::Python; +use operator::{spawn_operator, OperatorEvent}; + use std::{ collections::{BTreeSet, HashMap}, mem, @@ -25,7 +24,7 @@ mod operator; pub fn main() -> eyre::Result<()> { set_up_tracing().context("failed to set up tracing subscriber")?; - let node_id = { + let node_id: NodeId = { let raw = std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; serde_yaml::from_str(&raw).context("failed to deserialize operator config")? @@ -46,17 +45,10 @@ pub fn main() -> eyre::Result<()> { let mut operator_events = StreamMap::new(); let mut operator_stop_publishers = HashMap::new(); + let mut operator_events_tx = HashMap::new(); + for operator_config in &operators { let (events_tx, events) = mpsc::channel(1); - spawn_operator( - &node_id, - operator_config.clone(), - events_tx.clone(), - communication.as_mut(), - ) - .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; - operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); - let stop_publisher = publisher( &node_id, operator_config.id.clone(), @@ -70,25 +62,36 @@ pub fn main() -> eyre::Result<()> { ) })?; operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher); + + operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); + operator_events_tx.insert(operator_config.id.clone(), events_tx); } let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); - let gil = Python::acquire_gil(); - let py = gil.python(); - py.allow_threads(|| { - let join = std::thread::spawn(move || { - Builder::new_current_thread() - .enable_all() - .build()? - .block_on(run( - node_id, - operator_events, - operator_stop_publishers, - communication.as_mut(), - )) - }); - join.join().unwrap().unwrap(); + let node_id_clone = node_id.clone(); + std::thread::spawn(move || { + Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(run( + node_id_clone, + operator_events, + operator_stop_publishers, + )) + .unwrap(); }); + + for operator_config in &operators { + let events_tx = operator_events_tx.get(&operator_config.id).unwrap(); + spawn_operator( + &node_id, + operator_config.clone(), + events_tx.clone(), + communication.as_mut(), + ) + .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; + } Ok(()) } @@ -96,7 +99,6 @@ async fn run( node_id: NodeId, mut events: impl Stream + Unpin, mut operator_stop_publishers: HashMap>, - communication: &mut dyn CommunicationLayer, ) -> eyre::Result<()> { #[cfg(feature = "metrics")] let _started = { @@ -121,14 +123,6 @@ async fn run( } OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), OperatorEvent::Finished { reason } => { - if let StopReason::ExplicitStopAll = reason { - let manual_stop_publisher = manual_stop_publisher(communication)?; - tokio::task::spawn_blocking(manual_stop_publisher) - .await - .wrap_err("failed to join stop publish task")? - .map_err(|err| eyre::eyre!(err)) - .wrap_err("failed to send stop message")?; - } if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { tracing::info!("operator {node_id}/{id} finished ({reason:?})"); stopped_operators.insert(id.clone()); diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index b8b0d074..814e5d45 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -23,7 +23,6 @@ use std::{ panic::{catch_unwind, AssertUnwindSafe}, path::Path, sync::Arc, - thread, }; use tokio::sync::mpsc::Sender; @@ -170,24 +169,22 @@ pub fn spawn( Result::<_, eyre::Report>::Ok(reason) }; - thread::spawn(move || { - let closure = AssertUnwindSafe(|| { - python_runner() - .wrap_err_with(|| format!("error in Python module at {}", path_cloned.display())) - }); + let closure = AssertUnwindSafe(|| { + python_runner() + .wrap_err_with(|| format!("error in Python module at {}", path_cloned.display())) + }); - match catch_unwind(closure) { - Ok(Ok(reason)) => { - let _ = events_tx.blocking_send(OperatorEvent::Finished { reason }); - } - Ok(Err(err)) => { - let _ = events_tx.blocking_send(OperatorEvent::Error(err)); - } - Err(panic) => { - let _ = events_tx.blocking_send(OperatorEvent::Panic(panic)); - } + match catch_unwind(closure) { + Ok(Ok(reason)) => { + let _ = events_tx.blocking_send(OperatorEvent::Finished { reason }); } - }); + Ok(Err(err)) => { + let _ = events_tx.blocking_send(OperatorEvent::Error(err)); + } + Err(panic) => { + let _ = events_tx.blocking_send(OperatorEvent::Panic(panic)); + } + } Ok(()) } @@ -202,8 +199,6 @@ struct SendOutputCallback { #[allow(unsafe_op_in_unsafe_fn)] mod callback_impl { - use std::thread::sleep; - use super::SendOutputCallback; use dora_message::Metadata; use dora_operator_api_python::pydict_to_metadata; @@ -221,27 +216,26 @@ mod callback_impl { data: &PyBytes, metadata: Option<&PyDict>, ) -> Result<()> { - //let data = data.as_bytes(); - //let parameters = pydict_to_metadata(metadata).wrap_err("Could not parse metadata.")?; - //let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); - //let mut message = metadata - //.serialize() - //.context(format!("failed to serialize `{}` metadata", output))?; - - //match self.publishers.get(output) { - //Some(publisher) => { - //message.extend_from_slice(data); - - //publisher - //.publish(&message) - //.map_err(|err| eyre::eyre!(err)) - //.context("publish failed") - //} - //None => Err(eyre!( - //"unexpected output {output} (not defined in dataflow config)" - //)), - //} - Ok(()) + let data = data.as_bytes(); + let parameters = pydict_to_metadata(metadata).wrap_err("Could not parse metadata.")?; + let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); + let mut message = metadata + .serialize() + .context(format!("failed to serialize `{}` metadata", output))?; + + match self.publishers.get(output) { + Some(publisher) => { + message.extend_from_slice(data); + + publisher + .publish(&message) + .map_err(|err| eyre::eyre!(err)) + .context("publish failed") + } + None => Err(eyre!( + "unexpected output {output} (not defined in dataflow config)" + )), + } } } } diff --git a/examples/python-dataflow/object_detection.py b/examples/python-dataflow/object_detection.py index 19478ff0..05c66b1b 100644 --- a/examples/python-dataflow/object_detection.py +++ b/examples/python-dataflow/object_detection.py @@ -34,10 +34,7 @@ class Operator: frame = np.frombuffer(dora_input["data"], dtype="uint8") frame = cv2.imdecode(frame, -1) frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) - print("before model") results = self.model(frame) # includes NMS - print("after model") arrays = np.array(results.xyxy[0].cpu()).tobytes() send_output("bbox", arrays, dora_input["metadata"]) - print("after send output") return DoraStatus.CONTINUE diff --git a/examples/python-dataflow/plot.py b/examples/python-dataflow/plot.py index 43b20f7b..57a2a293 100644 --- a/examples/python-dataflow/plot.py +++ b/examples/python-dataflow/plot.py @@ -24,7 +24,7 @@ class Operator: def __init__(self): self.image = [] - self.bbox = [] + self.bboxs = [] def on_input( self, diff --git a/examples/python-dataflow/run.sh b/examples/python-dataflow/run.sh index fe89018a..5ea69805 100644 --- a/examples/python-dataflow/run.sh +++ b/examples/python-dataflow/run.sh @@ -1,5 +1,5 @@ -#python3 -m venv .env -# . $(pwd)/.env/bin/activate +python3 -m venv .env +. $(pwd)/.env/bin/activate # Dev dependencies pip install maturin cd ../../apis/python/node From e7172bec8f18f771e9568b0954d570a34bec4f59 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 28 Nov 2022 20:51:53 -0500 Subject: [PATCH 03/26] refactor debugging change --- examples/python-dataflow/dataflow.yml | 4 ++-- examples/python-dataflow/dataflow_without_webcam.yml | 4 ++-- examples/python-dataflow/run.sh | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/python-dataflow/dataflow.yml b/examples/python-dataflow/dataflow.yml index f6eafc98..1aa1d0ac 100644 --- a/examples/python-dataflow/dataflow.yml +++ b/examples/python-dataflow/dataflow.yml @@ -1,6 +1,6 @@ communication: - iceoryx: - app_name_prefix: example-python-dataflow + zenoh: + prefix: /example-python-dataflow nodes: - id: webcam diff --git a/examples/python-dataflow/dataflow_without_webcam.yml b/examples/python-dataflow/dataflow_without_webcam.yml index ab47eb9b..376da8c1 100644 --- a/examples/python-dataflow/dataflow_without_webcam.yml +++ b/examples/python-dataflow/dataflow_without_webcam.yml @@ -1,6 +1,6 @@ communication: - iceoryx: - app_name_prefix: example-python-no-webcam-dataflow + zenoh: + prefix: /example-python-dataflow nodes: - id: no_webcam diff --git a/examples/python-dataflow/run.sh b/examples/python-dataflow/run.sh index 5ea69805..51ee97a3 100644 --- a/examples/python-dataflow/run.sh +++ b/examples/python-dataflow/run.sh @@ -10,4 +10,4 @@ cd ../../../examples/python-dataflow pip install --upgrade pip pip install -r requirements.txt -cargo run -p dora-coordinator -- --run-dataflow dataflow.yml +cargo run -p dora-coordinator -- --run-dataflow dataflow_without_webcam.yml From 9ec9d226208d247e9db9a69c4e60588f8688f4b3 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 29 Nov 2022 11:54:58 -0500 Subject: [PATCH 04/26] wait for stop thread to finish before stopping runtime --- .github/workflows/ci-python.yml | 2 +- binaries/runtime/src/lib.rs | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-python.yml b/.github/workflows/ci-python.yml index 0ab79c8a..022c339b 100644 --- a/.github/workflows/ci-python.yml +++ b/.github/workflows/ci-python.yml @@ -20,7 +20,7 @@ jobs: sudo apt-get install -y capnproto libcapnp-dev libacl1-dev - uses: actions/setup-python@v2 with: - python-version: 3.8.10 + python-version: 3.8 - uses: r7kamura/rust-problem-matchers@v1.1.0 - run: cargo --version --verbose diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 6aa07e24..58c4024c 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -69,7 +69,7 @@ pub fn main() -> eyre::Result<()> { let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); let node_id_clone = node_id.clone(); - std::thread::spawn(move || { + let stop_thread = std::thread::spawn(move || { Builder::new_current_thread() .enable_all() .build() @@ -92,6 +92,10 @@ pub fn main() -> eyre::Result<()> { ) .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; } + + stop_thread + .join() + .map_err(|err| eyre::eyre!("Stop thread failed with err: {err:#?}"))?; Ok(()) } From 1d080a6edb6398e91ce90ec87da907db5d5f323a Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 29 Nov 2022 12:08:40 -0500 Subject: [PATCH 05/26] Split pip and github release --- .github/workflows/pip-release.yml | 45 +++++++++++++++++++++++++++++++ .github/workflows/release.yml | 24 +---------------- 2 files changed, 46 insertions(+), 23 deletions(-) create mode 100644 .github/workflows/pip-release.yml diff --git a/.github/workflows/pip-release.yml b/.github/workflows/pip-release.yml new file mode 100644 index 00000000..cc6f0eb1 --- /dev/null +++ b/.github/workflows/pip-release.yml @@ -0,0 +1,45 @@ +name: Release + +permissions: + contents: write + +on: + release: + types: + - "published" + +jobs: + release: + name: "Release" + + strategy: + matrix: + platform: [ubuntu-latest, macos-latest, windows-latest] + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] + fail-fast: false + runs-on: ${{ matrix.platform }} + + steps: + - uses: actions/checkout@v3 + - uses: r7kamura/rust-problem-matchers@v1.1.0 + + # Publish Dora Node Python API + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install maturin==0.13.2 + - name: Publish wheel + shell: bash + env: + MATURIN_PASSWORD: ${{ secrets.PYPI_PASS }} + run: | + cd apis/python/node + maturin publish \ + --skip-existing \ + -o wheels \ + -i python \ + --username __token__ \ diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index de214425..c690b1d8 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -15,7 +15,7 @@ jobs: strategy: matrix: platform: [ubuntu-latest, macos-latest, windows-latest] - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] + python-version: ["3.7"] fail-fast: false runs-on: ${{ matrix.platform }} @@ -80,25 +80,3 @@ jobs: asset_path: archive.zip asset_name: dora-${{ github.ref_name }}-x86_64-${{ runner.os }}.zip asset_content_type: application/zip - - # Publish Dora Node Python API - - name: Set up Python - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install maturin==0.13.2 - - name: Publish wheel - shell: bash - env: - MATURIN_PASSWORD: ${{ secrets.PYPI_PASS }} - run: | - cd apis/python/node - maturin publish \ - --no-sdist \ - --skip-existing \ - -o wheels \ - -i python \ - --username __token__ \ From 0a7105a3299b33738a76a47846495fd88716ff28 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 29 Nov 2022 15:03:55 -0500 Subject: [PATCH 06/26] Fix `ndk-sys` yank by upgrading locked version --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ead0b8ef..7316cccb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2219,9 +2219,9 @@ dependencies = [ [[package]] name = "ndk-sys" -version = "0.4.0" +version = "0.4.1+23.1.7779620" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21d83ec9c63ec5bf950200a8e508bdad6659972187b625469f58ef8c08e29046" +checksum = "3cf2aae958bd232cac5069850591667ad422d263686d75b52a065f9badeee5a3" dependencies = [ "jni-sys", ] From 5ef226b0714148e8678585ffb37f6391feeb16bc Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 29 Nov 2022 15:24:04 -0500 Subject: [PATCH 07/26] Fix libacl CI error and make CI simpler --- .github/workflows/ci-python.yml | 4 +-- .github/workflows/ci.yml | 42 ++++++------------------------- .github/workflows/pip-release.yml | 7 ++++++ .github/workflows/release.yml | 17 ++----------- 4 files changed, 19 insertions(+), 51 deletions(-) diff --git a/.github/workflows/ci-python.yml b/.github/workflows/ci-python.yml index 022c339b..d04d8e24 100644 --- a/.github/workflows/ci-python.yml +++ b/.github/workflows/ci-python.yml @@ -14,10 +14,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev + - name: Install libacl-dev run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev + sudo apt-get install -y libacl1-dev - uses: actions/setup-python@v2 with: python-version: 3.8 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 108bbcfb..b9e5624a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,24 +16,11 @@ jobs: timeout-minutes: 30 steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev (Linux) + - name: Install libacl-dev (Linux) if: runner.os == 'Linux' run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev - - name: Install Cap'n Proto (macOS) - if: runner.os == 'macOS' - run: brew install capnp - env: - HOMEBREW_NO_AUTO_UPDATE: 1 - HOMEBREW_NO_BOTTLE_SOURCE_FALLBACK: 1 - HOMEBREW_NO_INSTALL_CLEANUP: 1 - - name: Install Cap'n Proto (Windows) - if: runner.os == 'Windows' - shell: pwsh - run: | - choco install capnproto - echo "$Env:Programfiles\capnproto" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append + sudo apt-get install -y libacl1-dev - uses: r7kamura/rust-problem-matchers@v1.1.0 - run: cargo --version --verbose @@ -56,24 +43,11 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev (Linux) + - name: Install libacl-dev (Linux) if: runner.os == 'Linux' run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev - - name: Install Cap'n Proto (macOS) - if: runner.os == 'macOS' - run: brew install capnp - env: - HOMEBREW_NO_AUTO_UPDATE: 1 - HOMEBREW_NO_BOTTLE_SOURCE_FALLBACK: 1 - HOMEBREW_NO_INSTALL_CLEANUP: 1 - - name: Install Cap'n Proto (Windows) - if: runner.os == 'Windows' - shell: pwsh - run: | - choco install capnproto - echo "$Env:Programfiles\capnproto" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append + sudo apt-get install -y libacl1-dev - uses: r7kamura/rust-problem-matchers@v1.1.0 - run: cargo --version --verbose @@ -108,11 +82,11 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev (Linux) + - name: Install libacl-dev (Linux) if: runner.os == 'Linux' run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev + sudo apt-get install -y libacl1-dev - uses: r7kamura/rust-problem-matchers@v1.1.0 - run: cargo --version --verbose @@ -127,10 +101,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev + - name: Install libacl-dev run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev + sudo apt-get install -y libacl1-dev - uses: r7kamura/rust-problem-matchers@v1.1.0 - run: cargo --version --verbose diff --git a/.github/workflows/pip-release.yml b/.github/workflows/pip-release.yml index cc6f0eb1..6c134b7f 100644 --- a/.github/workflows/pip-release.yml +++ b/.github/workflows/pip-release.yml @@ -21,6 +21,13 @@ jobs: steps: - uses: actions/checkout@v3 + + - name: Install libacl-dev (Linux) + if: runner.os == 'Linux' + run: | + export DEBIAN_FRONTEND=noninteractive + sudo apt-get install -y libacl1-dev + - uses: r7kamura/rust-problem-matchers@v1.1.0 # Publish Dora Node Python API diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index c690b1d8..1074a407 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -22,24 +22,11 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Install Cap'n Proto and libacl-dev (Linux) + - name: Install libacl-dev (Linux) if: runner.os == 'Linux' run: | export DEBIAN_FRONTEND=noninteractive - sudo apt-get install -y capnproto libcapnp-dev libacl1-dev - - name: Install Cap'n Proto (macOS) - if: runner.os == 'macOS' - run: brew install capnp - env: - HOMEBREW_NO_AUTO_UPDATE: 1 - HOMEBREW_NO_BOTTLE_SOURCE_FALLBACK: 1 - HOMEBREW_NO_INSTALL_CLEANUP: 1 - - name: Install Cap'n Proto (Windows) - if: runner.os == 'Windows' - shell: pwsh - run: | - choco install capnproto - echo "$Env:Programfiles\capnproto" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append + sudo apt-get install -y libacl1-dev - uses: r7kamura/rust-problem-matchers@v1.1.0 From baf81d10edbd70ec4647f9b2650bdf1bc2b97759 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 29 Nov 2022 16:44:52 -0500 Subject: [PATCH 08/26] remove sdist from pypi release --- .github/workflows/pip-release.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pip-release.yml b/.github/workflows/pip-release.yml index 6c134b7f..1fb14d71 100644 --- a/.github/workflows/pip-release.yml +++ b/.github/workflows/pip-release.yml @@ -38,7 +38,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install maturin==0.13.2 + pip install maturin==0.14 - name: Publish wheel shell: bash env: @@ -48,5 +48,5 @@ jobs: maturin publish \ --skip-existing \ -o wheels \ - -i python \ + --no-sdist \ --username __token__ \ From 3fe3d23252677f308234c07974c65824c847e299 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 29 Nov 2022 19:12:28 -0500 Subject: [PATCH 09/26] add `XCode` for macOS pypi release --- .github/workflows/pip-release.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pip-release.yml b/.github/workflows/pip-release.yml index 1fb14d71..ca388c76 100644 --- a/.github/workflows/pip-release.yml +++ b/.github/workflows/pip-release.yml @@ -27,7 +27,11 @@ jobs: run: | export DEBIAN_FRONTEND=noninteractive sudo apt-get install -y libacl1-dev - + - name: Install XCode (macOS) + if: runner.os == 'macOS' + uses: maxim-lobanov/setup-xcode@v1 + with: + xcode-version: latest-stable - uses: r7kamura/rust-problem-matchers@v1.1.0 # Publish Dora Node Python API From 9ed73d2d4a941f082cd3576a3fb88e79dc5d4c7b Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 1 Dec 2022 10:39:05 -0500 Subject: [PATCH 10/26] Remove `macOS` from `runtime` distribution --- .github/workflows/pip-release.yml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/pip-release.yml b/.github/workflows/pip-release.yml index ca388c76..1012a66d 100644 --- a/.github/workflows/pip-release.yml +++ b/.github/workflows/pip-release.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: - platform: [ubuntu-latest, macos-latest, windows-latest] + platform: [ubuntu-latest, windows-latest] python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] fail-fast: false runs-on: ${{ matrix.platform }} @@ -27,11 +27,6 @@ jobs: run: | export DEBIAN_FRONTEND=noninteractive sudo apt-get install -y libacl1-dev - - name: Install XCode (macOS) - if: runner.os == 'macOS' - uses: maxim-lobanov/setup-xcode@v1 - with: - xcode-version: latest-stable - uses: r7kamura/rust-problem-matchers@v1.1.0 # Publish Dora Node Python API From 294990e3077477e9b39405efdcb1e5bddba03b0c Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 1 Dec 2022 12:21:11 -0500 Subject: [PATCH 11/26] Make `stop_thread` not borrow `communication` for static --- apis/rust/node/src/lib.rs | 9 ++-- binaries/coordinator/src/lib.rs | 20 ++++++--- binaries/runtime/src/lib.rs | 41 ++++++++++++------- .../dataflow_without_webcam.yml | 2 +- 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 53e35701..c0765142 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,6 +1,6 @@ pub use communication::Input; use communication::STOP_TOPIC; -use communication_layer_pub_sub::CommunicationLayer; +use communication_layer_pub_sub::{CommunicationLayer, Publisher}; pub use dora_core; use dora_core::config::{CommunicationConfig, DataId, NodeId, NodeRunConfig}; pub use dora_message::{uhlc, Metadata, MetadataParameters}; @@ -147,14 +147,11 @@ fn set_up_tracing() -> eyre::Result<()> { #[must_use] pub fn manual_stop_publisher( communication: &mut dyn CommunicationLayer, -) -> eyre::Result Result<(), BoxError>> { - let hlc = dora_message::uhlc::HLC::default(); - let metadata = dora_message::Metadata::new(hlc.new_timestamp()); - let data = metadata.serialize().unwrap(); +) -> eyre::Result> { let publisher = communication .publisher(dora_core::topics::MANUAL_STOP) .map_err(|err| eyre::eyre!(err))?; - Ok(move || publisher.publish(&data)) + Ok(publisher) } #[cfg(test)] diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index aec5212e..8c3fe678 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -8,7 +8,7 @@ use dora_core::{ }, }; use dora_node_api::{communication, manual_stop_publisher}; -use eyre::{bail, eyre, WrapErr}; +use eyre::{bail, eyre, Result, WrapErr}; use futures::StreamExt; use futures_concurrency::stream::Merge; use run::{await_tasks, SpawnedDataflow}; @@ -266,11 +266,19 @@ async fn stop_dataflow( .wrap_err("failed to init communication layer")?; tracing::info!("sending stop message to dataflow `{uuid}`"); let manual_stop_publisher = manual_stop_publisher(communication.as_mut())?; - tokio::task::spawn_blocking(move || manual_stop_publisher()) - .await - .wrap_err("failed to join stop publish task")? - .map_err(|err| eyre!(err)) - .wrap_err("failed to send stop message")?; + tokio::task::spawn_blocking(move || -> Result<()> { + let hlc = dora_message::uhlc::HLC::default(); + let metadata = dora_message::Metadata::new(hlc.new_timestamp()); + let data = metadata.serialize().unwrap(); + manual_stop_publisher + .publish(&data) + .map_err(|err| eyre::eyre!(err)) + .wrap_err("failed to send stop message") + }) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre!(err)) + .wrap_err("failed to send stop message")?; Ok(()) } diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 58c4024c..b7648bd0 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -7,10 +7,11 @@ use dora_core::{ use dora_node_api::{ self, communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, + manual_stop_publisher, }; -use eyre::{bail, Context}; +use eyre::{bail, Context, Result}; use futures::{Stream, StreamExt}; -use operator::{spawn_operator, OperatorEvent}; +use operator::{spawn_operator, OperatorEvent, StopReason}; use std::{ collections::{BTreeSet, HashMap}, @@ -69,17 +70,18 @@ pub fn main() -> eyre::Result<()> { let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); let node_id_clone = node_id.clone(); - let stop_thread = std::thread::spawn(move || { - Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(run( - node_id_clone, - operator_events, - operator_stop_publishers, - )) - .unwrap(); + let tokio_runtime = Builder::new_current_thread() + .enable_all() + .build() + .wrap_err("Could not build a tokio runtime.")?; + let manual_stop_publisher = manual_stop_publisher(communication.as_mut())?; + let stop_thread = std::thread::spawn(move || -> Result<()> { + tokio_runtime.block_on(run( + node_id_clone, + operator_events, + operator_stop_publishers, + manual_stop_publisher, + )) }); for operator_config in &operators { @@ -95,7 +97,8 @@ pub fn main() -> eyre::Result<()> { stop_thread .join() - .map_err(|err| eyre::eyre!("Stop thread failed with err: {err:#?}"))?; + .map_err(|err| eyre::eyre!("Stop thread failed with err: {err:#?}"))? + .wrap_err("Stop loop thread failed unexpectedly.")?; Ok(()) } @@ -103,6 +106,7 @@ async fn run( node_id: NodeId, mut events: impl Stream + Unpin, mut operator_stop_publishers: HashMap>, + manual_stop_publisher: Box, ) -> eyre::Result<()> { #[cfg(feature = "metrics")] let _started = { @@ -127,6 +131,15 @@ async fn run( } OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), OperatorEvent::Finished { reason } => { + if let StopReason::ExplicitStopAll = reason { + let hlc = dora_message::uhlc::HLC::default(); + let metadata = dora_message::Metadata::new(hlc.new_timestamp()); + let data = metadata.serialize().unwrap(); + manual_stop_publisher + .publish(&data) + .map_err(|err| eyre::eyre!(err)) + .wrap_err("failed to send stop message")?; + } if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { tracing::info!("operator {node_id}/{id} finished ({reason:?})"); stopped_operators.insert(id.clone()); diff --git a/examples/python-dataflow/dataflow_without_webcam.yml b/examples/python-dataflow/dataflow_without_webcam.yml index 376da8c1..6b9a00af 100644 --- a/examples/python-dataflow/dataflow_without_webcam.yml +++ b/examples/python-dataflow/dataflow_without_webcam.yml @@ -1,6 +1,6 @@ communication: zenoh: - prefix: /example-python-dataflow + prefix: /example-python-no-webcam-dataflow nodes: - id: no_webcam From 32e11f538c8ef7b6a57f8906cc02b2c4ac4bb2d0 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 1 Dec 2022 14:15:13 -0500 Subject: [PATCH 12/26] Remove `stop_thread` loop after a `StopAll` --- binaries/runtime/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index b7648bd0..fedf559b 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -134,11 +134,14 @@ async fn run( if let StopReason::ExplicitStopAll = reason { let hlc = dora_message::uhlc::HLC::default(); let metadata = dora_message::Metadata::new(hlc.new_timestamp()); - let data = metadata.serialize().unwrap(); + let data = metadata + .serialize() + .wrap_err("failed to serialize stop message")?; manual_stop_publisher .publish(&data) .map_err(|err| eyre::eyre!(err)) .wrap_err("failed to send stop message")?; + break; } if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { tracing::info!("operator {node_id}/{id} finished ({reason:?})"); From 83cc80e073fbadb3e2ccda75d842187d9cf8bec2 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 1 Dec 2022 15:09:18 -0500 Subject: [PATCH 13/26] FIx documentation --- .github/workflows/pip-release.yml | 2 +- README.md | 1 + binaries/coordinator/src/run/runtime.rs | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pip-release.yml b/.github/workflows/pip-release.yml index 1012a66d..11ecf5ad 100644 --- a/.github/workflows/pip-release.yml +++ b/.github/workflows/pip-release.yml @@ -1,4 +1,4 @@ -name: Release +name: Pypi Release permissions: contents: write diff --git a/README.md b/README.md index a96b4619..2d4795c2 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ For linux ```bash wget https://github.com/dora-rs/dora/releases/download//dora--x86_64-Linux.zip unzip dora--x86_64-Linux.zip +python3 -m pip install dora-rs== PATH=$PATH:$(pwd):$(pwd)/iceoryx dora --help ``` diff --git a/binaries/coordinator/src/run/runtime.rs b/binaries/coordinator/src/run/runtime.rs index c3d37ea4..65bf36d3 100644 --- a/binaries/coordinator/src/run/runtime.rs +++ b/binaries/coordinator/src/run/runtime.rs @@ -21,11 +21,12 @@ pub fn spawn_runtime_node( .any(|x| matches!(x.config.source, OperatorSource::Python { .. })); let mut command = if has_python_operator { - // Use Python Runtime if runtime is + // Use python to spawn runtime if there is a python operator let mut command = tokio::process::Command::new("python3"); command.args(["-c", "import dora; dora.start_runtime()"]); command } else { + // Use default runtime if there is no python operator let command = tokio::process::Command::new(runtime); command }; From 40c0175169777dec0726c27214e35034e08c4057 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 2 Dec 2022 15:49:54 -0500 Subject: [PATCH 14/26] Make python and rust operator not mixable --- binaries/coordinator/src/run/runtime.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/binaries/coordinator/src/run/runtime.rs b/binaries/coordinator/src/run/runtime.rs index 65bf36d3..2a3c5541 100644 --- a/binaries/coordinator/src/run/runtime.rs +++ b/binaries/coordinator/src/run/runtime.rs @@ -20,15 +20,23 @@ pub fn spawn_runtime_node( .iter() .any(|x| matches!(x.config.source, OperatorSource::Python { .. })); - let mut command = if has_python_operator { + let has_other_operator = node + .operators + .iter() + .any(|x| !matches!(x.config.source, OperatorSource::Python { .. })); + + let mut command = if has_python_operator && !has_other_operator { // Use python to spawn runtime if there is a python operator let mut command = tokio::process::Command::new("python3"); command.args(["-c", "import dora; dora.start_runtime()"]); command - } else { + } else if !has_python_operator && has_other_operator { // Use default runtime if there is no python operator - let command = tokio::process::Command::new(runtime); - command + tokio::process::Command::new(runtime) + } else { + return Err(eyre!( + "Runtime can not mix Python Operator with other type of operator." + )); }; command_init_common_env(&mut command, &node_id, communication)?; From e4cc3cf2a18a4426e591197da8fbb46a6e370f7d Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 1 Dec 2022 15:18:08 -0500 Subject: [PATCH 15/26] Fix template for `v0.1.1` --- Cargo.lock | 2 +- binaries/cli/src/template/rust/node/Cargo-template.toml | 2 +- binaries/cli/src/template/rust/node/main-template.rs | 2 +- binaries/cli/src/template/rust/operator/Cargo-template.toml | 2 +- libraries/communication-layer/pub-sub/Cargo.toml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7316cccb..a9c12de0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -529,7 +529,7 @@ dependencies = [ [[package]] name = "communication-layer-pub-sub" -version = "0.1.0" +version = "0.1.1" dependencies = [ "eyre", "iceoryx-rs", diff --git a/binaries/cli/src/template/rust/node/Cargo-template.toml b/binaries/cli/src/template/rust/node/Cargo-template.toml index 1493f3fd..518d4272 100644 --- a/binaries/cli/src/template/rust/node/Cargo-template.toml +++ b/binaries/cli/src/template/rust/node/Cargo-template.toml @@ -6,4 +6,4 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { git = "https://github.com/dora-rs/dora.git" } +dora-node-api = { git = "https://github.com/dora-rs/dora.git", tag = "v0.1.1" } diff --git a/binaries/cli/src/template/rust/node/main-template.rs b/binaries/cli/src/template/rust/node/main-template.rs index a7c01a66..f1fdfbbb 100644 --- a/binaries/cli/src/template/rust/node/main-template.rs +++ b/binaries/cli/src/template/rust/node/main-template.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, core::config::DataId, DoraNode}; +use dora_node_api::DoraNode; use std::error::Error; fn main() -> Result<(), Box> { diff --git a/binaries/cli/src/template/rust/operator/Cargo-template.toml b/binaries/cli/src/template/rust/operator/Cargo-template.toml index 0b27ecfe..32317f32 100644 --- a/binaries/cli/src/template/rust/operator/Cargo-template.toml +++ b/binaries/cli/src/template/rust/operator/Cargo-template.toml @@ -9,4 +9,4 @@ edition = "2021" crate-type = ["cdylib"] [dependencies] -dora-operator-api = { git = "https://github.com/dora-rs/dora.git" } +dora-operator-api = { git = "https://github.com/dora-rs/dora.git", tag = "v0.1.1" } diff --git a/libraries/communication-layer/pub-sub/Cargo.toml b/libraries/communication-layer/pub-sub/Cargo.toml index b246998d..525e5093 100644 --- a/libraries/communication-layer/pub-sub/Cargo.toml +++ b/libraries/communication-layer/pub-sub/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "communication-layer-pub-sub" -version = "0.1.0" +version = "0.1.1" edition = "2021" [features] From 9a0b303e987f6a5ca665086cd9d2ad24f70ad22f Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 1 Dec 2022 15:21:09 -0500 Subject: [PATCH 16/26] Replace all versions to `v0.1.1` --- Cargo.lock | 52 +++++++++---------- apis/c++/node/Cargo.toml | 4 +- apis/c++/operator/Cargo.toml | 4 +- apis/c/node/Cargo.toml | 2 +- apis/c/operator/Cargo.toml | 2 +- apis/python/node/Cargo.toml | 2 +- apis/python/operator/Cargo.toml | 2 +- apis/rust/node/Cargo.toml | 2 +- apis/rust/operator/Cargo.toml | 2 +- apis/rust/operator/macros/Cargo.toml | 2 +- apis/rust/operator/types/Cargo.toml | 2 +- binaries/cli/Cargo.toml | 2 +- binaries/coordinator/Cargo.toml | 4 +- binaries/runtime/Cargo.toml | 4 +- examples/iceoryx/node/Cargo.toml | 4 +- examples/iceoryx/operator/Cargo.toml | 2 +- examples/iceoryx/sink/Cargo.toml | 4 +- examples/rust-dataflow-url/sink/Cargo.toml | 4 +- examples/rust-dataflow/node/Cargo.toml | 4 +- examples/rust-dataflow/operator/Cargo.toml | 2 +- examples/rust-dataflow/sink/Cargo.toml | 4 +- .../request-reply/Cargo.toml | 2 +- libraries/core/Cargo.toml | 2 +- libraries/extensions/download/Cargo.toml | 2 +- .../extensions/telemetry/metrics/Cargo.toml | 2 +- .../extensions/telemetry/tracing/Cargo.toml | 2 +- libraries/extensions/zenoh-logger/Cargo.toml | 2 +- libraries/message/Cargo.toml | 2 +- 28 files changed, 62 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9c12de0..02c640bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -540,7 +540,7 @@ dependencies = [ [[package]] name = "communication-layer-request-reply" -version = "0.1.0" +version = "0.1.1" dependencies = [ "eyre", ] @@ -891,7 +891,7 @@ dependencies = [ [[package]] name = "dora-cli" -version = "0.1.0" +version = "0.1.1" dependencies = [ "atty", "clap 4.0.3", @@ -911,7 +911,7 @@ dependencies = [ [[package]] name = "dora-coordinator" -version = "0.1.0" +version = "0.1.1" dependencies = [ "bincode", "clap 3.2.20", @@ -941,7 +941,7 @@ dependencies = [ [[package]] name = "dora-core" -version = "0.1.0" +version = "0.1.1" dependencies = [ "eyre", "once_cell", @@ -954,7 +954,7 @@ dependencies = [ [[package]] name = "dora-download" -version = "0.1.0" +version = "0.1.1" dependencies = [ "eyre", "reqwest", @@ -975,7 +975,7 @@ dependencies = [ [[package]] name = "dora-message" -version = "0.1.0" +version = "0.1.1" dependencies = [ "capnp", "capnpc", @@ -984,7 +984,7 @@ dependencies = [ [[package]] name = "dora-metrics" -version = "0.1.0" +version = "0.1.1" dependencies = [ "futures", "opentelemetry", @@ -995,7 +995,7 @@ dependencies = [ [[package]] name = "dora-node-api" -version = "0.1.0" +version = "0.1.1" dependencies = [ "capnp", "communication-layer-pub-sub", @@ -1015,7 +1015,7 @@ dependencies = [ [[package]] name = "dora-node-api-c" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -1025,7 +1025,7 @@ dependencies = [ [[package]] name = "dora-node-api-cxx" -version = "0.1.0" +version = "0.1.1" dependencies = [ "cxx", "cxx-build", @@ -1035,7 +1035,7 @@ dependencies = [ [[package]] name = "dora-node-api-python" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "dora-operator-api-python", @@ -1048,7 +1048,7 @@ dependencies = [ [[package]] name = "dora-operator-api" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-operator-api-macros", "dora-operator-api-types", @@ -1056,14 +1056,14 @@ dependencies = [ [[package]] name = "dora-operator-api-c" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-operator-api-types", ] [[package]] name = "dora-operator-api-cxx" -version = "0.1.0" +version = "0.1.1" dependencies = [ "cxx", "cxx-build", @@ -1076,7 +1076,7 @@ dependencies = [ [[package]] name = "dora-operator-api-macros" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-operator-api", "dora-operator-api-types", @@ -1087,7 +1087,7 @@ dependencies = [ [[package]] name = "dora-operator-api-python" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -1098,14 +1098,14 @@ dependencies = [ [[package]] name = "dora-operator-api-types" -version = "0.1.0" +version = "0.1.1" dependencies = [ "safer-ffi", ] [[package]] name = "dora-runtime" -version = "0.1.0" +version = "0.1.1" dependencies = [ "clap 3.2.20", "dora-core", @@ -1136,7 +1136,7 @@ dependencies = [ [[package]] name = "dora-tracing" -version = "0.1.0" +version = "0.1.1" dependencies = [ "opentelemetry", "opentelemetry-jaeger", @@ -1665,7 +1665,7 @@ dependencies = [ [[package]] name = "iceoryx-example-node" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -1674,14 +1674,14 @@ dependencies = [ [[package]] name = "iceoryx-example-operator" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-operator-api", ] [[package]] name = "iceoryx-example-sink" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -3251,7 +3251,7 @@ dependencies = [ [[package]] name = "rust-dataflow-example-node" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -3262,14 +3262,14 @@ dependencies = [ [[package]] name = "rust-dataflow-example-operator" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-operator-api", ] [[package]] name = "rust-dataflow-example-sink" -version = "0.1.0" +version = "0.1.1" dependencies = [ "dora-node-api", "eyre", @@ -4924,7 +4924,7 @@ dependencies = [ [[package]] name = "zenoh-logger" -version = "0.1.0" +version = "0.1.1" dependencies = [ "zenoh", "zenoh-config", diff --git a/apis/c++/node/Cargo.toml b/apis/c++/node/Cargo.toml index 3b7ab157..4c60e9a5 100644 --- a/apis/c++/node/Cargo.toml +++ b/apis/c++/node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-node-api-cxx" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -10,7 +10,7 @@ crate-type = ["staticlib"] [dependencies] cxx = "1.0.73" -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node", default-features = false, features = [ +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node", default-features = false, features = [ "zenoh", ] } eyre = "0.6.8" diff --git a/apis/c++/operator/Cargo.toml b/apis/c++/operator/Cargo.toml index 980e07ca..9f359563 100644 --- a/apis/c++/operator/Cargo.toml +++ b/apis/c++/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api-cxx" -version = "0.1.0" +version = "0.1.1" edition = "2021" [lib] @@ -8,7 +8,7 @@ crate-type = ["staticlib"] [dependencies] cxx = "1.0.73" -dora-operator-api = { version = "0.1.0", path = "../../../apis/rust/operator" } +dora-operator-api = { version = "0.1.1", path = "../../../apis/rust/operator" } eyre = "0.6.8" futures = "0.3.21" rand = "0.8.5" diff --git a/apis/c/node/Cargo.toml b/apis/c/node/Cargo.toml index 9351e75c..c8cf6366 100644 --- a/apis/c/node/Cargo.toml +++ b/apis/c/node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-node-api-c" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/apis/c/operator/Cargo.toml b/apis/c/operator/Cargo.toml index ca457efd..82d857f8 100644 --- a/apis/c/operator/Cargo.toml +++ b/apis/c/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api-c" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" description = "C API implemetation for Dora Operator" diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 0bbda832..3cebc412 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-node-api-python" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/apis/python/operator/Cargo.toml b/apis/python/operator/Cargo.toml index d51de4a0..fd3a42b7 100644 --- a/apis/python/operator/Cargo.toml +++ b/apis/python/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api-python" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index 4ae127d8..fd790684 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-node-api" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/apis/rust/operator/Cargo.toml b/apis/rust/operator/Cargo.toml index 08c74a9b..233b42d8 100644 --- a/apis/rust/operator/Cargo.toml +++ b/apis/rust/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" description = "Rust API implemetation for Dora Operator" diff --git a/apis/rust/operator/macros/Cargo.toml b/apis/rust/operator/macros/Cargo.toml index 2565aa4e..066d808c 100644 --- a/apis/rust/operator/macros/Cargo.toml +++ b/apis/rust/operator/macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api-macros" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" description = "Rust API Macros for Dora Operator" diff --git a/apis/rust/operator/types/Cargo.toml b/apis/rust/operator/types/Cargo.toml index 43cab511..2d6cbc21 100644 --- a/apis/rust/operator/types/Cargo.toml +++ b/apis/rust/operator/types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-operator-api-types" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 6803514b..96c77e33 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-cli" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index af5ef556..fc3f18b8 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-coordinator" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" @@ -20,7 +20,7 @@ clap = { version = "3.1.8", features = ["derive"] } uuid = { version = "1.2.1" } time = "0.3.9" rand = "0.8.5" -dora-core = { version = "0.1.0", path = "../../libraries/core" } +dora-core = { version = "0.1.1", path = "../../libraries/core" } dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 69f0ba93..97b65b03 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-runtime" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" @@ -14,7 +14,7 @@ dora-node-api = { path = "../../apis/rust/node", default-features = false, featu ] } dora-operator-api-python = { path = "../../apis/python/operator" } dora-operator-api-types = { path = "../../apis/rust/operator/types" } -dora-core = { version = "0.1.0", path = "../../libraries/core" } +dora-core = { version = "0.1.1", path = "../../libraries/core" } dora-tracing = { path = "../../libraries/extensions/telemetry/tracing", optional = true } dora-metrics = { path = "../../libraries/extensions/telemetry/metrics", optional = true } opentelemetry = { version = "0.17", features = [ diff --git a/examples/iceoryx/node/Cargo.toml b/examples/iceoryx/node/Cargo.toml index dafe3892..66bea6e5 100644 --- a/examples/iceoryx/node/Cargo.toml +++ b/examples/iceoryx/node/Cargo.toml @@ -1,11 +1,11 @@ [package] name = "iceoryx-example-node" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } eyre = "0.6.8" rand = "0.8.5" diff --git a/examples/iceoryx/operator/Cargo.toml b/examples/iceoryx/operator/Cargo.toml index 474d6103..eb5f3032 100644 --- a/examples/iceoryx/operator/Cargo.toml +++ b/examples/iceoryx/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceoryx-example-operator" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/examples/iceoryx/sink/Cargo.toml b/examples/iceoryx/sink/Cargo.toml index fa48b258..4f3b049e 100644 --- a/examples/iceoryx/sink/Cargo.toml +++ b/examples/iceoryx/sink/Cargo.toml @@ -1,12 +1,12 @@ [package] name = "iceoryx-example-sink" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } eyre = "0.6.8" futures = "0.3.21" tokio = { version = "1.20.1", features = ["macros"] } diff --git a/examples/rust-dataflow-url/sink/Cargo.toml b/examples/rust-dataflow-url/sink/Cargo.toml index e80b5a61..f98c5d28 100644 --- a/examples/rust-dataflow-url/sink/Cargo.toml +++ b/examples/rust-dataflow-url/sink/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "rust-dataflow-example-sink" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } eyre = "0.6.8" diff --git a/examples/rust-dataflow/node/Cargo.toml b/examples/rust-dataflow/node/Cargo.toml index d7bcd656..71f91475 100644 --- a/examples/rust-dataflow/node/Cargo.toml +++ b/examples/rust-dataflow/node/Cargo.toml @@ -1,12 +1,12 @@ [package] name = "rust-dataflow-example-node" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } eyre = "0.6.8" futures = "0.3.21" rand = "0.8.5" diff --git a/examples/rust-dataflow/operator/Cargo.toml b/examples/rust-dataflow/operator/Cargo.toml index edcac2b8..e18c676c 100644 --- a/examples/rust-dataflow/operator/Cargo.toml +++ b/examples/rust-dataflow/operator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rust-dataflow-example-operator" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/examples/rust-dataflow/sink/Cargo.toml b/examples/rust-dataflow/sink/Cargo.toml index e80b5a61..f98c5d28 100644 --- a/examples/rust-dataflow/sink/Cargo.toml +++ b/examples/rust-dataflow/sink/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "rust-dataflow-example-sink" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +dora-node-api = { version = "0.1.1", path = "../../../apis/rust/node" } eyre = "0.6.8" diff --git a/libraries/communication-layer/request-reply/Cargo.toml b/libraries/communication-layer/request-reply/Cargo.toml index 8f4a9392..e580b981 100644 --- a/libraries/communication-layer/request-reply/Cargo.toml +++ b/libraries/communication-layer/request-reply/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "communication-layer-request-reply" -version = "0.1.0" +version = "0.1.1" edition = "2021" [features] diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 6abc3a34..d419f7fb 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-core" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/libraries/extensions/download/Cargo.toml b/libraries/extensions/download/Cargo.toml index 5ae30c93..ad851bc6 100644 --- a/libraries/extensions/download/Cargo.toml +++ b/libraries/extensions/download/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-download" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/libraries/extensions/telemetry/metrics/Cargo.toml b/libraries/extensions/telemetry/metrics/Cargo.toml index 00f83ca5..77c1620d 100644 --- a/libraries/extensions/telemetry/metrics/Cargo.toml +++ b/libraries/extensions/telemetry/metrics/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-metrics" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/libraries/extensions/telemetry/tracing/Cargo.toml b/libraries/extensions/telemetry/tracing/Cargo.toml index ee855913..ccc59e57 100644 --- a/libraries/extensions/telemetry/tracing/Cargo.toml +++ b/libraries/extensions/telemetry/tracing/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-tracing" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/libraries/extensions/zenoh-logger/Cargo.toml b/libraries/extensions/zenoh-logger/Cargo.toml index 1ce72e50..1782b11a 100644 --- a/libraries/extensions/zenoh-logger/Cargo.toml +++ b/libraries/extensions/zenoh-logger/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "zenoh-logger" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/libraries/message/Cargo.toml b/libraries/message/Cargo.toml index 89fd0584..e39d4f9d 100644 --- a/libraries/message/Cargo.toml +++ b/libraries/message/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-message" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" From b15d25b9a3bdafedcae4671cdc0e70643795d665 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 1 Dec 2022 15:30:26 -0500 Subject: [PATCH 17/26] Fix clippy warnings --- apis/rust/node/src/lib.rs | 1 - binaries/cli/src/check.rs | 1 - binaries/cli/src/template/cxx/mod.rs | 4 ++-- binaries/runtime/src/main.rs | 2 -- binaries/runtime/src/operator/python.rs | 2 +- libraries/communication-layer/request-reply/src/tcp.rs | 2 +- 6 files changed, 4 insertions(+), 8 deletions(-) diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index c0765142..0be0308d 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -144,7 +144,6 @@ fn set_up_tracing() -> eyre::Result<()> { .context("failed to set tracing global subscriber") } -#[must_use] pub fn manual_stop_publisher( communication: &mut dyn CommunicationLayer, ) -> eyre::Result> { diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index c0685454..64bf3e47 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -3,7 +3,6 @@ use dora_core::{ adjust_shared_library_path, config::{InputMapping, UserInputMapping}, descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, - topics::ControlRequest, }; use eyre::{bail, eyre, Context}; use std::{env::consts::EXE_EXTENSION, io::Write, path::Path}; diff --git a/binaries/cli/src/template/cxx/mod.rs b/binaries/cli/src/template/cxx/mod.rs index 6532bc85..6d96fc5e 100644 --- a/binaries/cli/src/template/cxx/mod.rs +++ b/binaries/cli/src/template/cxx/mod.rs @@ -64,7 +64,7 @@ fn create_operator(name: String, path: Option) -> Result<(), eyre::ErrR // create directories let root = path.as_deref().unwrap_or_else(|| Path::new(&name)); - fs::create_dir(&root) + fs::create_dir(root) .with_context(|| format!("failed to create directory `{}`", root.display()))?; let operator_path = root.join("operator.cc"); @@ -96,7 +96,7 @@ fn create_custom_node(name: String, path: Option) -> Result<(), eyre::E // create directories let root = path.as_deref().unwrap_or_else(|| Path::new(&name)); - fs::create_dir(&root) + fs::create_dir(root) .with_context(|| format!("failed to create directory `{}`", root.display()))?; let node_path = root.join("node.cc"); diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index 73156226..98a70f34 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -1,5 +1,3 @@ -use dora_runtime; - fn main() -> Result<(), eyre::Report> { dora_runtime::main() } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 814e5d45..b4849cca 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -50,7 +50,7 @@ pub fn spawn( let path = if source_is_url(source) { let target_path = Path::new("build") .join(node_id.to_string()) - .join(format!("{}.py", operator_id.to_string())); + .join(format!("{}.py", operator_id)); // try to download the shared library let rt = tokio::runtime::Builder::new_current_thread() .enable_all() diff --git a/libraries/communication-layer/request-reply/src/tcp.rs b/libraries/communication-layer/request-reply/src/tcp.rs index 515b5684..a8ffb622 100644 --- a/libraries/communication-layer/request-reply/src/tcp.rs +++ b/libraries/communication-layer/request-reply/src/tcp.rs @@ -125,7 +125,7 @@ impl TcpConnection { fn send(&mut self, request: &[u8]) -> std::io::Result<()> { let len_raw = (request.len() as u64).to_le_bytes(); self.stream.write_all(&len_raw)?; - self.stream.write_all(&request)?; + self.stream.write_all(request)?; Ok(()) } From 756c024727ca890d8caab308b7b309c7733f236f Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 1 Dec 2022 17:09:58 -0500 Subject: [PATCH 18/26] Make pip wheel for several linux version Pip wheels does not work across Ubuntu version apparently due to glibc version not being the same. --- .github/workflows/pip-release.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pip-release.yml b/.github/workflows/pip-release.yml index 11ecf5ad..c7a27d8c 100644 --- a/.github/workflows/pip-release.yml +++ b/.github/workflows/pip-release.yml @@ -14,8 +14,8 @@ jobs: strategy: matrix: - platform: [ubuntu-latest, windows-latest] - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] + platform: [ubuntu-latest, ubuntu-20.04, ubuntu-18.04, windows-latest] + python-version: ["3.7"] fail-fast: false runs-on: ${{ matrix.platform }} From c7c32223d494650ea2463d0399b5c067562588f4 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 2 Dec 2022 13:52:35 -0500 Subject: [PATCH 19/26] Create different ubuntu release as they use different glibc The rus compiler compile package against a specific glibc version that changes depending on the ubuntu version we're using. This commit should fix error such as: ```bash ./dora: /lib/x86_64-linux-gnu/libc.so.6: version `GLIBC_2.33' not found (required by ./dora) ./dora: /lib/x86_64-linux-gnu/libc.so.6: version `GLIBC_2.32' not found (required by ./dora) ./dora: /lib/x86_64-linux-gnu/libc.so.6: version `GLIBC_2.34' not found (required by ./dora) ``` --- .github/workflows/release.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 1074a407..05a29b61 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: strategy: matrix: - platform: [ubuntu-latest, macos-latest, windows-latest] + platform: [ubuntu-20.04, ubuntu-22.04, macos-12, windows-2022] python-version: ["3.7"] fail-fast: false runs-on: ${{ matrix.platform }} @@ -65,5 +65,5 @@ jobs: with: upload_url: ${{ github.event.release.upload_url }} asset_path: archive.zip - asset_name: dora-${{ github.ref_name }}-x86_64-${{ runner.os }}.zip + asset_name: dora-${{ github.ref_name }}-x86_64-${{ matrix.platform }}.zip asset_content_type: application/zip From 7e93962b846de1305f033df3829658216b473d3d Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 4 Dec 2022 11:49:27 -0500 Subject: [PATCH 20/26] Upgrading `patchelf` to fix `.libs` path issues. Fix https://github.com/PyO3/maturin/issues/1165#issuecomment-1269992882. Thanks to @messense. --- .github/workflows/pip-release.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pip-release.yml b/.github/workflows/pip-release.yml index c7a27d8c..78407727 100644 --- a/.github/workflows/pip-release.yml +++ b/.github/workflows/pip-release.yml @@ -38,6 +38,7 @@ jobs: run: | python -m pip install --upgrade pip pip install maturin==0.14 + pip install patchelf --upgrade - name: Publish wheel shell: bash env: From 73fc94ef81fae435ababf359d25700f27a64ab6d Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 4 Dec 2022 12:20:14 -0500 Subject: [PATCH 21/26] Removing `ubuntu-18` and `windows` from pypi release --- .github/workflows/pip-release.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pip-release.yml b/.github/workflows/pip-release.yml index 78407727..5e2a5933 100644 --- a/.github/workflows/pip-release.yml +++ b/.github/workflows/pip-release.yml @@ -10,11 +10,11 @@ on: jobs: release: - name: "Release" + name: "Pypi Release" strategy: matrix: - platform: [ubuntu-latest, ubuntu-20.04, ubuntu-18.04, windows-latest] + platform: [ubuntu-latest, ubuntu-20.04] python-version: ["3.7"] fail-fast: false runs-on: ${{ matrix.platform }} From 35b70f0e3b3b8e34c50e46d2739f3aa337b19806 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 4 Dec 2022 12:24:37 -0500 Subject: [PATCH 22/26] Add warning about low support of MacOS and Windows --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 2d4795c2..a5ef87b8 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,8 @@ Dataflow Oriented Robotic Architecture ⚡ This project is in early development, and many features have yet to be implemented with breaking changes. Please don't take for granted the current design. +`dora` primary support is with `Linux` ( Ubuntu 20.04 and Ubuntu 22.04 ) as it is the primary OS for both Cloud and small computers. If you wish to use `dora` with another OS, please compile from source. + --- ## 📖 Documentation From 0fd38ceedda9988c382effa689487ba362d96f84 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 4 Dec 2022 14:46:57 -0500 Subject: [PATCH 23/26] Remove windows and macOS as they are not well supported --- README.md | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/README.md b/README.md index a5ef87b8..c36b2cfc 100644 --- a/README.md +++ b/README.md @@ -35,30 +35,6 @@ PATH=$PATH:$(pwd):$(pwd)/iceoryx dora --help ``` -
- For Macos - -```bash -wget https://github.com/dora-rs/dora/releases/download//dora--x86_64-macOS.zip -unzip dora--x86_64-macOS.zip -PATH=$PATH:$(pwd):$(pwd)/iceoryx -dora --help -``` - -
- -
- For Windows - -```bash -wget https://github.com/dora-rs/dora/releases/download//dora--x86_64-Windows.zip -unzip dora--x86_64-Windows.zip -PATH=$PATH:$(pwd):$(pwd)/iceoryx -dora --help -``` - -
- > This is `x86_64` only for the moment. 2. Create a new dataflow From 222cb80b5426ce02288cb68803ce467ecdd02895 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 4 Dec 2022 14:47:17 -0500 Subject: [PATCH 24/26] change version name to be able to republish package in pypi --- Cargo.lock | 2 +- apis/python/node/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 02c640bc..a8d11497 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1035,7 +1035,7 @@ dependencies = [ [[package]] name = "dora-node-api-python" -version = "0.1.1" +version = "0.1.1-2" dependencies = [ "dora-node-api", "dora-operator-api-python", diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 3cebc412..fdce146c 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dora-node-api-python" -version = "0.1.1" +version = "0.1.1-2" edition = "2021" license = "Apache-2.0" From 00e6bb8192db5411d33246773347f666c11d8f01 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 7 Dec 2022 14:09:02 +0100 Subject: [PATCH 25/26] Fix high CPU usage in coordinator by properly handling closed connections We forgot to handle closed control connections in the `handle_requests` loop. So instead of breaking out of that loop, we continued polling the connection for new messages again and again. --- binaries/coordinator/src/control.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index a2066952..3b254ca5 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -81,14 +81,23 @@ fn handle_requests( Ok(reply) })); if let Err(err) = result { - if err.kind() == ErrorKind::Other { - let inner = err.into_inner().unwrap(); - let downcasted = inner.downcast_ref().unwrap(); - match downcasted { - HandlerError::ParseError(err) => { - tracing::warn!("failed to parse request: {err}"); + match err.kind() { + ErrorKind::UnexpectedEof => { + tracing::trace!("Control connection closed"); + break; + } + ErrorKind::Other => { + let inner = err.into_inner().unwrap(); + let downcasted = inner.downcast_ref().unwrap(); + match downcasted { + HandlerError::ParseError(err) => { + tracing::warn!("failed to parse request: {err}"); + } + HandlerError::ServerStopped => break, } - HandlerError::ServerStopped => break, + } + _ => { + tracing::warn!("I/O error while trying to receive control request: {err:?}"); } } } From 7ce86a3cfbcdb5869d25df0fdf8cb0862a267aa2 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 7 Dec 2022 14:10:35 +0100 Subject: [PATCH 26/26] Log received control events --- binaries/coordinator/src/control.rs | 1 + binaries/coordinator/src/lib.rs | 3 +++ 2 files changed, 4 insertions(+) diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index 3b254ca5..e0a0bd5a 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -112,6 +112,7 @@ enum HandlerError { ServerStopped, } +#[derive(Debug)] pub enum ControlEvent { IncomingRequest { request: ControlRequest, diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 8c3fe678..d3c1c014 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -77,6 +77,7 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { let mut running_dataflows = HashMap::new(); while let Some(event) = events.next().await { + tracing::trace!("Handling event {event:?}"); match event { Event::Dataflow { uuid, event } => match event { DataflowEvent::Finished { result } => { @@ -319,11 +320,13 @@ async fn start_dataflow( }) } +#[derive(Debug)] enum Event { Dataflow { uuid: Uuid, event: DataflowEvent }, Control(ControlEvent), } +#[derive(Debug)] enum DataflowEvent { Finished { result: eyre::Result<()> }, }