diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index dd38543b..05cfee01 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -654,7 +654,8 @@ impl Daemon { ) .await?; match status { - DataflowStatus::AllNodesReady => { + DataflowStatus::AllNodesReady { cascading_errors } => { + dataflow.cascading_errors.extend(cascading_errors); tracing::info!( "all nodes are ready, starting dataflow `{dataflow_id}`" ); @@ -1050,14 +1051,22 @@ impl Daemon { tracing::info!("node {dataflow_id}/{node_id} finished successfully"); Ok(()) } - exit_status => Err(NodeError { - timestamp: self.clock.new_timestamp(), - cause: NodeErrorCause::Other { - // TODO: load from file - stderr: String::new(), - }, - exit_status, - }), + exit_status => { + let cause = match self.running.get_mut(&dataflow_id) { + Some(dataflow) if dataflow.cascading_errors.contains(&node_id) => { + NodeErrorCause::Cascading + } + _ => NodeErrorCause::Other { + // TODO: load from file + stderr: String::new(), + }, + }; + Err(NodeError { + timestamp: self.clock.new_timestamp(), + cause, + exit_status, + }) + } }; self.dataflow_node_results @@ -1297,6 +1306,8 @@ pub struct RunningDataflow { /// /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. empty_set: BTreeSet, + + cascading_errors: BTreeSet, } impl RunningDataflow { @@ -1315,6 +1326,7 @@ impl RunningDataflow { _timer_handles: Vec::new(), stop_sent: false, empty_set: BTreeSet::new(), + cascading_errors: BTreeSet::new(), } } diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index ccba1a56..87bba6e9 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use dora_core::{ config::NodeId, @@ -113,15 +113,18 @@ impl PendingNodes { } Ok(DataflowStatus::Pending) } else { - self.answer_subscribe_requests(None).await; - Ok(DataflowStatus::AllNodesReady) + let cascading_errors = self.answer_subscribe_requests(None).await; + Ok(DataflowStatus::AllNodesReady { cascading_errors }) } } else { Ok(DataflowStatus::Pending) } } - async fn answer_subscribe_requests(&mut self, external_error: Option) { + async fn answer_subscribe_requests( + &mut self, + external_error: Option, + ) -> BTreeSet { let result = if self.exited_before_subscribe.is_empty() { match external_error { Some(err) => Err(err), @@ -147,9 +150,14 @@ impl PendingNodes { }; // answer all subscribe requests let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); - for reply_sender in subscribe_replies.into_values() { + let mut cascading_errors = BTreeSet::new(); + for (node_id, reply_sender) in subscribe_replies.into_iter() { + if result.is_err() { + cascading_errors.insert(node_id); + } let _ = reply_sender.send(DaemonReply::Result(result.clone())); } + cascading_errors } async fn report_nodes_ready( @@ -182,6 +190,6 @@ impl PendingNodes { } pub enum DataflowStatus { - AllNodesReady, + AllNodesReady { cascading_errors: BTreeSet }, Pending, }