From 71ea44ad017b36c8d933a91fde873fdcd881084e Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 13 Jun 2024 16:45:40 +0200 Subject: [PATCH] Mark node failures as cascading on init errors caused by other nodes The init function returns an error if another node exited before initialization. In this case, we consider the subsequent error as a cascading error and skip it when printing the error message to the user. --- binaries/daemon/src/lib.rs | 30 +++++++++++++++++++++--------- binaries/daemon/src/pending.rs | 20 ++++++++++++++------ 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index dd38543b..05cfee01 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -654,7 +654,8 @@ impl Daemon { ) .await?; match status { - DataflowStatus::AllNodesReady => { + DataflowStatus::AllNodesReady { cascading_errors } => { + dataflow.cascading_errors.extend(cascading_errors); tracing::info!( "all nodes are ready, starting dataflow `{dataflow_id}`" ); @@ -1050,14 +1051,22 @@ impl Daemon { tracing::info!("node {dataflow_id}/{node_id} finished successfully"); Ok(()) } - exit_status => Err(NodeError { - timestamp: self.clock.new_timestamp(), - cause: NodeErrorCause::Other { - // TODO: load from file - stderr: String::new(), - }, - exit_status, - }), + exit_status => { + let cause = match self.running.get_mut(&dataflow_id) { + Some(dataflow) if dataflow.cascading_errors.contains(&node_id) => { + NodeErrorCause::Cascading + } + _ => NodeErrorCause::Other { + // TODO: load from file + stderr: String::new(), + }, + }; + Err(NodeError { + timestamp: self.clock.new_timestamp(), + cause, + exit_status, + }) + } }; self.dataflow_node_results @@ -1297,6 +1306,8 @@ pub struct RunningDataflow { /// /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. empty_set: BTreeSet, + + cascading_errors: BTreeSet, } impl RunningDataflow { @@ -1315,6 +1326,7 @@ impl RunningDataflow { _timer_handles: Vec::new(), stop_sent: false, empty_set: BTreeSet::new(), + cascading_errors: BTreeSet::new(), } } diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index ccba1a56..87bba6e9 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use dora_core::{ config::NodeId, @@ -113,15 +113,18 @@ impl PendingNodes { } Ok(DataflowStatus::Pending) } else { - self.answer_subscribe_requests(None).await; - Ok(DataflowStatus::AllNodesReady) + let cascading_errors = self.answer_subscribe_requests(None).await; + Ok(DataflowStatus::AllNodesReady { cascading_errors }) } } else { Ok(DataflowStatus::Pending) } } - async fn answer_subscribe_requests(&mut self, external_error: Option) { + async fn answer_subscribe_requests( + &mut self, + external_error: Option, + ) -> BTreeSet { let result = if self.exited_before_subscribe.is_empty() { match external_error { Some(err) => Err(err), @@ -147,9 +150,14 @@ impl PendingNodes { }; // answer all subscribe requests let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); - for reply_sender in subscribe_replies.into_values() { + let mut cascading_errors = BTreeSet::new(); + for (node_id, reply_sender) in subscribe_replies.into_iter() { + if result.is_err() { + cascading_errors.insert(node_id); + } let _ = reply_sender.send(DaemonReply::Result(result.clone())); } + cascading_errors } async fn report_nodes_ready( @@ -182,6 +190,6 @@ impl PendingNodes { } pub enum DataflowStatus { - AllNodesReady, + AllNodesReady { cascading_errors: BTreeSet }, Pending, }