|
|
|
@@ -2,7 +2,7 @@ use aligned_vec::{AVec, ConstAlign}; |
|
|
|
use coordinator::CoordinatorEvent; |
|
|
|
use crossbeam::queue::ArrayQueue; |
|
|
|
use dora_core::config::{Input, OperatorId}; |
|
|
|
use dora_core::coordinator_messages::{CoordinatorRequest, LogMessage}; |
|
|
|
use dora_core::coordinator_messages::{CoordinatorRequest, Level, LogMessage}; |
|
|
|
use dora_core::daemon_messages::{ |
|
|
|
DataMessage, DynamicNodeEvent, InterDaemonEvent, NodeConfig, Timestamped, |
|
|
|
}; |
|
|
|
@@ -660,7 +660,16 @@ impl Daemon { |
|
|
|
dataflow.running_nodes.insert(node_id, running_node); |
|
|
|
} |
|
|
|
Err(err) => { |
|
|
|
tracing::error!("{err:?}"); |
|
|
|
log_messages.push(LogMessage { |
|
|
|
dataflow_id, |
|
|
|
node_id: Some(node_id.clone()), |
|
|
|
level: Level::Error, |
|
|
|
target: "spawn".into(), |
|
|
|
module_path: None, |
|
|
|
file: None, |
|
|
|
line: None, |
|
|
|
message: format!("{err:?}"), |
|
|
|
}); |
|
|
|
let messages = dataflow |
|
|
|
.pending_nodes |
|
|
|
.handle_node_stop( |
|
|
|
@@ -1227,6 +1236,25 @@ impl Daemon { |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
self.send_log_message(LogMessage { |
|
|
|
dataflow_id, |
|
|
|
node_id: Some(node_id.clone()), |
|
|
|
level: if node_result.is_ok() { |
|
|
|
Level::Info |
|
|
|
} else { |
|
|
|
Level::Error |
|
|
|
}, |
|
|
|
target: "exit".into(), |
|
|
|
module_path: None, |
|
|
|
file: None, |
|
|
|
line: None, |
|
|
|
message: match &node_result { |
|
|
|
Ok(()) => "node finished successfully".to_string(), |
|
|
|
Err(err) => format!("{err}"), |
|
|
|
}, |
|
|
|
}) |
|
|
|
.await?; |
|
|
|
|
|
|
|
self.dataflow_node_results |
|
|
|
.entry(dataflow_id) |
|
|
|
.or_default() |
|
|
|
|