| @@ -223,18 +223,22 @@ async fn start_inner( | |||
| Event::Dataflow { uuid, event } => match event { | |||
| DataflowEvent::ReadyOnMachine { | |||
| machine_id, | |||
| success, | |||
| exited_before_subscribe, | |||
| } => { | |||
| match running_dataflows.entry(uuid) { | |||
| std::collections::hash_map::Entry::Occupied(mut entry) => { | |||
| let dataflow = entry.get_mut(); | |||
| dataflow.pending_machines.remove(&machine_id); | |||
| dataflow.init_success &= success; | |||
| dataflow | |||
| .exited_before_subscribe | |||
| .extend(exited_before_subscribe); | |||
| if dataflow.pending_machines.is_empty() { | |||
| let message = serde_json::to_vec(&Timestamped { | |||
| inner: DaemonCoordinatorEvent::AllNodesReady { | |||
| dataflow_id: uuid, | |||
| success: dataflow.init_success, | |||
| exited_before_subscribe: dataflow | |||
| .exited_before_subscribe | |||
| .clone(), | |||
| }, | |||
| timestamp: clock.new_timestamp(), | |||
| }) | |||
| @@ -674,7 +678,7 @@ struct RunningDataflow { | |||
| machines: BTreeSet<String>, | |||
| /// IDs of machines that are waiting until all nodes are started. | |||
| pending_machines: BTreeSet<String>, | |||
| init_success: bool, | |||
| exited_before_subscribe: Vec<NodeId>, | |||
| nodes: Vec<ResolvedNode>, | |||
| reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>, | |||
| @@ -873,7 +877,7 @@ async fn start_dataflow( | |||
| } else { | |||
| BTreeSet::new() | |||
| }, | |||
| init_success: true, | |||
| exited_before_subscribe: Default::default(), | |||
| machines, | |||
| nodes, | |||
| reply_senders: Vec::new(), | |||
| @@ -944,7 +948,7 @@ pub enum DataflowEvent { | |||
| }, | |||
| ReadyOnMachine { | |||
| machine_id: String, | |||
| success: bool, | |||
| exited_before_subscribe: Vec<NodeId>, | |||
| }, | |||
| } | |||
| @@ -66,13 +66,13 @@ pub async fn handle_connection( | |||
| coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { | |||
| coordinator_messages::DaemonEvent::AllNodesReady { | |||
| dataflow_id, | |||
| success, | |||
| exited_before_subscribe, | |||
| } => { | |||
| let event = Event::Dataflow { | |||
| uuid: dataflow_id, | |||
| event: DataflowEvent::ReadyOnMachine { | |||
| machine_id, | |||
| success, | |||
| exited_before_subscribe, | |||
| }, | |||
| }; | |||
| if events_tx.send(event).await.is_err() { | |||
| @@ -373,18 +373,19 @@ impl Daemon { | |||
| } | |||
| DaemonCoordinatorEvent::AllNodesReady { | |||
| dataflow_id, | |||
| success, | |||
| exited_before_subscribe, | |||
| } => { | |||
| match self.running.get_mut(&dataflow_id) { | |||
| Some(dataflow) => { | |||
| let ready = exited_before_subscribe.is_empty(); | |||
| dataflow | |||
| .pending_nodes | |||
| .handle_external_all_nodes_ready( | |||
| success, | |||
| &mut dataflow.cascading_errors, | |||
| exited_before_subscribe, | |||
| &mut dataflow.cascading_error_causes, | |||
| ) | |||
| .await?; | |||
| if success { | |||
| if ready { | |||
| tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); | |||
| dataflow.start(&self.events_tx, &self.clock).await?; | |||
| } | |||
| @@ -636,7 +637,7 @@ impl Daemon { | |||
| &node_id, | |||
| &mut self.coordinator_connection, | |||
| &self.clock, | |||
| &mut dataflow.cascading_errors, | |||
| &mut dataflow.cascading_error_causes, | |||
| ) | |||
| .await?; | |||
| } | |||
| @@ -743,7 +744,7 @@ impl Daemon { | |||
| reply_sender, | |||
| &mut self.coordinator_connection, | |||
| &self.clock, | |||
| &mut dataflow.cascading_errors, | |||
| &mut dataflow.cascading_error_causes, | |||
| ) | |||
| .await?; | |||
| match status { | |||
| @@ -1002,7 +1003,7 @@ impl Daemon { | |||
| node_id, | |||
| &mut self.coordinator_connection, | |||
| &self.clock, | |||
| &mut dataflow.cascading_errors, | |||
| &mut dataflow.cascading_error_causes, | |||
| ) | |||
| .await?; | |||
| @@ -1153,11 +1154,20 @@ impl Daemon { | |||
| Ok(()) | |||
| } | |||
| exit_status => { | |||
| let cause = match self.running.get_mut(&dataflow_id) { | |||
| Some(dataflow) if dataflow.cascading_errors.contains(&node_id) => { | |||
| NodeErrorCause::Cascading | |||
| let caused_by_node = self | |||
| .running | |||
| .get_mut(&dataflow_id) | |||
| .and_then(|dataflow| { | |||
| dataflow.cascading_error_causes.error_caused_by(&node_id) | |||
| }) | |||
| .cloned(); | |||
| let cause = match caused_by_node { | |||
| Some(caused_by_node) => { | |||
| tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"); | |||
| NodeErrorCause::Cascading { caused_by_node } | |||
| } | |||
| _ => NodeErrorCause::Other { | |||
| None => NodeErrorCause::Other { | |||
| // TODO: load from file | |||
| stderr: String::new(), | |||
| }, | |||
| @@ -1382,7 +1392,8 @@ pub struct RunningDataflow { | |||
| /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. | |||
| empty_set: BTreeSet<DataId>, | |||
| cascading_errors: BTreeSet<NodeId>, | |||
| /// Contains the node that caused the error for nodes that experienced a cascading error. | |||
| cascading_error_causes: CascadingErrorCauses, | |||
| } | |||
| impl RunningDataflow { | |||
| @@ -1401,7 +1412,7 @@ impl RunningDataflow { | |||
| _timer_handles: Vec::new(), | |||
| stop_sent: false, | |||
| empty_set: BTreeSet::new(), | |||
| cascading_errors: BTreeSet::new(), | |||
| cascading_error_causes: Default::default(), | |||
| } | |||
| } | |||
| @@ -1651,3 +1662,23 @@ fn set_up_ctrlc_handler( | |||
| Ok(ReceiverStream::new(ctrlc_rx)) | |||
| } | |||
| #[derive(Debug, Default, Clone, PartialEq, Eq)] | |||
| pub struct CascadingErrorCauses { | |||
| caused_by: BTreeMap<NodeId, NodeId>, | |||
| } | |||
| impl CascadingErrorCauses { | |||
| pub fn experienced_cascading_error(&self, node: &NodeId) -> bool { | |||
| self.caused_by.contains_key(node) | |||
| } | |||
| /// Return the ID of the node that caused a cascading error for the given node, if any. | |||
| pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> { | |||
| self.caused_by.get(node) | |||
| } | |||
| pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) { | |||
| self.caused_by.entry(affected_node).or_insert(causing_node); | |||
| } | |||
| } | |||
| @@ -1,4 +1,4 @@ | |||
| use std::collections::{BTreeSet, HashMap, HashSet}; | |||
| use std::collections::{HashMap, HashSet}; | |||
| use dora_core::{ | |||
| config::NodeId, | |||
| @@ -9,7 +9,7 @@ use dora_core::{ | |||
| use eyre::{bail, Context}; | |||
| use tokio::{net::TcpStream, sync::oneshot}; | |||
| use crate::tcp_utils::tcp_send; | |||
| use crate::{tcp_utils::tcp_send, CascadingErrorCauses}; | |||
| pub struct PendingNodes { | |||
| dataflow_id: DataflowId, | |||
| @@ -28,7 +28,7 @@ pub struct PendingNodes { | |||
| /// | |||
| /// If this list is non-empty, we should not start the dataflow at all. Instead, | |||
| /// we report an error to the other nodes. | |||
| exited_before_subscribe: HashSet<NodeId>, | |||
| exited_before_subscribe: Vec<NodeId>, | |||
| /// Whether the local init result was already reported to the coordinator. | |||
| reported_init_to_coordinator: bool, | |||
| @@ -42,7 +42,7 @@ impl PendingNodes { | |||
| local_nodes: HashSet::new(), | |||
| external_nodes: false, | |||
| waiting_subscribers: HashMap::new(), | |||
| exited_before_subscribe: HashSet::new(), | |||
| exited_before_subscribe: Default::default(), | |||
| reported_init_to_coordinator: false, | |||
| } | |||
| } | |||
| @@ -61,7 +61,7 @@ impl PendingNodes { | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| coordinator_connection: &mut Option<TcpStream>, | |||
| clock: &HLC, | |||
| cascading_errors: &mut BTreeSet<NodeId>, | |||
| cascading_errors: &mut CascadingErrorCauses, | |||
| ) -> eyre::Result<DataflowStatus> { | |||
| self.waiting_subscribers | |||
| .insert(node_id.clone(), reply_sender); | |||
| @@ -76,11 +76,11 @@ impl PendingNodes { | |||
| node_id: &NodeId, | |||
| coordinator_connection: &mut Option<TcpStream>, | |||
| clock: &HLC, | |||
| cascading_errors: &mut BTreeSet<NodeId>, | |||
| cascading_errors: &mut CascadingErrorCauses, | |||
| ) -> eyre::Result<()> { | |||
| if self.local_nodes.remove(node_id) { | |||
| tracing::warn!("node `{node_id}` exited before initializing dora connection"); | |||
| self.exited_before_subscribe.insert(node_id.clone()); | |||
| self.exited_before_subscribe.push(node_id.clone()); | |||
| self.update_dataflow_status(coordinator_connection, clock, cascading_errors) | |||
| .await?; | |||
| } | |||
| @@ -89,18 +89,14 @@ impl PendingNodes { | |||
| pub async fn handle_external_all_nodes_ready( | |||
| &mut self, | |||
| success: bool, | |||
| cascading_errors: &mut BTreeSet<NodeId>, | |||
| exited_before_subscribe: Vec<NodeId>, | |||
| cascading_errors: &mut CascadingErrorCauses, | |||
| ) -> eyre::Result<()> { | |||
| if !self.local_nodes.is_empty() { | |||
| bail!("received external `all_nodes_ready` event before local nodes were ready"); | |||
| } | |||
| let external_error = if success { | |||
| None | |||
| } else { | |||
| Some("some nodes failed to initialize on remote machines".to_string()) | |||
| }; | |||
| self.answer_subscribe_requests(external_error, cascading_errors) | |||
| self.answer_subscribe_requests(exited_before_subscribe, cascading_errors) | |||
| .await; | |||
| Ok(()) | |||
| @@ -110,7 +106,7 @@ impl PendingNodes { | |||
| &mut self, | |||
| coordinator_connection: &mut Option<TcpStream>, | |||
| clock: &HLC, | |||
| cascading_errors: &mut BTreeSet<NodeId>, | |||
| cascading_errors: &mut CascadingErrorCauses, | |||
| ) -> eyre::Result<DataflowStatus> { | |||
| if self.local_nodes.is_empty() { | |||
| if self.external_nodes { | |||
| @@ -121,7 +117,8 @@ impl PendingNodes { | |||
| } | |||
| Ok(DataflowStatus::Pending) | |||
| } else { | |||
| self.answer_subscribe_requests(None, cascading_errors).await; | |||
| self.answer_subscribe_requests(Vec::new(), cascading_errors) | |||
| .await; | |||
| Ok(DataflowStatus::AllNodesReady) | |||
| } | |||
| } else { | |||
| @@ -131,37 +128,33 @@ impl PendingNodes { | |||
| async fn answer_subscribe_requests( | |||
| &mut self, | |||
| external_error: Option<String>, | |||
| cascading_errors: &mut BTreeSet<NodeId>, | |||
| exited_before_subscribe_external: Vec<NodeId>, | |||
| cascading_errors: &mut CascadingErrorCauses, | |||
| ) { | |||
| let result = if self.exited_before_subscribe.is_empty() { | |||
| match external_error { | |||
| Some(err) => Err(err), | |||
| None => Ok(()), | |||
| } | |||
| } else { | |||
| let node_id_message = if self.exited_before_subscribe.len() == 1 { | |||
| self.exited_before_subscribe | |||
| .iter() | |||
| .next() | |||
| .map(|node_id| node_id.to_string()) | |||
| .unwrap_or("<node_id>".to_string()) | |||
| } else { | |||
| "<node_id>".to_string() | |||
| }; | |||
| Err(format!( | |||
| let node_exited_before_subscribe = match self.exited_before_subscribe.as_slice() { | |||
| [first, ..] => Some(first), | |||
| [] => match exited_before_subscribe_external.as_slice() { | |||
| [first, ..] => Some(first), | |||
| [] => None, | |||
| }, | |||
| }; | |||
| let result = match &node_exited_before_subscribe { | |||
| Some(causing_node) => Err(format!( | |||
| "Some nodes exited before subscribing to dora: {:?}\n\n\ | |||
| This is typically happens when an initialization error occurs | |||
| in the node or operator. To check the output of the failed | |||
| nodes, run `dora logs {} {node_id_message}`.", | |||
| in the node or operator. To check the output of the causing | |||
| node, run `dora logs {} {causing_node}`.", | |||
| self.exited_before_subscribe, self.dataflow_id | |||
| )) | |||
| )), | |||
| None => Ok(()), | |||
| }; | |||
| // answer all subscribe requests | |||
| let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); | |||
| for (node_id, reply_sender) in subscribe_replies.into_iter() { | |||
| if result.is_err() { | |||
| cascading_errors.insert(node_id); | |||
| if let Some(causing_node) = node_exited_before_subscribe { | |||
| cascading_errors.report_cascading_error(causing_node.clone(), node_id.clone()); | |||
| } | |||
| let _ = reply_sender.send(DaemonReply::Result(result.clone())); | |||
| } | |||
| @@ -176,15 +169,17 @@ impl PendingNodes { | |||
| bail!("no coordinator connection to send AllNodesReady"); | |||
| }; | |||
| let success = self.exited_before_subscribe.is_empty(); | |||
| tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes"); | |||
| tracing::info!( | |||
| "all local nodes are ready (exit before subscribe: {:?}), waiting for remote nodes", | |||
| self.exited_before_subscribe | |||
| ); | |||
| let msg = serde_json::to_vec(&Timestamped { | |||
| inner: CoordinatorRequest::Event { | |||
| machine_id: self.machine_id.clone(), | |||
| event: DaemonEvent::AllNodesReady { | |||
| dataflow_id: self.dataflow_id, | |||
| success, | |||
| exited_before_subscribe: self.exited_before_subscribe.clone(), | |||
| }, | |||
| }, | |||
| timestamp, | |||
| @@ -1,4 +1,4 @@ | |||
| use crate::{daemon_messages::DataflowId, topics::DataflowDaemonResult}; | |||
| use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult}; | |||
| use eyre::eyre; | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||
| @@ -18,7 +18,7 @@ pub enum CoordinatorRequest { | |||
| pub enum DaemonEvent { | |||
| AllNodesReady { | |||
| dataflow_id: DataflowId, | |||
| success: bool, | |||
| exited_before_subscribe: Vec<NodeId>, | |||
| }, | |||
| AllNodesFinished { | |||
| dataflow_id: DataflowId, | |||
| @@ -234,7 +234,7 @@ pub enum DaemonCoordinatorEvent { | |||
| Spawn(SpawnDataflowNodes), | |||
| AllNodesReady { | |||
| dataflow_id: DataflowId, | |||
| success: bool, | |||
| exited_before_subscribe: Vec<NodeId>, | |||
| }, | |||
| StopDataflow { | |||
| dataflow_id: DataflowId, | |||
| @@ -125,7 +125,7 @@ impl std::fmt::Display for RootError<'_> { | |||
| let mut non_cascading: Vec<_> = failed | |||
| .clone() | |||
| .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading)) | |||
| .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. })) | |||
| .collect(); | |||
| non_cascading.sort_by_key(|(_, e)| e.timestamp); | |||
| // try to print earliest non-cascading error | |||
| @@ -194,14 +194,27 @@ impl std::fmt::Display for NodeError { | |||
| write!(f, "exited because of signal {signal_str}") | |||
| } | |||
| NodeExitStatus::Unknown => write!(f, "unknown exit status"), | |||
| }?; | |||
| match &self.cause { | |||
| NodeErrorCause::Cascading { caused_by_node } => write!( | |||
| f, | |||
| "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." | |||
| )?, | |||
| NodeErrorCause::Other { stderr } if stderr.is_empty() => {} | |||
| NodeErrorCause::Other { stderr } => write!(f, "\n\nStderr output:\n{stderr}\n")?, | |||
| } | |||
| Ok(()) | |||
| } | |||
| } | |||
| #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] | |||
| pub enum NodeErrorCause { | |||
| /// Node failed because another node failed before, | |||
| Cascading, | |||
| Cascading { | |||
| caused_by_node: NodeId, | |||
| }, | |||
| Other { | |||
| stderr: String, | |||
| }, | |||