Browse Source

Forward more daemon log messages to CLI

tags/v0.3.10
Philipp Oppermann haixuantao 11 months ago
parent
commit
ec4f229094
1 changed files with 93 additions and 18 deletions
  1. +93
    -18
      binaries/daemon/src/lib.rs

+ 93
- 18
binaries/daemon/src/lib.rs View File

@@ -467,7 +467,9 @@ impl Daemon {
dataflow_id,
exited_before_subscribe,
} => {
tracing::debug!("received AllNodesReady (dataflow id: {dataflow_id}, exited_before_subscribe: {exited_before_subscribe:?})");
self.send_log_message(log(LogLevel::Debug, dataflow_id,
format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
))).await?;
match self.running.get_mut(&dataflow_id) {
Some(dataflow) => {
let ready = exited_before_subscribe.is_empty();
@@ -479,7 +481,13 @@ impl Daemon {
)
.await?;
if ready {
tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`");
self.send_log_message(log(
LogLevel::Info,
dataflow_id,
"coordinator reported that all nodes are ready, starting dataflow",
))
.await?;
let dataflow = self.running.get_mut(&dataflow_id).unwrap(); // reborrow
dataflow.start(&self.events_tx, &self.clock).await?;
}
}
@@ -633,7 +641,13 @@ impl Daemon {
.await
.wrap_err("failed to forward remote output to local receivers")
{
tracing::warn!("{err:?}")
self.send_log_message(log_node(
LogLevel::Warn,
dataflow_id,
node_id,
format!("{err:?}"),
))
.await?;
}
Ok(())
}
@@ -642,8 +656,14 @@ impl Daemon {
node_id,
output_id,
} => {
let output_id = OutputId(node_id, output_id);
tracing::debug!(?dataflow_id, ?output_id, "received OutputClosed event");
let output_id = OutputId(node_id.clone(), output_id);
self.send_log_message(log_node(
LogLevel::Debug,
dataflow_id,
node_id.clone(),
format!("received OutputClosed event for output {output_id:?}"),
))
.await?;
let inner = async {
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("send out failed: no running dataflow with ID `{dataflow_id}`")
@@ -660,7 +680,13 @@ impl Daemon {
.await
.wrap_err("failed to handle InputsClosed event sent by coordinator")
{
tracing::warn!("{err:?}")
self.send_log_message(log_node(
LogLevel::Warn,
dataflow_id,
node_id,
format!("{err:?}"),
))
.await?;
}
Ok(())
}
@@ -922,6 +948,14 @@ impl Daemon {
event_sender,
reply_sender,
} => {
self.send_log_message(log_node(
LogLevel::Info,
dataflow_id,
node_id.clone(),
"node is ready",
))
.await?;

let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
});
@@ -931,7 +965,6 @@ impl Daemon {
let _ = reply_sender.send(DaemonReply::Result(Err(err)));
}
Ok(dataflow) => {
tracing::info!("node `{node_id}` is ready");
Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;

let status = dataflow
@@ -946,9 +979,13 @@ impl Daemon {
.await?;
match status {
DataflowStatus::AllNodesReady => {
tracing::info!(
"all nodes are ready, starting dataflow `{dataflow_id}`"
);
self.send_log_message(log(
LogLevel::Info,
dataflow_id,
"all nodes are ready, starting dataflow",
))
.await?;
let dataflow = self.running.get_mut(&dataflow_id).unwrap(); // reborrow
dataflow.start(&self.events_tx, &self.clock).await?;
}
DataflowStatus::Pending => {}
@@ -1293,10 +1330,12 @@ impl Daemon {
.clone(),
};

tracing::info!(
"Dataflow `{dataflow_id}` finished on machine `{}`",
self.daemon_id
);
self.send_log_message(log(
LogLevel::Info,
dataflow_id,
format!("dataflow finished on machine `{}`", self.daemon_id),
))
.await?;
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
@@ -1416,9 +1455,14 @@ impl Daemon {
node_id,
exit_status,
} => {
tracing::debug!(
"handling node stop {dataflow_id}/{node_id} with exit status {exit_status:?}"
);
self.send_log_message(log_node(
LogLevel::Debug,
dataflow_id,
node_id.clone(),
format!("handling node stop with exit status {exit_status:?}"),
))
.await?;

let node_result = match exit_status {
NodeExitStatus::Success => Ok(()),
exit_status => {
@@ -1434,7 +1478,7 @@ impl Daemon {

let cause = match caused_by_node {
Some(caused_by_node) => {
tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`");
self.send_log_message(log_node(LogLevel::Info, dataflow_id,node_id.clone(), format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"))).await?;
NodeErrorCause::Cascading { caused_by_node }
}
None if grace_duration_kill => NodeErrorCause::GraceDuration,
@@ -2149,3 +2193,34 @@ impl CoreNodeKindExt for CoreNodeKind {
}
}
}

fn log(level: LogLevel, dataflow_id: Uuid, message: impl Into<String>) -> LogMessage {
LogMessage {
dataflow_id,
node_id: None,
level,
target: Some("deamon".into()),
module_path: None,
file: None,
line: None,
message: message.into(),
}
}

fn log_node(
level: LogLevel,
dataflow_id: Uuid,
node_id: NodeId,
message: impl Into<String>,
) -> LogMessage {
LogMessage {
dataflow_id,
node_id: Some(node_id),
level,
target: Some("deamon".into()),
module_path: None,
file: None,
line: None,
message: message.into(),
}
}

Loading…
Cancel
Save