Browse Source

Fix: Keep track of `InputClosed` and `Stop` messages and send them on subscribe

These two messages can be essential for correctness. For example, a node might not finish properly when an `InputClosed` event is lost. So we need to always send them, even if the target node was not subscribed yet when the event occurred.
tags/v0.2.0-candidate
Philipp Oppermann 2 years ago
parent
commit
9439b08f16
Failed to extract signature
1 changed files with 71 additions and 16 deletions
  1. +71
    -16
      binaries/daemon/src/lib.rs

+ 71
- 16
binaries/daemon/src/lib.rs View File

@@ -252,9 +252,7 @@ impl Daemon {
}
Event::CtrlC => {
for dataflow in self.running.values_mut() {
for (_node_id, channel) in dataflow.subscribe_channels.drain() {
let _ = channel.send_async(daemon_messages::NodeEvent::Stop).await;
}
dataflow.stop_all().await;
}
}
}
@@ -295,10 +293,7 @@ impl Daemon {
.running
.get_mut(&dataflow_id)
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;

for (_node_id, channel) in dataflow.subscribe_channels.drain() {
let _ = channel.send_async(daemon_messages::NodeEvent::Stop).await;
}
dataflow.stop_all().await;
Result::<(), eyre::Report>::Ok(())
};
let reply = DaemonCoordinatorReply::StopResult(
@@ -408,15 +403,7 @@ impl Daemon {
) -> eyre::Result<()> {
match event {
DaemonNodeEvent::Subscribe { event_sender } => {
let result = match self.running.get_mut(&dataflow_id) {
Some(dataflow) => {
dataflow.subscribe_channels.insert(node_id, event_sender);
Ok(())
}
None => Err(format!(
"subscribe failed: no running dataflow with ID `{dataflow_id}`"
)),
};
let result = self.subscribe(dataflow_id, node_id, event_sender).await;
let _ = reply_sender.send(DaemonReply::Result(result));
}
DaemonNodeEvent::CloseOutputs(outputs) => {
@@ -448,6 +435,55 @@ impl Daemon {
Ok(())
}

async fn subscribe(
&mut self,
dataflow_id: Uuid,
node_id: NodeId,
event_sender: flume::Sender<daemon_messages::NodeEvent>,
) -> Result<(), String> {
let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
})?;

// some inputs might have been closed already -> report those events
let closed_inputs = dataflow
.mappings
.values()
.flatten()
.filter(|(node, _)| node == &node_id)
.map(|(_, input)| input)
.filter(|input| {
dataflow
.open_inputs
.get(&node_id)
.map(|open_inputs| !open_inputs.contains(*input))
.unwrap_or(true)
});
for input_id in closed_inputs {
let _ = event_sender
.send_async(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
})
.await;
}

// if a stop event was already sent for the dataflow, send it to
// the newly connected node too
if dataflow.stop_sent {
let _ = event_sender
.send_async(daemon_messages::NodeEvent::Stop)
.await;
}

if dataflow.stop_sent || dataflow.open_inputs(&node_id).is_empty() {
tracing::debug!("Received subscribe message for closed event stream");
} else {
dataflow.subscribe_channels.insert(node_id, event_sender);
}

Ok(())
}

#[tracing::instrument(skip(self))]
async fn handle_node_stop(
&mut self,
@@ -781,6 +817,25 @@ pub struct RunningDataflow {
running_nodes: BTreeSet<NodeId>,
/// Keep handles to all timer tasks of this dataflow to cancel them on drop.
_timer_handles: Vec<futures::future::RemoteHandle<()>>,
stop_sent: bool,

/// Used in `open_inputs`.
///
/// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable.
empty_set: BTreeSet<DataId>,
}

impl RunningDataflow {
async fn stop_all(&mut self) {
for (_node_id, channel) in self.subscribe_channels.drain() {
let _ = channel.send_async(daemon_messages::NodeEvent::Stop).await;
}
self.stop_sent = true;
}

fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> {
self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
}
}

type OutputId = (NodeId, DataId);


Loading…
Cancel
Save