Compare commits

...

6 Commits

Author SHA1 Message Date
  Philipp Oppermann ca180b78dc
Start implementing recording to rosbag files 2 years ago
  Philipp Oppermann b0e51c44cb
Record events as json 2 years ago
  Philipp Oppermann f0dd167287
Refactor: Move event recording to separate `Recorder` type 2 years ago
  Philipp Oppermann 3cf33f72a6
Include machine ID in event record file 2 years ago
  Philipp Oppermann e29c58d403
Save daemon events to text file if recording is enabled 2 years ago
  Philipp Oppermann ca917be94e
Add option to record events when spawning dataflows 2 years ago
19 changed files with 422 additions and 46 deletions
Split View
  1. +71
    -0
      Cargo.lock
  2. +7
    -0
      binaries/cli/src/main.rs
  3. +11
    -1
      binaries/coordinator/src/lib.rs
  4. +2
    -0
      binaries/coordinator/src/run/mod.rs
  5. +1
    -0
      binaries/daemon/Cargo.toml
  6. +3
    -1
      binaries/daemon/src/coordinator.rs
  7. +114
    -38
      binaries/daemon/src/lib.rs
  8. +5
    -1
      binaries/daemon/src/main.rs
  9. +46
    -0
      binaries/daemon/src/record/json.rs
  10. +63
    -0
      binaries/daemon/src/record/mod.rs
  11. +90
    -0
      binaries/daemon/src/record/rosbag.rs
  12. +1
    -1
      examples/benchmark/run.rs
  13. +1
    -1
      examples/c++-dataflow/run.rs
  14. +1
    -1
      examples/c-dataflow/run.rs
  15. +1
    -0
      examples/multiple-daemons/run.rs
  16. +1
    -1
      examples/rust-dataflow-url/run.rs
  17. +1
    -1
      examples/rust-dataflow/run.rs
  18. +1
    -0
      libraries/core/src/daemon_messages.rs
  19. +2
    -0
      libraries/core/src/topics.rs

+ 71
- 0
Cargo.lock View File

@@ -538,6 +538,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"

[[package]]
name = "base16ct"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce"

[[package]]
name = "base64"
version = "0.13.1"
@@ -720,6 +726,27 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38fcc2979eff34a4b84e1cf9a1e3da42a7d44b3b690a40cdcb23e3d556cfb2e5"

[[package]]
name = "bzip2"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8"
dependencies = [
"bzip2-sys",
"libc",
]

[[package]]
name = "bzip2-sys"
version = "0.1.11+1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc"
dependencies = [
"cc",
"libc",
"pkg-config",
]

