Browse Source

Merge pull request #429 from dora-rs/out_dir

Send runs artefacts into a dedicated `out` folder
tags/v0.3.3-rc1
Philipp Oppermann GitHub 1 year ago
parent
commit
14bb4fbfb7
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
13 changed files with 112 additions and 67 deletions
  1. +1
    -1
      .gitignore
  2. +11
    -4
      Cargo.lock
  3. +10
    -5
      Cargo.toml
  4. +7
    -1
      apis/rust/node/src/node/mod.rs
  5. +1
    -1
      binaries/cli/Cargo.toml
  6. +2
    -2
      binaries/coordinator/src/run/mod.rs
  7. +1
    -1
      binaries/daemon/Cargo.toml
  8. +42
    -24
      binaries/daemon/src/lib.rs
  9. +4
    -3
      binaries/daemon/src/log.rs
  10. +9
    -8
      binaries/daemon/src/spawn.rs
  11. +1
    -1
      libraries/core/Cargo.toml
  12. +2
    -2
      libraries/core/src/daemon_messages.rs
  13. +21
    -14
      libraries/extensions/dora-record/src/main.rs

+ 1
- 1
.gitignore View File

@@ -1,7 +1,7 @@
# Generated by Cargo # Generated by Cargo
# will have compiled files and executables # will have compiled files and executables
/target/ /target/
examples/**/*.txt
# These are backup files generated by rustfmt # These are backup files generated by rustfmt
**/*.rs.bk **/*.rs.bk




+ 11
- 4
Cargo.lock View File

@@ -528,6 +528,12 @@ dependencies = [
"syn 2.0.48", "syn 2.0.48",
] ]


[[package]]
name = "atomic"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"

[[package]] [[package]]
name = "atomic-waker" name = "atomic-waker"
version = "1.1.2" version = "1.1.2"
@@ -5870,10 +5876,11 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"


