diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 28e31704..f3cb9fea 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -2,7 +2,7 @@ use coordinator::CoordinatorEvent; use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; use dora_core::daemon_messages::{Data, InterDaemonEvent, Timestamped}; -use dora_core::message::uhlc::{self, HLC}; +use dora_core::message::uhlc::{self, Timestamp, HLC}; use dora_core::message::MetadataParameters; use dora_core::{ config::{DataId, InputMapping, NodeId}, @@ -32,7 +32,7 @@ use std::{ }; use tcp_utils::tcp_send; use tokio::fs::File; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot::Sender; @@ -250,6 +250,14 @@ impl Daemon { tracing::warn!("failed to update HLC with incoming event timestamp: {err}"); } + if let Err(err) = self + .record_event(&inner, timestamp) + .await + .context("failed to record event") + { + tracing::warn!("{err:?}"); + }; + match inner { Event::Coordinator(CoordinatorEvent { event, reply_tx }) => { let status = self.handle_coordinator_event(event, reply_tx).await?; @@ -300,6 +308,73 @@ impl Daemon { Ok(self.dataflow_errors) } + async fn record_event(&mut self, event: &Event, timestamp: Timestamp) -> eyre::Result<()> { + let dataflow_id = match &event { + Event::Node { dataflow_id, .. } => dataflow_id, + Event::Coordinator(CoordinatorEvent { event, .. }) => match event { + DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { dataflow_id, .. }) => { + dataflow_id + } + DaemonCoordinatorEvent::AllNodesReady { dataflow_id, .. } => dataflow_id, + DaemonCoordinatorEvent::StopDataflow { dataflow_id } => dataflow_id, + DaemonCoordinatorEvent::ReloadDataflow { dataflow_id, .. } => dataflow_id, + DaemonCoordinatorEvent::Logs { dataflow_id, .. } => dataflow_id, + DaemonCoordinatorEvent::Destroy | DaemonCoordinatorEvent::Heartbeat => { + return Ok(()) + } + }, + Event::Daemon(event) => match event { + InterDaemonEvent::Output { dataflow_id, .. } => dataflow_id, + InterDaemonEvent::InputsClosed { dataflow_id, .. } => dataflow_id, + }, + Event::Dora(event) => match event { + DoraEvent::Timer { dataflow_id, .. } => dataflow_id, + DoraEvent::SpawnedNodeResult { dataflow_id, .. } => dataflow_id, + }, + Event::HeartbeatInterval | Event::CtrlC => return Ok(()), + }; + let Some(dataflow) = self.running.get_mut(dataflow_id) else { + bail!("no running dataflow with id `{dataflow_id}`"); + }; + if !dataflow.record { + // event recording is disabled + return Ok(()); + } + + let rendered = format!("at {timestamp}: {event:?}\n\n"); + + let record_folder = dataflow + .working_dir + .join("record") + .join(dataflow.id.to_string()); + tokio::fs::create_dir_all(&record_folder) + .await + .wrap_err_with(|| { + format!( + "failed to create record folder at {}", + record_folder.display() + ) + })?; + let record_file_path = record_folder.join("events.txt"); + let mut record_file = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&record_file_path) + .await + .wrap_err_with(|| { + format!( + "failed to open record file at {}", + record_file_path.display() + ) + })?; + record_file + .write_all(rendered.as_bytes()) + .await + .context("failed to write event to record file")?; + + Ok(()) + } + async fn handle_coordinator_event( &mut self, event: DaemonCoordinatorEvent, @@ -510,7 +585,8 @@ impl Daemon { dataflow_descriptor: Descriptor, record: bool, ) -> eyre::Result<()> { - let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); + let dataflow = + RunningDataflow::new(dataflow_id, self.machine_id.clone(), working_dir.clone()); let dataflow = match self.running.entry(dataflow_id) { std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow), std::collections::hash_map::Entry::Occupied(_) => { @@ -1258,6 +1334,8 @@ fn close_input( pub struct RunningDataflow { id: Uuid, + working_dir: PathBuf, + /// Local nodes that are not started yet pending_nodes: PendingNodes, @@ -1286,9 +1364,10 @@ pub struct RunningDataflow { } impl RunningDataflow { - fn new(dataflow_id: Uuid, machine_id: String) -> RunningDataflow { + fn new(dataflow_id: Uuid, machine_id: String, working_dir: PathBuf) -> RunningDataflow { Self { id: dataflow_id, + working_dir, pending_nodes: PendingNodes::new(dataflow_id, machine_id), subscribe_channels: HashMap::new(), drop_channels: HashMap::new(),