Browse Source

Allow operators to stop the full dataflow

Useful in case of emergencies.
tags/v0.1.0
Philipp Oppermann 3 years ago
parent
commit
5375628930
Failed to extract signature
11 changed files with 74 additions and 36 deletions
  1. +2
    -0
      apis/c/operator/operator_types.h
  2. +1
    -1
      apis/python/node/src/lib.rs
  3. +14
    -1
      apis/rust/node/src/lib.rs
  4. +1
    -0
      apis/rust/operator/types/src/lib.rs
  5. +8
    -12
      binaries/coordinator/src/lib.rs
  6. +14
    -3
      binaries/runtime/src/main.rs
  7. +8
    -1
      binaries/runtime/src/operator/mod.rs
  8. +12
    -8
      binaries/runtime/src/operator/python.rs
  9. +12
    -8
      binaries/runtime/src/operator/shared_lib.rs
  10. +1
    -1
      examples/iceoryx/node/src/main.rs
  11. +1
    -1
      examples/rust-dataflow/node/src/main.rs

+ 2
- 0
apis/c/operator/operator_types.h View File

@@ -68,6 +68,8 @@ enum DoraStatus {
DORA_STATUS_CONTINUE = 0,
/** <No documentation available> */
DORA_STATUS_STOP = 1,
/** <No documentation available> */
DORA_STATUS_STOP_ALL = 2,
}
#ifndef DOXYGEN
; typedef uint8_t


+ 1
- 1
apis/python/node/src/lib.rs View File

@@ -1,6 +1,6 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use dora_node_api::{core::config::NodeId, DoraNode, Input};
use dora_node_api::{dora_core::config::NodeId, DoraNode, Input};
use dora_operator_api_python::{metadata_to_pydict, pydict_to_metadata};
use eyre::{Context, Result};
use flume::Receiver;


+ 14
- 1
apis/rust/node/src/lib.rs View File

@@ -1,7 +1,7 @@
pub use communication::Input;
use communication::STOP_TOPIC;
use communication_layer_pub_sub::CommunicationLayer;
pub use dora_core as core;
pub use dora_core;
use dora_core::config::{CommunicationConfig, DataId, NodeId, NodeRunConfig};
pub use dora_message::{uhlc, Metadata, MetadataParameters};
use eyre::WrapErr;
@@ -144,6 +144,19 @@ fn set_up_tracing() -> eyre::Result<()> {
.context("failed to set tracing global subscriber")
}

#[must_use]
pub fn manual_stop_publisher(
communication: &mut dyn CommunicationLayer,
) -> eyre::Result<impl FnOnce() -> Result<(), BoxError>> {
let hlc = dora_message::uhlc::HLC::default();
let metadata = dora_message::Metadata::new(hlc.new_timestamp());
let data = metadata.serialize().unwrap();
let publisher = communication
.publisher(dora_core::topics::MANUAL_STOP)
.map_err(|err| eyre::eyre!(err))?;
Ok(move || publisher.publish(&data))
}

