From 29ce4ea7e5ba4ca863153bfb9cbc051d09c8072b Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 26 Feb 2025 19:15:48 +0100 Subject: [PATCH] Introduce logger structs to forward more log messages --- binaries/cli/src/attach.rs | 11 +- binaries/daemon/src/lib.rs | 310 ++++++++++++++------------------ binaries/daemon/src/log.rs | 204 ++++++++++++++++++++- binaries/daemon/src/pending.rs | 60 ++++--- binaries/daemon/src/spawn.rs | 92 +++++++--- libraries/message/src/common.rs | 3 +- 6 files changed, 453 insertions(+), 227 deletions(-) diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 7f65446e..39c3a056 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -156,8 +156,9 @@ pub fn attach_dataflow( Ok(AttachEvent::Control(control_request)) => control_request, Ok(AttachEvent::Log(Ok(log_message))) => { let LogMessage { - dataflow_id: _, + dataflow_id, node_id, + daemon_id, level, target, module_path: _, @@ -171,6 +172,12 @@ pub fn attach_dataflow( log::Level::Info => "INFO ".green(), other => format!("{other:5}").normal(), }; + let dataflow = format!(" dataflow `{dataflow_id}`").cyan(); + let daemon = match daemon_id { + Some(id) => format!(" on daemon `{id}`"), + None => " on default daemon".to_string(), + } + .bright_black(); let node = match node_id { Some(node_id) => format!(" {node_id}").bold(), None => "".normal(), @@ -180,7 +187,7 @@ pub fn attach_dataflow( None => "".normal(), }; - println!("{level}{node}{target}: {message}"); + println!("{level}{dataflow}{daemon}{node}{target}: {message}"); continue; } Ok(AttachEvent::Log(Err(err))) => { diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index c71ab10d..de531fd0 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -17,7 +17,7 @@ use dora_message::{ coordinator_to_cli::DataflowResult, coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes}, daemon_to_coordinator::{ - CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult, LogMessage, + CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult, }, daemon_to_daemon::InterDaemonEvent, daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent}, @@ -30,6 +30,7 @@ use eyre::{bail, eyre, Context, ContextCompat, Result}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; use local_listener::DynamicNodeEventWrapper; +use log::{DaemonLogger, DataflowLogger, Logger}; use pending::PendingNodes; use shared_memory_server::ShmemConf; use socket_stream_utils::socket_stream_send; @@ -94,6 +95,8 @@ pub struct Daemon { zenoh_session: zenoh::Session, remote_daemon_events_tx: Option>>>, + + logger: DaemonLogger, } type DaemonRunResult = BTreeMap>>; @@ -234,6 +237,20 @@ impl Daemon { None => None, }; + // additional connection for logging + let logger_coordinator_connection = match coordinator_addr { + Some(addr) => { + let stream = TcpStream::connect(addr) + .await + .wrap_err("failed to connect log to dora-coordinator")?; + stream + .set_nodelay(true) + .wrap_err("failed to set TCP_NODELAY")?; + Some(stream) + } + None => None, + }; + let zenoh_config = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) { Ok(path) => zenoh::Config::from_file(&path) .map_err(|e| eyre!(e)) @@ -251,6 +268,12 @@ impl Daemon { let (dora_events_tx, dora_events_rx) = mpsc::channel(5); let daemon = Self { + logger: Logger { + coordinator_connection: logger_coordinator_connection, + daemon_id: daemon_id.clone(), + clock: clock.clone(), + } + .for_daemon(daemon_id.clone()), running: HashMap::new(), working_dir: HashMap::new(), events_tx: dora_events_tx, @@ -334,8 +357,14 @@ impl Daemon { Event::CtrlC => { tracing::info!("received ctrlc signal -> stopping all dataflows"); for dataflow in self.running.values_mut() { + let mut logger = self.logger.for_dataflow(dataflow.id); dataflow - .stop_all(&mut self.coordinator_connection, &self.clock, None) + .stop_all( + &mut self.coordinator_connection, + &self.clock, + None, + &mut logger, + ) .await?; } self.exit_when_all_finished = true; @@ -369,55 +398,6 @@ impl Daemon { Ok(self.dataflow_node_results) } - async fn send_log_message(&mut self, message: LogMessage) -> eyre::Result<()> { - if let Some(connection) = &mut self.coordinator_connection { - let msg = serde_json::to_vec(&Timestamped { - inner: CoordinatorRequest::Event { - daemon_id: self.daemon_id.clone(), - event: DaemonEvent::Log(message), - }, - timestamp: self.clock.new_timestamp(), - })?; - socket_stream_send(connection, &msg) - .await - .wrap_err("failed to send log message to dora-coordinator")?; - - if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) { - bail!("lost connection to coordinator") - } - } else { - match message.level { - LogLevel::Error => { - if let Some(node_id) = message.node_id { - tracing::error!("{}/{} errored:", message.dataflow_id.to_string(), node_id); - } - for line in message.message.lines() { - tracing::error!(" {}", line); - } - } - LogLevel::Warn => { - if let Some(node_id) = message.node_id { - tracing::warn!("{}/{} warned:", message.dataflow_id.to_string(), node_id); - } - for line in message.message.lines() { - tracing::warn!(" {}", line); - } - } - LogLevel::Info => { - if let Some(node_id) = message.node_id { - tracing::info!("{}/{} info:", message.dataflow_id.to_string(), node_id); - } - - for line in message.message.lines() { - tracing::info!(" {}", line); - } - } - _ => {} - } - } - Ok(()) - } - async fn handle_coordinator_event( &mut self, event: DaemonCoordinatorEvent, @@ -467,9 +447,11 @@ impl Daemon { dataflow_id, exited_before_subscribe, } => { - self.send_log_message(log(LogLevel::Debug, dataflow_id, + let mut logger = self.logger.for_dataflow(dataflow_id); + logger.log(LogLevel::Debug, None, + Some("daemon".into()), format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})" - ))).await?; + )).await; match self.running.get_mut(&dataflow_id) { Some(dataflow) => { let ready = exited_before_subscribe.is_empty(); @@ -481,13 +463,10 @@ impl Daemon { ) .await?; if ready { - self.send_log_message(log( - LogLevel::Info, - dataflow_id, + logger.log(LogLevel::Info, None, + Some("daemon".into()), "coordinator reported that all nodes are ready, starting dataflow", - )) - .await?; - let dataflow = self.running.get_mut(&dataflow_id).unwrap(); // reborrow + ).await; dataflow.start(&self.events_tx, &self.clock).await?; } } @@ -562,6 +541,7 @@ impl Daemon { dataflow_id, grace_duration, } => { + let mut logger = self.logger.for_dataflow(dataflow_id); let dataflow = self .running .get_mut(&dataflow_id) @@ -572,6 +552,7 @@ impl Daemon { &mut self.coordinator_connection, &self.clock, grace_duration, + &mut logger, ); (Ok(()), Some(future)) } @@ -641,13 +622,10 @@ impl Daemon { .await .wrap_err("failed to forward remote output to local receivers") { - self.send_log_message(log_node( - LogLevel::Warn, - dataflow_id, - node_id, - format!("{err:?}"), - )) - .await?; + let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id); + logger + .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}")) + .await; } Ok(()) } @@ -657,13 +635,18 @@ impl Daemon { output_id, } => { let output_id = OutputId(node_id.clone(), output_id); - self.send_log_message(log_node( - LogLevel::Debug, - dataflow_id, - node_id.clone(), - format!("received OutputClosed event for output {output_id:?}"), - )) - .await?; + let mut logger = self + .logger + .for_dataflow(dataflow_id) + .for_node(node_id.clone()); + logger + .log( + LogLevel::Debug, + Some("daemon".into()), + format!("received OutputClosed event for output {output_id:?}"), + ) + .await; + let inner = async { let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!("send out failed: no running dataflow with ID `{dataflow_id}`") @@ -680,13 +663,9 @@ impl Daemon { .await .wrap_err("failed to handle InputsClosed event sent by coordinator") { - self.send_log_message(log_node( - LogLevel::Warn, - dataflow_id, - node_id, - format!("{err:?}"), - )) - .await?; + logger + .log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}")) + .await; } Ok(()) } @@ -702,6 +681,7 @@ impl Daemon { spawn_nodes: BTreeSet, uv: bool, ) -> eyre::Result<()> { + let mut logger = self.logger.for_dataflow(dataflow_id); let dataflow = RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor); let dataflow = match self.running.entry(dataflow_id) { @@ -714,7 +694,6 @@ impl Daemon { } }; - let mut log_messages = Vec::new(); let mut stopped = Vec::new(); // calculate info about mappings @@ -755,6 +734,7 @@ impl Daemon { // spawn nodes and set up subscriptions for node in nodes.into_values() { + let mut logger = logger.reborrow().for_node(node.id.clone()); let local = spawn_nodes.contains(&node.id); if local { if node.kind.dynamic() { @@ -769,6 +749,9 @@ impl Daemon { .entry(node.id.clone()) .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES))) .clone(); + logger + .log(LogLevel::Info, Some("daemon".into()), "spawning") + .await; match spawn::spawn_node( dataflow_id, &working_dir, @@ -778,6 +761,7 @@ impl Daemon { self.clock.clone(), node_stderr_most_recent, uv, + &mut logger, ) .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) @@ -786,16 +770,9 @@ impl Daemon { dataflow.running_nodes.insert(node_id, running_node); } Err(err) => { - log_messages.push(LogMessage { - dataflow_id, - node_id: Some(node_id.clone()), - level: LogLevel::Error, - target: None, - module_path: None, - file: None, - line: None, - message: format!("{err:?}"), - }); + logger + .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}")) + .await; self.dataflow_node_results .entry(dataflow_id) .or_default() @@ -871,10 +848,6 @@ impl Daemon { self.handle_node_stop(dataflow_id, &node_id).await?; } - for log_message in log_messages { - self.send_log_message(log_message).await?; - } - Ok(()) } @@ -948,13 +921,15 @@ impl Daemon { event_sender, reply_sender, } => { - self.send_log_message(log_node( - LogLevel::Info, - dataflow_id, - node_id.clone(), - "node is ready", - )) - .await?; + let mut logger = self.logger.for_dataflow(dataflow_id); + logger + .log( + LogLevel::Info, + Some(node_id.clone()), + Some("daemon".into()), + "node is ready", + ) + .await; let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") @@ -975,17 +950,19 @@ impl Daemon { &mut self.coordinator_connection, &self.clock, &mut dataflow.cascading_error_causes, + &mut logger, ) .await?; match status { DataflowStatus::AllNodesReady => { - self.send_log_message(log( - LogLevel::Info, - dataflow_id, - "all nodes are ready, starting dataflow", - )) - .await?; - let dataflow = self.running.get_mut(&dataflow_id).unwrap(); // reborrow + logger + .log( + LogLevel::Info, + None, + Some("daemon".into()), + "all nodes are ready, starting dataflow", + ) + .await; dataflow.start(&self.events_tx, &self.clock).await?; } DataflowStatus::Pending => {} @@ -1294,22 +1271,25 @@ impl Daemon { } async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> { + let mut logger = self.logger.for_dataflow(dataflow_id); let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") })?; - let log_messages = dataflow + dataflow .pending_nodes .handle_node_stop( node_id, &mut self.coordinator_connection, &self.clock, &mut dataflow.cascading_error_causes, + &mut logger, ) .await?; self.handle_outputs_done(dataflow_id, node_id).await?; + let mut logger = self.logger.for_dataflow(dataflow_id); let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") })?; @@ -1330,12 +1310,14 @@ impl Daemon { .clone(), }; - self.send_log_message(log( - LogLevel::Info, - dataflow_id, - format!("dataflow finished on machine `{}`", self.daemon_id), - )) - .await?; + logger + .log( + LogLevel::Info, + None, + Some("daemon".into()), + format!("dataflow finished on machine `{}`", self.daemon_id), + ) + .await; if let Some(connection) = &mut self.coordinator_connection { let msg = serde_json::to_vec(&Timestamped { inner: CoordinatorRequest::Event { @@ -1354,10 +1336,6 @@ impl Daemon { self.running.remove(&dataflow_id); } - for log_message in log_messages { - self.send_log_message(log_message).await?; - } - Ok(()) } @@ -1455,13 +1433,17 @@ impl Daemon { node_id, exit_status, } => { - self.send_log_message(log_node( - LogLevel::Debug, - dataflow_id, - node_id.clone(), - format!("handling node stop with exit status {exit_status:?}"), - )) - .await?; + let mut logger = self + .logger + .for_dataflow(dataflow_id) + .for_node(node_id.clone()); + logger + .log( + LogLevel::Debug, + Some("daemon".into()), + format!("handling node stop with exit status {exit_status:?}"), + ) + .await; let node_result = match exit_status { NodeExitStatus::Success => Ok(()), @@ -1478,7 +1460,14 @@ impl Daemon { let cause = match caused_by_node { Some(caused_by_node) => { - self.send_log_message(log_node(LogLevel::Info, dataflow_id,node_id.clone(), format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"))).await?; + logger + .log( + LogLevel::Info, + Some("daemon".into()), + format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`") + ) + .await; + NodeErrorCause::Cascading { caused_by_node } } None if grace_duration_kill => NodeErrorCause::GraceDuration, @@ -1509,24 +1498,20 @@ impl Daemon { } }; - self.send_log_message(LogMessage { - dataflow_id, - node_id: Some(node_id.clone()), - level: if node_result.is_ok() { - LogLevel::Info - } else { - LogLevel::Error - }, - target: None, - module_path: None, - file: None, - line: None, - message: match &node_result { - Ok(()) => format!("{node_id} finished successfully"), - Err(err) => format!("{err}"), - }, - }) - .await?; + logger + .log( + if node_result.is_ok() { + LogLevel::Info + } else { + LogLevel::Error + }, + Some("daemon".into()), + match &node_result { + Ok(()) => format!("{node_id} finished successfully"), + Err(err) => format!("{err}"), + }, + ) + .await; self.dataflow_node_results .entry(dataflow_id) @@ -1893,6 +1878,7 @@ impl RunningDataflow { coordinator_connection: &mut Option, clock: &HLC, grace_duration: Option, + logger: &mut DataflowLogger<'_>, ) -> eyre::Result<()> { self.pending_nodes .handle_dataflow_stop( @@ -1900,6 +1886,7 @@ impl RunningDataflow { clock, &mut self.cascading_error_causes, &self.dynamic_nodes, + logger, ) .await?; @@ -2193,34 +2180,3 @@ impl CoreNodeKindExt for CoreNodeKind { } } } - -fn log(level: LogLevel, dataflow_id: Uuid, message: impl Into) -> LogMessage { - LogMessage { - dataflow_id, - node_id: None, - level, - target: Some("deamon".into()), - module_path: None, - file: None, - line: None, - message: message.into(), - } -} - -fn log_node( - level: LogLevel, - dataflow_id: Uuid, - node_id: NodeId, - message: impl Into, -) -> LogMessage { - LogMessage { - dataflow_id, - node_id: Some(node_id), - level, - target: Some("deamon".into()), - module_path: None, - file: None, - line: None, - message: message.into(), - } -} diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs index 55368a23..c9e41334 100644 --- a/binaries/daemon/src/log.rs +++ b/binaries/daemon/src/log.rs @@ -1,9 +1,209 @@ -use std::path::{Path, PathBuf}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; -use dora_core::config::NodeId; +use dora_core::{config::NodeId, uhlc}; +use dora_message::{ + common::{DaemonId, LogLevel, LogMessage, Timestamped}, + daemon_to_coordinator::{CoordinatorRequest, DaemonEvent}, +}; +use eyre::Context; +use tokio::net::TcpStream; use uuid::Uuid; +use crate::socket_stream_utils::socket_stream_send; + pub fn log_path(working_dir: &Path, dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf { let dataflow_dir = working_dir.join("out").join(dataflow_id.to_string()); dataflow_dir.join(format!("log_{node_id}.txt")) } + +pub struct NodeLogger<'a> { + node_id: NodeId, + logger: DataflowLogger<'a>, +} + +impl NodeLogger<'_> { + pub fn inner(&self) -> &DataflowLogger { + &self.logger + } + + pub async fn log( + &mut self, + level: LogLevel, + target: Option, + message: impl Into, + ) { + self.logger + .log(level, Some(self.node_id.clone()), target, message) + .await + } +} + +pub struct DataflowLogger<'a> { + dataflow_id: Uuid, + logger: &'a mut DaemonLogger, +} + +impl<'a> DataflowLogger<'a> { + pub fn for_node(self, node_id: NodeId) -> NodeLogger<'a> { + NodeLogger { + node_id, + logger: self, + } + } + + pub fn reborrow(&mut self) -> DataflowLogger { + DataflowLogger { + dataflow_id: self.dataflow_id, + logger: self.logger, + } + } + + pub fn inner(&self) -> &DaemonLogger { + self.logger + } + + pub async fn log( + &mut self, + level: LogLevel, + node_id: Option, + target: Option, + message: impl Into, + ) { + self.logger + .log(level, self.dataflow_id, node_id, target, message) + .await + } +} + +pub struct DaemonLogger { + daemon_id: DaemonId, + logger: Logger, +} + +impl DaemonLogger { + pub fn for_dataflow(&mut self, dataflow_id: Uuid) -> DataflowLogger { + DataflowLogger { + dataflow_id, + logger: self, + } + } + + pub fn inner(&self) -> &Logger { + &self.logger + } + + pub async fn log( + &mut self, + level: LogLevel, + dataflow_id: Uuid, + node_id: Option, + target: Option, + message: impl Into, + ) { + let message = LogMessage { + daemon_id: Some(self.daemon_id.clone()), + dataflow_id, + node_id, + level, + target, + module_path: None, + file: None, + line: None, + message: message.into(), + }; + self.logger.log(message).await + } + + pub(crate) fn daemon_id(&self) -> &DaemonId { + &self.daemon_id + } +} + +pub struct Logger { + pub(super) coordinator_connection: Option, + pub(super) daemon_id: DaemonId, + pub(super) clock: Arc, +} + +impl Logger { + pub fn for_daemon(self, daemon_id: DaemonId) -> DaemonLogger { + DaemonLogger { + daemon_id, + logger: self, + } + } + + pub async fn log(&mut self, message: LogMessage) { + if let Some(connection) = &mut self.coordinator_connection { + let msg = serde_json::to_vec(&Timestamped { + inner: CoordinatorRequest::Event { + daemon_id: self.daemon_id.clone(), + event: DaemonEvent::Log(message.clone()), + }, + timestamp: self.clock.new_timestamp(), + }) + .expect("failed to serialize log message"); + match socket_stream_send(connection, &msg) + .await + .wrap_err("failed to send log message to dora-coordinator") + { + Ok(()) => return, + Err(err) => tracing::warn!("{err:?}"), + } + } + + // log message using tracing if reporting to coordinator is not possible + match message.level { + LogLevel::Error => { + if let Some(node_id) = message.node_id { + tracing::error!("{}/{} errored:", message.dataflow_id.to_string(), node_id); + } + for line in message.message.lines() { + tracing::error!(" {}", line); + } + } + LogLevel::Warn => { + if let Some(node_id) = message.node_id { + tracing::warn!("{}/{} warned:", message.dataflow_id.to_string(), node_id); + } + for line in message.message.lines() { + tracing::warn!(" {}", line); + } + } + LogLevel::Info => { + if let Some(node_id) = message.node_id { + tracing::info!("{}/{} info:", message.dataflow_id.to_string(), node_id); + } + + for line in message.message.lines() { + tracing::info!(" {}", line); + } + } + _ => {} + } + } + + pub async fn try_clone(&self) -> eyre::Result { + let coordinator_connection = match &self.coordinator_connection { + Some(c) => { + let addr = c + .peer_addr() + .context("failed to get coordinator peer addr")?; + let new_connection = TcpStream::connect(addr) + .await + .context("failed to connect to coordinator during logger clone")?; + Some(new_connection) + } + None => None, + }; + + Ok(Self { + coordinator_connection, + daemon_id: self.daemon_id.clone(), + clock: self.clock.clone(), + }) + } +} diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index b36807d1..89305d80 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -13,7 +13,7 @@ use dora_message::{ use eyre::{bail, Context}; use tokio::{net::TcpStream, sync::oneshot}; -use crate::{socket_stream_utils::socket_stream_send, CascadingErrorCauses}; +use crate::{log::DataflowLogger, socket_stream_utils::socket_stream_send, CascadingErrorCauses}; pub struct PendingNodes { dataflow_id: DataflowId, @@ -66,12 +66,13 @@ impl PendingNodes { coordinator_connection: &mut Option, clock: &HLC, cascading_errors: &mut CascadingErrorCauses, + logger: &mut DataflowLogger<'_>, ) -> eyre::Result { self.waiting_subscribers .insert(node_id.clone(), reply_sender); self.local_nodes.remove(&node_id); - self.update_dataflow_status(coordinator_connection, clock, cascading_errors) + self.update_dataflow_status(coordinator_connection, clock, cascading_errors, logger) .await } @@ -81,24 +82,22 @@ impl PendingNodes { coordinator_connection: &mut Option, clock: &HLC, cascading_errors: &mut CascadingErrorCauses, - ) -> eyre::Result> { - let mut log = Vec::new(); + logger: &mut DataflowLogger<'_>, + ) -> eyre::Result<()> { if self.local_nodes.remove(node_id) { - log.push(LogMessage { - dataflow_id: self.dataflow_id, - node_id: Some(node_id.clone()), - level: LogLevel::Warn, - target: None, - module_path: None, - file: None, - line: None, - message: "node exited before initializing dora connection".into(), - }); + logger + .log( + LogLevel::Warn, + Some(node_id.clone()), + Some("daemon::pending".into()), + "node exited before initializing dora connection", + ) + .await; self.exited_before_subscribe.push(node_id.clone()); - self.update_dataflow_status(coordinator_connection, clock, cascading_errors) + self.update_dataflow_status(coordinator_connection, clock, cascading_errors, logger) .await?; } - Ok(log) + Ok(()) } pub async fn handle_dataflow_stop( @@ -107,12 +106,18 @@ impl PendingNodes { clock: &HLC, cascading_errors: &mut CascadingErrorCauses, dynamic_nodes: &BTreeSet, + logger: &mut DataflowLogger<'_>, ) -> eyre::Result> { // remove all local dynamic nodes that are not yet started for node_id in dynamic_nodes { if self.local_nodes.remove(node_id) { - self.update_dataflow_status(coordinator_connection, clock, cascading_errors) - .await?; + self.update_dataflow_status( + coordinator_connection, + clock, + cascading_errors, + logger, + ) + .await?; } } @@ -139,11 +144,12 @@ impl PendingNodes { coordinator_connection: &mut Option, clock: &HLC, cascading_errors: &mut CascadingErrorCauses, + logger: &mut DataflowLogger<'_>, ) -> eyre::Result { if self.local_nodes.is_empty() { if self.external_nodes { if !self.reported_init_to_coordinator { - self.report_nodes_ready(coordinator_connection, clock.new_timestamp()) + self.report_nodes_ready(coordinator_connection, clock.new_timestamp(), logger) .await?; self.reported_init_to_coordinator = true; } @@ -194,15 +200,23 @@ impl PendingNodes { &self, coordinator_connection: &mut Option, timestamp: Timestamp, + logger: &mut DataflowLogger<'_>, ) -> eyre::Result<()> { let Some(connection) = coordinator_connection else { bail!("no coordinator connection to send AllNodesReady"); }; - tracing::info!( - "all local nodes are ready (exit before subscribe: {:?}), waiting for remote nodes", - self.exited_before_subscribe - ); + logger + .log( + LogLevel::Info, + None, + Some("daemon".into()), + format!( + "all local nodes are ready (exit before subscribe: {:?}), waiting for remote nodes", + self.exited_before_subscribe + ), + ) + .await; let msg = serde_json::to_vec(&Timestamped { inner: CoordinatorRequest::Event { diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 887b82b1..609f49e4 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,6 +1,7 @@ use crate::{ - log, node_communication::spawn_listener_loop, node_inputs, CoreNodeKindExt, DoraEvent, Event, - OutputId, RunningNode, + log::{self, NodeLogger}, + node_communication::spawn_listener_loop, + node_inputs, CoreNodeKindExt, DoraEvent, Event, OutputId, RunningNode, }; use aligned_vec::{AVec, ConstAlign}; use crossbeam::queue::ArrayQueue; @@ -16,6 +17,7 @@ use dora_core::{ }; use dora_download::download_file; use dora_message::{ + common::{LogLevel, LogMessage}, daemon_to_coordinator::{DataMessage, NodeExitStatus, Timestamped}, daemon_to_node::{NodeConfig, RuntimeConfig}, DataflowId, @@ -48,9 +50,16 @@ pub async fn spawn_node( clock: Arc, node_stderr_most_recent: Arc>, uv: bool, + logger: &mut NodeLogger<'_>, ) -> eyre::Result { let node_id = node.id.clone(); - tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); + logger + .log( + LogLevel::Debug, + Some("daemon::spawner".into()), + "spawning node", + ) + .await; let queue_sizes = node_inputs(&node) .into_iter() @@ -118,20 +127,32 @@ pub async fn spawn_node( let mut cmd = tokio::process::Command::new("uv"); cmd.arg("run"); cmd.arg("python"); - tracing::info!( - "spawning: uv run python -u {}", - resolved_path.display() - ); + logger + .log( + LogLevel::Info, + Some("spawner".into()), + format!( + "spawning: uv run python -u {}", + resolved_path.display() + ), + ) + .await; cmd } else { let python = get_python_path().wrap_err( "Could not find python path when spawning custom node", )?; - tracing::info!( - "spawning: {:?} -u {}", - &python, - resolved_path.display() - ); + logger + .log( + LogLevel::Info, + Some("spawner".into()), + format!( + "spawning: {:?} -u {}", + &python, + resolved_path.display() + ), + ) + .await; let cmd = tokio::process::Command::new(python); cmd }; @@ -141,7 +162,13 @@ pub async fn spawn_node( cmd } _ => { - tracing::info!("spawning: {}", resolved_path.display()); + logger + .log( + LogLevel::Info, + Some("spawner".into()), + format!("spawning: {}", resolved_path.display()), + ) + .await; if uv { let mut cmd = tokio::process::Command::new(&"uv"); cmd.arg("run"); @@ -304,7 +331,13 @@ pub async fn spawn_node( let pid = crate::ProcessId::new(child.id().context( "Could not get the pid for the just spawned node and indicate that there is an error", )?); - tracing::debug!("Spawned node `{dataflow_id}/{node_id}` with pid {pid:?}"); + logger + .log( + LogLevel::Debug, + Some("spawner".into()), + format!("spawned node with pid {pid:?}"), + ) + .await; let dataflow_dir: PathBuf = working_dir.join("out").join(dataflow_id.to_string()); if !dataflow_dir.exists() { @@ -368,11 +401,6 @@ pub async fn spawn_node( // send the buffered lines let lines = std::mem::take(&mut buffer); - if std::env::var("DORA_QUIET").is_err() { - if lines.len() > 1 { - tracing::info!("log_{}: {}", node_id, &lines[..lines.len() - 1]); - } - } let sent = stdout_tx.send(lines.clone()).await; if sent.is_err() { println!("Could not log: {lines}"); @@ -450,6 +478,14 @@ pub async fn spawn_node( }); let node_id = node.id.clone(); + let daemon_id = logger.inner().inner().daemon_id().clone(); + let mut cloned_logger = logger + .inner() + .inner() + .inner() + .try_clone() + .await + .context("failed to clone logger")?; // Log to file stream. tokio::spawn(async move { while let Some(message) = rx.recv().await { @@ -488,12 +524,24 @@ pub async fn spawn_node( .await .map_err(|err| error!("Could not log {message} to file due to {err}")); let formatted = message.lines().fold(String::default(), |mut output, line| { - output.push_str(" "); output.push_str(line); - output.push('\n'); output }); - tracing::trace!("{dataflow_id}/{} logged:\n{formatted}", node.id.clone()); + if std::env::var("DORA_QUIET").is_err() { + cloned_logger + .log(LogMessage { + daemon_id: Some(daemon_id.clone()), + dataflow_id, + level: LogLevel::Info, + node_id: Some(node_id.clone()), + target: Some("stdout".into()), + message: formatted, + file: None, + line: None, + module_path: None, + }) + .await; + } // Make sure that all data has been synced to disk. let _ = file .sync_all() diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index a392cfb2..93e2f8d9 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -9,11 +9,12 @@ use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, DataflowId}; pub use log::Level as LogLevel; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[must_use] pub struct LogMessage { pub dataflow_id: DataflowId, pub node_id: Option, + pub daemon_id: Option, pub level: LogLevel, pub target: Option, pub module_path: Option,