The init function returns an error if another node exited before initialization. In this case, we consider the subsequent error as a cascading error and skip it when printing the error message to the user.tags/v0.3.5-rc0
| @@ -654,7 +654,8 @@ impl Daemon { | |||
| ) | |||
| .await?; | |||
| match status { | |||
| DataflowStatus::AllNodesReady => { | |||
| DataflowStatus::AllNodesReady { cascading_errors } => { | |||
| dataflow.cascading_errors.extend(cascading_errors); | |||
| tracing::info!( | |||
| "all nodes are ready, starting dataflow `{dataflow_id}`" | |||
| ); | |||
| @@ -1050,14 +1051,22 @@ impl Daemon { | |||
| tracing::info!("node {dataflow_id}/{node_id} finished successfully"); | |||
| Ok(()) | |||
| } | |||
| exit_status => Err(NodeError { | |||
| timestamp: self.clock.new_timestamp(), | |||
| cause: NodeErrorCause::Other { | |||
| // TODO: load from file | |||
| stderr: String::new(), | |||
| }, | |||
| exit_status, | |||
| }), | |||
| exit_status => { | |||
| let cause = match self.running.get_mut(&dataflow_id) { | |||
| Some(dataflow) if dataflow.cascading_errors.contains(&node_id) => { | |||
| NodeErrorCause::Cascading | |||
| } | |||
| _ => NodeErrorCause::Other { | |||
| // TODO: load from file | |||
| stderr: String::new(), | |||
| }, | |||
| }; | |||
| Err(NodeError { | |||
| timestamp: self.clock.new_timestamp(), | |||
| cause, | |||
| exit_status, | |||
| }) | |||
| } | |||
| }; | |||
| self.dataflow_node_results | |||
| @@ -1297,6 +1306,8 @@ pub struct RunningDataflow { | |||
| /// | |||
| /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. | |||
| empty_set: BTreeSet<DataId>, | |||
| cascading_errors: BTreeSet<NodeId>, | |||
| } | |||
| impl RunningDataflow { | |||
| @@ -1315,6 +1326,7 @@ impl RunningDataflow { | |||
| _timer_handles: Vec::new(), | |||
| stop_sent: false, | |||
| empty_set: BTreeSet::new(), | |||
| cascading_errors: BTreeSet::new(), | |||
| } | |||
| } | |||
| @@ -1,4 +1,4 @@ | |||
| use std::collections::{HashMap, HashSet}; | |||
| use std::collections::{BTreeSet, HashMap, HashSet}; | |||
| use dora_core::{ | |||
| config::NodeId, | |||
| @@ -113,15 +113,18 @@ impl PendingNodes { | |||
| } | |||
| Ok(DataflowStatus::Pending) | |||
| } else { | |||
| self.answer_subscribe_requests(None).await; | |||
| Ok(DataflowStatus::AllNodesReady) | |||
| let cascading_errors = self.answer_subscribe_requests(None).await; | |||
| Ok(DataflowStatus::AllNodesReady { cascading_errors }) | |||
| } | |||
| } else { | |||
| Ok(DataflowStatus::Pending) | |||
| } | |||
| } | |||
| async fn answer_subscribe_requests(&mut self, external_error: Option<String>) { | |||
| async fn answer_subscribe_requests( | |||
| &mut self, | |||
| external_error: Option<String>, | |||
| ) -> BTreeSet<NodeId> { | |||
| let result = if self.exited_before_subscribe.is_empty() { | |||
| match external_error { | |||
| Some(err) => Err(err), | |||
| @@ -147,9 +150,14 @@ impl PendingNodes { | |||
| }; | |||
| // answer all subscribe requests | |||
| let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); | |||
| for reply_sender in subscribe_replies.into_values() { | |||
| let mut cascading_errors = BTreeSet::new(); | |||
| for (node_id, reply_sender) in subscribe_replies.into_iter() { | |||
| if result.is_err() { | |||
| cascading_errors.insert(node_id); | |||
| } | |||
| let _ = reply_sender.send(DaemonReply::Result(result.clone())); | |||
| } | |||
| cascading_errors | |||
| } | |||
| async fn report_nodes_ready( | |||
| @@ -182,6 +190,6 @@ impl PendingNodes { | |||
| } | |||
| pub enum DataflowStatus { | |||
| AllNodesReady, | |||
| AllNodesReady { cascading_errors: BTreeSet<NodeId> }, | |||
| Pending, | |||
| } | |||