[[package]]
name = "cache-padded"
version = "1.3.0"
@@ -1348,6 +1375,7 @@ dependencies = [
"flume",
"futures",
"futures-concurrency",
"rosbag",
"serde",
"serde_json",
"serde_yaml 0.8.26",
@@ -2687,6 +2715,26 @@ dependencies = [
"value-bag",
]

[[package]]
name = "lz4"
version = "1.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1"
dependencies = [
"libc",
"lz4-sys",
]

[[package]]
name = "lz4-sys"
version = "1.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900"
dependencies = [
"cc",
"libc",
]

[[package]]
name = "macro_rules_attribute"
version = "0.1.3"
@@ -2727,6 +2775,15 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"

[[package]]
name = "memmap2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83faa42c0a078c393f6b29d5db232d8be22776a891f8f56e5284faee4a20b327"
dependencies = [
"libc",
]

[[package]]
name = "memoffset"
version = "0.6.5"
@@ -3981,6 +4038,20 @@ dependencies = [
"cache-padded",
]

[[package]]
name = "rosbag"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c470a1a13c7daccdf26cf9b1ee22da130ba39541e384b8054df9537b86b2961"
dependencies = [
"base16ct",
"byteorder",
"bzip2",
"log",
"lz4",
"memmap2",
]

[[package]]
name = "rsa"
version = "0.7.2"


+ 7
- 0
binaries/cli/src/main.rs View File

@@ -75,6 +75,9 @@ enum Command {
attach: bool,
#[clap(long, action)]
hot_reload: bool,
/// Whether the events of this dataflow should be recorded.
#[clap(long, action)]
record_events: bool,
},
/// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows.
Stop {
@@ -180,6 +183,7 @@ fn run() -> eyre::Result<()> {
name,
attach,
hot_reload,
record_events,
} => {
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
@@ -198,6 +202,7 @@ fn run() -> eyre::Result<()> {
dataflow_descriptor.clone(),
name,
working_dir,
record_events,
&mut *session,
)?;

@@ -236,6 +241,7 @@ fn start_dataflow(
dataflow: Descriptor,
name: Option<String>,
local_working_dir: PathBuf,
record_events: bool,
session: &mut TcpRequestReplyConnection,
) -> Result<Uuid, eyre::ErrReport> {
let reply_raw = session
@@ -244,6 +250,7 @@ fn start_dataflow(
dataflow,
name,
local_working_dir,
record_events,
})
.unwrap(),
)


+ 11
- 1
binaries/coordinator/src/lib.rs View File

@@ -323,6 +323,7 @@ async fn start_inner(
dataflow,
name,
local_working_dir,
record_events,
} => {
let name = name.or_else(|| names::Generator::default().next());

@@ -342,6 +343,7 @@ async fn start_inner(
name,
&mut daemon_connections,
&clock,
record_events,
)
.await?;
Ok(dataflow)
@@ -842,12 +844,20 @@ async fn start_dataflow(
name: Option<String>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
record_events: bool,
) -> eyre::Result<RunningDataflow> {
let SpawnedDataflow {
uuid,
machines,
nodes,
} = spawn_dataflow(dataflow, working_dir, daemon_connections, clock).await?;
} = spawn_dataflow(
dataflow,
working_dir,
daemon_connections,
clock,
record_events,
)
.await?;
Ok(RunningDataflow {
uuid,
name,


+ 2
- 0
binaries/coordinator/src/run/mod.rs View File

@@ -23,6 +23,7 @@ pub(super) async fn spawn_dataflow(
working_dir: PathBuf,
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
record_events: bool,
) -> eyre::Result<SpawnedDataflow> {
dataflow.check(&working_dir)?;

@@ -46,6 +47,7 @@ pub(super) async fn spawn_dataflow(
nodes: nodes.clone(),
machine_listen_ports,
dataflow_descriptor: dataflow,
record: record_events,
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),


+ 1
- 0
binaries/daemon/Cargo.toml View File

@@ -37,3 +37,4 @@ shared-memory-server = { workspace = true }
ctrlc = "3.2.5"
bincode = "1.3.3"
async-trait = "0.1.64"
rosbag = "0.6.1"

+ 3
- 1
binaries/daemon/src/coordinator.rs View File

@@ -15,9 +15,11 @@ use tokio::{
};
use tokio_stream::{wrappers::ReceiverStream, Stream};

#[derive(Debug)]
#[derive(Debug, serde::Serialize)]
pub struct CoordinatorEvent {
#[serde(flatten)]
pub event: DaemonCoordinatorEvent,
#[serde(skip)]
pub reply_tx: oneshot::Sender<Option<DaemonCoordinatorReply>>,
}



+ 114
- 38
binaries/daemon/src/lib.rs View File

@@ -2,7 +2,7 @@ use coordinator::CoordinatorEvent;
use dora_core::config::{Input, OperatorId};
use dora_core::coordinator_messages::CoordinatorRequest;
use dora_core::daemon_messages::{Data, InterDaemonEvent, Timestamped};
use dora_core::message::uhlc::{self, HLC};
use dora_core::message::uhlc::{self, Timestamp, HLC};
use dora_core::message::MetadataParameters;
use dora_core::{
config::{DataId, InputMapping, NodeId},
@@ -18,6 +18,7 @@ use futures::{future, stream, FutureExt, TryFutureExt};
use futures_concurrency::stream::Merge;
use inter_daemon::InterDaemonConnection;
use pending::PendingNodes;
use record::Recorder;
use shared_memory_server::ShmemConf;
use std::env::temp_dir;
use std::sync::Arc;
@@ -46,6 +47,7 @@ mod inter_daemon;
mod log;
mod node_communication;
mod pending;
mod record;
mod spawn;
mod tcp_utils;

@@ -117,7 +119,7 @@ impl Daemon {
.map(|_| ())
}

pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> {
pub async fn run_dataflow(dataflow_path: &Path, record: bool) -> eyre::Result<()> {
let working_dir = dataflow_path
.canonicalize()
.context("failed to canoncialize dataflow path")?
@@ -135,6 +137,7 @@ impl Daemon {
nodes,
machine_listen_ports: BTreeMap::new(),
dataflow_descriptor: descriptor,
record,
};

let clock = Arc::new(HLC::default());
@@ -249,6 +252,14 @@ impl Daemon {
tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
}

if let Err(err) = self
.record_event(&inner, timestamp)
.await
.context("failed to record event")
{
tracing::warn!("{err:?}");
};

match inner {
Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
let status = self.handle_coordinator_event(event, reply_tx).await?;
@@ -299,6 +310,41 @@ impl Daemon {
Ok(self.dataflow_errors)
}

async fn record_event(&mut self, event: &Event, timestamp: Timestamp) -> eyre::Result<()> {
let dataflow_id = match &event {
Event::Node { dataflow_id, .. } => dataflow_id,
Event::Coordinator(CoordinatorEvent { event, .. }) => match event {
DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { dataflow_id, .. }) => {
dataflow_id
}
DaemonCoordinatorEvent::AllNodesReady { dataflow_id, .. } => dataflow_id,
DaemonCoordinatorEvent::StopDataflow { dataflow_id } => dataflow_id,
DaemonCoordinatorEvent::ReloadDataflow { dataflow_id, .. } => dataflow_id,
DaemonCoordinatorEvent::Logs { dataflow_id, .. } => dataflow_id,
DaemonCoordinatorEvent::Destroy | DaemonCoordinatorEvent::Heartbeat => {
return Ok(())
}
},
Event::Daemon(event) => match event {
InterDaemonEvent::Output { dataflow_id, .. } => dataflow_id,
InterDaemonEvent::InputsClosed { dataflow_id, .. } => dataflow_id,
},
Event::Dora(event) => match event {
DoraEvent::Timer { dataflow_id, .. } => dataflow_id,
DoraEvent::SpawnedNodeResult { dataflow_id, .. } => dataflow_id,
},
Event::HeartbeatInterval | Event::CtrlC => return Ok(()),
};
let Some(dataflow) = self.running.get_mut(dataflow_id) else {
bail!("no running dataflow with id `{dataflow_id}`");
};
if let Some(recorder) = &mut dataflow.recorder {
recorder.record(event, timestamp).await?;
}

Ok(())
}

async fn handle_coordinator_event(
&mut self,
event: DaemonCoordinatorEvent,
@@ -311,6 +357,7 @@ impl Daemon {
nodes,
machine_listen_ports,
dataflow_descriptor,
record,
}) => {
match dataflow_descriptor.communication.remote {
dora_core::config::RemoteCommunicationConfig::Tcp => {}
@@ -329,7 +376,7 @@ impl Daemon {
}

let result = self
.spawn_dataflow(dataflow_id, working_dir, nodes, dataflow_descriptor)
.spawn_dataflow(dataflow_id, working_dir, nodes, dataflow_descriptor, record)
.await;
if let Err(err) = &result {
tracing::error!("{err:?}");
@@ -506,6 +553,7 @@ impl Daemon {
working_dir: PathBuf,
nodes: Vec<ResolvedNode>,
dataflow_descriptor: Descriptor,
record: bool,
) -> eyre::Result<()> {
let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone());
let dataflow = match self.running.entry(dataflow_id) {
@@ -515,6 +563,12 @@ impl Daemon {
}
};

if record {
dataflow.recorder = Some(
Recorder::new(working_dir.clone(), self.machine_id.clone(), dataflow_id).await?,
);
}

for node in nodes {
let local = node.deploy.machine == self.machine_id;

@@ -887,37 +941,47 @@ impl Daemon {

dataflow.running_nodes.remove(node_id);
if dataflow.running_nodes.is_empty() {
let result = match self.dataflow_errors.get(&dataflow.id) {
None => Ok(()),
Some(errors) => {
let mut output = "some nodes failed:".to_owned();
for (node, error) in errors {
use std::fmt::Write;
write!(&mut output, "\n - {node}: {error}").unwrap();
}
Err(output)
self.handle_dataflow_finished(dataflow_id).await?;
}
Ok(())
}

async fn handle_dataflow_finished(&mut self, dataflow_id: Uuid) -> Result<(), eyre::ErrReport> {
let Some(dataflow) = self.running.remove(&dataflow_id) else {
return Ok(())
};
if let Some(recorder) = dataflow.recorder {
recorder.finish().await?;
}
let result = match self.dataflow_errors.get(&dataflow.id) {
None => Ok(()),
Some(errors) => {
let mut output = "some nodes failed:".to_owned();
for (node, error) in errors {
use std::fmt::Write;
write!(&mut output, "\n - {node}: {error}").unwrap();
}
};
tracing::info!(
"Dataflow `{dataflow_id}` finished on machine `{}`",
self.machine_id
);
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesFinished {
dataflow_id,
result,
},
},
timestamp: self.clock.new_timestamp(),
})?;
tcp_send(connection, &msg)
.await
.wrap_err("failed to report dataflow finish to dora-coordinator")?;
Err(output)
}
self.running.remove(&dataflow_id);
};
tracing::info!(
"Dataflow `{dataflow_id}` finished on machine `{}`",
self.machine_id
);
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesFinished {
dataflow_id,
result,
},
},
timestamp: self.clock.new_timestamp(),
})?;
tcp_send(connection, &msg)
.await
.wrap_err("failed to report dataflow finish to dora-coordinator")?;
}
Ok(())
}
@@ -1253,6 +1317,7 @@ fn close_input(

pub struct RunningDataflow {
id: Uuid,

/// Local nodes that are not started yet
pending_nodes: PendingNodes,

@@ -1275,6 +1340,9 @@ pub struct RunningDataflow {
///
/// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable.
empty_set: BTreeSet<DataId>,

/// Whether the events of this dataflow should be recorded and saved to disk.
recorder: Option<Recorder>,
}

impl RunningDataflow {
@@ -1293,6 +1361,7 @@ impl RunningDataflow {
_timer_handles: Vec::new(),
stop_sent: false,
empty_set: BTreeSet::new(),
recorder: None,
}
}

@@ -1404,7 +1473,7 @@ struct DropTokenInformation {
pending_nodes: BTreeSet<NodeId>,
}

#[derive(Debug)]
#[derive(Debug, serde::Serialize)]
pub enum Event {
Node {
dataflow_id: DataflowId,
@@ -1424,21 +1493,27 @@ impl From<DoraEvent> for Event {
}
}

#[derive(Debug)]
#[derive(Debug, serde::Serialize)]
pub enum DaemonNodeEvent {
OutputsDone {
#[serde(skip)]
reply_sender: oneshot::Sender<DaemonReply>,
},
Subscribe {
#[serde(skip)]
event_sender: UnboundedSender<Timestamped<daemon_messages::NodeEvent>>,
#[serde(skip)]
reply_sender: oneshot::Sender<DaemonReply>,
},
SubscribeDrop {
#[serde(skip)]
event_sender: UnboundedSender<Timestamped<daemon_messages::NodeDropEvent>>,
#[serde(skip)]
reply_sender: oneshot::Sender<DaemonReply>,
},
CloseOutputs {
outputs: Vec<dora_core::config::DataId>,
#[serde(skip)]
reply_sender: oneshot::Sender<DaemonReply>,
},
SendOut {
@@ -1450,11 +1525,12 @@ pub enum DaemonNodeEvent {
tokens: Vec<DropToken>,
},
EventStreamDropped {
#[serde(skip)]
reply_sender: oneshot::Sender<DaemonReply>,
},
}

#[derive(Debug)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum DoraEvent {
Timer {
dataflow_id: DataflowId,
@@ -1468,10 +1544,10 @@ pub enum DoraEvent {
},
}

#[derive(Debug)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum NodeExitStatus {
Success,
IoError(io::Error),
IoError(String),
ExitCode(i32),
Signal(i32),
Unknown,
@@ -1496,7 +1572,7 @@ impl From<Result<std::process::ExitStatus, io::Error>> for NodeExitStatus {
Self::Unknown
}
}
Err(err) => Self::IoError(err),
Err(err) => Self::IoError(err.to_string()),
}
}
}


+ 5
- 1
binaries/daemon/src/main.rs View File

@@ -28,6 +28,9 @@ pub struct Args {

#[clap(long)]
pub run_dora_runtime: bool,

#[clap(long)]
pub record_events: bool,
}

#[tokio::main]
@@ -43,6 +46,7 @@ async fn run() -> eyre::Result<()> {
machine_id,
coordinator_addr,
run_dora_runtime,
record_events,
} = clap::Parser::parse();

if run_dora_runtime {
@@ -80,7 +84,7 @@ async fn run() -> eyre::Result<()> {
Some(dataflow_path) => {
tracing::info!("Starting dataflow `{}`", dataflow_path.display());

Daemon::run_dataflow(&dataflow_path).await
Daemon::run_dataflow(&dataflow_path, record_events).await
}
None => {
Daemon::run(


+ 46
- 0
binaries/daemon/src/record/json.rs View File

@@ -0,0 +1,46 @@
use std::path::Path;

use dora_core::message::uhlc::Timestamp;
use eyre::Context;
use tokio::{fs::File, io::AsyncWriteExt};

use crate::Event;

pub struct JsonFile {
file: File,
}

impl JsonFile {
pub async fn new(path: &Path) -> eyre::Result<Self> {
let file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
.wrap_err_with(|| format!("failed to open record file at {}", path.display()))?;
Ok(Self { file })
}

pub async fn record(&mut self, timestamp: Timestamp, event: &Event) -> eyre::Result<()> {
let json = format(timestamp, event)?;
self.file
.write_all(json.as_bytes())
.await
.context("failed to write event to record file")?;
Ok(())
}
}

fn format(
timestamp: dora_core::message::uhlc::Timestamp,
event: &crate::Event,
) -> eyre::Result<String> {
let entry = RecordEntry { timestamp, event };
serde_json::to_string(&entry).context("failed to serialize record entry")
}

#[derive(Debug, serde::Serialize)]
struct RecordEntry<'a> {
timestamp: Timestamp,
event: &'a Event,
}

+ 63
- 0
binaries/daemon/src/record/mod.rs View File

@@ -0,0 +1,63 @@
use std::path::{Path, PathBuf};

use dora_core::{daemon_messages::DataflowId, message::uhlc::Timestamp};
use eyre::Context;

use self::{json::JsonFile, rosbag::RosbagFile};

mod json;
mod rosbag;

pub struct Recorder {
json_file: JsonFile,
rosbag_file: RosbagFile,
}

impl Recorder {
pub async fn new(
working_dir: PathBuf,
machine_id: String,
dataflow_id: DataflowId,
) -> eyre::Result<Self> {
let record_folder = Self::record_folder(&working_dir, dataflow_id).await?;

let json_file_path = record_folder.join(format!("events-{}.json", machine_id));
let json_file = JsonFile::new(&json_file_path).await?;

let rosbag_file_path = record_folder.join(format!("events-{}.bag", machine_id));
let rosbag_file = RosbagFile::new(&rosbag_file_path).await?;

Ok(Self {
json_file,
rosbag_file,
})
}

pub async fn record(&mut self, event: &crate::Event, timestamp: Timestamp) -> eyre::Result<()> {
self.json_file.record(timestamp, event).await?;
self.rosbag_file.record(timestamp, event).await?;

Ok(())
}

pub async fn finish(self) -> eyre::Result<()> {
self.rosbag_file.finish().await?;
Ok(())
}

async fn record_folder(
working_dir: &Path,
dataflow_id: DataflowId,
) -> Result<PathBuf, eyre::ErrReport> {
let record_folder = working_dir.join("record").join(dataflow_id.to_string());
tokio::fs::create_dir_all(&record_folder)
.await
.wrap_err_with(|| {
format!(
"failed to create record folder at {}",
record_folder.display()
)
})?;
Ok(record_folder)
}
}

+ 90
- 0
binaries/daemon/src/record/rosbag.rs View File

@@ -0,0 +1,90 @@
use std::path::Path;

use dora_core::message::uhlc::Timestamp;
use eyre::Context;
use tokio::{
fs::File,
io::{AsyncWrite, AsyncWriteExt},
};

use crate::Event;

pub struct RosbagFile {
file: File,
record: Record,
}

impl RosbagFile {
pub async fn new(path: &Path) -> eyre::Result<Self> {
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
.wrap_err_with(|| format!("failed to open record file at {}", path.display()))?;
file.write_all("#ROSBAG V2.0\n".as_bytes())
.await
.context("failed to write rosbag header")?;
Ok(Self {
file,
record: Record {
header: Vec::new(),
data: Vec::new(),
},
})
}

pub async fn record(&mut self, timestamp: Timestamp, event: &Event) -> eyre::Result<()> {
tracing::warn!("rosbag recording is not implemented yet");
Ok(())
}

pub async fn finish(mut self) -> eyre::Result<()> {
self.record.serialize(&mut self.file).await
}
}

struct Record {
header: Vec<HeaderField>,
data: Vec<u8>,
}

impl Record {
async fn serialize(&self, writer: &mut (impl AsyncWrite + Unpin)) -> eyre::Result<()> {
let serialized_header = {
let mut buf = Vec::new();
for field in &self.header {
field.serialize(&mut buf).await?;
}
buf
};

writer
.write_all(&u32::try_from(serialized_header.len())?.to_le_bytes())
.await?;
writer.write_all(&serialized_header).await?;
writer
.write_all(&u32::try_from(self.data.len())?.to_le_bytes())
.await?;
writer.write_all(&self.data).await?;

Ok(())
}
}

struct HeaderField {
name: String,
value: Vec<u8>,
}

impl HeaderField {
async fn serialize(&self, writer: &mut (impl AsyncWrite + Unpin)) -> eyre::Result<()> {
let len = self.name.len() + self.value.len() + 5;
writer.write_all(&u32::try_from(len)?.to_le_bytes()).await?;
writer.write_all(self.name.as_bytes()).await?;
writer.write_all(&[b'=']).await?;
writer.write_all(&self.value).await?;

Ok(())
}
}

+ 1
- 1
examples/benchmark/run.rs View File

@@ -26,7 +26,7 @@ async fn main() -> eyre::Result<()> {
let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

dora_daemon::Daemon::run_dataflow(dataflow).await?;
dora_daemon::Daemon::run_dataflow(dataflow, false).await?;

Ok(())
}


+ 1
- 1
examples/c++-dataflow/run.rs View File

@@ -119,7 +119,7 @@ async fn main() -> eyre::Result<()> {

let dataflow = Path::new("dataflow.yml").to_owned();
build_package("dora-runtime").await?;
dora_daemon::Daemon::run_dataflow(&dataflow).await?;
dora_daemon::Daemon::run_dataflow(&dataflow, false).await?;

Ok(())
}


+ 1
- 1
examples/c-dataflow/run.rs View File

@@ -36,7 +36,7 @@ async fn main() -> eyre::Result<()> {
build_c_operator().await?;

let dataflow = Path::new("dataflow.yml").to_owned();
dora_daemon::Daemon::run_dataflow(&dataflow).await?;
dora_daemon::Daemon::run_dataflow(&dataflow, false).await?;

Ok(())
}


+ 1
- 0
examples/multiple-daemons/run.rs View File

@@ -135,6 +135,7 @@ async fn start_dataflow(
dataflow: dataflow_descriptor,
local_working_dir: working_dir,
name: None,
record_events: false,
},
reply_sender,
}))


+ 1
- 1
examples/rust-dataflow-url/run.rs View File

@@ -25,7 +25,7 @@ async fn main() -> eyre::Result<()> {
let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

dora_daemon::Daemon::run_dataflow(dataflow).await?;
dora_daemon::Daemon::run_dataflow(dataflow, false).await?;

Ok(())
}


+ 1
- 1
examples/rust-dataflow/run.rs View File

@@ -26,7 +26,7 @@ async fn main() -> eyre::Result<()> {
let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

dora_daemon::Daemon::run_dataflow(dataflow).await?;
dora_daemon::Daemon::run_dataflow(dataflow, false).await?;

Ok(())
}


+ 1
- 0
libraries/core/src/daemon_messages.rs View File

@@ -259,4 +259,5 @@ pub struct SpawnDataflowNodes {
pub nodes: Vec<ResolvedNode>,
pub machine_listen_ports: BTreeMap<String, SocketAddr>,
pub dataflow_descriptor: Descriptor,
pub record: bool,
}

+ 2
- 0
libraries/core/src/topics.rs View File

@@ -27,6 +27,8 @@ pub enum ControlRequest {
// TODO: remove this once we figure out deploying of node/operator
// binaries from CLI to coordinator/daemon
local_working_dir: PathBuf,
/// Whether the events of this dataflow should be recorded.
record_events: bool,
},
Reload {
dataflow_id: Uuid,


Loading…
Cancel
Save