From 4e908d170981ec01ab4dda2878041d08ff08da76 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 16 Apr 2025 10:39:43 +0200 Subject: [PATCH] Don't finish dataflow while there are still pending nodes --- binaries/daemon/src/lib.rs | 9 +++++---- binaries/daemon/src/pending.rs | 4 ++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index b8d4fa71..c759d63f 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1377,10 +1377,11 @@ impl Daemon { if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) { pid.mark_as_stopped() } - if dataflow - .running_nodes - .iter() - .all(|(_id, n)| n.node_config.dynamic) + if !dataflow.pending_nodes.local_nodes_pending() + && dataflow + .running_nodes + .iter() + .all(|(_id, n)| n.node_config.dynamic) { let result = DataflowDaemonResult { timestamp: self.clock.new_timestamp(), diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 89305d80..757a858d 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -59,6 +59,10 @@ impl PendingNodes { self.external_nodes = value; } + pub fn local_nodes_pending(&self) -> bool { + !self.local_nodes.is_empty() + } + pub async fn handle_node_subscription( &mut self, node_id: NodeId,