diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 4db816cd..81bc9699 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -252,9 +252,7 @@ impl Daemon { } Event::CtrlC => { for dataflow in self.running.values_mut() { - for (_node_id, channel) in dataflow.subscribe_channels.drain() { - let _ = channel.send_async(daemon_messages::NodeEvent::Stop).await; - } + dataflow.stop_all().await; } } } @@ -295,10 +293,7 @@ impl Daemon { .running .get_mut(&dataflow_id) .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?; - - for (_node_id, channel) in dataflow.subscribe_channels.drain() { - let _ = channel.send_async(daemon_messages::NodeEvent::Stop).await; - } + dataflow.stop_all().await; Result::<(), eyre::Report>::Ok(()) }; let reply = DaemonCoordinatorReply::StopResult( @@ -408,15 +403,7 @@ impl Daemon { ) -> eyre::Result<()> { match event { DaemonNodeEvent::Subscribe { event_sender } => { - let result = match self.running.get_mut(&dataflow_id) { - Some(dataflow) => { - dataflow.subscribe_channels.insert(node_id, event_sender); - Ok(()) - } - None => Err(format!( - "subscribe failed: no running dataflow with ID `{dataflow_id}`" - )), - }; + let result = self.subscribe(dataflow_id, node_id, event_sender).await; let _ = reply_sender.send(DaemonReply::Result(result)); } DaemonNodeEvent::CloseOutputs(outputs) => { @@ -448,6 +435,55 @@ impl Daemon { Ok(()) } + async fn subscribe( + &mut self, + dataflow_id: Uuid, + node_id: NodeId, + event_sender: flume::Sender, + ) -> Result<(), String> { + let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { + format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") + })?; + + // some inputs might have been closed already -> report those events + let closed_inputs = dataflow + .mappings + .values() + .flatten() + .filter(|(node, _)| node == &node_id) + .map(|(_, input)| input) + .filter(|input| { + dataflow + .open_inputs + .get(&node_id) + .map(|open_inputs| !open_inputs.contains(*input)) + .unwrap_or(true) + }); + for input_id in closed_inputs { + let _ = event_sender + .send_async(daemon_messages::NodeEvent::InputClosed { + id: input_id.clone(), + }) + .await; + } + + // if a stop event was already sent for the dataflow, send it to + // the newly connected node too + if dataflow.stop_sent { + let _ = event_sender + .send_async(daemon_messages::NodeEvent::Stop) + .await; + } + + if dataflow.stop_sent || dataflow.open_inputs(&node_id).is_empty() { + tracing::debug!("Received subscribe message for closed event stream"); + } else { + dataflow.subscribe_channels.insert(node_id, event_sender); + } + + Ok(()) + } + #[tracing::instrument(skip(self))] async fn handle_node_stop( &mut self, @@ -781,6 +817,25 @@ pub struct RunningDataflow { running_nodes: BTreeSet, /// Keep handles to all timer tasks of this dataflow to cancel them on drop. _timer_handles: Vec>, + stop_sent: bool, + + /// Used in `open_inputs`. + /// + /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. + empty_set: BTreeSet, +} + +impl RunningDataflow { + async fn stop_all(&mut self) { + for (_node_id, channel) in self.subscribe_channels.drain() { + let _ = channel.send_async(daemon_messages::NodeEvent::Stop).await; + } + self.stop_sent = true; + } + + fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet { + self.open_inputs.get(node_id).unwrap_or(&self.empty_set) + } } type OutputId = (NodeId, DataId);