diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index ead7d54f..d299060c 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -223,18 +223,22 @@ async fn start_inner( Event::Dataflow { uuid, event } => match event { DataflowEvent::ReadyOnMachine { machine_id, - success, + exited_before_subscribe, } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { let dataflow = entry.get_mut(); dataflow.pending_machines.remove(&machine_id); - dataflow.init_success &= success; + dataflow + .exited_before_subscribe + .extend(exited_before_subscribe); if dataflow.pending_machines.is_empty() { let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::AllNodesReady { dataflow_id: uuid, - success: dataflow.init_success, + exited_before_subscribe: dataflow + .exited_before_subscribe + .clone(), }, timestamp: clock.new_timestamp(), }) @@ -674,7 +678,7 @@ struct RunningDataflow { machines: BTreeSet, /// IDs of machines that are waiting until all nodes are started. pending_machines: BTreeSet, - init_success: bool, + exited_before_subscribe: Vec, nodes: Vec, reply_senders: Vec>>, @@ -873,7 +877,7 @@ async fn start_dataflow( } else { BTreeSet::new() }, - init_success: true, + exited_before_subscribe: Default::default(), machines, nodes, reply_senders: Vec::new(), @@ -944,7 +948,7 @@ pub enum DataflowEvent { }, ReadyOnMachine { machine_id: String, - success: bool, + exited_before_subscribe: Vec, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index df105691..f6f9b56c 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -66,13 +66,13 @@ pub async fn handle_connection( coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { coordinator_messages::DaemonEvent::AllNodesReady { dataflow_id, - success, + exited_before_subscribe, } => { let event = Event::Dataflow { uuid: dataflow_id, event: DataflowEvent::ReadyOnMachine { machine_id, - success, + exited_before_subscribe, }, }; if events_tx.send(event).await.is_err() { diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index bb6a3aa9..d4b4d71f 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -373,18 +373,19 @@ impl Daemon { } DaemonCoordinatorEvent::AllNodesReady { dataflow_id, - success, + exited_before_subscribe, } => { match self.running.get_mut(&dataflow_id) { Some(dataflow) => { + let ready = exited_before_subscribe.is_empty(); dataflow .pending_nodes .handle_external_all_nodes_ready( - success, - &mut dataflow.cascading_errors, + exited_before_subscribe, + &mut dataflow.cascading_error_causes, ) .await?; - if success { + if ready { tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); dataflow.start(&self.events_tx, &self.clock).await?; } @@ -636,7 +637,7 @@ impl Daemon { &node_id, &mut self.coordinator_connection, &self.clock, - &mut dataflow.cascading_errors, + &mut dataflow.cascading_error_causes, ) .await?; } @@ -743,7 +744,7 @@ impl Daemon { reply_sender, &mut self.coordinator_connection, &self.clock, - &mut dataflow.cascading_errors, + &mut dataflow.cascading_error_causes, ) .await?; match status { @@ -1002,7 +1003,7 @@ impl Daemon { node_id, &mut self.coordinator_connection, &self.clock, - &mut dataflow.cascading_errors, + &mut dataflow.cascading_error_causes, ) .await?; @@ -1153,11 +1154,20 @@ impl Daemon { Ok(()) } exit_status => { - let cause = match self.running.get_mut(&dataflow_id) { - Some(dataflow) if dataflow.cascading_errors.contains(&node_id) => { - NodeErrorCause::Cascading + let caused_by_node = self + .running + .get_mut(&dataflow_id) + .and_then(|dataflow| { + dataflow.cascading_error_causes.error_caused_by(&node_id) + }) + .cloned(); + + let cause = match caused_by_node { + Some(caused_by_node) => { + tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"); + NodeErrorCause::Cascading { caused_by_node } } - _ => NodeErrorCause::Other { + None => NodeErrorCause::Other { // TODO: load from file stderr: String::new(), }, @@ -1382,7 +1392,8 @@ pub struct RunningDataflow { /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. empty_set: BTreeSet, - cascading_errors: BTreeSet, + /// Contains the node that caused the error for nodes that experienced a cascading error. + cascading_error_causes: CascadingErrorCauses, } impl RunningDataflow { @@ -1401,7 +1412,7 @@ impl RunningDataflow { _timer_handles: Vec::new(), stop_sent: false, empty_set: BTreeSet::new(), - cascading_errors: BTreeSet::new(), + cascading_error_causes: Default::default(), } } @@ -1651,3 +1662,23 @@ fn set_up_ctrlc_handler( Ok(ReceiverStream::new(ctrlc_rx)) } + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct CascadingErrorCauses { + caused_by: BTreeMap, +} + +impl CascadingErrorCauses { + pub fn experienced_cascading_error(&self, node: &NodeId) -> bool { + self.caused_by.contains_key(node) + } + + /// Return the ID of the node that caused a cascading error for the given node, if any. + pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> { + self.caused_by.get(node) + } + + pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) { + self.caused_by.entry(affected_node).or_insert(causing_node); + } +} diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 599ff85b..1feb070b 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use dora_core::{ config::NodeId, @@ -9,7 +9,7 @@ use dora_core::{ use eyre::{bail, Context}; use tokio::{net::TcpStream, sync::oneshot}; -use crate::tcp_utils::tcp_send; +use crate::{tcp_utils::tcp_send, CascadingErrorCauses}; pub struct PendingNodes { dataflow_id: DataflowId, @@ -28,7 +28,7 @@ pub struct PendingNodes { /// /// If this list is non-empty, we should not start the dataflow at all. Instead, /// we report an error to the other nodes. - exited_before_subscribe: HashSet, + exited_before_subscribe: Vec, /// Whether the local init result was already reported to the coordinator. reported_init_to_coordinator: bool, @@ -42,7 +42,7 @@ impl PendingNodes { local_nodes: HashSet::new(), external_nodes: false, waiting_subscribers: HashMap::new(), - exited_before_subscribe: HashSet::new(), + exited_before_subscribe: Default::default(), reported_init_to_coordinator: false, } } @@ -61,7 +61,7 @@ impl PendingNodes { reply_sender: oneshot::Sender, coordinator_connection: &mut Option, clock: &HLC, - cascading_errors: &mut BTreeSet, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result { self.waiting_subscribers .insert(node_id.clone(), reply_sender); @@ -76,11 +76,11 @@ impl PendingNodes { node_id: &NodeId, coordinator_connection: &mut Option, clock: &HLC, - cascading_errors: &mut BTreeSet, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result<()> { if self.local_nodes.remove(node_id) { tracing::warn!("node `{node_id}` exited before initializing dora connection"); - self.exited_before_subscribe.insert(node_id.clone()); + self.exited_before_subscribe.push(node_id.clone()); self.update_dataflow_status(coordinator_connection, clock, cascading_errors) .await?; } @@ -89,18 +89,14 @@ impl PendingNodes { pub async fn handle_external_all_nodes_ready( &mut self, - success: bool, - cascading_errors: &mut BTreeSet, + exited_before_subscribe: Vec, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result<()> { if !self.local_nodes.is_empty() { bail!("received external `all_nodes_ready` event before local nodes were ready"); } - let external_error = if success { - None - } else { - Some("some nodes failed to initialize on remote machines".to_string()) - }; - self.answer_subscribe_requests(external_error, cascading_errors) + + self.answer_subscribe_requests(exited_before_subscribe, cascading_errors) .await; Ok(()) @@ -110,7 +106,7 @@ impl PendingNodes { &mut self, coordinator_connection: &mut Option, clock: &HLC, - cascading_errors: &mut BTreeSet, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result { if self.local_nodes.is_empty() { if self.external_nodes { @@ -121,7 +117,8 @@ impl PendingNodes { } Ok(DataflowStatus::Pending) } else { - self.answer_subscribe_requests(None, cascading_errors).await; + self.answer_subscribe_requests(Vec::new(), cascading_errors) + .await; Ok(DataflowStatus::AllNodesReady) } } else { @@ -131,37 +128,33 @@ impl PendingNodes { async fn answer_subscribe_requests( &mut self, - external_error: Option, - cascading_errors: &mut BTreeSet, + exited_before_subscribe_external: Vec, + cascading_errors: &mut CascadingErrorCauses, ) { - let result = if self.exited_before_subscribe.is_empty() { - match external_error { - Some(err) => Err(err), - None => Ok(()), - } - } else { - let node_id_message = if self.exited_before_subscribe.len() == 1 { - self.exited_before_subscribe - .iter() - .next() - .map(|node_id| node_id.to_string()) - .unwrap_or("".to_string()) - } else { - "".to_string() - }; - Err(format!( + let node_exited_before_subscribe = match self.exited_before_subscribe.as_slice() { + [first, ..] => Some(first), + [] => match exited_before_subscribe_external.as_slice() { + [first, ..] => Some(first), + [] => None, + }, + }; + + let result = match &node_exited_before_subscribe { + Some(causing_node) => Err(format!( "Some nodes exited before subscribing to dora: {:?}\n\n\ This is typically happens when an initialization error occurs - in the node or operator. To check the output of the failed - nodes, run `dora logs {} {node_id_message}`.", + in the node or operator. To check the output of the causing + node, run `dora logs {} {causing_node}`.", self.exited_before_subscribe, self.dataflow_id - )) + )), + None => Ok(()), }; + // answer all subscribe requests let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); for (node_id, reply_sender) in subscribe_replies.into_iter() { - if result.is_err() { - cascading_errors.insert(node_id); + if let Some(causing_node) = node_exited_before_subscribe { + cascading_errors.report_cascading_error(causing_node.clone(), node_id.clone()); } let _ = reply_sender.send(DaemonReply::Result(result.clone())); } @@ -176,15 +169,17 @@ impl PendingNodes { bail!("no coordinator connection to send AllNodesReady"); }; - let success = self.exited_before_subscribe.is_empty(); - tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes"); + tracing::info!( + "all local nodes are ready (exit before subscribe: {:?}), waiting for remote nodes", + self.exited_before_subscribe + ); let msg = serde_json::to_vec(&Timestamped { inner: CoordinatorRequest::Event { machine_id: self.machine_id.clone(), event: DaemonEvent::AllNodesReady { dataflow_id: self.dataflow_id, - success, + exited_before_subscribe: self.exited_before_subscribe.clone(), }, }, timestamp, diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 1c5118cc..5a4a1db9 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,4 +1,4 @@ -use crate::{daemon_messages::DataflowId, topics::DataflowDaemonResult}; +use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult}; use eyre::eyre; #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -18,7 +18,7 @@ pub enum CoordinatorRequest { pub enum DaemonEvent { AllNodesReady { dataflow_id: DataflowId, - success: bool, + exited_before_subscribe: Vec, }, AllNodesFinished { dataflow_id: DataflowId, diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index ae6fc262..ce6c459a 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -234,7 +234,7 @@ pub enum DaemonCoordinatorEvent { Spawn(SpawnDataflowNodes), AllNodesReady { dataflow_id: DataflowId, - success: bool, + exited_before_subscribe: Vec, }, StopDataflow { dataflow_id: DataflowId, diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 539a8e97..a2b21e8a 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -125,7 +125,7 @@ impl std::fmt::Display for RootError<'_> { let mut non_cascading: Vec<_> = failed .clone() - .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading)) + .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. })) .collect(); non_cascading.sort_by_key(|(_, e)| e.timestamp); // try to print earliest non-cascading error @@ -194,14 +194,27 @@ impl std::fmt::Display for NodeError { write!(f, "exited because of signal {signal_str}") } NodeExitStatus::Unknown => write!(f, "unknown exit status"), + }?; + + match &self.cause { + NodeErrorCause::Cascading { caused_by_node } => write!( + f, + "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." + )?, + NodeErrorCause::Other { stderr } if stderr.is_empty() => {} + NodeErrorCause::Other { stderr } => write!(f, "\n\nStderr output:\n{stderr}\n")?, } + + Ok(()) } } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum NodeErrorCause { /// Node failed because another node failed before, - Cascading, + Cascading { + caused_by_node: NodeId, + }, Other { stderr: String, },