From 8aecbb249c66f225423feec0c323b0e5e093615c Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 26 Jun 2024 13:00:02 +0200 Subject: [PATCH] Log when node finishes or fails --- binaries/daemon/src/lib.rs | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 8353e361..6fdc7bcf 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -2,7 +2,7 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; use crossbeam::queue::ArrayQueue; use dora_core::config::{Input, OperatorId}; -use dora_core::coordinator_messages::{CoordinatorRequest, LogMessage}; +use dora_core::coordinator_messages::{CoordinatorRequest, Level, LogMessage}; use dora_core::daemon_messages::{ DataMessage, DynamicNodeEvent, InterDaemonEvent, NodeConfig, Timestamped, }; @@ -660,7 +660,16 @@ impl Daemon { dataflow.running_nodes.insert(node_id, running_node); } Err(err) => { - tracing::error!("{err:?}"); + log_messages.push(LogMessage { + dataflow_id, + node_id: Some(node_id.clone()), + level: Level::Error, + target: "spawn".into(), + module_path: None, + file: None, + line: None, + message: format!("{err:?}"), + }); let messages = dataflow .pending_nodes .handle_node_stop( @@ -1227,6 +1236,25 @@ impl Daemon { } }; + self.send_log_message(LogMessage { + dataflow_id, + node_id: Some(node_id.clone()), + level: if node_result.is_ok() { + Level::Info + } else { + Level::Error + }, + target: "exit".into(), + module_path: None, + file: None, + line: None, + message: match &node_result { + Ok(()) => "node finished successfully".to_string(), + Err(err) => format!("{err}"), + }, + }) + .await?; + self.dataflow_node_results .entry(dataflow_id) .or_default()