diff --git a/Cargo.lock b/Cargo.lock index aa91ce42..5c9c9696 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -538,6 +538,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + [[package]] name = "base64" version = "0.13.1" @@ -720,6 +726,27 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38fcc2979eff34a4b84e1cf9a1e3da42a7d44b3b690a40cdcb23e3d556cfb2e5" +[[package]] +name = "bzip2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cache-padded" version = "1.3.0" @@ -1348,6 +1375,7 @@ dependencies = [ "flume", "futures", "futures-concurrency", + "rosbag", "serde", "serde_json", "serde_yaml 0.8.26", @@ -2687,6 +2715,26 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lz4" +version = "1.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1" +dependencies = [ + "libc", + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "macro_rules_attribute" version = "0.1.3" @@ -2727,6 +2775,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memmap2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83faa42c0a078c393f6b29d5db232d8be22776a891f8f56e5284faee4a20b327" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.6.5" @@ -3981,6 +4038,20 @@ dependencies = [ "cache-padded", ] +[[package]] +name = "rosbag" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c470a1a13c7daccdf26cf9b1ee22da130ba39541e384b8054df9537b86b2961" +dependencies = [ + "base16ct", + "byteorder", + "bzip2", + "log", + "lz4", + "memmap2", +] + [[package]] name = "rsa" version = "0.7.2" diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index c36fae7a..b33053ce 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -37,3 +37,4 @@ shared-memory-server = { workspace = true } ctrlc = "3.2.5" bincode = "1.3.3" async-trait = "0.1.64" +rosbag = "0.6.1" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index fdb59b39..ec48dd67 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -564,11 +564,9 @@ impl Daemon { }; if record { - dataflow.recorder = Some(Recorder::new( - working_dir.clone(), - self.machine_id.clone(), - dataflow_id, - )); + dataflow.recorder = Some( + Recorder::new(working_dir.clone(), self.machine_id.clone(), dataflow_id).await?, + ); } for node in nodes { @@ -943,37 +941,47 @@ impl Daemon { dataflow.running_nodes.remove(node_id); if dataflow.running_nodes.is_empty() { - let result = match self.dataflow_errors.get(&dataflow.id) { - None => Ok(()), - Some(errors) => { - let mut output = "some nodes failed:".to_owned(); - for (node, error) in errors { - use std::fmt::Write; - write!(&mut output, "\n - {node}: {error}").unwrap(); - } - Err(output) + self.handle_dataflow_finished(dataflow_id).await?; + } + Ok(()) + } + + async fn handle_dataflow_finished(&mut self, dataflow_id: Uuid) -> Result<(), eyre::ErrReport> { + let Some(dataflow) = self.running.remove(&dataflow_id) else { + return Ok(()) + }; + if let Some(recorder) = dataflow.recorder { + recorder.finish().await?; + } + let result = match self.dataflow_errors.get(&dataflow.id) { + None => Ok(()), + Some(errors) => { + let mut output = "some nodes failed:".to_owned(); + for (node, error) in errors { + use std::fmt::Write; + write!(&mut output, "\n - {node}: {error}").unwrap(); } - }; - tracing::info!( - "Dataflow `{dataflow_id}` finished on machine `{}`", - self.machine_id - ); - if let Some(connection) = &mut self.coordinator_connection { - let msg = serde_json::to_vec(&Timestamped { - inner: CoordinatorRequest::Event { - machine_id: self.machine_id.clone(), - event: DaemonEvent::AllNodesFinished { - dataflow_id, - result, - }, - }, - timestamp: self.clock.new_timestamp(), - })?; - tcp_send(connection, &msg) - .await - .wrap_err("failed to report dataflow finish to dora-coordinator")?; + Err(output) } - self.running.remove(&dataflow_id); + }; + tracing::info!( + "Dataflow `{dataflow_id}` finished on machine `{}`", + self.machine_id + ); + if let Some(connection) = &mut self.coordinator_connection { + let msg = serde_json::to_vec(&Timestamped { + inner: CoordinatorRequest::Event { + machine_id: self.machine_id.clone(), + event: DaemonEvent::AllNodesFinished { + dataflow_id, + result, + }, + }, + timestamp: self.clock.new_timestamp(), + })?; + tcp_send(connection, &msg) + .await + .wrap_err("failed to report dataflow finish to dora-coordinator")?; } Ok(()) } diff --git a/binaries/daemon/src/record.rs b/binaries/daemon/src/record.rs deleted file mode 100644 index 8d79375c..00000000 --- a/binaries/daemon/src/record.rs +++ /dev/null @@ -1,74 +0,0 @@ -use std::path::PathBuf; - -use dora_core::{daemon_messages::DataflowId, message::uhlc::Timestamp}; -use eyre::Context; -use tokio::io::AsyncWriteExt; - -use crate::Event; - -pub struct Recorder { - working_dir: PathBuf, - machine_id: String, - dataflow_id: DataflowId, -} - -impl Recorder { - pub fn new(working_dir: PathBuf, machine_id: String, dataflow_id: DataflowId) -> Self { - Self { - working_dir, - machine_id, - dataflow_id, - } - } - - pub async fn record(&mut self, event: &crate::Event, timestamp: Timestamp) -> eyre::Result<()> { - let entry = RecordEntry { timestamp, event }; - let rendered = serde_json::to_string(&entry).context("failed to serialize record entry")?; - - let record_folder = self.record_folder().await?; - let record_file_path = record_folder.join(format!("events-{}.json", self.machine_id)); - let mut record_file = tokio::fs::OpenOptions::new() - .create(true) - .append(true) - .open(&record_file_path) - .await - .wrap_err_with(|| { - format!( - "failed to open record file at {}", - record_file_path.display() - ) - })?; - record_file - .write_all(rendered.as_bytes()) - .await - .context("failed to write event to record file")?; - record_file - .write_all("\n".as_bytes()) - .await - .context("failed to write newline to record file")?; - - Ok(()) - } - - async fn record_folder(&mut self) -> Result { - let record_folder = self - .working_dir - .join("record") - .join(self.dataflow_id.to_string()); - tokio::fs::create_dir_all(&record_folder) - .await - .wrap_err_with(|| { - format!( - "failed to create record folder at {}", - record_folder.display() - ) - })?; - Ok(record_folder) - } -} - -#[derive(Debug, serde::Serialize)] -struct RecordEntry<'a> { - timestamp: Timestamp, - event: &'a Event, -} diff --git a/binaries/daemon/src/record/json.rs b/binaries/daemon/src/record/json.rs new file mode 100644 index 00000000..ed014f2c --- /dev/null +++ b/binaries/daemon/src/record/json.rs @@ -0,0 +1,46 @@ +use std::path::Path; + +use dora_core::message::uhlc::Timestamp; +use eyre::Context; +use tokio::{fs::File, io::AsyncWriteExt}; + +use crate::Event; + +pub struct JsonFile { + file: File, +} + +impl JsonFile { + pub async fn new(path: &Path) -> eyre::Result { + let file = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .await + .wrap_err_with(|| format!("failed to open record file at {}", path.display()))?; + Ok(Self { file }) + } + + pub async fn record(&mut self, timestamp: Timestamp, event: &Event) -> eyre::Result<()> { + let json = format(timestamp, event)?; + self.file + .write_all(json.as_bytes()) + .await + .context("failed to write event to record file")?; + Ok(()) + } +} + +fn format( + timestamp: dora_core::message::uhlc::Timestamp, + event: &crate::Event, +) -> eyre::Result { + let entry = RecordEntry { timestamp, event }; + serde_json::to_string(&entry).context("failed to serialize record entry") +} + +#[derive(Debug, serde::Serialize)] +struct RecordEntry<'a> { + timestamp: Timestamp, + event: &'a Event, +} diff --git a/binaries/daemon/src/record/mod.rs b/binaries/daemon/src/record/mod.rs new file mode 100644 index 00000000..03d006d4 --- /dev/null +++ b/binaries/daemon/src/record/mod.rs @@ -0,0 +1,63 @@ +use std::path::{Path, PathBuf}; + +use dora_core::{daemon_messages::DataflowId, message::uhlc::Timestamp}; +use eyre::Context; + +use self::{json::JsonFile, rosbag::RosbagFile}; + +mod json; +mod rosbag; + +pub struct Recorder { + json_file: JsonFile, + rosbag_file: RosbagFile, +} + +impl Recorder { + pub async fn new( + working_dir: PathBuf, + machine_id: String, + dataflow_id: DataflowId, + ) -> eyre::Result { + let record_folder = Self::record_folder(&working_dir, dataflow_id).await?; + + let json_file_path = record_folder.join(format!("events-{}.json", machine_id)); + let json_file = JsonFile::new(&json_file_path).await?; + + let rosbag_file_path = record_folder.join(format!("events-{}.bag", machine_id)); + let rosbag_file = RosbagFile::new(&rosbag_file_path).await?; + + Ok(Self { + json_file, + rosbag_file, + }) + } + + pub async fn record(&mut self, event: &crate::Event, timestamp: Timestamp) -> eyre::Result<()> { + self.json_file.record(timestamp, event).await?; + self.rosbag_file.record(timestamp, event).await?; + + Ok(()) + } + + pub async fn finish(self) -> eyre::Result<()> { + self.rosbag_file.finish().await?; + Ok(()) + } + + async fn record_folder( + working_dir: &Path, + dataflow_id: DataflowId, + ) -> Result { + let record_folder = working_dir.join("record").join(dataflow_id.to_string()); + tokio::fs::create_dir_all(&record_folder) + .await + .wrap_err_with(|| { + format!( + "failed to create record folder at {}", + record_folder.display() + ) + })?; + Ok(record_folder) + } +} diff --git a/binaries/daemon/src/record/rosbag.rs b/binaries/daemon/src/record/rosbag.rs new file mode 100644 index 00000000..0c272eb8 --- /dev/null +++ b/binaries/daemon/src/record/rosbag.rs @@ -0,0 +1,90 @@ +use std::path::Path; + +use dora_core::message::uhlc::Timestamp; +use eyre::Context; +use tokio::{ + fs::File, + io::{AsyncWrite, AsyncWriteExt}, +}; + +use crate::Event; + +pub struct RosbagFile { + file: File, + record: Record, +} + +impl RosbagFile { + pub async fn new(path: &Path) -> eyre::Result { + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .await + .wrap_err_with(|| format!("failed to open record file at {}", path.display()))?; + file.write_all("#ROSBAG V2.0\n".as_bytes()) + .await + .context("failed to write rosbag header")?; + Ok(Self { + file, + record: Record { + header: Vec::new(), + data: Vec::new(), + }, + }) + } + + pub async fn record(&mut self, timestamp: Timestamp, event: &Event) -> eyre::Result<()> { + tracing::warn!("rosbag recording is not implemented yet"); + Ok(()) + } + + pub async fn finish(mut self) -> eyre::Result<()> { + self.record.serialize(&mut self.file).await + } +} + +struct Record { + header: Vec, + data: Vec, +} + +impl Record { + async fn serialize(&self, writer: &mut (impl AsyncWrite + Unpin)) -> eyre::Result<()> { + let serialized_header = { + let mut buf = Vec::new(); + for field in &self.header { + field.serialize(&mut buf).await?; + } + buf + }; + + writer + .write_all(&u32::try_from(serialized_header.len())?.to_le_bytes()) + .await?; + writer.write_all(&serialized_header).await?; + writer + .write_all(&u32::try_from(self.data.len())?.to_le_bytes()) + .await?; + writer.write_all(&self.data).await?; + + Ok(()) + } +} + +struct HeaderField { + name: String, + value: Vec, +} + +impl HeaderField { + async fn serialize(&self, writer: &mut (impl AsyncWrite + Unpin)) -> eyre::Result<()> { + let len = self.name.len() + self.value.len() + 5; + writer.write_all(&u32::try_from(len)?.to_le_bytes()).await?; + writer.write_all(self.name.as_bytes()).await?; + writer.write_all(&[b'=']).await?; + writer.write_all(&self.value).await?; + + Ok(()) + } +}