|
|
|
@@ -325,7 +325,9 @@ impl Daemon { |
|
|
|
} |
|
|
|
Event::CtrlC => { |
|
|
|
for dataflow in self.running.values_mut() { |
|
|
|
dataflow.stop_all(&self.clock, None).await; |
|
|
|
dataflow |
|
|
|
.stop_all(&mut self.coordinator_connection, &self.clock, None) |
|
|
|
.await?; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@@ -496,7 +498,13 @@ impl Daemon { |
|
|
|
.send(Some(reply)) |
|
|
|
.map_err(|_| error!("could not send stop reply from daemon to coordinator")); |
|
|
|
|
|
|
|
dataflow.stop_all(&self.clock, grace_duration).await; |
|
|
|
dataflow |
|
|
|
.stop_all( |
|
|
|
&mut self.coordinator_connection, |
|
|
|
&self.clock, |
|
|
|
grace_duration, |
|
|
|
) |
|
|
|
.await?; |
|
|
|
RunStatus::Continue |
|
|
|
} |
|
|
|
DaemonCoordinatorEvent::Destroy => { |
|
|
|
@@ -640,6 +648,10 @@ impl Daemon { |
|
|
|
if local { |
|
|
|
dataflow.pending_nodes.insert(node.id.clone()); |
|
|
|
|
|
|
|
if node.kind.dynamic() { |
|
|
|
dataflow.dynamic_nodes.insert(node.id.clone()); |
|
|
|
} |
|
|
|
|
|
|
|
let node_id = node.id.clone(); |
|
|
|
let node_stderr_most_recent = dataflow |
|
|
|
.node_stderr_most_recent |
|
|
|
@@ -1464,6 +1476,12 @@ pub struct RunningDataflow { |
|
|
|
open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>, |
|
|
|
running_nodes: BTreeMap<NodeId, RunningNode>, |
|
|
|
|
|
|
|
/// List of all dynamic node IDs. |
|
|
|
/// |
|
|
|
/// We want to treat dynamic nodes differently in some cases, so we need |
|
|
|
/// to know which nodes are dynamic. |
|
|
|
dynamic_nodes: BTreeSet<NodeId>, |
|
|
|
|
|
|
|
open_external_mappings: HashMap<OutputId, BTreeMap<String, BTreeSet<InputId>>>, |
|
|
|
|
|
|
|
pending_drop_tokens: HashMap<DropToken, DropTokenInformation>, |
|
|
|
@@ -1495,6 +1513,7 @@ impl RunningDataflow { |
|
|
|
timers: BTreeMap::new(), |
|
|
|
open_inputs: BTreeMap::new(), |
|
|
|
running_nodes: BTreeMap::new(), |
|
|
|
dynamic_nodes: BTreeSet::new(), |
|
|
|
open_external_mappings: HashMap::new(), |
|
|
|
pending_drop_tokens: HashMap::new(), |
|
|
|
_timer_handles: Vec::new(), |
|
|
|
@@ -1559,7 +1578,21 @@ impl RunningDataflow { |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
async fn stop_all(&mut self, clock: &HLC, grace_duration: Option<Duration>) { |
|
|
|
async fn stop_all( |
|
|
|
&mut self, |
|
|
|
coordinator_connection: &mut Option<TcpStream>, |
|
|
|
clock: &HLC, |
|
|
|
grace_duration: Option<Duration>, |
|
|
|
) -> eyre::Result<()> { |
|
|
|
self.pending_nodes |
|
|
|
.handle_dataflow_stop( |
|
|
|
coordinator_connection, |
|
|
|
clock, |
|
|
|
&mut self.cascading_error_causes, |
|
|
|
&self.dynamic_nodes, |
|
|
|
) |
|
|
|
.await?; |
|
|
|
|
|
|
|
for (_node_id, channel) in self.subscribe_channels.drain() { |
|
|
|
let _ = send_with_timestamp(&channel, daemon_messages::NodeEvent::Stop, clock); |
|
|
|
} |
|
|
|
@@ -1586,6 +1619,7 @@ impl RunningDataflow { |
|
|
|
} |
|
|
|
}); |
|
|
|
self.stop_sent = true; |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> { |
|
|
|
|