From 950862b916d9f198dfca06e84f0b44ed5af6f58c Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 18 Jun 2025 19:37:06 +0200 Subject: [PATCH] Add a special log level for `stdout` output --- binaries/cli/src/command/build/local.rs | 24 ++-- binaries/cli/src/output.rs | 13 ++- binaries/coordinator/src/lib.rs | 2 +- binaries/coordinator/src/log_subscriber.rs | 10 +- binaries/daemon/src/log.rs | 128 +++++++++++++-------- binaries/daemon/src/spawn.rs | 4 +- libraries/core/src/build/logger.rs | 8 +- libraries/core/src/build/mod.rs | 7 +- libraries/message/src/common.rs | 14 ++- 9 files changed, 136 insertions(+), 74 deletions(-) diff --git a/binaries/cli/src/command/build/local.rs b/binaries/cli/src/command/build/local.rs index 78d7152f..32c7b319 100644 --- a/binaries/cli/src/command/build/local.rs +++ b/binaries/cli/src/command/build/local.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, path::PathBuf}; use colored::Colorize; use dora_core::{ - build::{BuildInfo, BuildLogger, Builder, GitManager}, + build::{BuildInfo, BuildLogger, Builder, GitManager, LogLevelOrStdout}, descriptor::{Descriptor, DescriptorExt}, }; use dora_message::{common::GitSource, id::NodeId}; @@ -89,16 +89,24 @@ struct LocalBuildLogger { impl BuildLogger for LocalBuildLogger { type Clone = Self; - async fn log_message(&mut self, level: log::Level, message: impl Into + Send) { - let level = match level { - log::Level::Error => "ERROR".red(), - log::Level::Warn => "WARN ".yellow(), - log::Level::Info => "INFO ".green(), - other => format!("{other:5}").normal(), + async fn log_message( + &mut self, + level: impl Into + Send, + message: impl Into + Send, + ) { + let level = match level.into() { + LogLevelOrStdout::LogLevel(level) => match level { + log::Level::Error => "ERROR ".red(), + log::Level::Warn => "WARN ".yellow(), + log::Level::Info => "INFO ".green(), + log::Level::Debug => "DEBUG ".bright_blue(), + log::Level::Trace => "TRACE ".dimmed(), + }, + LogLevelOrStdout::Stdout => "stdout".italic().dimmed(), }; let node = self.node_id.to_string().bold().bright_black(); let message: String = message.into(); - println!("{node}: \t{level}: \t{message}"); + println!("{node}: {level} {message}"); } async fn try_clone(&self) -> eyre::Result { diff --git a/binaries/cli/src/output.rs b/binaries/cli/src/output.rs index bdffc8d3..76db8c17 100644 --- a/binaries/cli/src/output.rs +++ b/binaries/cli/src/output.rs @@ -1,4 +1,5 @@ use colored::Colorize; +use dora_core::build::LogLevelOrStdout; use dora_message::common::LogMessage; pub fn print_log_message(log_message: LogMessage) { @@ -15,10 +16,14 @@ pub fn print_log_message(log_message: LogMessage) { message, } = log_message; let level = match level { - log::Level::Error => "ERROR".red(), - log::Level::Warn => "WARN ".yellow(), - log::Level::Info => "INFO ".green(), - other => format!("{other:5}").normal(), + LogLevelOrStdout::LogLevel(level) => match level { + log::Level::Error => "ERROR ".red(), + log::Level::Warn => "WARN ".yellow(), + log::Level::Info => "INFO ".green(), + log::Level::Debug => "DEBUG ".bright_blue(), + log::Level::Trace => "TRACE ".dimmed(), + }, + LogLevelOrStdout::Stdout => "stdout".bright_blue().italic().dimmed(), }; let dataflow = if let Some(dataflow_id) = dataflow_id { format!(" dataflow `{dataflow_id}`\t").cyan() diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 90d0dfc3..8e3a4c23 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -369,7 +369,7 @@ async fn start_inner( dataflow_id: Some(dataflow_id), node_id: None, daemon_id: None, - level: LogLevel::Info, + level: LogLevel::Info.into(), target: Some("coordinator".into()), module_path: None, file: None, diff --git a/binaries/coordinator/src/log_subscriber.rs b/binaries/coordinator/src/log_subscriber.rs index cb602d47..e5006616 100644 --- a/binaries/coordinator/src/log_subscriber.rs +++ b/binaries/coordinator/src/log_subscriber.rs @@ -17,9 +17,15 @@ impl LogSubscriber { } pub async fn send_message(&mut self, message: &LogMessage) -> eyre::Result<()> { - if message.level > self.level { - return Ok(()); + match message.level { + dora_core::build::LogLevelOrStdout::LogLevel(level) => { + if level > self.level { + return Ok(()); + } + } + dora_core::build::LogLevelOrStdout::Stdout => {} } + let message = serde_json::to_vec(&message)?; let connection = self.connection.as_mut().context("connection is closed")?; tcp_send(connection, &message) diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs index c5fe171a..7092d328 100644 --- a/binaries/daemon/src/log.rs +++ b/binaries/daemon/src/log.rs @@ -4,7 +4,11 @@ use std::{ sync::Arc, }; -use dora_core::{build::BuildLogger, config::NodeId, uhlc}; +use dora_core::{ + build::{BuildLogger, LogLevelOrStdout}, + config::NodeId, + uhlc, +}; use dora_message::{ common::{DaemonId, LogLevel, LogMessage, Timestamped}, daemon_to_coordinator::{CoordinatorRequest, DaemonEvent}, @@ -101,9 +105,19 @@ pub struct NodeBuildLogger<'a> { } impl NodeBuildLogger<'_> { - pub async fn log(&mut self, level: LogLevel, message: impl Into) { + pub async fn log( + &mut self, + level: impl Into + Send, + message: impl Into, + ) { self.logger - .log_build(self.build_id, level, Some(self.node_id.clone()), message) + .log_build( + self.build_id, + level.into(), + None, + Some(self.node_id.clone()), + message, + ) .await } @@ -121,7 +135,7 @@ impl BuildLogger for NodeBuildLogger<'_> { fn log_message( &mut self, - level: LogLevel, + level: impl Into + Send, message: impl Into + Send, ) -> impl std::future::Future + Send { self.log(level, message) @@ -170,7 +184,7 @@ impl DaemonLogger { daemon_id: Some(self.daemon_id.clone()), dataflow_id, node_id, - level, + level: level.into(), target, module_path: None, file: None, @@ -183,7 +197,8 @@ impl DaemonLogger { pub async fn log_build( &mut self, build_id: BuildId, - level: LogLevel, + level: LogLevelOrStdout, + target: Option, node_id: Option, message: impl Into, ) { @@ -193,7 +208,7 @@ impl DaemonLogger { dataflow_id: None, node_id, level, - target: Some("build".into()), + target, module_path: None, file: None, line: None, @@ -249,33 +264,7 @@ impl Logger { // log message using tracing if reporting to coordinator is not possible match message.level { - LogLevel::Error => { - tracing::error!( - build_id = ?message.build_id.map(|id| id.to_string()), - dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), - node_id = ?message.node_id.map(|id| id.to_string()), - target = message.target, - module_path = message.module_path, - file = message.file, - line = message.line, - "{}", - Indent(&message.message) - ); - } - LogLevel::Warn => { - tracing::warn!( - build_id = ?message.build_id.map(|id| id.to_string()), - dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), - node_id = ?message.node_id.map(|id| id.to_string()), - target = message.target, - module_path = message.module_path, - file = message.file, - line = message.line, - "{}", - Indent(&message.message) - ); - } - LogLevel::Info => { + LogLevelOrStdout::Stdout => { tracing::info!( build_id = ?message.build_id.map(|id| id.to_string()), dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), @@ -286,22 +275,63 @@ impl Logger { line = message.line, "{}", Indent(&message.message) - ); - } - LogLevel::Debug => { - tracing::debug!( - build_id = ?message.build_id.map(|id| id.to_string()), - dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), - node_id = ?message.node_id.map(|id| id.to_string()), - target = message.target, - module_path = message.module_path, - file = message.file, - line = message.line, - "{}", - Indent(&message.message) - ); + ) } - _ => {} + LogLevelOrStdout::LogLevel(level) => match level { + LogLevel::Error => { + tracing::error!( + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), + node_id = ?message.node_id.map(|id| id.to_string()), + target = message.target, + module_path = message.module_path, + file = message.file, + line = message.line, + "{}", + Indent(&message.message) + ); + } + LogLevel::Warn => { + tracing::warn!( + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), + node_id = ?message.node_id.map(|id| id.to_string()), + target = message.target, + module_path = message.module_path, + file = message.file, + line = message.line, + "{}", + Indent(&message.message) + ); + } + LogLevel::Info => { + tracing::info!( + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), + node_id = ?message.node_id.map(|id| id.to_string()), + target = message.target, + module_path = message.module_path, + file = message.file, + line = message.line, + "{}", + Indent(&message.message) + ); + } + LogLevel::Debug => { + tracing::debug!( + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), + node_id = ?message.node_id.map(|id| id.to_string()), + target = message.target, + module_path = message.module_path, + file = message.file, + line = message.line, + "{}", + Indent(&message.message) + ); + } + _ => {} + }, } } diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 7d75b755..2c24f5e4 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -590,9 +590,9 @@ impl PreparedNode { daemon_id: Some(daemon_id.clone()), dataflow_id: Some(dataflow_id), build_id: None, - level: LogLevel::Info, + level: dora_core::build::LogLevelOrStdout::Stdout, node_id: Some(node_id.clone()), - target: Some("stdout".into()), + target: None, message: formatted, file: None, line: None, diff --git a/libraries/core/src/build/logger.rs b/libraries/core/src/build/logger.rs index d683bcd4..c382b1ac 100644 --- a/libraries/core/src/build/logger.rs +++ b/libraries/core/src/build/logger.rs @@ -1,15 +1,19 @@ use std::future::Future; -use dora_message::common::LogLevel; +pub use dora_message::common::LogLevelOrStdout; pub trait BuildLogger: Send { type Clone: BuildLogger + 'static; fn log_message( &mut self, - level: LogLevel, + level: impl Into + Send, message: impl Into + Send, ) -> impl Future + Send; + fn log_stdout(&mut self, message: impl Into + Send) -> impl Future + Send { + self.log_message(LogLevelOrStdout::Stdout, message) + } + fn try_clone(&self) -> impl Future> + Send; } diff --git a/libraries/core/src/build/mod.rs b/libraries/core/src/build/mod.rs index f5afd3b3..b0449d35 100644 --- a/libraries/core/src/build/mod.rs +++ b/libraries/core/src/build/mod.rs @@ -1,5 +1,5 @@ pub use git::GitManager; -pub use logger::BuildLogger; +pub use logger::{BuildLogger, LogLevelOrStdout}; use url::Url; @@ -126,10 +126,7 @@ async fn build_node( tokio::spawn(async move { while let Some(line) = stdout.recv().await { logger - .log_message( - LogLevel::Info, - line.unwrap_or_else(|err| format!("io err: {}", err.kind())), - ) + .log_stdout(line.unwrap_or_else(|err| format!("io err: {}", err.kind()))) .await; } }); diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 83591811..d48f1308 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -16,7 +16,7 @@ pub struct LogMessage { pub dataflow_id: Option, pub node_id: Option, pub daemon_id: Option, - pub level: LogLevel, + pub level: LogLevelOrStdout, pub target: Option, pub module_path: Option, pub file: Option, @@ -24,6 +24,18 @@ pub struct LogMessage { pub message: String, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum LogLevelOrStdout { + LogLevel(LogLevel), + Stdout, +} + +impl From for LogLevelOrStdout { + fn from(level: LogLevel) -> Self { + Self::LogLevel(level) + } +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct NodeError { pub timestamp: uhlc::Timestamp,