diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 9b9e81ff..bb6a3aa9 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -379,7 +379,10 @@ impl Daemon { Some(dataflow) => { dataflow .pending_nodes - .handle_external_all_nodes_ready(success) + .handle_external_all_nodes_ready( + success, + &mut dataflow.cascading_errors, + ) .await?; if success { tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); @@ -633,6 +636,7 @@ impl Daemon { &node_id, &mut self.coordinator_connection, &self.clock, + &mut dataflow.cascading_errors, ) .await?; } @@ -739,11 +743,11 @@ impl Daemon { reply_sender, &mut self.coordinator_connection, &self.clock, + &mut dataflow.cascading_errors, ) .await?; match status { - DataflowStatus::AllNodesReady { cascading_errors } => { - dataflow.cascading_errors.extend(cascading_errors); + DataflowStatus::AllNodesReady => { tracing::info!( "all nodes are ready, starting dataflow `{dataflow_id}`" ); @@ -994,7 +998,12 @@ impl Daemon { dataflow .pending_nodes - .handle_node_stop(node_id, &mut self.coordinator_connection, &self.clock) + .handle_node_stop( + node_id, + &mut self.coordinator_connection, + &self.clock, + &mut dataflow.cascading_errors, + ) .await?; Self::handle_outputs_done( diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 87bba6e9..599ff85b 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -61,12 +61,13 @@ impl PendingNodes { reply_sender: oneshot::Sender, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut BTreeSet, ) -> eyre::Result { self.waiting_subscribers .insert(node_id.clone(), reply_sender); self.local_nodes.remove(&node_id); - self.update_dataflow_status(coordinator_connection, clock) + self.update_dataflow_status(coordinator_connection, clock, cascading_errors) .await } @@ -75,17 +76,22 @@ impl PendingNodes { node_id: &NodeId, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut BTreeSet, ) -> eyre::Result<()> { if self.local_nodes.remove(node_id) { tracing::warn!("node `{node_id}` exited before initializing dora connection"); self.exited_before_subscribe.insert(node_id.clone()); - self.update_dataflow_status(coordinator_connection, clock) + self.update_dataflow_status(coordinator_connection, clock, cascading_errors) .await?; } Ok(()) } - pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> { + pub async fn handle_external_all_nodes_ready( + &mut self, + success: bool, + cascading_errors: &mut BTreeSet, + ) -> eyre::Result<()> { if !self.local_nodes.is_empty() { bail!("received external `all_nodes_ready` event before local nodes were ready"); } @@ -94,7 +100,8 @@ impl PendingNodes { } else { Some("some nodes failed to initialize on remote machines".to_string()) }; - self.answer_subscribe_requests(external_error).await; + self.answer_subscribe_requests(external_error, cascading_errors) + .await; Ok(()) } @@ -103,6 +110,7 @@ impl PendingNodes { &mut self, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut BTreeSet, ) -> eyre::Result { if self.local_nodes.is_empty() { if self.external_nodes { @@ -113,8 +121,8 @@ impl PendingNodes { } Ok(DataflowStatus::Pending) } else { - let cascading_errors = self.answer_subscribe_requests(None).await; - Ok(DataflowStatus::AllNodesReady { cascading_errors }) + self.answer_subscribe_requests(None, cascading_errors).await; + Ok(DataflowStatus::AllNodesReady) } } else { Ok(DataflowStatus::Pending) @@ -124,7 +132,8 @@ impl PendingNodes { async fn answer_subscribe_requests( &mut self, external_error: Option, - ) -> BTreeSet { + cascading_errors: &mut BTreeSet, + ) { let result = if self.exited_before_subscribe.is_empty() { match external_error { Some(err) => Err(err), @@ -150,14 +159,12 @@ impl PendingNodes { }; // answer all subscribe requests let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); - let mut cascading_errors = BTreeSet::new(); for (node_id, reply_sender) in subscribe_replies.into_iter() { if result.is_err() { cascading_errors.insert(node_id); } let _ = reply_sender.send(DaemonReply::Result(result.clone())); } - cascading_errors } async fn report_nodes_ready( @@ -190,6 +197,6 @@ impl PendingNodes { } pub enum DataflowStatus { - AllNodesReady { cascading_errors: BTreeSet }, + AllNodesReady, Pending, }