| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
ca180b78dc
|
Start implementing recording to rosbag files | 2 years ago |
|
|
b0e51c44cb
|
Record events as json | 2 years ago |
|
|
f0dd167287
|
Refactor: Move event recording to separate `Recorder` type | 2 years ago |
|
|
3cf33f72a6
|
Include machine ID in event record file
In case multiple daemons are running in the same working directory. |
2 years ago |
|
|
e29c58d403
|
Save daemon events to text file if recording is enabled | 2 years ago |
|
|
ca917be94e
|
Add option to record events when spawning dataflows
This commit does not implement event logging yet. |
2 years ago |
| @@ -538,6 +538,12 @@ version = "1.1.0" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" | |||
| [[package]] | |||
| name = "base16ct" | |||
| version = "0.1.1" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" | |||
| [[package]] | |||
| name = "base64" | |||
| version = "0.13.1" | |||
| @@ -720,6 +726,27 @@ version = "1.2.0" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "38fcc2979eff34a4b84e1cf9a1e3da42a7d44b3b690a40cdcb23e3d556cfb2e5" | |||
| [[package]] | |||
| name = "bzip2" | |||
| version = "0.4.4" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" | |||
| dependencies = [ | |||
| "bzip2-sys", | |||
| "libc", | |||
| ] | |||
| [[package]] | |||
| name = "bzip2-sys" | |||
| version = "0.1.11+1.0.8" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" | |||
| dependencies = [ | |||
| "cc", | |||
| "libc", | |||
| "pkg-config", | |||
| ] | |||
| [[package]] | |||
| name = "cache-padded" | |||
| version = "1.3.0" | |||
| @@ -1348,6 +1375,7 @@ dependencies = [ | |||
| "flume", | |||
| "futures", | |||
| "futures-concurrency", | |||
| "rosbag", | |||
| "serde", | |||
| "serde_json", | |||
| "serde_yaml 0.8.26", | |||
| @@ -2687,6 +2715,26 @@ dependencies = [ | |||
| "value-bag", | |||
| ] | |||
| [[package]] | |||
| name = "lz4" | |||
| version = "1.24.0" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1" | |||
| dependencies = [ | |||
| "libc", | |||
| "lz4-sys", | |||
| ] | |||
| [[package]] | |||
| name = "lz4-sys" | |||
| version = "1.9.4" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" | |||
| dependencies = [ | |||
| "cc", | |||
| "libc", | |||
| ] | |||
| [[package]] | |||
| name = "macro_rules_attribute" | |||
| version = "0.1.3" | |||
| @@ -2727,6 +2775,15 @@ version = "2.5.0" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" | |||
| [[package]] | |||
| name = "memmap2" | |||
| version = "0.5.10" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "83faa42c0a078c393f6b29d5db232d8be22776a891f8f56e5284faee4a20b327" | |||
| dependencies = [ | |||
| "libc", | |||
| ] | |||
| [[package]] | |||
| name = "memoffset" | |||
| version = "0.6.5" | |||
| @@ -3981,6 +4038,20 @@ dependencies = [ | |||
| "cache-padded", | |||
| ] | |||
| [[package]] | |||
| name = "rosbag" | |||
| version = "0.6.1" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "4c470a1a13c7daccdf26cf9b1ee22da130ba39541e384b8054df9537b86b2961" | |||
| dependencies = [ | |||
| "base16ct", | |||
| "byteorder", | |||
| "bzip2", | |||
| "log", | |||
| "lz4", | |||
| "memmap2", | |||
| ] | |||
| [[package]] | |||
| name = "rsa" | |||
| version = "0.7.2" | |||
| @@ -75,6 +75,9 @@ enum Command { | |||
| attach: bool, | |||
| #[clap(long, action)] | |||
| hot_reload: bool, | |||
| /// Whether the events of this dataflow should be recorded. | |||
| #[clap(long, action)] | |||
| record_events: bool, | |||
| }, | |||
| /// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows. | |||
| Stop { | |||
| @@ -180,6 +183,7 @@ fn run() -> eyre::Result<()> { | |||
| name, | |||
| attach, | |||
| hot_reload, | |||
| record_events, | |||
| } => { | |||
| let dataflow_descriptor = | |||
| Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; | |||
| @@ -198,6 +202,7 @@ fn run() -> eyre::Result<()> { | |||
| dataflow_descriptor.clone(), | |||
| name, | |||
| working_dir, | |||
| record_events, | |||
| &mut *session, | |||
| )?; | |||
| @@ -236,6 +241,7 @@ fn start_dataflow( | |||
| dataflow: Descriptor, | |||
| name: Option<String>, | |||
| local_working_dir: PathBuf, | |||
| record_events: bool, | |||
| session: &mut TcpRequestReplyConnection, | |||
| ) -> Result<Uuid, eyre::ErrReport> { | |||
| let reply_raw = session | |||
| @@ -244,6 +250,7 @@ fn start_dataflow( | |||
| dataflow, | |||
| name, | |||
| local_working_dir, | |||
| record_events, | |||
| }) | |||
| .unwrap(), | |||
| ) | |||
| @@ -323,6 +323,7 @@ async fn start_inner( | |||
| dataflow, | |||
| name, | |||
| local_working_dir, | |||
| record_events, | |||
| } => { | |||
| let name = name.or_else(|| names::Generator::default().next()); | |||
| @@ -342,6 +343,7 @@ async fn start_inner( | |||
| name, | |||
| &mut daemon_connections, | |||
| &clock, | |||
| record_events, | |||
| ) | |||
| .await?; | |||
| Ok(dataflow) | |||
| @@ -842,12 +844,20 @@ async fn start_dataflow( | |||
| name: Option<String>, | |||
| daemon_connections: &mut HashMap<String, DaemonConnection>, | |||
| clock: &HLC, | |||
| record_events: bool, | |||
| ) -> eyre::Result<RunningDataflow> { | |||
| let SpawnedDataflow { | |||
| uuid, | |||
| machines, | |||
| nodes, | |||
| } = spawn_dataflow(dataflow, working_dir, daemon_connections, clock).await?; | |||
| } = spawn_dataflow( | |||
| dataflow, | |||
| working_dir, | |||
| daemon_connections, | |||
| clock, | |||
| record_events, | |||
| ) | |||
| .await?; | |||
| Ok(RunningDataflow { | |||
| uuid, | |||
| name, | |||
| @@ -23,6 +23,7 @@ pub(super) async fn spawn_dataflow( | |||
| working_dir: PathBuf, | |||
| daemon_connections: &mut HashMap<String, DaemonConnection>, | |||
| clock: &HLC, | |||
| record_events: bool, | |||
| ) -> eyre::Result<SpawnedDataflow> { | |||
| dataflow.check(&working_dir)?; | |||
| @@ -46,6 +47,7 @@ pub(super) async fn spawn_dataflow( | |||
| nodes: nodes.clone(), | |||
| machine_listen_ports, | |||
| dataflow_descriptor: dataflow, | |||
| record: record_events, | |||
| }; | |||
| let message = serde_json::to_vec(&Timestamped { | |||
| inner: DaemonCoordinatorEvent::Spawn(spawn_command), | |||
| @@ -37,3 +37,4 @@ shared-memory-server = { workspace = true } | |||
| ctrlc = "3.2.5" | |||
| bincode = "1.3.3" | |||
| async-trait = "0.1.64" | |||
| rosbag = "0.6.1" | |||
| @@ -15,9 +15,11 @@ use tokio::{ | |||
| }; | |||
| use tokio_stream::{wrappers::ReceiverStream, Stream}; | |||
| #[derive(Debug)] | |||
| #[derive(Debug, serde::Serialize)] | |||
| pub struct CoordinatorEvent { | |||
| #[serde(flatten)] | |||
| pub event: DaemonCoordinatorEvent, | |||
| #[serde(skip)] | |||
| pub reply_tx: oneshot::Sender<Option<DaemonCoordinatorReply>>, | |||
| } | |||
| @@ -2,7 +2,7 @@ use coordinator::CoordinatorEvent; | |||
| use dora_core::config::{Input, OperatorId}; | |||
| use dora_core::coordinator_messages::CoordinatorRequest; | |||
| use dora_core::daemon_messages::{Data, InterDaemonEvent, Timestamped}; | |||
| use dora_core::message::uhlc::{self, HLC}; | |||
| use dora_core::message::uhlc::{self, Timestamp, HLC}; | |||
| use dora_core::message::MetadataParameters; | |||
| use dora_core::{ | |||
| config::{DataId, InputMapping, NodeId}, | |||
| @@ -18,6 +18,7 @@ use futures::{future, stream, FutureExt, TryFutureExt}; | |||
| use futures_concurrency::stream::Merge; | |||
| use inter_daemon::InterDaemonConnection; | |||
| use pending::PendingNodes; | |||
| use record::Recorder; | |||
| use shared_memory_server::ShmemConf; | |||
| use std::env::temp_dir; | |||
| use std::sync::Arc; | |||
| @@ -46,6 +47,7 @@ mod inter_daemon; | |||
| mod log; | |||
| mod node_communication; | |||
| mod pending; | |||
| mod record; | |||
| mod spawn; | |||
| mod tcp_utils; | |||
| @@ -117,7 +119,7 @@ impl Daemon { | |||
| .map(|_| ()) | |||
| } | |||
| pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> { | |||
| pub async fn run_dataflow(dataflow_path: &Path, record: bool) -> eyre::Result<()> { | |||
| let working_dir = dataflow_path | |||
| .canonicalize() | |||
| .context("failed to canoncialize dataflow path")? | |||
| @@ -135,6 +137,7 @@ impl Daemon { | |||
| nodes, | |||
| machine_listen_ports: BTreeMap::new(), | |||
| dataflow_descriptor: descriptor, | |||
| record, | |||
| }; | |||
| let clock = Arc::new(HLC::default()); | |||
| @@ -249,6 +252,14 @@ impl Daemon { | |||
| tracing::warn!("failed to update HLC with incoming event timestamp: {err}"); | |||
| } | |||
| if let Err(err) = self | |||
| .record_event(&inner, timestamp) | |||
| .await | |||
| .context("failed to record event") | |||
| { | |||
| tracing::warn!("{err:?}"); | |||
| }; | |||
| match inner { | |||
| Event::Coordinator(CoordinatorEvent { event, reply_tx }) => { | |||
| let status = self.handle_coordinator_event(event, reply_tx).await?; | |||
| @@ -299,6 +310,41 @@ impl Daemon { | |||
| Ok(self.dataflow_errors) | |||
| } | |||
| async fn record_event(&mut self, event: &Event, timestamp: Timestamp) -> eyre::Result<()> { | |||
| let dataflow_id = match &event { | |||
| Event::Node { dataflow_id, .. } => dataflow_id, | |||
| Event::Coordinator(CoordinatorEvent { event, .. }) => match event { | |||
| DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { dataflow_id, .. }) => { | |||
| dataflow_id | |||
| } | |||
| DaemonCoordinatorEvent::AllNodesReady { dataflow_id, .. } => dataflow_id, | |||
| DaemonCoordinatorEvent::StopDataflow { dataflow_id } => dataflow_id, | |||
| DaemonCoordinatorEvent::ReloadDataflow { dataflow_id, .. } => dataflow_id, | |||
| DaemonCoordinatorEvent::Logs { dataflow_id, .. } => dataflow_id, | |||
| DaemonCoordinatorEvent::Destroy | DaemonCoordinatorEvent::Heartbeat => { | |||
| return Ok(()) | |||
| } | |||
| }, | |||
| Event::Daemon(event) => match event { | |||
| InterDaemonEvent::Output { dataflow_id, .. } => dataflow_id, | |||
| InterDaemonEvent::InputsClosed { dataflow_id, .. } => dataflow_id, | |||
| }, | |||
| Event::Dora(event) => match event { | |||
| DoraEvent::Timer { dataflow_id, .. } => dataflow_id, | |||
| DoraEvent::SpawnedNodeResult { dataflow_id, .. } => dataflow_id, | |||
| }, | |||
| Event::HeartbeatInterval | Event::CtrlC => return Ok(()), | |||
| }; | |||
| let Some(dataflow) = self.running.get_mut(dataflow_id) else { | |||
| bail!("no running dataflow with id `{dataflow_id}`"); | |||
| }; | |||
| if let Some(recorder) = &mut dataflow.recorder { | |||
| recorder.record(event, timestamp).await?; | |||
| } | |||
| Ok(()) | |||
| } | |||
| async fn handle_coordinator_event( | |||
| &mut self, | |||
| event: DaemonCoordinatorEvent, | |||
| @@ -311,6 +357,7 @@ impl Daemon { | |||
| nodes, | |||
| machine_listen_ports, | |||
| dataflow_descriptor, | |||
| record, | |||
| }) => { | |||
| match dataflow_descriptor.communication.remote { | |||
| dora_core::config::RemoteCommunicationConfig::Tcp => {} | |||
| @@ -329,7 +376,7 @@ impl Daemon { | |||
| } | |||
| let result = self | |||
| .spawn_dataflow(dataflow_id, working_dir, nodes, dataflow_descriptor) | |||
| .spawn_dataflow(dataflow_id, working_dir, nodes, dataflow_descriptor, record) | |||
| .await; | |||
| if let Err(err) = &result { | |||
| tracing::error!("{err:?}"); | |||
| @@ -506,6 +553,7 @@ impl Daemon { | |||
| working_dir: PathBuf, | |||
| nodes: Vec<ResolvedNode>, | |||
| dataflow_descriptor: Descriptor, | |||
| record: bool, | |||
| ) -> eyre::Result<()> { | |||
| let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); | |||
| let dataflow = match self.running.entry(dataflow_id) { | |||
| @@ -515,6 +563,12 @@ impl Daemon { | |||
| } | |||
| }; | |||
| if record { | |||
| dataflow.recorder = Some( | |||
| Recorder::new(working_dir.clone(), self.machine_id.clone(), dataflow_id).await?, | |||
| ); | |||
| } | |||
| for node in nodes { | |||
| let local = node.deploy.machine == self.machine_id; | |||
| @@ -887,37 +941,47 @@ impl Daemon { | |||
| dataflow.running_nodes.remove(node_id); | |||
| if dataflow.running_nodes.is_empty() { | |||
| let result = match self.dataflow_errors.get(&dataflow.id) { | |||
| None => Ok(()), | |||
| Some(errors) => { | |||
| let mut output = "some nodes failed:".to_owned(); | |||
| for (node, error) in errors { | |||
| use std::fmt::Write; | |||
| write!(&mut output, "\n - {node}: {error}").unwrap(); | |||
| } | |||
| Err(output) | |||
| self.handle_dataflow_finished(dataflow_id).await?; | |||
| } | |||
| Ok(()) | |||
| } | |||
| async fn handle_dataflow_finished(&mut self, dataflow_id: Uuid) -> Result<(), eyre::ErrReport> { | |||
| let Some(dataflow) = self.running.remove(&dataflow_id) else { | |||
| return Ok(()) | |||
| }; | |||
| if let Some(recorder) = dataflow.recorder { | |||
| recorder.finish().await?; | |||
| } | |||
| let result = match self.dataflow_errors.get(&dataflow.id) { | |||
| None => Ok(()), | |||
| Some(errors) => { | |||
| let mut output = "some nodes failed:".to_owned(); | |||
| for (node, error) in errors { | |||
| use std::fmt::Write; | |||
| write!(&mut output, "\n - {node}: {error}").unwrap(); | |||
| } | |||
| }; | |||
| tracing::info!( | |||
| "Dataflow `{dataflow_id}` finished on machine `{}`", | |||
| self.machine_id | |||
| ); | |||
| if let Some(connection) = &mut self.coordinator_connection { | |||
| let msg = serde_json::to_vec(&Timestamped { | |||
| inner: CoordinatorRequest::Event { | |||
| machine_id: self.machine_id.clone(), | |||
| event: DaemonEvent::AllNodesFinished { | |||
| dataflow_id, | |||
| result, | |||
| }, | |||
| }, | |||
| timestamp: self.clock.new_timestamp(), | |||
| })?; | |||
| tcp_send(connection, &msg) | |||
| .await | |||
| .wrap_err("failed to report dataflow finish to dora-coordinator")?; | |||
| Err(output) | |||
| } | |||
| self.running.remove(&dataflow_id); | |||
| }; | |||
| tracing::info!( | |||
| "Dataflow `{dataflow_id}` finished on machine `{}`", | |||
| self.machine_id | |||
| ); | |||
| if let Some(connection) = &mut self.coordinator_connection { | |||
| let msg = serde_json::to_vec(&Timestamped { | |||
| inner: CoordinatorRequest::Event { | |||
| machine_id: self.machine_id.clone(), | |||
| event: DaemonEvent::AllNodesFinished { | |||
| dataflow_id, | |||
| result, | |||
| }, | |||
| }, | |||
| timestamp: self.clock.new_timestamp(), | |||
| })?; | |||
| tcp_send(connection, &msg) | |||
| .await | |||
| .wrap_err("failed to report dataflow finish to dora-coordinator")?; | |||
| } | |||
| Ok(()) | |||
| } | |||
| @@ -1253,6 +1317,7 @@ fn close_input( | |||
| pub struct RunningDataflow { | |||
| id: Uuid, | |||
| /// Local nodes that are not started yet | |||
| pending_nodes: PendingNodes, | |||
| @@ -1275,6 +1340,9 @@ pub struct RunningDataflow { | |||
| /// | |||
| /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. | |||
| empty_set: BTreeSet<DataId>, | |||
| /// Whether the events of this dataflow should be recorded and saved to disk. | |||
| recorder: Option<Recorder>, | |||
| } | |||
| impl RunningDataflow { | |||
| @@ -1293,6 +1361,7 @@ impl RunningDataflow { | |||
| _timer_handles: Vec::new(), | |||
| stop_sent: false, | |||
| empty_set: BTreeSet::new(), | |||
| recorder: None, | |||
| } | |||
| } | |||
| @@ -1404,7 +1473,7 @@ struct DropTokenInformation { | |||
| pending_nodes: BTreeSet<NodeId>, | |||
| } | |||
| #[derive(Debug)] | |||
| #[derive(Debug, serde::Serialize)] | |||
| pub enum Event { | |||
| Node { | |||
| dataflow_id: DataflowId, | |||
| @@ -1424,21 +1493,27 @@ impl From<DoraEvent> for Event { | |||
| } | |||
| } | |||
| #[derive(Debug)] | |||
| #[derive(Debug, serde::Serialize)] | |||
| pub enum DaemonNodeEvent { | |||
| OutputsDone { | |||
| #[serde(skip)] | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| }, | |||
| Subscribe { | |||
| #[serde(skip)] | |||
| event_sender: UnboundedSender<Timestamped<daemon_messages::NodeEvent>>, | |||
| #[serde(skip)] | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| }, | |||
| SubscribeDrop { | |||
| #[serde(skip)] | |||
| event_sender: UnboundedSender<Timestamped<daemon_messages::NodeDropEvent>>, | |||
| #[serde(skip)] | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| }, | |||
| CloseOutputs { | |||
| outputs: Vec<dora_core::config::DataId>, | |||
| #[serde(skip)] | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| }, | |||
| SendOut { | |||
| @@ -1450,11 +1525,12 @@ pub enum DaemonNodeEvent { | |||
| tokens: Vec<DropToken>, | |||
| }, | |||
| EventStreamDropped { | |||
| #[serde(skip)] | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| }, | |||
| } | |||
| #[derive(Debug)] | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||
| pub enum DoraEvent { | |||
| Timer { | |||
| dataflow_id: DataflowId, | |||
| @@ -1468,10 +1544,10 @@ pub enum DoraEvent { | |||
| }, | |||
| } | |||
| #[derive(Debug)] | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||
| pub enum NodeExitStatus { | |||
| Success, | |||
| IoError(io::Error), | |||
| IoError(String), | |||
| ExitCode(i32), | |||
| Signal(i32), | |||
| Unknown, | |||
| @@ -1496,7 +1572,7 @@ impl From<Result<std::process::ExitStatus, io::Error>> for NodeExitStatus { | |||
| Self::Unknown | |||
| } | |||
| } | |||
| Err(err) => Self::IoError(err), | |||
| Err(err) => Self::IoError(err.to_string()), | |||
| } | |||
| } | |||
| } | |||
| @@ -28,6 +28,9 @@ pub struct Args { | |||
| #[clap(long)] | |||
| pub run_dora_runtime: bool, | |||
| #[clap(long)] | |||
| pub record_events: bool, | |||
| } | |||
| #[tokio::main] | |||
| @@ -43,6 +46,7 @@ async fn run() -> eyre::Result<()> { | |||
| machine_id, | |||
| coordinator_addr, | |||
| run_dora_runtime, | |||
| record_events, | |||
| } = clap::Parser::parse(); | |||
| if run_dora_runtime { | |||
| @@ -80,7 +84,7 @@ async fn run() -> eyre::Result<()> { | |||
| Some(dataflow_path) => { | |||
| tracing::info!("Starting dataflow `{}`", dataflow_path.display()); | |||
| Daemon::run_dataflow(&dataflow_path).await | |||
| Daemon::run_dataflow(&dataflow_path, record_events).await | |||
| } | |||
| None => { | |||
| Daemon::run( | |||
| @@ -0,0 +1,46 @@ | |||
| use std::path::Path; | |||
| use dora_core::message::uhlc::Timestamp; | |||
| use eyre::Context; | |||
| use tokio::{fs::File, io::AsyncWriteExt}; | |||
| use crate::Event; | |||
| pub struct JsonFile { | |||
| file: File, | |||
| } | |||
| impl JsonFile { | |||
| pub async fn new(path: &Path) -> eyre::Result<Self> { | |||
| let file = tokio::fs::OpenOptions::new() | |||
| .create(true) | |||
| .append(true) | |||
| .open(&path) | |||
| .await | |||
| .wrap_err_with(|| format!("failed to open record file at {}", path.display()))?; | |||
| Ok(Self { file }) | |||
| } | |||
| pub async fn record(&mut self, timestamp: Timestamp, event: &Event) -> eyre::Result<()> { | |||
| let json = format(timestamp, event)?; | |||
| self.file | |||
| .write_all(json.as_bytes()) | |||
| .await | |||
| .context("failed to write event to record file")?; | |||
| Ok(()) | |||
| } | |||
| } | |||
| fn format( | |||
| timestamp: dora_core::message::uhlc::Timestamp, | |||
| event: &crate::Event, | |||
| ) -> eyre::Result<String> { | |||
| let entry = RecordEntry { timestamp, event }; | |||
| serde_json::to_string(&entry).context("failed to serialize record entry") | |||
| } | |||
| #[derive(Debug, serde::Serialize)] | |||
| struct RecordEntry<'a> { | |||
| timestamp: Timestamp, | |||
| event: &'a Event, | |||
| } | |||
| @@ -0,0 +1,63 @@ | |||
| use std::path::{Path, PathBuf}; | |||
| use dora_core::{daemon_messages::DataflowId, message::uhlc::Timestamp}; | |||
| use eyre::Context; | |||
| use self::{json::JsonFile, rosbag::RosbagFile}; | |||
| mod json; | |||
| mod rosbag; | |||
| pub struct Recorder { | |||
| json_file: JsonFile, | |||
| rosbag_file: RosbagFile, | |||
| } | |||
| impl Recorder { | |||
| pub async fn new( | |||
| working_dir: PathBuf, | |||
| machine_id: String, | |||
| dataflow_id: DataflowId, | |||
| ) -> eyre::Result<Self> { | |||
| let record_folder = Self::record_folder(&working_dir, dataflow_id).await?; | |||
| let json_file_path = record_folder.join(format!("events-{}.json", machine_id)); | |||
| let json_file = JsonFile::new(&json_file_path).await?; | |||
| let rosbag_file_path = record_folder.join(format!("events-{}.bag", machine_id)); | |||
| let rosbag_file = RosbagFile::new(&rosbag_file_path).await?; | |||
| Ok(Self { | |||
| json_file, | |||
| rosbag_file, | |||
| }) | |||
| } | |||
| pub async fn record(&mut self, event: &crate::Event, timestamp: Timestamp) -> eyre::Result<()> { | |||
| self.json_file.record(timestamp, event).await?; | |||
| self.rosbag_file.record(timestamp, event).await?; | |||
| Ok(()) | |||
| } | |||
| pub async fn finish(self) -> eyre::Result<()> { | |||
| self.rosbag_file.finish().await?; | |||
| Ok(()) | |||
| } | |||
| async fn record_folder( | |||
| working_dir: &Path, | |||
| dataflow_id: DataflowId, | |||
| ) -> Result<PathBuf, eyre::ErrReport> { | |||
| let record_folder = working_dir.join("record").join(dataflow_id.to_string()); | |||
| tokio::fs::create_dir_all(&record_folder) | |||
| .await | |||
| .wrap_err_with(|| { | |||
| format!( | |||
| "failed to create record folder at {}", | |||
| record_folder.display() | |||
| ) | |||
| })?; | |||
| Ok(record_folder) | |||
| } | |||
| } | |||
| @@ -0,0 +1,90 @@ | |||
| use std::path::Path; | |||
| use dora_core::message::uhlc::Timestamp; | |||
| use eyre::Context; | |||
| use tokio::{ | |||
| fs::File, | |||
| io::{AsyncWrite, AsyncWriteExt}, | |||
| }; | |||
| use crate::Event; | |||
| pub struct RosbagFile { | |||
| file: File, | |||
| record: Record, | |||
| } | |||
| impl RosbagFile { | |||
| pub async fn new(path: &Path) -> eyre::Result<Self> { | |||
| let mut file = tokio::fs::OpenOptions::new() | |||
| .create(true) | |||
| .append(true) | |||
| .open(&path) | |||
| .await | |||
| .wrap_err_with(|| format!("failed to open record file at {}", path.display()))?; | |||
| file.write_all("#ROSBAG V2.0\n".as_bytes()) | |||
| .await | |||
| .context("failed to write rosbag header")?; | |||
| Ok(Self { | |||
| file, | |||
| record: Record { | |||
| header: Vec::new(), | |||
| data: Vec::new(), | |||
| }, | |||
| }) | |||
| } | |||
| pub async fn record(&mut self, timestamp: Timestamp, event: &Event) -> eyre::Result<()> { | |||
| tracing::warn!("rosbag recording is not implemented yet"); | |||
| Ok(()) | |||
| } | |||
| pub async fn finish(mut self) -> eyre::Result<()> { | |||
| self.record.serialize(&mut self.file).await | |||
| } | |||
| } | |||
| struct Record { | |||
| header: Vec<HeaderField>, | |||
| data: Vec<u8>, | |||
| } | |||
| impl Record { | |||
| async fn serialize(&self, writer: &mut (impl AsyncWrite + Unpin)) -> eyre::Result<()> { | |||
| let serialized_header = { | |||
| let mut buf = Vec::new(); | |||
| for field in &self.header { | |||
| field.serialize(&mut buf).await?; | |||
| } | |||
| buf | |||
| }; | |||
| writer | |||
| .write_all(&u32::try_from(serialized_header.len())?.to_le_bytes()) | |||
| .await?; | |||
| writer.write_all(&serialized_header).await?; | |||
| writer | |||
| .write_all(&u32::try_from(self.data.len())?.to_le_bytes()) | |||
| .await?; | |||
| writer.write_all(&self.data).await?; | |||
| Ok(()) | |||
| } | |||
| } | |||
| struct HeaderField { | |||
| name: String, | |||
| value: Vec<u8>, | |||
| } | |||
| impl HeaderField { | |||
| async fn serialize(&self, writer: &mut (impl AsyncWrite + Unpin)) -> eyre::Result<()> { | |||
| let len = self.name.len() + self.value.len() + 5; | |||
| writer.write_all(&u32::try_from(len)?.to_le_bytes()).await?; | |||
| writer.write_all(self.name.as_bytes()).await?; | |||
| writer.write_all(&[b'=']).await?; | |||
| writer.write_all(&self.value).await?; | |||
| Ok(()) | |||
| } | |||
| } | |||
| @@ -26,7 +26,7 @@ async fn main() -> eyre::Result<()> { | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| build_dataflow(dataflow).await?; | |||
| dora_daemon::Daemon::run_dataflow(dataflow).await?; | |||
| dora_daemon::Daemon::run_dataflow(dataflow, false).await?; | |||
| Ok(()) | |||
| } | |||
| @@ -119,7 +119,7 @@ async fn main() -> eyre::Result<()> { | |||
| let dataflow = Path::new("dataflow.yml").to_owned(); | |||
| build_package("dora-runtime").await?; | |||
| dora_daemon::Daemon::run_dataflow(&dataflow).await?; | |||
| dora_daemon::Daemon::run_dataflow(&dataflow, false).await?; | |||
| Ok(()) | |||
| } | |||
| @@ -36,7 +36,7 @@ async fn main() -> eyre::Result<()> { | |||
| build_c_operator().await?; | |||
| let dataflow = Path::new("dataflow.yml").to_owned(); | |||
| dora_daemon::Daemon::run_dataflow(&dataflow).await?; | |||
| dora_daemon::Daemon::run_dataflow(&dataflow, false).await?; | |||
| Ok(()) | |||
| } | |||
| @@ -135,6 +135,7 @@ async fn start_dataflow( | |||
| dataflow: dataflow_descriptor, | |||
| local_working_dir: working_dir, | |||
| name: None, | |||
| record_events: false, | |||
| }, | |||
| reply_sender, | |||
| })) | |||
| @@ -25,7 +25,7 @@ async fn main() -> eyre::Result<()> { | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| build_dataflow(dataflow).await?; | |||
| dora_daemon::Daemon::run_dataflow(dataflow).await?; | |||
| dora_daemon::Daemon::run_dataflow(dataflow, false).await?; | |||
| Ok(()) | |||
| } | |||
| @@ -26,7 +26,7 @@ async fn main() -> eyre::Result<()> { | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| build_dataflow(dataflow).await?; | |||
| dora_daemon::Daemon::run_dataflow(dataflow).await?; | |||
| dora_daemon::Daemon::run_dataflow(dataflow, false).await?; | |||
| Ok(()) | |||
| } | |||
| @@ -259,4 +259,5 @@ pub struct SpawnDataflowNodes { | |||
| pub nodes: Vec<ResolvedNode>, | |||
| pub machine_listen_ports: BTreeMap<String, SocketAddr>, | |||
| pub dataflow_descriptor: Descriptor, | |||
| pub record: bool, | |||
| } | |||
| @@ -27,6 +27,8 @@ pub enum ControlRequest { | |||
| // TODO: remove this once we figure out deploying of node/operator | |||
| // binaries from CLI to coordinator/daemon | |||
| local_working_dir: PathBuf, | |||
| /// Whether the events of this dataflow should be recorded. | |||
| record_events: bool, | |||
| }, | |||
| Reload { | |||
| dataflow_id: Uuid, | |||