#[cfg(test)]
mod tests {
use dora_core::config;


+ 1
- 0
apis/rust/operator/types/src/lib.rs View File

@@ -103,6 +103,7 @@ pub struct OnInputResult {
pub enum DoraStatus {
Continue = 0,
Stop = 1,
StopAll = 2,
}

pub fn generate_headers(target_file: &Path) -> ::std::io::Result<()> {


+ 8
- 12
binaries/coordinator/src/lib.rs View File

@@ -2,11 +2,11 @@ use crate::run::spawn_dataflow;
use dora_core::{
config::CommunicationConfig,
topics::{
self, StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_LIST,
StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_LIST,
ZENOH_CONTROL_START, ZENOH_CONTROL_STOP,
},
};
use dora_node_api::communication;
use dora_node_api::{communication, manual_stop_publisher};
use eyre::{bail, eyre, WrapErr};
use futures::StreamExt;
use futures_concurrency::stream::Merge;
@@ -188,16 +188,12 @@ async fn stop_dataflow(
.wrap_err("failed to join communication layer init task")?
.wrap_err("failed to init communication layer")?;
tracing::info!("sending stop message to dataflow `{uuid}`");
tokio::task::spawn_blocking(move || {
let hlc = dora_message::uhlc::HLC::default();
let metadata = dora_message::Metadata::new(hlc.new_timestamp());
let data = metadata.serialize().unwrap();
communication.publisher(topics::MANUAL_STOP)?.publish(&data)
})
.await
.wrap_err("failed to join stop publish task")?
.map_err(|err| eyre!(err))
.wrap_err("failed to send stop message")?;
let manual_stop_publisher = manual_stop_publisher(communication.as_mut())?;
tokio::task::spawn_blocking(move || manual_stop_publisher())
.await
.wrap_err("failed to join stop publish task")?
.map_err(|err| eyre!(err))
.wrap_err("failed to send stop message")?;
Ok(())
}



+ 14
- 3
binaries/runtime/src/main.rs View File

@@ -7,10 +7,11 @@ use dora_core::{
use dora_node_api::{
self,
communication::{self, CommunicationLayer, Publisher, STOP_TOPIC},
manual_stop_publisher,
};
use eyre::{bail, Context};
use futures::{Stream, StreamExt};
use operator::{spawn_operator, OperatorEvent};
use operator::{spawn_operator, OperatorEvent, StopReason};
use std::{
collections::{BTreeSet, HashMap},
mem,
@@ -76,6 +77,7 @@ fn main() -> eyre::Result<()> {
node_id,
operator_events,
operator_stop_publishers,
communication.as_mut(),
))
}

@@ -83,6 +85,7 @@ async fn run(
node_id: NodeId,
mut events: impl Stream<Item = Event> + Unpin,
mut operator_stop_publishers: HashMap<OperatorId, Box<dyn Publisher>>,
communication: &mut dyn CommunicationLayer,
) -> eyre::Result<()> {
#[cfg(feature = "metrics")]
let _started = {
@@ -106,9 +109,17 @@ async fn run(
bail!(err.wrap_err(format!("operator {id} failed")))
}
OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload),
OperatorEvent::Finished => {
OperatorEvent::Finished { reason } => {
if let StopReason::ExplicitStopAll = reason {
let manual_stop_publisher = manual_stop_publisher(communication)?;
tokio::task::spawn_blocking(manual_stop_publisher)
.await
.wrap_err("failed to join stop publish task")?
.map_err(|err| eyre::eyre!(err))
.wrap_err("failed to send stop message")?;
}
if let Some(stop_publisher) = operator_stop_publishers.remove(&id) {
tracing::info!("operator {node_id}/{id} finished");
tracing::info!("operator {node_id}/{id} finished ({reason:?})");
stopped_operators.insert(id.clone());
// send stopped message
tokio::task::spawn_blocking(move || stop_publisher.publish(&[]))


+ 8
- 1
binaries/runtime/src/operator/mod.rs View File

@@ -99,5 +99,12 @@ pub fn spawn_operator(
pub enum OperatorEvent {
Error(eyre::Error),
Panic(Box<dyn Any + Send>),
Finished,
Finished { reason: StopReason },
}

#[derive(Debug)]
pub enum StopReason {
InputsClosed,
ExplicitStop,
ExplicitStopAll,
}

+ 12
- 8
binaries/runtime/src/operator/python.rs View File

@@ -1,6 +1,6 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use super::{OperatorEvent, Tracer};
use super::{OperatorEvent, StopReason, Tracer};
use dora_core::{
config::{DataId, NodeId, OperatorId},
descriptor::source_is_url,
@@ -9,6 +9,7 @@ use dora_download::download_file;
use dora_message::uhlc;
use dora_node_api::communication::Publisher;
use dora_operator_api_python::metadata_to_pydict;
use dora_operator_api_types::DoraStatus;
use eyre::{bail, eyre, Context};
use pyo3::{
pyclass,
@@ -113,7 +114,9 @@ pub fn spawn(
let operator =
Python::with_gil(init_operator).wrap_err("failed to init python operator")?;

while let Ok(mut input) = inputs.recv() {
let reason = loop {
let Ok(mut input) = inputs.recv() else {break StopReason::InputsClosed };

#[cfg(feature = "tracing")]
let (_child_cx, string_cx) = {
use dora_tracing::{deserialize_context, serialize_context};
@@ -151,11 +154,12 @@ pub fn spawn(
let status: i32 = Python::with_gil(|py| status_val.extract(py))
.wrap_err("on_input has invalid return value")?;
match status {
0 => {} // ok
1 => break, // stop
s if s == DoraStatus::Continue as i32 => {} // ok
s if s == DoraStatus::Stop as i32 => break StopReason::ExplicitStop,
s if s == DoraStatus::StopAll as i32 => break StopReason::ExplicitStopAll,
other => bail!("on_input returned invalid status {other}"),
}
}
};

// Dropping the operator using Python garbage collector.
// Locking the GIL for immediate release.
@@ -163,7 +167,7 @@ pub fn spawn(
drop(operator);
});

Result::<_, eyre::Report>::Ok(())
Result::<_, eyre::Report>::Ok(reason)
};

thread::spawn(move || {
@@ -173,8 +177,8 @@ pub fn spawn(
});

match catch_unwind(closure) {
Ok(Ok(())) => {
let _ = events_tx.blocking_send(OperatorEvent::Finished);
Ok(Ok(reason)) => {
let _ = events_tx.blocking_send(OperatorEvent::Finished { reason });
}
Ok(Err(err)) => {
let _ = events_tx.blocking_send(OperatorEvent::Error(err));


+ 12
- 8
binaries/runtime/src/operator/shared_lib.rs View File

@@ -1,4 +1,4 @@
use super::{OperatorEvent, Tracer};
use super::{OperatorEvent, StopReason, Tracer};
use dora_core::{
adjust_shared_library_path,
config::{DataId, NodeId, OperatorId},
@@ -70,8 +70,8 @@ pub fn spawn(
operator.run(publishers, tracer)
});
match catch_unwind(closure) {
Ok(Ok(())) => {
let _ = events_tx.blocking_send(OperatorEvent::Finished);
Ok(Ok(reason)) => {
let _ = events_tx.blocking_send(OperatorEvent::Finished { reason });
}
Ok(Err(err)) => {
let _ = events_tx.blocking_send(OperatorEvent::Error(err));
@@ -97,7 +97,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
self,
publishers: HashMap<DataId, Box<dyn Publisher>>,
tracer: Tracer,
) -> eyre::Result<()> {
) -> eyre::Result<StopReason> {
let operator_context = {
let DoraInitResult {
result,
@@ -150,7 +150,10 @@ impl<'lib> SharedLibraryOperator<'lib> {
DoraResult { error }
});

while let Ok(input) = self.inputs.recv() {
let reason = loop {
let Ok(input) = self.inputs.recv() else {
break StopReason::InputsClosed
};
#[cfg(feature = "tracing")]
let (_child_cx, string_cx) = {
use dora_tracing::{deserialize_context, serialize_context};
@@ -197,11 +200,12 @@ impl<'lib> SharedLibraryOperator<'lib> {
Some(error) => bail!("on_input failed: {}", String::from(error)),
None => match status {
DoraStatus::Continue => {}
DoraStatus::Stop => break,
DoraStatus::Stop => break StopReason::ExplicitStop,
DoraStatus::StopAll => break StopReason::ExplicitStopAll,
},
}
}
Ok(())
};
Ok(reason)
}
}



+ 1
- 1
examples/iceoryx/node/src/main.rs View File

@@ -1,4 +1,4 @@
use dora_node_api::{self, core::config::DataId, DoraNode};
use dora_node_api::{self, dora_core::config::DataId, DoraNode};

fn main() -> eyre::Result<()> {
let output = DataId::from("random".to_owned());


+ 1
- 1
examples/rust-dataflow/node/src/main.rs View File

@@ -1,4 +1,4 @@
use dora_node_api::{self, core::config::DataId, DoraNode};
use dora_node_api::{self, dora_core::config::DataId, DoraNode};

fn main() -> eyre::Result<()> {
let output = DataId::from("random".to_owned());


Loading…
Cancel
Save