@@ -17,7 +17,7 @@ use dora_message::{
coordinator_to_cli::DataflowResult,
coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
daemon_to_coordinator::{
CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult, LogMessage,
CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
},
daemon_to_daemon::InterDaemonEvent,
daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
@@ -30,6 +30,7 @@ use eyre::{bail, eyre, Context, ContextCompat, Result};
use futures::{future, stream, FutureExt, TryFutureExt};
use futures_concurrency::stream::Merge;
use local_listener::DynamicNodeEventWrapper;
use log::{DaemonLogger, DataflowLogger, Logger};
use pending::PendingNodes;
use shared_memory_server::ShmemConf;
use socket_stream_utils::socket_stream_send;
@@ -94,6 +95,8 @@ pub struct Daemon {
zenoh_session: zenoh::Session,
remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
logger: DaemonLogger,
}
type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
@@ -234,6 +237,20 @@ impl Daemon {
None => None,
};
// additional connection for logging
let logger_coordinator_connection = match coordinator_addr {
Some(addr) => {
let stream = TcpStream::connect(addr)
.await
.wrap_err("failed to connect log to dora-coordinator")?;
stream
.set_nodelay(true)
.wrap_err("failed to set TCP_NODELAY")?;
Some(stream)
}
None => None,
};
let zenoh_config = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) {
Ok(path) => zenoh::Config::from_file(&path)
.map_err(|e| eyre!(e))
@@ -251,6 +268,12 @@ impl Daemon {
let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
let daemon = Self {
logger: Logger {
coordinator_connection: logger_coordinator_connection,
daemon_id: daemon_id.clone(),
clock: clock.clone(),
}
.for_daemon(daemon_id.clone()),
running: HashMap::new(),
working_dir: HashMap::new(),
events_tx: dora_events_tx,
@@ -334,8 +357,14 @@ impl Daemon {
Event::CtrlC => {
tracing::info!("received ctrlc signal -> stopping all dataflows");
for dataflow in self.running.values_mut() {
let mut logger = self.logger.for_dataflow(dataflow.id);
dataflow
.stop_all(&mut self.coordinator_connection, &self.clock, None)
.stop_all(
&mut self.coordinator_connection,
&self.clock,
None,
&mut logger,
)
.await?;
}
self.exit_when_all_finished = true;
@@ -369,55 +398,6 @@ impl Daemon {
Ok(self.dataflow_node_results)
}
async fn send_log_message(&mut self, message: LogMessage) -> eyre::Result<()> {
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
daemon_id: self.daemon_id.clone(),
event: DaemonEvent::Log(message),
},
timestamp: self.clock.new_timestamp(),
})?;
socket_stream_send(connection, &msg)
.await
.wrap_err("failed to send log message to dora-coordinator")?;
if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
bail!("lost connection to coordinator")
}
} else {
match message.level {
LogLevel::Error => {
if let Some(node_id) = message.node_id {
tracing::error!("{}/{} errored:", message.dataflow_id.to_string(), node_id);
}
for line in message.message.lines() {
tracing::error!(" {}", line);
}
}
LogLevel::Warn => {
if let Some(node_id) = message.node_id {
tracing::warn!("{}/{} warned:", message.dataflow_id.to_string(), node_id);
}
for line in message.message.lines() {
tracing::warn!(" {}", line);
}
}
LogLevel::Info => {
if let Some(node_id) = message.node_id {
tracing::info!("{}/{} info:", message.dataflow_id.to_string(), node_id);
}
for line in message.message.lines() {
tracing::info!(" {}", line);
}
}
_ => {}
}
}
Ok(())
}
async fn handle_coordinator_event(
&mut self,
event: DaemonCoordinatorEvent,
@@ -467,9 +447,11 @@ impl Daemon {
dataflow_id,
exited_before_subscribe,
} => {
self.send_log_message(log(LogLevel::Debug, dataflow_id,
let mut logger = self.logger.for_dataflow(dataflow_id);
logger.log(LogLevel::Debug, None,
Some("daemon".into()),
format!("received AllNodesReady (exited_before_subscribe: {exited_before_subscribe:?})"
))).await?;
)).await;
match self.running.get_mut(&dataflow_id) {
Some(dataflow) => {
let ready = exited_before_subscribe.is_empty();
@@ -481,13 +463,10 @@ impl Daemon {
)
.await?;
if ready {
self.send_log_message(log(
LogLevel::Info,
dataflow_id,
logger.log(LogLevel::Info, None,
Some("daemon".into()),
"coordinator reported that all nodes are ready, starting dataflow",
))
.await?;
let dataflow = self.running.get_mut(&dataflow_id).unwrap(); // reborrow
).await;
dataflow.start(&self.events_tx, &self.clock).await?;
}
}
@@ -562,6 +541,7 @@ impl Daemon {
dataflow_id,
grace_duration,
} => {
let mut logger = self.logger.for_dataflow(dataflow_id);
let dataflow = self
.running
.get_mut(&dataflow_id)
@@ -572,6 +552,7 @@ impl Daemon {
&mut self.coordinator_connection,
&self.clock,
grace_duration,
&mut logger,
);
(Ok(()), Some(future))
}
@@ -641,13 +622,10 @@ impl Daemon {
.await
.wrap_err("failed to forward remote output to local receivers")
{
self.send_log_message(log_node(
LogLevel::Warn,
dataflow_id,
node_id,
format!("{err:?}"),
))
.await?;
let mut logger = self.logger.for_dataflow(dataflow_id).for_node(node_id);
logger
.log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
.await;
}
Ok(())
}
@@ -657,13 +635,18 @@ impl Daemon {
output_id,
} => {
let output_id = OutputId(node_id.clone(), output_id);
self.send_log_message(log_node(
LogLevel::Debug,
dataflow_id,
node_id.clone(),
format!("received OutputClosed event for output {output_id:?}"),
))
.await?;
let mut logger = self
.logger
.for_dataflow(dataflow_id)
.for_node(node_id.clone());
logger
.log(
LogLevel::Debug,
Some("daemon".into()),
format!("received OutputClosed event for output {output_id:?}"),
)
.await;
let inner = async {
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("send out failed: no running dataflow with ID `{dataflow_id}`")
@@ -680,13 +663,9 @@ impl Daemon {
.await
.wrap_err("failed to handle InputsClosed event sent by coordinator")
{
self.send_log_message(log_node(
LogLevel::Warn,
dataflow_id,
node_id,
format!("{err:?}"),
))
.await?;
logger
.log(LogLevel::Warn, Some("daemon".into()), format!("{err:?}"))
.await;
}
Ok(())
}
@@ -702,6 +681,7 @@ impl Daemon {
spawn_nodes: BTreeSet<NodeId>,
uv: bool,
) -> eyre::Result<()> {
let mut logger = self.logger.for_dataflow(dataflow_id);
let dataflow =
RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
let dataflow = match self.running.entry(dataflow_id) {
@@ -714,7 +694,6 @@ impl Daemon {
}
};
let mut log_messages = Vec::new();
let mut stopped = Vec::new();
// calculate info about mappings
@@ -755,6 +734,7 @@ impl Daemon {
// spawn nodes and set up subscriptions
for node in nodes.into_values() {
let mut logger = logger.reborrow().for_node(node.id.clone());
let local = spawn_nodes.contains(&node.id);
if local {
if node.kind.dynamic() {
@@ -769,6 +749,9 @@ impl Daemon {
.entry(node.id.clone())
.or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
.clone();
logger
.log(LogLevel::Info, Some("daemon".into()), "spawning")
.await;
match spawn::spawn_node(
dataflow_id,
&working_dir,
@@ -778,6 +761,7 @@ impl Daemon {
self.clock.clone(),
node_stderr_most_recent,
uv,
&mut logger,
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
@@ -786,16 +770,9 @@ impl Daemon {
dataflow.running_nodes.insert(node_id, running_node);
}
Err(err) => {
log_messages.push(LogMessage {
dataflow_id,
node_id: Some(node_id.clone()),
level: LogLevel::Error,
target: None,
module_path: None,
file: None,
line: None,
message: format!("{err:?}"),
});
logger
.log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
.await;
self.dataflow_node_results
.entry(dataflow_id)
.or_default()
@@ -871,10 +848,6 @@ impl Daemon {
self.handle_node_stop(dataflow_id, &node_id).await?;
}
for log_message in log_messages {
self.send_log_message(log_message).await?;
}
Ok(())
}
@@ -948,13 +921,15 @@ impl Daemon {
event_sender,
reply_sender,
} => {
self.send_log_message(log_node(
LogLevel::Info,
dataflow_id,
node_id.clone(),
"node is ready",
))
.await?;
let mut logger = self.logger.for_dataflow(dataflow_id);
logger
.log(
LogLevel::Info,
Some(node_id.clone()),
Some("daemon".into()),
"node is ready",
)
.await;
let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
@@ -975,17 +950,19 @@ impl Daemon {
&mut self.coordinator_connection,
&self.clock,
&mut dataflow.cascading_error_causes,
&mut logger,
)
.await?;
match status {
DataflowStatus::AllNodesReady => {
self.send_log_message(log(
LogLevel::Info,
dataflow_id,
"all nodes are ready, starting dataflow",
))
.await?;
let dataflow = self.running.get_mut(&dataflow_id).unwrap(); // reborrow
logger
.log(
LogLevel::Info,
None,
Some("daemon".into()),
"all nodes are ready, starting dataflow",
)
.await;
dataflow.start(&self.events_tx, &self.clock).await?;
}
DataflowStatus::Pending => {}
@@ -1294,22 +1271,25 @@ impl Daemon {
}
async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> {
let mut logger = self.logger.for_dataflow(dataflow_id);
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
})?;
let log_messages = dataflow
dataflow
.pending_nodes
.handle_node_stop(
node_id,
&mut self.coordinator_connection,
&self.clock,
&mut dataflow.cascading_error_causes,
&mut logger,
)
.await?;
self.handle_outputs_done(dataflow_id, node_id).await?;
let mut logger = self.logger.for_dataflow(dataflow_id);
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
})?;
@@ -1330,12 +1310,14 @@ impl Daemon {
.clone(),
};
self.send_log_message(log(
LogLevel::Info,
dataflow_id,
format!("dataflow finished on machine `{}`", self.daemon_id),
))
.await?;
logger
.log(
LogLevel::Info,
None,
Some("daemon".into()),
format!("dataflow finished on machine `{}`", self.daemon_id),
)
.await;
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
@@ -1354,10 +1336,6 @@ impl Daemon {
self.running.remove(&dataflow_id);
}
for log_message in log_messages {
self.send_log_message(log_message).await?;
}
Ok(())
}
@@ -1455,13 +1433,17 @@ impl Daemon {
node_id,
exit_status,
} => {
self.send_log_message(log_node(
LogLevel::Debug,
dataflow_id,
node_id.clone(),
format!("handling node stop with exit status {exit_status:?}"),
))
.await?;
let mut logger = self
.logger
.for_dataflow(dataflow_id)
.for_node(node_id.clone());
logger
.log(
LogLevel::Debug,
Some("daemon".into()),
format!("handling node stop with exit status {exit_status:?}"),
)
.await;
let node_result = match exit_status {
NodeExitStatus::Success => Ok(()),
@@ -1478,7 +1460,14 @@ impl Daemon {
let cause = match caused_by_node {
Some(caused_by_node) => {
self.send_log_message(log_node(LogLevel::Info, dataflow_id,node_id.clone(), format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"))).await?;
logger
.log(
LogLevel::Info,
Some("daemon".into()),
format!("marking `{node_id}` as cascading error caused by `{caused_by_node}`")
)
.await;
NodeErrorCause::Cascading { caused_by_node }
}
None if grace_duration_kill => NodeErrorCause::GraceDuration,
@@ -1509,24 +1498,20 @@ impl Daemon {
}
};
self.send_log_message(LogMessage {
dataflow_id,
node_id: Some(node_id.clone()),
level: if node_result.is_ok() {
LogLevel::Info
} else {
LogLevel::Error
},
target: None,
module_path: None,
file: None,
line: None,
message: match &node_result {
Ok(()) => format!("{node_id} finished successfully"),
Err(err) => format!("{err}"),
},
})
.await?;
logger
.log(
if node_result.is_ok() {
LogLevel::Info
} else {
LogLevel::Error
},
Some("daemon".into()),
match &node_result {
Ok(()) => format!("{node_id} finished successfully"),
Err(err) => format!("{err}"),
},
)
.await;
self.dataflow_node_results
.entry(dataflow_id)
@@ -1893,6 +1878,7 @@ impl RunningDataflow {
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
grace_duration: Option<Duration>,
logger: &mut DataflowLogger<'_>,
) -> eyre::Result<()> {
self.pending_nodes
.handle_dataflow_stop(
@@ -1900,6 +1886,7 @@ impl RunningDataflow {
clock,
&mut self.cascading_error_causes,
&self.dynamic_nodes,
logger,
)
.await?;
@@ -2193,34 +2180,3 @@ impl CoreNodeKindExt for CoreNodeKind {
}
}
}
fn log(level: LogLevel, dataflow_id: Uuid, message: impl Into<String>) -> LogMessage {
LogMessage {
dataflow_id,
node_id: None,
level,
target: Some("deamon".into()),
module_path: None,
file: None,
line: None,
message: message.into(),
}
}
fn log_node(
level: LogLevel,
dataflow_id: Uuid,
node_id: NodeId,
message: impl Into<String>,
) -> LogMessage {
LogMessage {
dataflow_id,
node_id: Some(node_id),
level,
target: Some("deamon".into()),
module_path: None,
file: None,
line: None,
message: message.into(),
}
}