diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index b375c7c8..54d7284e 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -325,7 +325,9 @@ impl Daemon { } Event::CtrlC => { for dataflow in self.running.values_mut() { - dataflow.stop_all(&self.clock, None).await; + dataflow + .stop_all(&mut self.coordinator_connection, &self.clock, None) + .await?; } } } @@ -496,7 +498,13 @@ impl Daemon { .send(Some(reply)) .map_err(|_| error!("could not send stop reply from daemon to coordinator")); - dataflow.stop_all(&self.clock, grace_duration).await; + dataflow + .stop_all( + &mut self.coordinator_connection, + &self.clock, + grace_duration, + ) + .await?; RunStatus::Continue } DaemonCoordinatorEvent::Destroy => { @@ -640,6 +648,10 @@ impl Daemon { if local { dataflow.pending_nodes.insert(node.id.clone()); + if node.kind.dynamic() { + dataflow.dynamic_nodes.insert(node.id.clone()); + } + let node_id = node.id.clone(); let node_stderr_most_recent = dataflow .node_stderr_most_recent @@ -1464,6 +1476,12 @@ pub struct RunningDataflow { open_inputs: BTreeMap>, running_nodes: BTreeMap, + /// List of all dynamic node IDs. + /// + /// We want to treat dynamic nodes differently in some cases, so we need + /// to know which nodes are dynamic. + dynamic_nodes: BTreeSet, + open_external_mappings: HashMap>>, pending_drop_tokens: HashMap, @@ -1495,6 +1513,7 @@ impl RunningDataflow { timers: BTreeMap::new(), open_inputs: BTreeMap::new(), running_nodes: BTreeMap::new(), + dynamic_nodes: BTreeSet::new(), open_external_mappings: HashMap::new(), pending_drop_tokens: HashMap::new(), _timer_handles: Vec::new(), @@ -1559,7 +1578,21 @@ impl RunningDataflow { Ok(()) } - async fn stop_all(&mut self, clock: &HLC, grace_duration: Option) { + async fn stop_all( + &mut self, + coordinator_connection: &mut Option, + clock: &HLC, + grace_duration: Option, + ) -> eyre::Result<()> { + self.pending_nodes + .handle_dataflow_stop( + coordinator_connection, + clock, + &mut self.cascading_error_causes, + &self.dynamic_nodes, + ) + .await?; + for (_node_id, channel) in self.subscribe_channels.drain() { let _ = send_with_timestamp(&channel, daemon_messages::NodeEvent::Stop, clock); } @@ -1586,6 +1619,7 @@ impl RunningDataflow { } }); self.stop_sent = true; + Ok(()) } fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet { diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index f0a92e99..0e42dca3 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use dora_core::{ config::NodeId, @@ -97,6 +97,24 @@ impl PendingNodes { Ok(log) } + pub async fn handle_dataflow_stop( + &mut self, + coordinator_connection: &mut Option, + clock: &HLC, + cascading_errors: &mut CascadingErrorCauses, + dynamic_nodes: &BTreeSet, + ) -> eyre::Result> { + // remove all local dynamic nodes that are not yet started + for node_id in dynamic_nodes { + if self.local_nodes.remove(node_id) { + self.update_dataflow_status(coordinator_connection, clock, cascading_errors) + .await?; + } + } + + Ok(Vec::new()) + } + pub async fn handle_external_all_nodes_ready( &mut self, exited_before_subscribe: Vec,