diff --git a/binaries/daemon/src/main.rs b/binaries/daemon/src/main.rs index 4286fd36..46a464d2 100644 --- a/binaries/daemon/src/main.rs +++ b/binaries/daemon/src/main.rs @@ -160,10 +160,9 @@ impl Daemon { } } - let task = spawn::spawn_node(dataflow_id, params, self.port) + spawn::spawn_node(dataflow_id, params, self.port, self.dora_events_tx.clone()) .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?; - dataflow.node_tasks.insert(node_id, task); } // spawn timer tasks @@ -311,6 +310,12 @@ impl Daemon { } // TODO: notify remote nodes + + dataflow.subscribe_channels.remove(&node_id); + if dataflow.subscribe_channels.is_empty() { + tracing::info!("Dataflow `{dataflow_id}` finished"); + self.running.remove(&dataflow_id); + } } } Ok(()) @@ -353,17 +358,42 @@ impl Daemon { for id in closed { dataflow.subscribe_channels.remove(id); } - - Ok(()) + } + DoraEvent::SpawnedNodeResult { + dataflow_id, + node_id, + result, + } => { + if self + .running + .get(&dataflow_id) + .and_then(|d| d.subscribe_channels.get(&node_id)) + .is_some() + { + tracing::warn!( + "node `{dataflow_id}/{node_id}` finished without sending `Stopped` message" + ); + } + match result { + Ok(()) => { + tracing::info!("node {dataflow_id}/{node_id} finished"); + } + Err(err) => { + tracing::error!( + "{:?}", + err.wrap_err(format!("error in node `{dataflow_id}/{node_id}`")) + ); + } + } } } + Ok(()) } } #[derive(Default)] pub struct RunningDataflow { subscribe_channels: HashMap>, - node_tasks: HashMap>>, mappings: HashMap>, timers: BTreeMap>, } @@ -406,6 +436,11 @@ pub enum DoraEvent { interval: Duration, metadata: dora_message::Metadata<'static>, }, + SpawnedNodeResult { + dataflow_id: DataflowId, + node_id: NodeId, + result: eyre::Result<()>, + }, } type MessageId = String; diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 5fcc8237..3fd98429 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,3 +1,4 @@ +use crate::DoraEvent; use dora_core::{ daemon_messages::{DataflowId, NodeConfig, SpawnNodeParams}, descriptor::{resolve_path, source_is_url}, @@ -5,13 +6,15 @@ use dora_core::{ use dora_download::download_file; use eyre::{eyre, WrapErr}; use std::{env::consts::EXE_EXTENSION, path::Path}; +use tokio::sync::mpsc; #[tracing::instrument] pub async fn spawn_node( dataflow_id: DataflowId, params: SpawnNodeParams, daemon_port: u16, -) -> eyre::Result>> { + result_tx: mpsc::Sender, +) -> eyre::Result<()> { let SpawnNodeParams { node_id, node, @@ -63,16 +66,26 @@ pub async fn spawn_node( node.args.as_deref().unwrap_or_default() ) })?; - let result = tokio::spawn(async move { + let node_id_cloned = node_id.clone(); + let wait_task = async move { let status = child.wait().await.context("child process failed")?; if status.success() { - tracing::info!("node {node_id} finished"); Ok(()) } else if let Some(code) = status.code() { Err(eyre!("node {node_id} failed with exit code: {code}")) } else { Err(eyre!("node {node_id} failed (unknown exit code)")) } + }; + tokio::spawn(async move { + let result = wait_task.await; + let _ = result_tx + .send(DoraEvent::SpawnedNodeResult { + dataflow_id, + node_id: node_id_cloned, + result, + }) + .await; }); - Ok(result) + Ok(()) }