diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 0afb2e44..c71ab10d 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -467,7 +467,9 @@ impl Daemon { dataflow_id, exited_before_subscribe, } => { - tracing::debug!("received AllNodesReady (dataflow id: {dataflow_id}, exited_before_subscribe: {exited_before_subscribe:?})"); + self.send_log_message(log(LogLevel::Debug, dataflow_id, + format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})" + ))).await?; match self.running.get_mut(&dataflow_id) { Some(dataflow) => { let ready = exited_before_subscribe.is_empty(); @@ -479,7 +481,13 @@ impl Daemon { ) .await?; if ready { - tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); + self.send_log_message(log( + LogLevel::Info, + dataflow_id, + "coordinator reported that all nodes are ready, starting dataflow", + )) + .await?; + let dataflow = self.running.get_mut(&dataflow_id).unwrap(); // reborrow dataflow.start(&self.events_tx, &self.clock).await?; } } @@ -633,7 +641,13 @@ impl Daemon { .await .wrap_err("failed to forward remote output to local receivers") { - tracing::warn!("{err:?}") + self.send_log_message(log_node( + LogLevel::Warn, + dataflow_id, + node_id, + format!("{err:?}"), + )) + .await?; } Ok(()) } @@ -642,8 +656,14 @@ impl Daemon { node_id, output_id, } => { - let output_id = OutputId(node_id, output_id); - tracing::debug!(?dataflow_id, ?output_id, "received OutputClosed event"); + let output_id = OutputId(node_id.clone(), output_id); + self.send_log_message(log_node( + LogLevel::Debug, + dataflow_id, + node_id.clone(), + format!("received OutputClosed event for output {output_id:?}"), + )) + .await?; let inner = async { let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!("send out failed: no running dataflow with ID `{dataflow_id}`") @@ -660,7 +680,13 @@ impl Daemon { .await .wrap_err("failed to handle InputsClosed event sent by coordinator") { - tracing::warn!("{err:?}") + self.send_log_message(log_node( + LogLevel::Warn, + dataflow_id, + node_id, + format!("{err:?}"), + )) + .await?; } Ok(()) } @@ -922,6 +948,14 @@ impl Daemon { event_sender, reply_sender, } => { + self.send_log_message(log_node( + LogLevel::Info, + dataflow_id, + node_id.clone(), + "node is ready", + )) + .await?; + let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") }); @@ -931,7 +965,6 @@ impl Daemon { let _ = reply_sender.send(DaemonReply::Result(Err(err))); } Ok(dataflow) => { - tracing::info!("node `{node_id}` is ready"); Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await; let status = dataflow @@ -946,9 +979,13 @@ impl Daemon { .await?; match status { DataflowStatus::AllNodesReady => { - tracing::info!( - "all nodes are ready, starting dataflow `{dataflow_id}`" - ); + self.send_log_message(log( + LogLevel::Info, + dataflow_id, + "all nodes are ready, starting dataflow", + )) + .await?; + let dataflow = self.running.get_mut(&dataflow_id).unwrap(); // reborrow dataflow.start(&self.events_tx, &self.clock).await?; } DataflowStatus::Pending => {} @@ -1293,10 +1330,12 @@ impl Daemon { .clone(), }; - tracing::info!( - "Dataflow `{dataflow_id}` finished on machine `{}`", - self.daemon_id - ); + self.send_log_message(log( + LogLevel::Info, + dataflow_id, + format!("dataflow finished on machine `{}`", self.daemon_id), + )) + .await?; if let Some(connection) = &mut self.coordinator_connection { let msg = serde_json::to_vec(&Timestamped { inner: CoordinatorRequest::Event { @@ -1416,9 +1455,14 @@ impl Daemon { node_id, exit_status, } => { - tracing::debug!( - "handling node stop {dataflow_id}/{node_id} with exit status {exit_status:?}" - ); + self.send_log_message(log_node( + LogLevel::Debug, + dataflow_id, + node_id.clone(), + format!("handling node stop with exit status {exit_status:?}"), + )) + .await?; + let node_result = match exit_status { NodeExitStatus::Success => Ok(()), exit_status => { @@ -1434,7 +1478,7 @@ impl Daemon { let cause = match caused_by_node { Some(caused_by_node) => { - tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"); + self.send_log_message(log_node(LogLevel::Info, dataflow_id,node_id.clone(), format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"))).await?; NodeErrorCause::Cascading { caused_by_node } } None if grace_duration_kill => NodeErrorCause::GraceDuration, @@ -2149,3 +2193,34 @@ impl CoreNodeKindExt for CoreNodeKind { } } } + +fn log(level: LogLevel, dataflow_id: Uuid, message: impl Into) -> LogMessage { + LogMessage { + dataflow_id, + node_id: None, + level, + target: Some("deamon".into()), + module_path: None, + file: None, + line: None, + message: message.into(), + } +} + +fn log_node( + level: LogLevel, + dataflow_id: Uuid, + node_id: NodeId, + message: impl Into, +) -> LogMessage { + LogMessage { + dataflow_id, + node_id: Some(node_id), + level, + target: Some("deamon".into()), + module_path: None, + file: None, + line: None, + message: message.into(), + } +}