diff --git a/.gitignore b/.gitignore index 59491033..fd8cde4d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ # Generated by Cargo # will have compiled files and executables /target/ - +examples/**/*.txt # These are backup files generated by rustfmt **/*.rs.bk diff --git a/Cargo.lock b/Cargo.lock index 0b8b121e..19f5f4d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -528,6 +528,12 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "atomic" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" + [[package]] name = "atomic-waker" version = "1.1.2" @@ -5870,10 +5876,11 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.4.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" dependencies = [ + "atomic", "getrandom", "rand", "serde", @@ -5882,9 +5889,9 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.4.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7e1ba1f333bd65ce3c9f27de592fcbc256dafe3af2717f56d7c87761fbaccf4" +checksum = "7abb14ae1a50dad63eaa768a458ef43d298cd1bd44951677bd10b732a9ba2a2d" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 34fe4fbb..0852cb85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,10 +13,15 @@ members = [ "binaries/coordinator", "binaries/daemon", "binaries/runtime", - "examples/rust-dataflow/*", - "examples/rust-ros2-dataflow/*", - "examples/benchmark/*", - "examples/multiple-daemons/*", + "examples/rust-dataflow/node", + "examples/rust-dataflow/operator", + "examples/rust-dataflow/sink", + "examples/rust-ros2-dataflow/node", + "examples/benchmark/node", + "examples/benchmark/sink", + "examples/multiple-daemons/node", + "examples/multiple-daemons/operator", + "examples/multiple-daemons/sink", "libraries/arrow-convert", "libraries/communication-layer/*", "libraries/core", @@ -87,7 +92,7 @@ dora-tracing = { workspace = true } dora-download = { workspace = true } dunce = "1.0.2" serde_yaml = "0.8.23" -uuid = { version = "1.2.1", features = ["v4", "serde"] } +uuid = { version = "1.7", features = ["v7", "serde"] } tracing = "0.1.36" futures = "0.3.25" tokio-stream = "0.1.11" diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index d8e159b2..d300f58f 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -9,7 +9,7 @@ use aligned_vec::{AVec, ConstAlign}; use arrow::array::Array; use dora_core::{ config::{DataId, NodeId, NodeRunConfig}, - daemon_messages::{DataMessage, DropToken, NodeConfig}, + daemon_messages::{DataMessage, DataflowId, DropToken, NodeConfig}, descriptor::Descriptor, message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters}, }; @@ -33,6 +33,7 @@ pub const ZERO_COPY_THRESHOLD: usize = 4096; pub struct DoraNode { id: NodeId, + dataflow_id: DataflowId, node_config: NodeRunConfig, control_channel: ControlChannel, clock: Arc, @@ -89,6 +90,7 @@ impl DoraNode { let node = Self { id: node_id, + dataflow_id: dataflow_id, node_config: run_config, control_channel, clock, @@ -243,6 +245,10 @@ impl DoraNode { &self.id } + pub fn dataflow_id(&self) -> &DataflowId { + &self.dataflow_id + } + pub fn node_config(&self) -> &NodeRunConfig { &self.node_config } diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 9f8f4e51..69ecbe9b 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -27,7 +27,7 @@ serde_yaml = "0.9.11" webbrowser = "0.8.3" serde_json = "1.0.86" termcolor = "1.1.3" -uuid = { version = "1.2.1", features = ["v4", "serde"] } +uuid = { version = "1.7", features = ["v7", "serde"] } inquire = "0.5.2" communication-layer-request-reply = { workspace = true } notify = "5.1.0" diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index e4731d3c..0d830184 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -15,7 +15,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, path::PathBuf, }; -use uuid::Uuid; +use uuid::{NoContext, Timestamp, Uuid}; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( @@ -27,7 +27,7 @@ pub(super) async fn spawn_dataflow( dataflow.check(&working_dir)?; let nodes = dataflow.resolve_aliases_and_set_defaults(); - let uuid = Uuid::new_v4(); + let uuid = Uuid::new_v7(Timestamp::now(NoContext)); let machines: BTreeSet<_> = nodes.iter().map(|n| n.deploy.machine.clone()).collect(); let machine_listen_ports = machines diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 2d1690a8..8086394e 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -30,7 +30,7 @@ dora-tracing = { workspace = true, optional = true } dora-arrow-convert = { workspace = true } dora-node-api = { workspace = true } serde_yaml = "0.8.23" -uuid = { version = "1.1.2", features = ["v4"] } +uuid = { version = "1.7", features = ["v7"] } futures = "0.3.25" shared-memory-server = { workspace = true } bincode = "1.3.3" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index a66ec83c..9865d0eb 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -21,7 +21,6 @@ use futures_concurrency::stream::Merge; use inter_daemon::InterDaemonConnection; use pending::PendingNodes; use shared_memory_server::ShmemConf; -use std::env::temp_dir; use std::sync::Arc; use std::time::Instant; use std::{ @@ -41,7 +40,7 @@ use tokio::sync::oneshot::Sender; use tokio::sync::{mpsc, oneshot}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tracing::error; -use uuid::Uuid; +use uuid::{NoContext, Timestamp, Uuid}; mod coordinator; mod inter_daemon; @@ -60,6 +59,7 @@ use crate::pending::DataflowStatus; pub struct Daemon { running: HashMap, + working_dir: HashMap, events_tx: mpsc::Sender>, @@ -130,7 +130,7 @@ impl Daemon { let nodes = descriptor.resolve_aliases_and_set_defaults(); let spawn_command = SpawnDataflowNodes { - dataflow_id: Uuid::new_v4(), + dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)), working_dir, nodes, machine_listen_ports: BTreeMap::new(), @@ -213,6 +213,7 @@ impl Daemon { let (dora_events_tx, dora_events_rx) = mpsc::channel(5); let daemon = Self { running: HashMap::new(), + working_dir: HashMap::new(), events_tx: dora_events_tx, coordinator_connection, last_coordinator_heartbeat: Instant::now(), @@ -371,29 +372,43 @@ impl Daemon { dataflow_id, node_id, } => { - tokio::spawn(async move { - let logs = async { - let log_dir = temp_dir(); - - let mut file = - File::open(log_dir.join(log::log_path(&dataflow_id, &node_id))) - .await - .wrap_err("Could not open log file")?; - - let mut contents = vec![]; - file.read_to_end(&mut contents) + match self.working_dir.get(&dataflow_id) { + Some(working_dir) => { + let working_dir = working_dir.clone(); + tokio::spawn(async move { + let logs = async { + let mut file = + File::open(log::log_path(&working_dir, &dataflow_id, &node_id)) + .await + .wrap_err(format!( + "Could not open log file: {:#?}", + log::log_path(&working_dir, &dataflow_id, &node_id) + ))?; + + let mut contents = vec![]; + file.read_to_end(&mut contents) + .await + .wrap_err("Could not read content of log file")?; + Result::, eyre::Report>::Ok(contents) + } .await - .wrap_err("Could not read content of log file")?; - Result::, eyre::Report>::Ok(contents) + .map_err(|err| format!("{err:?}")); + let _ = reply_tx + .send(Some(DaemonCoordinatorReply::Logs(logs))) + .map_err(|_| { + error!("could not send logs reply from daemon to coordinator") + }); + }); } - .await - .map_err(|err| format!("{err:?}")); - let _ = reply_tx - .send(Some(DaemonCoordinatorReply::Logs(logs))) - .map_err(|_| { - error!("could not send logs reply from daemon to coordinator") + None => { + tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)"); + let _ = reply_tx.send(None).map_err(|_| { + error!( + "could not send `AllNodesReady` reply from daemon to coordinator" + ) }); - }); + } + } RunStatus::Continue } DaemonCoordinatorEvent::ReloadDataflow { @@ -517,7 +532,10 @@ impl Daemon { ) -> eyre::Result<()> { let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); let dataflow = match self.running.entry(dataflow_id) { - std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow), + std::collections::hash_map::Entry::Vacant(entry) => { + self.working_dir.insert(dataflow_id, working_dir.clone()); + entry.insert(dataflow) + } std::collections::hash_map::Entry::Occupied(_) => { bail!("there is already a running dataflow with ID `{dataflow_id}`") } diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs index 62119994..55368a23 100644 --- a/binaries/daemon/src/log.rs +++ b/binaries/daemon/src/log.rs @@ -1,8 +1,9 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use dora_core::config::NodeId; use uuid::Uuid; -pub fn log_path(dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf { - PathBuf::from(format!("{dataflow_id}-{node_id}.txt")) +pub fn log_path(working_dir: &Path, dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf { + let dataflow_dir = working_dir.join("out").join(dataflow_id.to_string()); + dataflow_dir.join(format!("log_{node_id}.txt")) } diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 7a204bd4..1681c0ad 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -21,8 +21,8 @@ use dora_node_api::{ }; use eyre::WrapErr; use std::{ - env::{consts::EXE_EXTENSION, temp_dir}, - path::Path, + env::consts::EXE_EXTENSION, + path::{Path, PathBuf}, process::Stdio, sync::Arc, }; @@ -217,13 +217,14 @@ pub async fn spawn_node( } }; - let log_dir = temp_dir(); - + let dataflow_dir = PathBuf::from(working_dir.join("out").join(dataflow_id.to_string())); + if !dataflow_dir.exists() { + std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?; + } let (tx, mut rx) = mpsc::channel(10); - let mut file = - File::create(&log_dir.join(log::log_path(&dataflow_id, &node_id).with_extension("txt"))) - .await - .expect("Failed to create log file"); + let mut file = File::create(log::log_path(working_dir, &dataflow_id, &node_id)) + .await + .expect("Failed to create log file"); let mut child_stdout = tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout")); diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 25263f65..fc1f7749 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -14,7 +14,7 @@ serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.9.11" once_cell = "1.13.0" which = "5.0.0" -uuid = { version = "1.2.1", features = ["serde"] } +uuid = { version = "1.7", features = ["serde", "v7"] } dora-message = { workspace = true } tracing = "0.1" serde-with-expand-env = "1.1.0" diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 623ea9c0..8712a511 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -11,7 +11,7 @@ use crate::{ }; use aligned_vec::{AVec, ConstAlign}; use dora_message::{uhlc, Metadata}; -use uuid::Uuid; +use uuid::{NoContext, Timestamp, Uuid}; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct NodeConfig { @@ -178,7 +178,7 @@ pub struct DropToken(Uuid); impl DropToken { pub fn generate() -> Self { - Self(Uuid::new_v4()) + Self(Uuid::new_v7(Timestamp::now(NoContext))) } } diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs index 2924fc4d..a67b8f28 100644 --- a/libraries/extensions/dora-record/src/main.rs +++ b/libraries/extensions/dora-record/src/main.rs @@ -7,18 +7,18 @@ use dora_node_api::{ }, buffer::{OffsetBuffer, ScalarBuffer}, datatypes::{DataType, Field, Schema}, - ipc::writer::FileWriter, + ipc::writer::StreamWriter, record_batch::RecordBatch, }, DoraNode, Event, Metadata, }; use dora_tracing::telemetry::deserialize_to_hashmap; use eyre::{Context, ContextCompat}; -use std::{collections::HashMap, fs::File, sync::Arc}; +use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc}; fn main() -> eyre::Result<()> { - let (_node, mut events) = DoraNode::init_from_env()?; - + let (node, mut events) = DoraNode::init_from_env()?; + let dataflow_id = node.dataflow_id(); let mut writers = HashMap::new(); while let Some(event) = events.recv() { match event { @@ -47,23 +47,29 @@ fn main() -> eyre::Result<()> { field_utc_epoch, field_data, ])); - let file = std::fs::File::create(format!("{id}.arrow")).unwrap(); + let dataflow_dir = PathBuf::from("out").join(dataflow_id.to_string()); + if !dataflow_dir.exists() { + std::fs::create_dir_all(&dataflow_dir) + .context("could not create dataflow_dir")?; + } + let file = std::fs::File::create(dataflow_dir.join(format!("{id}.arrow"))) + .context("Couldn't create write file")?; - let writer = FileWriter::try_new(file, &schema).unwrap(); + let writer = StreamWriter::try_new(file, &schema).unwrap(); let mut writer = writer; - write_event(&mut writer, data.into(), &metadata) + write_event(&mut writer, data.into(), &metadata, schema.clone()) .context("could not write first record data")?; - writers.insert(id.clone(), writer); + writers.insert(id.clone(), (writer, schema)); } - Some(writer) => { - write_event(writer, data.into(), &metadata) + Some((writer, schema)) => { + write_event(writer, data.into(), &metadata, schema.clone()) .context("could not write record data")?; } }; } Event::InputClosed { id } => match writers.remove(&id) { None => {} - Some(mut writer) => writer.finish().context("Could not finish arrow file")?, + Some((mut writer, _)) => writer.finish().context("Could not finish arrow file")?, }, _ => {} } @@ -71,7 +77,7 @@ fn main() -> eyre::Result<()> { let result: eyre::Result> = writers .iter_mut() - .map(|(_, writer)| -> eyre::Result<()> { + .map(|(_, (writer, _))| -> eyre::Result<()> { writer .finish() .context("Could not finish writing arrow file")?; @@ -85,9 +91,10 @@ fn main() -> eyre::Result<()> { /// Write a row of data into the writer fn write_event( - writer: &mut FileWriter, + writer: &mut StreamWriter, data: Arc, metadata: &Metadata, + schema: Arc, ) -> eyre::Result<()> { let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, data.len() as i32])); let field = Arc::new(Field::new("item", data.data_type().clone(), true)); @@ -119,7 +126,7 @@ fn write_event( let span_id_array = make_array(span_id_array.into()); let record = RecordBatch::try_new( - writer.schema().clone(), + schema, vec![ trace_id_array, span_id_array,