|
|
|
@@ -1169,12 +1169,16 @@ impl Daemon { |
|
|
|
dataflow.cascading_error_causes.error_caused_by(&node_id) |
|
|
|
}) |
|
|
|
.cloned(); |
|
|
|
let grace_duration_kill = dataflow |
|
|
|
.map(|d| d.grace_duration_kills.contains(&node_id)) |
|
|
|
.unwrap_or_default(); |
|
|
|
|
|
|
|
let cause = match caused_by_node { |
|
|
|
Some(caused_by_node) => { |
|
|
|
tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"); |
|
|
|
NodeErrorCause::Cascading { caused_by_node } |
|
|
|
} |
|
|
|
None if grace_duration_kill => NodeErrorCause::GraceDuration, |
|
|
|
None => NodeErrorCause::Other { |
|
|
|
stderr: dataflow |
|
|
|
.and_then(|d| d.node_stderr_most_recent.get(&node_id)) |
|
|
|
@@ -1410,6 +1414,7 @@ pub struct RunningDataflow { |
|
|
|
|
|
|
|
/// Contains the node that caused the error for nodes that experienced a cascading error. |
|
|
|
cascading_error_causes: CascadingErrorCauses, |
|
|
|
grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>, |
|
|
|
|
|
|
|
node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>, |
|
|
|
} |
|
|
|
@@ -1431,6 +1436,7 @@ impl RunningDataflow { |
|
|
|
stop_sent: false, |
|
|
|
empty_set: BTreeSet::new(), |
|
|
|
cascading_error_causes: Default::default(), |
|
|
|
grace_duration_kills: Default::default(), |
|
|
|
node_stderr_most_recent: BTreeMap::new(), |
|
|
|
} |
|
|
|
} |
|
|
|
@@ -1494,6 +1500,7 @@ impl RunningDataflow { |
|
|
|
} |
|
|
|
|
|
|
|
let running_nodes = self.running_nodes.clone(); |
|
|
|
let grace_duration_kills = self.grace_duration_kills.clone(); |
|
|
|
tokio::spawn(async move { |
|
|
|
let duration = grace_duration.unwrap_or(Duration::from_millis(500)); |
|
|
|
tokio::time::sleep(duration).await; |
|
|
|
@@ -1503,6 +1510,7 @@ impl RunningDataflow { |
|
|
|
for (node, node_details) in running_nodes.iter() { |
|
|
|
if let Some(pid) = node_details.pid { |
|
|
|
if let Some(process) = system.process(Pid::from(pid as usize)) { |
|
|
|
grace_duration_kills.insert(node.clone()); |
|
|
|
process.kill(); |
|
|
|
warn!( |
|
|
|
"{node} was killed due to not stopping within the {:#?} grace period", |
|
|
|
|