diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 9130b4cc..364ca6ae 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -106,6 +106,12 @@ pub struct Daemon { type DaemonRunResult = BTreeMap>>; +struct NodePrepareTask { + node_id: NodeId, + dynamic_node: bool, + task: F, +} + impl Daemon { pub async fn run( coordinator_addr: SocketAddr, @@ -393,6 +399,7 @@ impl Daemon { Event::SpawnNodeResult { dataflow_id, node_id, + dynamic_node, result, } => match result { Ok(running_node) => { @@ -407,7 +414,8 @@ impl Daemon { .entry(dataflow_id) .or_default() .insert(node_id.clone(), Err(error)); - self.handle_node_stop(dataflow_id, &node_id).await?; + self.handle_node_stop(dataflow_id, &node_id, dynamic_node) + .await?; } }, Event::SpawnDataflowResult { @@ -845,7 +853,8 @@ impl Daemon { let mut logger = logger.reborrow().for_node(node.id.clone()); let local = spawn_nodes.contains(&node.id); if local { - if node.kind.dynamic() { + let dynamic_node = node.kind.dynamic(); + if dynamic_node { dataflow.dynamic_nodes.insert(node.id.clone()); } else { dataflow.pending_nodes.insert(node.id.clone()); @@ -872,7 +881,11 @@ impl Daemon { .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) { Ok(result) => { - tasks.push((node_id, result)); + tasks.push(NodePrepareTask { + node_id, + task: result, + dynamic_node, + }); } Err(err) => { logger @@ -889,7 +902,7 @@ impl Daemon { exit_status: NodeExitStatus::Unknown, }), ); - stopped.push(node_id.clone()); + stopped.push((node_id.clone(), dynamic_node)); } } } else { @@ -947,8 +960,9 @@ impl Daemon { } } } - for node_id in stopped { - self.handle_node_stop(dataflow_id, &node_id).await?; + for (node_id, dynamic) in stopped { + self.handle_node_stop(dataflow_id, &node_id, dynamic) + .await?; } let spawn_result = Self::spawn_prepared_nodes( @@ -965,24 +979,27 @@ impl Daemon { async fn spawn_prepared_nodes( dataflow_id: Uuid, mut logger: DataflowLogger<'_>, - tasks: Vec<( - NodeId, - impl Future>, - )>, + tasks: Vec>>>, events_tx: mpsc::Sender>, clock: Arc, ) -> eyre::Result<()> { - let node_result = |node_id, result| Timestamped { + let node_result = |node_id, dynamic_node, result| Timestamped { inner: Event::SpawnNodeResult { dataflow_id, node_id, + dynamic_node, result, }, timestamp: clock.new_timestamp(), }; let mut failed_to_prepare = None; let mut prepared_nodes = Vec::new(); - for (node_id, task) in tasks { + for task in tasks { + let NodePrepareTask { + node_id, + dynamic_node, + task, + } = task; match task.await { Ok(node) => prepared_nodes.push(node), Err(err) => { @@ -996,7 +1013,9 @@ impl Daemon { )), exit_status: NodeExitStatus::Unknown, }; - let send_result = events_tx.send(node_result(node_id, Err(node_err))).await; + let send_result = events_tx + .send(node_result(node_id, dynamic_node, Err(node_err))) + .await; if send_result.is_err() { tracing::error!("failed to send SpawnNodeResult to main daemon task") } @@ -1016,7 +1035,11 @@ impl Daemon { exit_status: NodeExitStatus::Unknown, }; let send_result = events_tx - .send(node_result(node.node_id().clone(), Err(err))) + .send(node_result( + node.node_id().clone(), + node.dynamic(), + Err(err), + )) .await; if send_result.is_err() { tracing::error!("failed to send SpawnNodeResult to main daemon task") @@ -1038,6 +1061,7 @@ impl Daemon { // spawn the nodes for node in prepared_nodes { let node_id = node.node_id().clone(); + let dynamic_node = node.dynamic(); let mut logger = logger.reborrow().for_node(node_id.clone()); let result = node.spawn(&mut logger).await; let node_spawn_result = match result { @@ -1055,7 +1079,7 @@ impl Daemon { } }; let send_result = events_tx - .send(node_result(node_id, node_spawn_result)) + .send(node_result(node_id, dynamic_node, node_spawn_result)) .await; if send_result.is_err() { tracing::error!("failed to send SpawnNodeResult to main daemon task") @@ -1486,11 +1510,27 @@ impl Daemon { Ok(()) } - async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> { + async fn handle_node_stop( + &mut self, + dataflow_id: Uuid, + node_id: &NodeId, + dynamic_node: bool, + ) -> eyre::Result<()> { let mut logger = self.logger.for_dataflow(dataflow_id); - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { - format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") - })?; + let dataflow = match self.running.get_mut(&dataflow_id) { + Some(dataflow) => dataflow, + None if dynamic_node => { + // The dataflow might be done already as we don't wait for dynamic nodes. In this + // case, we don't need to do anything to handle the node stop. + tracing::debug!( + "dynamic node {dataflow_id}/{node_id} stopped after dataflow was done" + ); + return Ok(()); + } + None => eyre::bail!( + "failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`" + ), + }; dataflow .pending_nodes @@ -1652,6 +1692,7 @@ impl Daemon { DoraEvent::SpawnedNodeResult { dataflow_id, node_id, + dynamic_node, exit_status, } => { let mut logger = self @@ -1739,7 +1780,8 @@ impl Daemon { .or_default() .insert(node_id.clone(), node_result); - self.handle_node_stop(dataflow_id, &node_id).await?; + self.handle_node_stop(dataflow_id, &node_id, dynamic_node) + .await?; if let Some(exit_when_done) = &mut self.exit_when_done { exit_when_done.remove(&(dataflow_id, node_id)); @@ -2227,6 +2269,7 @@ pub enum Event { SpawnNodeResult { dataflow_id: DataflowId, node_id: NodeId, + dynamic_node: bool, result: Result, }, SpawnDataflowResult { @@ -2306,6 +2349,7 @@ pub enum DoraEvent { SpawnedNodeResult { dataflow_id: DataflowId, node_id: NodeId, + dynamic_node: bool, exit_status: NodeExitStatus, }, } diff --git a/binaries/daemon/src/spawn/mod.rs b/binaries/daemon/src/spawn/mod.rs index 1d1eedaa..9bf15360 100644 --- a/binaries/daemon/src/spawn/mod.rs +++ b/binaries/daemon/src/spawn/mod.rs @@ -403,6 +403,10 @@ impl PreparedNode { &self.node.id } + pub fn dynamic(&self) -> bool { + self.node.kind.dynamic() + } + pub async fn spawn(mut self, logger: &mut NodeLogger<'_>) -> eyre::Result { let mut child = match &mut self.command { Some(command) => command.spawn().wrap_err(self.spawn_error_msg)?, @@ -555,6 +559,7 @@ impl PreparedNode { }); let node_id = self.node.id.clone(); + let dynamic_node = self.node.kind.dynamic(); let (log_finish_tx, log_finish_rx) = oneshot::channel(); let clock = self.clock.clone(); let daemon_tx = self.daemon_tx.clone(); @@ -566,6 +571,7 @@ impl PreparedNode { dataflow_id, node_id, exit_status, + dynamic_node, } .into(); let event = Timestamped {