From 53756289307e88bc314bbf7367202dba06cb9336 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 9 Nov 2022 14:22:44 +0100 Subject: [PATCH] Allow operators to stop the full dataflow Useful in case of emergencies. --- apis/c/operator/operator_types.h | 2 ++ apis/python/node/src/lib.rs | 2 +- apis/rust/node/src/lib.rs | 15 ++++++++++++++- apis/rust/operator/types/src/lib.rs | 1 + binaries/coordinator/src/lib.rs | 20 ++++++++------------ binaries/runtime/src/main.rs | 17 ++++++++++++++--- binaries/runtime/src/operator/mod.rs | 9 ++++++++- binaries/runtime/src/operator/python.rs | 20 ++++++++++++-------- binaries/runtime/src/operator/shared_lib.rs | 20 ++++++++++++-------- examples/iceoryx/node/src/main.rs | 2 +- examples/rust-dataflow/node/src/main.rs | 2 +- 11 files changed, 74 insertions(+), 36 deletions(-) diff --git a/apis/c/operator/operator_types.h b/apis/c/operator/operator_types.h index 251ed99b..d8074338 100644 --- a/apis/c/operator/operator_types.h +++ b/apis/c/operator/operator_types.h @@ -68,6 +68,8 @@ enum DoraStatus { DORA_STATUS_CONTINUE = 0, /** */ DORA_STATUS_STOP = 1, + /** */ + DORA_STATUS_STOP_ALL = 2, } #ifndef DOXYGEN ; typedef uint8_t diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 9a40e299..97f68014 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,6 +1,6 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] -use dora_node_api::{core::config::NodeId, DoraNode, Input}; +use dora_node_api::{dora_core::config::NodeId, DoraNode, Input}; use dora_operator_api_python::{metadata_to_pydict, pydict_to_metadata}; use eyre::{Context, Result}; use flume::Receiver; diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index f3b06415..53e35701 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,7 +1,7 @@ pub use communication::Input; use communication::STOP_TOPIC; use communication_layer_pub_sub::CommunicationLayer; -pub use dora_core as core; +pub use dora_core; use dora_core::config::{CommunicationConfig, DataId, NodeId, NodeRunConfig}; pub use dora_message::{uhlc, Metadata, MetadataParameters}; use eyre::WrapErr; @@ -144,6 +144,19 @@ fn set_up_tracing() -> eyre::Result<()> { .context("failed to set tracing global subscriber") } +#[must_use] +pub fn manual_stop_publisher( + communication: &mut dyn CommunicationLayer, +) -> eyre::Result Result<(), BoxError>> { + let hlc = dora_message::uhlc::HLC::default(); + let metadata = dora_message::Metadata::new(hlc.new_timestamp()); + let data = metadata.serialize().unwrap(); + let publisher = communication + .publisher(dora_core::topics::MANUAL_STOP) + .map_err(|err| eyre::eyre!(err))?; + Ok(move || publisher.publish(&data)) +} + #[cfg(test)] mod tests { use dora_core::config; diff --git a/apis/rust/operator/types/src/lib.rs b/apis/rust/operator/types/src/lib.rs index 2d66ec1b..312caf85 100644 --- a/apis/rust/operator/types/src/lib.rs +++ b/apis/rust/operator/types/src/lib.rs @@ -103,6 +103,7 @@ pub struct OnInputResult { pub enum DoraStatus { Continue = 0, Stop = 1, + StopAll = 2, } pub fn generate_headers(target_file: &Path) -> ::std::io::Result<()> { diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 766fde8a..445c50dd 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -2,11 +2,11 @@ use crate::run::spawn_dataflow; use dora_core::{ config::CommunicationConfig, topics::{ - self, StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_LIST, + StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_LIST, ZENOH_CONTROL_START, ZENOH_CONTROL_STOP, }, }; -use dora_node_api::communication; +use dora_node_api::{communication, manual_stop_publisher}; use eyre::{bail, eyre, WrapErr}; use futures::StreamExt; use futures_concurrency::stream::Merge; @@ -188,16 +188,12 @@ async fn stop_dataflow( .wrap_err("failed to join communication layer init task")? .wrap_err("failed to init communication layer")?; tracing::info!("sending stop message to dataflow `{uuid}`"); - tokio::task::spawn_blocking(move || { - let hlc = dora_message::uhlc::HLC::default(); - let metadata = dora_message::Metadata::new(hlc.new_timestamp()); - let data = metadata.serialize().unwrap(); - communication.publisher(topics::MANUAL_STOP)?.publish(&data) - }) - .await - .wrap_err("failed to join stop publish task")? - .map_err(|err| eyre!(err)) - .wrap_err("failed to send stop message")?; + let manual_stop_publisher = manual_stop_publisher(communication.as_mut())?; + tokio::task::spawn_blocking(move || manual_stop_publisher()) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre!(err)) + .wrap_err("failed to send stop message")?; Ok(()) } diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index 9253e798..b8d38d84 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -7,10 +7,11 @@ use dora_core::{ use dora_node_api::{ self, communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, + manual_stop_publisher, }; use eyre::{bail, Context}; use futures::{Stream, StreamExt}; -use operator::{spawn_operator, OperatorEvent}; +use operator::{spawn_operator, OperatorEvent, StopReason}; use std::{ collections::{BTreeSet, HashMap}, mem, @@ -76,6 +77,7 @@ fn main() -> eyre::Result<()> { node_id, operator_events, operator_stop_publishers, + communication.as_mut(), )) } @@ -83,6 +85,7 @@ async fn run( node_id: NodeId, mut events: impl Stream + Unpin, mut operator_stop_publishers: HashMap>, + communication: &mut dyn CommunicationLayer, ) -> eyre::Result<()> { #[cfg(feature = "metrics")] let _started = { @@ -106,9 +109,17 @@ async fn run( bail!(err.wrap_err(format!("operator {id} failed"))) } OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), - OperatorEvent::Finished => { + OperatorEvent::Finished { reason } => { + if let StopReason::ExplicitStopAll = reason { + let manual_stop_publisher = manual_stop_publisher(communication)?; + tokio::task::spawn_blocking(manual_stop_publisher) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre::eyre!(err)) + .wrap_err("failed to send stop message")?; + } if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { - tracing::info!("operator {node_id}/{id} finished"); + tracing::info!("operator {node_id}/{id} finished ({reason:?})"); stopped_operators.insert(id.clone()); // send stopped message tokio::task::spawn_blocking(move || stop_publisher.publish(&[])) diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 4d8c246c..9d5e2cf7 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -99,5 +99,12 @@ pub fn spawn_operator( pub enum OperatorEvent { Error(eyre::Error), Panic(Box), - Finished, + Finished { reason: StopReason }, +} + +#[derive(Debug)] +pub enum StopReason { + InputsClosed, + ExplicitStop, + ExplicitStopAll, } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index f40f7251..cfc37aae 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,6 +1,6 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] -use super::{OperatorEvent, Tracer}; +use super::{OperatorEvent, StopReason, Tracer}; use dora_core::{ config::{DataId, NodeId, OperatorId}, descriptor::source_is_url, @@ -9,6 +9,7 @@ use dora_download::download_file; use dora_message::uhlc; use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; +use dora_operator_api_types::DoraStatus; use eyre::{bail, eyre, Context}; use pyo3::{ pyclass, @@ -113,7 +114,9 @@ pub fn spawn( let operator = Python::with_gil(init_operator).wrap_err("failed to init python operator")?; - while let Ok(mut input) = inputs.recv() { + let reason = loop { + let Ok(mut input) = inputs.recv() else {break StopReason::InputsClosed }; + #[cfg(feature = "tracing")] let (_child_cx, string_cx) = { use dora_tracing::{deserialize_context, serialize_context}; @@ -151,11 +154,12 @@ pub fn spawn( let status: i32 = Python::with_gil(|py| status_val.extract(py)) .wrap_err("on_input has invalid return value")?; match status { - 0 => {} // ok - 1 => break, // stop + s if s == DoraStatus::Continue as i32 => {} // ok + s if s == DoraStatus::Stop as i32 => break StopReason::ExplicitStop, + s if s == DoraStatus::StopAll as i32 => break StopReason::ExplicitStopAll, other => bail!("on_input returned invalid status {other}"), } - } + }; // Dropping the operator using Python garbage collector. // Locking the GIL for immediate release. @@ -163,7 +167,7 @@ pub fn spawn( drop(operator); }); - Result::<_, eyre::Report>::Ok(()) + Result::<_, eyre::Report>::Ok(reason) }; thread::spawn(move || { @@ -173,8 +177,8 @@ pub fn spawn( }); match catch_unwind(closure) { - Ok(Ok(())) => { - let _ = events_tx.blocking_send(OperatorEvent::Finished); + Ok(Ok(reason)) => { + let _ = events_tx.blocking_send(OperatorEvent::Finished { reason }); } Ok(Err(err)) => { let _ = events_tx.blocking_send(OperatorEvent::Error(err)); diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 71616bed..e8d40709 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,4 +1,4 @@ -use super::{OperatorEvent, Tracer}; +use super::{OperatorEvent, StopReason, Tracer}; use dora_core::{ adjust_shared_library_path, config::{DataId, NodeId, OperatorId}, @@ -70,8 +70,8 @@ pub fn spawn( operator.run(publishers, tracer) }); match catch_unwind(closure) { - Ok(Ok(())) => { - let _ = events_tx.blocking_send(OperatorEvent::Finished); + Ok(Ok(reason)) => { + let _ = events_tx.blocking_send(OperatorEvent::Finished { reason }); } Ok(Err(err)) => { let _ = events_tx.blocking_send(OperatorEvent::Error(err)); @@ -97,7 +97,7 @@ impl<'lib> SharedLibraryOperator<'lib> { self, publishers: HashMap>, tracer: Tracer, - ) -> eyre::Result<()> { + ) -> eyre::Result { let operator_context = { let DoraInitResult { result, @@ -150,7 +150,10 @@ impl<'lib> SharedLibraryOperator<'lib> { DoraResult { error } }); - while let Ok(input) = self.inputs.recv() { + let reason = loop { + let Ok(input) = self.inputs.recv() else { + break StopReason::InputsClosed + }; #[cfg(feature = "tracing")] let (_child_cx, string_cx) = { use dora_tracing::{deserialize_context, serialize_context}; @@ -197,11 +200,12 @@ impl<'lib> SharedLibraryOperator<'lib> { Some(error) => bail!("on_input failed: {}", String::from(error)), None => match status { DoraStatus::Continue => {} - DoraStatus::Stop => break, + DoraStatus::Stop => break StopReason::ExplicitStop, + DoraStatus::StopAll => break StopReason::ExplicitStopAll, }, } - } - Ok(()) + }; + Ok(reason) } } diff --git a/examples/iceoryx/node/src/main.rs b/examples/iceoryx/node/src/main.rs index 03692e9d..7e8f92fb 100644 --- a/examples/iceoryx/node/src/main.rs +++ b/examples/iceoryx/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, core::config::DataId, DoraNode}; +use dora_node_api::{self, dora_core::config::DataId, DoraNode}; fn main() -> eyre::Result<()> { let output = DataId::from("random".to_owned()); diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index 03692e9d..7e8f92fb 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, core::config::DataId, DoraNode}; +use dora_node_api::{self, dora_core::config::DataId, DoraNode}; fn main() -> eyre::Result<()> { let output = DataId::from("random".to_owned());