diff --git a/Cargo.lock b/Cargo.lock index ad71dee9..0043ef5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1925,6 +1925,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -2321,6 +2331,7 @@ dependencies = [ "async-trait", "bincode", "crossbeam", + "crossbeam-skiplist", "ctrlc", "dora-arrow-convert", "dora-core", @@ -9605,7 +9616,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 274c68bd..2f1be890 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -40,3 +40,4 @@ ctrlc = "3.2.5" which = "5.0.0" sysinfo = "0.30.11" crossbeam = "0.8.4" +crossbeam-skiplist = "0.1.3" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index f31e284f..d06453b5 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1169,12 +1169,16 @@ impl Daemon { dataflow.cascading_error_causes.error_caused_by(&node_id) }) .cloned(); + let grace_duration_kill = dataflow + .map(|d| d.grace_duration_kills.contains(&node_id)) + .unwrap_or_default(); let cause = match caused_by_node { Some(caused_by_node) => { tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"); NodeErrorCause::Cascading { caused_by_node } } + None if grace_duration_kill => NodeErrorCause::GraceDuration, None => NodeErrorCause::Other { stderr: dataflow .and_then(|d| d.node_stderr_most_recent.get(&node_id)) @@ -1410,6 +1414,7 @@ pub struct RunningDataflow { /// Contains the node that caused the error for nodes that experienced a cascading error. cascading_error_causes: CascadingErrorCauses, + grace_duration_kills: Arc>, node_stderr_most_recent: BTreeMap>>, } @@ -1431,6 +1436,7 @@ impl RunningDataflow { stop_sent: false, empty_set: BTreeSet::new(), cascading_error_causes: Default::default(), + grace_duration_kills: Default::default(), node_stderr_most_recent: BTreeMap::new(), } } @@ -1494,6 +1500,7 @@ impl RunningDataflow { } let running_nodes = self.running_nodes.clone(); + let grace_duration_kills = self.grace_duration_kills.clone(); tokio::spawn(async move { let duration = grace_duration.unwrap_or(Duration::from_millis(500)); tokio::time::sleep(duration).await; @@ -1503,6 +1510,7 @@ impl RunningDataflow { for (node, node_details) in running_nodes.iter() { if let Some(pid) = node_details.pid { if let Some(process) = system.process(Pid::from(pid as usize)) { + grace_duration_kills.insert(node.clone()); process.kill(); warn!( "{node} was killed due to not stopping within the {:#?} grace period", diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index dd708292..8b587427 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -150,6 +150,10 @@ impl std::fmt::Display for NodeError { }?; match &self.cause { + NodeErrorCause::GraceDuration => write!( + f, + "\n\nThe node was killed by dora because it didn't react to a stop message in time." + )?, NodeErrorCause::Cascading { caused_by_node } => write!( f, "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." @@ -167,6 +171,8 @@ impl std::fmt::Display for NodeError { #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum NodeErrorCause { + /// Node was killed because it didn't react to a stop message in time. + GraceDuration, /// Node failed because another node failed before, Cascading { caused_by_node: NodeId,