Browse Source

Log in coordinator when dataflow is finished

tags/v0.3.10
Philipp Oppermann haixuantao 11 months ago
parent
commit
a692c2ee5b
2 changed files with 33 additions and 14 deletions
  1. +32
    -13
      binaries/coordinator/src/lib.rs
  2. +1
    -1
      libraries/message/src/coordinator_to_cli.rs

+ 32
- 13
binaries/coordinator/src/lib.rs View File

@@ -12,7 +12,7 @@ use dora_message::{
common::DaemonId,
coordinator_to_cli::{
ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
DataflowStatus, LogMessage,
DataflowStatus, LogLevel, LogMessage,
},
coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped},
daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
@@ -344,7 +344,24 @@ async fn start_inner(
archived_dataflows
.entry(uuid)
.or_insert_with(|| ArchivedDataflow::from(entry.get()));
let finished_dataflow = entry.remove();
let mut finished_dataflow = entry.remove();
let dataflow_id = finished_dataflow.uuid;
send_log_message(
&mut finished_dataflow,
&LogMessage {
dataflow_id,
node_id: None,
daemon_id: None,
level: LogLevel::Info,
target: Some("coordinator".into()),
module_path: None,
file: None,
line: None,
message: "dataflow finished".into(),
},
)
.await;

let reply = ControlRequestReply::DataflowStopped {
uuid,
result: dataflow_results
@@ -679,17 +696,7 @@ async fn start_inner(
}
Event::Log(message) => {
if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) {
for subscriber in &mut dataflow.log_subscribers {
let send_result = tokio::time::timeout(
Duration::from_millis(100),
subscriber.send_message(&message),
);

if send_result.await.is_err() {
subscriber.close();
}
}
dataflow.log_subscribers.retain(|s| !s.is_closed());
send_log_message(dataflow, &message).await;
}
}
Event::DaemonExit { daemon_id } => {
@@ -704,6 +711,18 @@ async fn start_inner(
Ok(())
}

async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) {
for subscriber in &mut dataflow.log_subscribers {
let send_result =
tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));

if send_result.await.is_err() {
subscriber.close();
}
}
dataflow.log_subscribers.retain(|s| !s.is_closed());
}

fn dataflow_result(
results: &BTreeMap<DaemonId, DataflowDaemonResult>,
dataflow_uuid: Uuid,


+ 1
- 1
libraries/message/src/coordinator_to_cli.rs View File

@@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet};

use uuid::Uuid;

pub use crate::common::{LogMessage, NodeError, NodeErrorCause, NodeExitStatus};
pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus};
use crate::{common::DaemonId, id::NodeId};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]


Loading…
Cancel
Save