Browse Source

Add a special log level for `stdout` output

tags/v0.3.12-rc0
Philipp Oppermann 7 months ago
parent
commit
950862b916
Failed to extract signature
9 changed files with 136 additions and 74 deletions
  1. +16
    -8
      binaries/cli/src/command/build/local.rs
  2. +9
    -4
      binaries/cli/src/output.rs
  3. +1
    -1
      binaries/coordinator/src/lib.rs
  4. +8
    -2
      binaries/coordinator/src/log_subscriber.rs
  5. +79
    -49
      binaries/daemon/src/log.rs
  6. +2
    -2
      binaries/daemon/src/spawn.rs
  7. +6
    -2
      libraries/core/src/build/logger.rs
  8. +2
    -5
      libraries/core/src/build/mod.rs
  9. +13
    -1
      libraries/message/src/common.rs

+ 16
- 8
binaries/cli/src/command/build/local.rs View File

@@ -2,7 +2,7 @@ use std::{collections::BTreeMap, path::PathBuf};

use colored::Colorize;
use dora_core::{
build::{BuildInfo, BuildLogger, Builder, GitManager},
build::{BuildInfo, BuildLogger, Builder, GitManager, LogLevelOrStdout},
descriptor::{Descriptor, DescriptorExt},
};
use dora_message::{common::GitSource, id::NodeId};
@@ -89,16 +89,24 @@ struct LocalBuildLogger {
impl BuildLogger for LocalBuildLogger {
type Clone = Self;

async fn log_message(&mut self, level: log::Level, message: impl Into<String> + Send) {
let level = match level {
log::Level::Error => "ERROR".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
other => format!("{other:5}").normal(),
async fn log_message(
&mut self,
level: impl Into<LogLevelOrStdout> + Send,
message: impl Into<String> + Send,
) {
let level = match level.into() {
LogLevelOrStdout::LogLevel(level) => match level {
log::Level::Error => "ERROR ".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
log::Level::Debug => "DEBUG ".bright_blue(),
log::Level::Trace => "TRACE ".dimmed(),
},
LogLevelOrStdout::Stdout => "stdout".italic().dimmed(),
};
let node = self.node_id.to_string().bold().bright_black();
let message: String = message.into();
println!("{node}: \t{level}: \t{message}");
println!("{node}: {level} {message}");
}

async fn try_clone(&self) -> eyre::Result<Self::Clone> {


+ 9
- 4
binaries/cli/src/output.rs View File

@@ -1,4 +1,5 @@
use colored::Colorize;
use dora_core::build::LogLevelOrStdout;
use dora_message::common::LogMessage;

pub fn print_log_message(log_message: LogMessage) {
@@ -15,10 +16,14 @@ pub fn print_log_message(log_message: LogMessage) {
message,
} = log_message;
let level = match level {
log::Level::Error => "ERROR".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
other => format!("{other:5}").normal(),
LogLevelOrStdout::LogLevel(level) => match level {
log::Level::Error => "ERROR ".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
log::Level::Debug => "DEBUG ".bright_blue(),
log::Level::Trace => "TRACE ".dimmed(),
},
LogLevelOrStdout::Stdout => "stdout".bright_blue().italic().dimmed(),
};
let dataflow = if let Some(dataflow_id) = dataflow_id {
format!(" dataflow `{dataflow_id}`\t").cyan()


+ 1
- 1
binaries/coordinator/src/lib.rs View File

@@ -369,7 +369,7 @@ async fn start_inner(
dataflow_id: Some(dataflow_id),
node_id: None,
daemon_id: None,
level: LogLevel::Info,
level: LogLevel::Info.into(),
target: Some("coordinator".into()),
module_path: None,
file: None,


+ 8
- 2
binaries/coordinator/src/log_subscriber.rs View File

@@ -17,9 +17,15 @@ impl LogSubscriber {
}

pub async fn send_message(&mut self, message: &LogMessage) -> eyre::Result<()> {
if message.level > self.level {
return Ok(());
match message.level {
dora_core::build::LogLevelOrStdout::LogLevel(level) => {
if level > self.level {
return Ok(());
}
}
dora_core::build::LogLevelOrStdout::Stdout => {}
}

let message = serde_json::to_vec(&message)?;
let connection = self.connection.as_mut().context("connection is closed")?;
tcp_send(connection, &message)


+ 79
- 49
binaries/daemon/src/log.rs View File

@@ -4,7 +4,11 @@ use std::{
sync::Arc,
};

use dora_core::{build::BuildLogger, config::NodeId, uhlc};
use dora_core::{
build::{BuildLogger, LogLevelOrStdout},
config::NodeId,
uhlc,
};
use dora_message::{
common::{DaemonId, LogLevel, LogMessage, Timestamped},
daemon_to_coordinator::{CoordinatorRequest, DaemonEvent},
@@ -101,9 +105,19 @@ pub struct NodeBuildLogger<'a> {
}

impl NodeBuildLogger<'_> {
pub async fn log(&mut self, level: LogLevel, message: impl Into<String>) {
pub async fn log(
&mut self,
level: impl Into<LogLevelOrStdout> + Send,
message: impl Into<String>,
) {
self.logger
.log_build(self.build_id, level, Some(self.node_id.clone()), message)
.log_build(
self.build_id,
level.into(),
None,
Some(self.node_id.clone()),
message,
)
.await
}

@@ -121,7 +135,7 @@ impl BuildLogger for NodeBuildLogger<'_> {

fn log_message(
&mut self,
level: LogLevel,
level: impl Into<LogLevelOrStdout> + Send,
message: impl Into<String> + Send,
) -> impl std::future::Future<Output = ()> + Send {
self.log(level, message)
@@ -170,7 +184,7 @@ impl DaemonLogger {
daemon_id: Some(self.daemon_id.clone()),
dataflow_id,
node_id,
level,
level: level.into(),
target,
module_path: None,
file: None,
@@ -183,7 +197,8 @@ impl DaemonLogger {
pub async fn log_build(
&mut self,
build_id: BuildId,
level: LogLevel,
level: LogLevelOrStdout,
target: Option<String>,
node_id: Option<NodeId>,
message: impl Into<String>,
) {
@@ -193,7 +208,7 @@ impl DaemonLogger {
dataflow_id: None,
node_id,
level,
target: Some("build".into()),
target,
module_path: None,
file: None,
line: None,
@@ -249,33 +264,7 @@ impl Logger {

// log message using tracing if reporting to coordinator is not possible
match message.level {
LogLevel::Error => {
tracing::error!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
);
}
LogLevel::Warn => {
tracing::warn!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
);
}
LogLevel::Info => {
LogLevelOrStdout::Stdout => {
tracing::info!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
@@ -286,22 +275,63 @@ impl Logger {
line = message.line,
"{}",
Indent(&message.message)
);
}
LogLevel::Debug => {
tracing::debug!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
);
)
}
_ => {}
LogLevelOrStdout::LogLevel(level) => match level {
LogLevel::Error => {
tracing::error!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
);
}
LogLevel::Warn => {
tracing::warn!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
);
}
LogLevel::Info => {
tracing::info!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
);
}
LogLevel::Debug => {
tracing::debug!(
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
file = message.file,
line = message.line,
"{}",
Indent(&message.message)
);
}
_ => {}
},
}
}



+ 2
- 2
binaries/daemon/src/spawn.rs View File

@@ -590,9 +590,9 @@ impl PreparedNode {
daemon_id: Some(daemon_id.clone()),
dataflow_id: Some(dataflow_id),
build_id: None,
level: LogLevel::Info,
level: dora_core::build::LogLevelOrStdout::Stdout,
node_id: Some(node_id.clone()),
target: Some("stdout".into()),
target: None,
message: formatted,
file: None,
line: None,


+ 6
- 2
libraries/core/src/build/logger.rs View File

@@ -1,15 +1,19 @@
use std::future::Future;

use dora_message::common::LogLevel;
pub use dora_message::common::LogLevelOrStdout;

pub trait BuildLogger: Send {
type Clone: BuildLogger + 'static;

fn log_message(
&mut self,
level: LogLevel,
level: impl Into<LogLevelOrStdout> + Send,
message: impl Into<String> + Send,
) -> impl Future<Output = ()> + Send;

fn log_stdout(&mut self, message: impl Into<String> + Send) -> impl Future<Output = ()> + Send {
self.log_message(LogLevelOrStdout::Stdout, message)
}

fn try_clone(&self) -> impl Future<Output = eyre::Result<Self::Clone>> + Send;
}

+ 2
- 5
libraries/core/src/build/mod.rs View File

@@ -1,5 +1,5 @@
pub use git::GitManager;
pub use logger::BuildLogger;
pub use logger::{BuildLogger, LogLevelOrStdout};

use url::Url;

@@ -126,10 +126,7 @@ async fn build_node(
tokio::spawn(async move {
while let Some(line) = stdout.recv().await {
logger
.log_message(
LogLevel::Info,
line.unwrap_or_else(|err| format!("io err: {}", err.kind())),
)
.log_stdout(line.unwrap_or_else(|err| format!("io err: {}", err.kind())))
.await;
}
});


+ 13
- 1
libraries/message/src/common.rs View File

@@ -16,7 +16,7 @@ pub struct LogMessage {
pub dataflow_id: Option<DataflowId>,
pub node_id: Option<NodeId>,
pub daemon_id: Option<DaemonId>,
pub level: LogLevel,
pub level: LogLevelOrStdout,
pub target: Option<String>,
pub module_path: Option<String>,
pub file: Option<String>,
@@ -24,6 +24,18 @@ pub struct LogMessage {
pub message: String,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum LogLevelOrStdout {
LogLevel(LogLevel),
Stdout,
}

impl From<LogLevel> for LogLevelOrStdout {
fn from(level: LogLevel) -> Self {
Self::LogLevel(level)
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct NodeError {
pub timestamp: uhlc::Timestamp,


Loading…
Cancel
Save