| @@ -2,7 +2,7 @@ use coordinator::CoordinatorEvent; | |||
| use dora_core::config::{Input, OperatorId}; | |||
| use dora_core::coordinator_messages::CoordinatorRequest; | |||
| use dora_core::daemon_messages::{Data, InterDaemonEvent, Timestamped}; | |||
| use dora_core::message::uhlc::{self, HLC}; | |||
| use dora_core::message::uhlc::{self, Timestamp, HLC}; | |||
| use dora_core::message::MetadataParameters; | |||
| use dora_core::{ | |||
| config::{DataId, InputMapping, NodeId}, | |||
| @@ -32,7 +32,7 @@ use std::{ | |||
| }; | |||
| use tcp_utils::tcp_send; | |||
| use tokio::fs::File; | |||
| use tokio::io::AsyncReadExt; | |||
| use tokio::io::{AsyncReadExt, AsyncWriteExt}; | |||
| use tokio::net::TcpStream; | |||
| use tokio::sync::mpsc::UnboundedSender; | |||
| use tokio::sync::oneshot::Sender; | |||
| @@ -250,6 +250,14 @@ impl Daemon { | |||
| tracing::warn!("failed to update HLC with incoming event timestamp: {err}"); | |||
| } | |||
| if let Err(err) = self | |||
| .record_event(&inner, timestamp) | |||
| .await | |||
| .context("failed to record event") | |||
| { | |||
| tracing::warn!("{err:?}"); | |||
| }; | |||
| match inner { | |||
| Event::Coordinator(CoordinatorEvent { event, reply_tx }) => { | |||
| let status = self.handle_coordinator_event(event, reply_tx).await?; | |||
| @@ -300,6 +308,73 @@ impl Daemon { | |||
| Ok(self.dataflow_errors) | |||
| } | |||
| async fn record_event(&mut self, event: &Event, timestamp: Timestamp) -> eyre::Result<()> { | |||
| let dataflow_id = match &event { | |||
| Event::Node { dataflow_id, .. } => dataflow_id, | |||
| Event::Coordinator(CoordinatorEvent { event, .. }) => match event { | |||
| DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { dataflow_id, .. }) => { | |||
| dataflow_id | |||
| } | |||
| DaemonCoordinatorEvent::AllNodesReady { dataflow_id, .. } => dataflow_id, | |||
| DaemonCoordinatorEvent::StopDataflow { dataflow_id } => dataflow_id, | |||
| DaemonCoordinatorEvent::ReloadDataflow { dataflow_id, .. } => dataflow_id, | |||
| DaemonCoordinatorEvent::Logs { dataflow_id, .. } => dataflow_id, | |||
| DaemonCoordinatorEvent::Destroy | DaemonCoordinatorEvent::Heartbeat => { | |||
| return Ok(()) | |||
| } | |||
| }, | |||
| Event::Daemon(event) => match event { | |||
| InterDaemonEvent::Output { dataflow_id, .. } => dataflow_id, | |||
| InterDaemonEvent::InputsClosed { dataflow_id, .. } => dataflow_id, | |||
| }, | |||
| Event::Dora(event) => match event { | |||
| DoraEvent::Timer { dataflow_id, .. } => dataflow_id, | |||
| DoraEvent::SpawnedNodeResult { dataflow_id, .. } => dataflow_id, | |||
| }, | |||
| Event::HeartbeatInterval | Event::CtrlC => return Ok(()), | |||
| }; | |||
| let Some(dataflow) = self.running.get_mut(dataflow_id) else { | |||
| bail!("no running dataflow with id `{dataflow_id}`"); | |||
| }; | |||
| if !dataflow.record { | |||
| // event recording is disabled | |||
| return Ok(()); | |||
| } | |||
| let rendered = format!("at {timestamp}: {event:?}\n\n"); | |||
| let record_folder = dataflow | |||
| .working_dir | |||
| .join("record") | |||
| .join(dataflow.id.to_string()); | |||
| tokio::fs::create_dir_all(&record_folder) | |||
| .await | |||
| .wrap_err_with(|| { | |||
| format!( | |||
| "failed to create record folder at {}", | |||
| record_folder.display() | |||
| ) | |||
| })?; | |||
| let record_file_path = record_folder.join("events.txt"); | |||
| let mut record_file = tokio::fs::OpenOptions::new() | |||
| .create(true) | |||
| .append(true) | |||
| .open(&record_file_path) | |||
| .await | |||
| .wrap_err_with(|| { | |||
| format!( | |||
| "failed to open record file at {}", | |||
| record_file_path.display() | |||
| ) | |||
| })?; | |||
| record_file | |||
| .write_all(rendered.as_bytes()) | |||
| .await | |||
| .context("failed to write event to record file")?; | |||
| Ok(()) | |||
| } | |||
| async fn handle_coordinator_event( | |||
| &mut self, | |||
| event: DaemonCoordinatorEvent, | |||
| @@ -510,7 +585,8 @@ impl Daemon { | |||
| dataflow_descriptor: Descriptor, | |||
| record: bool, | |||
| ) -> eyre::Result<()> { | |||
| let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); | |||
| let dataflow = | |||
| RunningDataflow::new(dataflow_id, self.machine_id.clone(), working_dir.clone()); | |||
| let dataflow = match self.running.entry(dataflow_id) { | |||
| std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow), | |||
| std::collections::hash_map::Entry::Occupied(_) => { | |||
| @@ -1258,6 +1334,8 @@ fn close_input( | |||
| pub struct RunningDataflow { | |||
| id: Uuid, | |||
| working_dir: PathBuf, | |||
| /// Local nodes that are not started yet | |||
| pending_nodes: PendingNodes, | |||
| @@ -1286,9 +1364,10 @@ pub struct RunningDataflow { | |||
| } | |||
| impl RunningDataflow { | |||
| fn new(dataflow_id: Uuid, machine_id: String) -> RunningDataflow { | |||
| fn new(dataflow_id: Uuid, machine_id: String, working_dir: PathBuf) -> RunningDataflow { | |||
| Self { | |||
| id: dataflow_id, | |||
| working_dir, | |||
| pending_nodes: PendingNodes::new(dataflow_id, machine_id), | |||
| subscribe_channels: HashMap::new(), | |||
| drop_channels: HashMap::new(), | |||