[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.4.1"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a"
dependencies = [ dependencies = [
"atomic",
"getrandom", "getrandom",
"rand", "rand",
"serde", "serde",
@@ -5882,9 +5889,9 @@ dependencies = [


[[package]] [[package]]
name = "uuid-macro-internal" name = "uuid-macro-internal"
version = "1.4.1"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7e1ba1f333bd65ce3c9f27de592fcbc256dafe3af2717f56d7c87761fbaccf4"
checksum = "7abb14ae1a50dad63eaa768a458ef43d298cd1bd44951677bd10b732a9ba2a2d"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",


+ 10
- 5
Cargo.toml View File

@@ -13,10 +13,15 @@ members = [
"binaries/coordinator", "binaries/coordinator",
"binaries/daemon", "binaries/daemon",
"binaries/runtime", "binaries/runtime",
"examples/rust-dataflow/*",
"examples/rust-ros2-dataflow/*",
"examples/benchmark/*",
"examples/multiple-daemons/*",
"examples/rust-dataflow/node",
"examples/rust-dataflow/operator",
"examples/rust-dataflow/sink",
"examples/rust-ros2-dataflow/node",
"examples/benchmark/node",
"examples/benchmark/sink",
"examples/multiple-daemons/node",
"examples/multiple-daemons/operator",
"examples/multiple-daemons/sink",
"libraries/arrow-convert", "libraries/arrow-convert",
"libraries/communication-layer/*", "libraries/communication-layer/*",
"libraries/core", "libraries/core",
@@ -87,7 +92,7 @@ dora-tracing = { workspace = true }
dora-download = { workspace = true } dora-download = { workspace = true }
dunce = "1.0.2" dunce = "1.0.2"
serde_yaml = "0.8.23" serde_yaml = "0.8.23"
uuid = { version = "1.2.1", features = ["v4", "serde"] }
uuid = { version = "1.7", features = ["v7", "serde"] }
tracing = "0.1.36" tracing = "0.1.36"
futures = "0.3.25" futures = "0.3.25"
tokio-stream = "0.1.11" tokio-stream = "0.1.11"


+ 7
- 1
apis/rust/node/src/node/mod.rs View File

@@ -9,7 +9,7 @@ use aligned_vec::{AVec, ConstAlign};
use arrow::array::Array; use arrow::array::Array;
use dora_core::{ use dora_core::{
config::{DataId, NodeId, NodeRunConfig}, config::{DataId, NodeId, NodeRunConfig},
daemon_messages::{DataMessage, DropToken, NodeConfig},
daemon_messages::{DataMessage, DataflowId, DropToken, NodeConfig},
descriptor::Descriptor, descriptor::Descriptor,
message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters}, message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters},
}; };
@@ -33,6 +33,7 @@ pub const ZERO_COPY_THRESHOLD: usize = 4096;


pub struct DoraNode { pub struct DoraNode {
id: NodeId, id: NodeId,
dataflow_id: DataflowId,
node_config: NodeRunConfig, node_config: NodeRunConfig,
control_channel: ControlChannel, control_channel: ControlChannel,
clock: Arc<uhlc::HLC>, clock: Arc<uhlc::HLC>,
@@ -89,6 +90,7 @@ impl DoraNode {


let node = Self { let node = Self {
id: node_id, id: node_id,
dataflow_id: dataflow_id,
node_config: run_config, node_config: run_config,
control_channel, control_channel,
clock, clock,
@@ -243,6 +245,10 @@ impl DoraNode {
&self.id &self.id
} }


pub fn dataflow_id(&self) -> &DataflowId {
&self.dataflow_id
}

pub fn node_config(&self) -> &NodeRunConfig { pub fn node_config(&self) -> &NodeRunConfig {
&self.node_config &self.node_config
} }


+ 1
- 1
binaries/cli/Cargo.toml View File

@@ -27,7 +27,7 @@ serde_yaml = "0.9.11"
webbrowser = "0.8.3" webbrowser = "0.8.3"
serde_json = "1.0.86" serde_json = "1.0.86"
termcolor = "1.1.3" termcolor = "1.1.3"
uuid = { version = "1.2.1", features = ["v4", "serde"] }
uuid = { version = "1.7", features = ["v7", "serde"] }
inquire = "0.5.2" inquire = "0.5.2"
communication-layer-request-reply = { workspace = true } communication-layer-request-reply = { workspace = true }
notify = "5.1.0" notify = "5.1.0"


+ 2
- 2
binaries/coordinator/src/run/mod.rs View File

@@ -15,7 +15,7 @@ use std::{
collections::{BTreeMap, BTreeSet, HashMap}, collections::{BTreeMap, BTreeSet, HashMap},
path::PathBuf, path::PathBuf,
}; };
use uuid::Uuid;
use uuid::{NoContext, Timestamp, Uuid};


#[tracing::instrument(skip(daemon_connections, clock))] #[tracing::instrument(skip(daemon_connections, clock))]
pub(super) async fn spawn_dataflow( pub(super) async fn spawn_dataflow(
@@ -27,7 +27,7 @@ pub(super) async fn spawn_dataflow(
dataflow.check(&working_dir)?; dataflow.check(&working_dir)?;


let nodes = dataflow.resolve_aliases_and_set_defaults(); let nodes = dataflow.resolve_aliases_and_set_defaults();
let uuid = Uuid::new_v4();
let uuid = Uuid::new_v7(Timestamp::now(NoContext));


let machines: BTreeSet<_> = nodes.iter().map(|n| n.deploy.machine.clone()).collect(); let machines: BTreeSet<_> = nodes.iter().map(|n| n.deploy.machine.clone()).collect();
let machine_listen_ports = machines let machine_listen_ports = machines


+ 1
- 1
binaries/daemon/Cargo.toml View File

@@ -30,7 +30,7 @@ dora-tracing = { workspace = true, optional = true }
dora-arrow-convert = { workspace = true } dora-arrow-convert = { workspace = true }
dora-node-api = { workspace = true } dora-node-api = { workspace = true }
serde_yaml = "0.8.23" serde_yaml = "0.8.23"
uuid = { version = "1.1.2", features = ["v4"] }
uuid = { version = "1.7", features = ["v7"] }
futures = "0.3.25" futures = "0.3.25"
shared-memory-server = { workspace = true } shared-memory-server = { workspace = true }
bincode = "1.3.3" bincode = "1.3.3"


+ 42
- 24
binaries/daemon/src/lib.rs View File

@@ -21,7 +21,6 @@ use futures_concurrency::stream::Merge;
use inter_daemon::InterDaemonConnection; use inter_daemon::InterDaemonConnection;
use pending::PendingNodes; use pending::PendingNodes;
use shared_memory_server::ShmemConf; use shared_memory_server::ShmemConf;
use std::env::temp_dir;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use std::{ use std::{
@@ -41,7 +40,7 @@ use tokio::sync::oneshot::Sender;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tracing::error; use tracing::error;
use uuid::Uuid;
use uuid::{NoContext, Timestamp, Uuid};


mod coordinator; mod coordinator;
mod inter_daemon; mod inter_daemon;
@@ -60,6 +59,7 @@ use crate::pending::DataflowStatus;


pub struct Daemon { pub struct Daemon {
running: HashMap<DataflowId, RunningDataflow>, running: HashMap<DataflowId, RunningDataflow>,
working_dir: HashMap<DataflowId, PathBuf>,


events_tx: mpsc::Sender<Timestamped<Event>>, events_tx: mpsc::Sender<Timestamped<Event>>,


@@ -130,7 +130,7 @@ impl Daemon {
let nodes = descriptor.resolve_aliases_and_set_defaults(); let nodes = descriptor.resolve_aliases_and_set_defaults();


let spawn_command = SpawnDataflowNodes { let spawn_command = SpawnDataflowNodes {
dataflow_id: Uuid::new_v4(),
dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)),
working_dir, working_dir,
nodes, nodes,
machine_listen_ports: BTreeMap::new(), machine_listen_ports: BTreeMap::new(),
@@ -213,6 +213,7 @@ impl Daemon {
let (dora_events_tx, dora_events_rx) = mpsc::channel(5); let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
let daemon = Self { let daemon = Self {
running: HashMap::new(), running: HashMap::new(),
working_dir: HashMap::new(),
events_tx: dora_events_tx, events_tx: dora_events_tx,
coordinator_connection, coordinator_connection,
last_coordinator_heartbeat: Instant::now(), last_coordinator_heartbeat: Instant::now(),
@@ -371,29 +372,43 @@ impl Daemon {
dataflow_id, dataflow_id,
node_id, node_id,
} => { } => {
tokio::spawn(async move {
let logs = async {
let log_dir = temp_dir();

let mut file =
File::open(log_dir.join(log::log_path(&dataflow_id, &node_id)))
.await
.wrap_err("Could not open log file")?;

let mut contents = vec![];
file.read_to_end(&mut contents)
match self.working_dir.get(&dataflow_id) {
Some(working_dir) => {
let working_dir = working_dir.clone();
tokio::spawn(async move {
let logs = async {
let mut file =
File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
.await
.wrap_err(format!(
"Could not open log file: {:#?}",
log::log_path(&working_dir, &dataflow_id, &node_id)
))?;

let mut contents = vec![];
file.read_to_end(&mut contents)
.await
.wrap_err("Could not read content of log file")?;
Result::<Vec<u8>, eyre::Report>::Ok(contents)
}
.await .await
.wrap_err("Could not read content of log file")?;
Result::<Vec<u8>, eyre::Report>::Ok(contents)
.map_err(|err| format!("{err:?}"));
let _ = reply_tx
.send(Some(DaemonCoordinatorReply::Logs(logs)))
.map_err(|_| {
error!("could not send logs reply from daemon to coordinator")
});
});
} }
.await
.map_err(|err| format!("{err:?}"));
let _ = reply_tx
.send(Some(DaemonCoordinatorReply::Logs(logs)))
.map_err(|_| {
error!("could not send logs reply from daemon to coordinator")
None => {
tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
let _ = reply_tx.send(None).map_err(|_| {
error!(
"could not send `AllNodesReady` reply from daemon to coordinator"
)
}); });
});
}
}
RunStatus::Continue RunStatus::Continue
} }
DaemonCoordinatorEvent::ReloadDataflow { DaemonCoordinatorEvent::ReloadDataflow {
@@ -517,7 +532,10 @@ impl Daemon {
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone());
let dataflow = match self.running.entry(dataflow_id) { let dataflow = match self.running.entry(dataflow_id) {
std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow),
std::collections::hash_map::Entry::Vacant(entry) => {
self.working_dir.insert(dataflow_id, working_dir.clone());
entry.insert(dataflow)
}
std::collections::hash_map::Entry::Occupied(_) => { std::collections::hash_map::Entry::Occupied(_) => {
bail!("there is already a running dataflow with ID `{dataflow_id}`") bail!("there is already a running dataflow with ID `{dataflow_id}`")
} }


+ 4
- 3
binaries/daemon/src/log.rs View File

@@ -1,8 +1,9 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};


use dora_core::config::NodeId; use dora_core::config::NodeId;
use uuid::Uuid; use uuid::Uuid;


pub fn log_path(dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf {
PathBuf::from(format!("{dataflow_id}-{node_id}.txt"))
pub fn log_path(working_dir: &Path, dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf {
let dataflow_dir = working_dir.join("out").join(dataflow_id.to_string());
dataflow_dir.join(format!("log_{node_id}.txt"))
} }

+ 9
- 8
binaries/daemon/src/spawn.rs View File

@@ -21,8 +21,8 @@ use dora_node_api::{
}; };
use eyre::WrapErr; use eyre::WrapErr;
use std::{ use std::{
env::{consts::EXE_EXTENSION, temp_dir},
path::Path,
env::consts::EXE_EXTENSION,
path::{Path, PathBuf},
process::Stdio, process::Stdio,
sync::Arc, sync::Arc,
}; };
@@ -217,13 +217,14 @@ pub async fn spawn_node(
} }
}; };


let log_dir = temp_dir();

let dataflow_dir = PathBuf::from(working_dir.join("out").join(dataflow_id.to_string()));
if !dataflow_dir.exists() {
std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?;
}
let (tx, mut rx) = mpsc::channel(10); let (tx, mut rx) = mpsc::channel(10);
let mut file =
File::create(&log_dir.join(log::log_path(&dataflow_id, &node_id).with_extension("txt")))
.await
.expect("Failed to create log file");
let mut file = File::create(log::log_path(working_dir, &dataflow_id, &node_id))
.await
.expect("Failed to create log file");
let mut child_stdout = let mut child_stdout =
tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout")); tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"));




+ 1
- 1
libraries/core/Cargo.toml View File

@@ -14,7 +14,7 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11" serde_yaml = "0.9.11"
once_cell = "1.13.0" once_cell = "1.13.0"
which = "5.0.0" which = "5.0.0"
uuid = { version = "1.2.1", features = ["serde"] }
uuid = { version = "1.7", features = ["serde", "v7"] }
dora-message = { workspace = true } dora-message = { workspace = true }
tracing = "0.1" tracing = "0.1"
serde-with-expand-env = "1.1.0" serde-with-expand-env = "1.1.0"


+ 2
- 2
libraries/core/src/daemon_messages.rs View File

@@ -11,7 +11,7 @@ use crate::{
}; };
use aligned_vec::{AVec, ConstAlign}; use aligned_vec::{AVec, ConstAlign};
use dora_message::{uhlc, Metadata}; use dora_message::{uhlc, Metadata};
use uuid::Uuid;
use uuid::{NoContext, Timestamp, Uuid};


#[derive(Debug, serde::Serialize, serde::Deserialize)] #[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct NodeConfig { pub struct NodeConfig {
@@ -178,7 +178,7 @@ pub struct DropToken(Uuid);


impl DropToken { impl DropToken {
pub fn generate() -> Self { pub fn generate() -> Self {
Self(Uuid::new_v4())
Self(Uuid::new_v7(Timestamp::now(NoContext)))
} }
} }




+ 21
- 14
libraries/extensions/dora-record/src/main.rs View File

@@ -7,18 +7,18 @@ use dora_node_api::{
}, },
buffer::{OffsetBuffer, ScalarBuffer}, buffer::{OffsetBuffer, ScalarBuffer},
datatypes::{DataType, Field, Schema}, datatypes::{DataType, Field, Schema},
ipc::writer::FileWriter,
ipc::writer::StreamWriter,
record_batch::RecordBatch, record_batch::RecordBatch,
}, },
DoraNode, Event, Metadata, DoraNode, Event, Metadata,
}; };
use dora_tracing::telemetry::deserialize_to_hashmap; use dora_tracing::telemetry::deserialize_to_hashmap;
use eyre::{Context, ContextCompat}; use eyre::{Context, ContextCompat};
use std::{collections::HashMap, fs::File, sync::Arc};
use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc};


fn main() -> eyre::Result<()> { fn main() -> eyre::Result<()> {
let (_node, mut events) = DoraNode::init_from_env()?;
let (node, mut events) = DoraNode::init_from_env()?;
let dataflow_id = node.dataflow_id();
let mut writers = HashMap::new(); let mut writers = HashMap::new();
while let Some(event) = events.recv() { while let Some(event) = events.recv() {
match event { match event {
@@ -47,23 +47,29 @@ fn main() -> eyre::Result<()> {
field_utc_epoch, field_utc_epoch,
field_data, field_data,
])); ]));
let file = std::fs::File::create(format!("{id}.arrow")).unwrap();
let dataflow_dir = PathBuf::from("out").join(dataflow_id.to_string());
if !dataflow_dir.exists() {
std::fs::create_dir_all(&dataflow_dir)
.context("could not create dataflow_dir")?;
}
let file = std::fs::File::create(dataflow_dir.join(format!("{id}.arrow")))
.context("Couldn't create write file")?;


let writer = FileWriter::try_new(file, &schema).unwrap();
let writer = StreamWriter::try_new(file, &schema).unwrap();
let mut writer = writer; let mut writer = writer;
write_event(&mut writer, data.into(), &metadata)
write_event(&mut writer, data.into(), &metadata, schema.clone())
.context("could not write first record data")?; .context("could not write first record data")?;
writers.insert(id.clone(), writer);
writers.insert(id.clone(), (writer, schema));
} }
Some(writer) => {
write_event(writer, data.into(), &metadata)
Some((writer, schema)) => {
write_event(writer, data.into(), &metadata, schema.clone())
.context("could not write record data")?; .context("could not write record data")?;
} }
}; };
} }
Event::InputClosed { id } => match writers.remove(&id) { Event::InputClosed { id } => match writers.remove(&id) {
None => {} None => {}
Some(mut writer) => writer.finish().context("Could not finish arrow file")?,
Some((mut writer, _)) => writer.finish().context("Could not finish arrow file")?,
}, },
_ => {} _ => {}
} }
@@ -71,7 +77,7 @@ fn main() -> eyre::Result<()> {


let result: eyre::Result<Vec<_>> = writers let result: eyre::Result<Vec<_>> = writers
.iter_mut() .iter_mut()
.map(|(_, writer)| -> eyre::Result<()> {
.map(|(_, (writer, _))| -> eyre::Result<()> {
writer writer
.finish() .finish()
.context("Could not finish writing arrow file")?; .context("Could not finish writing arrow file")?;
@@ -85,9 +91,10 @@ fn main() -> eyre::Result<()> {


/// Write a row of data into the writer /// Write a row of data into the writer
fn write_event( fn write_event(
writer: &mut FileWriter<File>,
writer: &mut StreamWriter<File>,
data: Arc<dyn Array>, data: Arc<dyn Array>,
metadata: &Metadata, metadata: &Metadata,
schema: Arc<Schema>,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, data.len() as i32])); let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, data.len() as i32]));
let field = Arc::new(Field::new("item", data.data_type().clone(), true)); let field = Arc::new(Field::new("item", data.data_type().clone(), true));
@@ -119,7 +126,7 @@ fn write_event(
let span_id_array = make_array(span_id_array.into()); let span_id_array = make_array(span_id_array.into());


let record = RecordBatch::try_new( let record = RecordBatch::try_new(
writer.schema().clone(),
schema,
vec![ vec![
trace_id_array, trace_id_array,
span_id_array, span_id_array,


Loading…
Cancel
Save