diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 48d7a584..e002f859 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -12,7 +12,7 @@ use dora_message::{ common::DaemonId, coordinator_to_cli::{ ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult, - DataflowStatus, LogMessage, + DataflowStatus, LogLevel, LogMessage, }, coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped}, daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult}, @@ -344,7 +344,24 @@ async fn start_inner( archived_dataflows .entry(uuid) .or_insert_with(|| ArchivedDataflow::from(entry.get())); - let finished_dataflow = entry.remove(); + let mut finished_dataflow = entry.remove(); + let dataflow_id = finished_dataflow.uuid; + send_log_message( + &mut finished_dataflow, + &LogMessage { + dataflow_id, + node_id: None, + daemon_id: None, + level: LogLevel::Info, + target: Some("coordinator".into()), + module_path: None, + file: None, + line: None, + message: "dataflow finished".into(), + }, + ) + .await; + let reply = ControlRequestReply::DataflowStopped { uuid, result: dataflow_results @@ -679,17 +696,7 @@ async fn start_inner( } Event::Log(message) => { if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) { - for subscriber in &mut dataflow.log_subscribers { - let send_result = tokio::time::timeout( - Duration::from_millis(100), - subscriber.send_message(&message), - ); - - if send_result.await.is_err() { - subscriber.close(); - } - } - dataflow.log_subscribers.retain(|s| !s.is_closed()); + send_log_message(dataflow, &message).await; } } Event::DaemonExit { daemon_id } => { @@ -704,6 +711,18 @@ async fn start_inner( Ok(()) } +async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) { + for subscriber in &mut dataflow.log_subscribers { + let send_result = + tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message)); + + if send_result.await.is_err() { + subscriber.close(); + } + } + dataflow.log_subscribers.retain(|s| !s.is_closed()); +} + fn dataflow_result( results: &BTreeMap, dataflow_uuid: Uuid, diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index d5477a07..c8f1d3c8 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet}; use uuid::Uuid; -pub use crate::common::{LogMessage, NodeError, NodeErrorCause, NodeExitStatus}; +pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus}; use crate::{common::DaemonId, id::NodeId}; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]