From ca917be94e074edf16acd3b850142ffca25468cc Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 16 Jun 2023 16:18:12 +0200 Subject: [PATCH] Add option to record events when spawning dataflows This commit does not implement event logging yet. --- binaries/cli/src/main.rs | 7 +++++++ binaries/coordinator/src/lib.rs | 12 +++++++++++- binaries/coordinator/src/run/mod.rs | 2 ++ binaries/daemon/src/lib.rs | 13 +++++++++++-- binaries/daemon/src/main.rs | 6 +++++- examples/benchmark/run.rs | 2 +- examples/c++-dataflow/run.rs | 2 +- examples/c-dataflow/run.rs | 2 +- examples/multiple-daemons/run.rs | 1 + examples/rust-dataflow-url/run.rs | 2 +- examples/rust-dataflow/run.rs | 2 +- libraries/core/src/daemon_messages.rs | 1 + libraries/core/src/topics.rs | 2 ++ 13 files changed, 45 insertions(+), 9 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 96e4435d..d5893490 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -75,6 +75,9 @@ enum Command { attach: bool, #[clap(long, action)] hot_reload: bool, + /// Whether the events of this dataflow should be recorded. + #[clap(long, action)] + record_events: bool, }, /// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows. Stop { @@ -180,6 +183,7 @@ fn run() -> eyre::Result<()> { name, attach, hot_reload, + record_events, } => { let dataflow_descriptor = Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; @@ -198,6 +202,7 @@ fn run() -> eyre::Result<()> { dataflow_descriptor.clone(), name, working_dir, + record_events, &mut *session, )?; @@ -236,6 +241,7 @@ fn start_dataflow( dataflow: Descriptor, name: Option, local_working_dir: PathBuf, + record_events: bool, session: &mut TcpRequestReplyConnection, ) -> Result { let reply_raw = session @@ -244,6 +250,7 @@ fn start_dataflow( dataflow, name, local_working_dir, + record_events, }) .unwrap(), ) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 72492c70..c21d3dc4 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -323,6 +323,7 @@ async fn start_inner( dataflow, name, local_working_dir, + record_events, } => { let name = name.or_else(|| names::Generator::default().next()); @@ -342,6 +343,7 @@ async fn start_inner( name, &mut daemon_connections, &clock, + record_events, ) .await?; Ok(dataflow) @@ -842,12 +844,20 @@ async fn start_dataflow( name: Option, daemon_connections: &mut HashMap, clock: &HLC, + record_events: bool, ) -> eyre::Result { let SpawnedDataflow { uuid, machines, nodes, - } = spawn_dataflow(dataflow, working_dir, daemon_connections, clock).await?; + } = spawn_dataflow( + dataflow, + working_dir, + daemon_connections, + clock, + record_events, + ) + .await?; Ok(RunningDataflow { uuid, name, diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index e4731d3c..6491486f 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -23,6 +23,7 @@ pub(super) async fn spawn_dataflow( working_dir: PathBuf, daemon_connections: &mut HashMap, clock: &HLC, + record_events: bool, ) -> eyre::Result { dataflow.check(&working_dir)?; @@ -46,6 +47,7 @@ pub(super) async fn spawn_dataflow( nodes: nodes.clone(), machine_listen_ports, dataflow_descriptor: dataflow, + record: record_events, }; let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::Spawn(spawn_command), diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 0f99de69..28e31704 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -117,7 +117,7 @@ impl Daemon { .map(|_| ()) } - pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> { + pub async fn run_dataflow(dataflow_path: &Path, record: bool) -> eyre::Result<()> { let working_dir = dataflow_path .canonicalize() .context("failed to canoncialize dataflow path")? @@ -135,6 +135,7 @@ impl Daemon { nodes, machine_listen_ports: BTreeMap::new(), dataflow_descriptor: descriptor, + record, }; let clock = Arc::new(HLC::default()); @@ -311,6 +312,7 @@ impl Daemon { nodes, machine_listen_ports, dataflow_descriptor, + record, }) => { match dataflow_descriptor.communication.remote { dora_core::config::RemoteCommunicationConfig::Tcp => {} @@ -329,7 +331,7 @@ impl Daemon { } let result = self - .spawn_dataflow(dataflow_id, working_dir, nodes, dataflow_descriptor) + .spawn_dataflow(dataflow_id, working_dir, nodes, dataflow_descriptor, record) .await; if let Err(err) = &result { tracing::error!("{err:?}"); @@ -506,6 +508,7 @@ impl Daemon { working_dir: PathBuf, nodes: Vec, dataflow_descriptor: Descriptor, + record: bool, ) -> eyre::Result<()> { let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); let dataflow = match self.running.entry(dataflow_id) { @@ -515,6 +518,8 @@ impl Daemon { } }; + dataflow.record = record; + for node in nodes { let local = node.deploy.machine == self.machine_id; @@ -1275,6 +1280,9 @@ pub struct RunningDataflow { /// /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. empty_set: BTreeSet, + + /// Whether the events of this dataflow should be recorded and saved to disk. + record: bool, } impl RunningDataflow { @@ -1293,6 +1301,7 @@ impl RunningDataflow { _timer_handles: Vec::new(), stop_sent: false, empty_set: BTreeSet::new(), + record: false, } } diff --git a/binaries/daemon/src/main.rs b/binaries/daemon/src/main.rs index 1c40c53e..e57fbc2a 100644 --- a/binaries/daemon/src/main.rs +++ b/binaries/daemon/src/main.rs @@ -28,6 +28,9 @@ pub struct Args { #[clap(long)] pub run_dora_runtime: bool, + + #[clap(long)] + pub record_events: bool, } #[tokio::main] @@ -43,6 +46,7 @@ async fn run() -> eyre::Result<()> { machine_id, coordinator_addr, run_dora_runtime, + record_events, } = clap::Parser::parse(); if run_dora_runtime { @@ -80,7 +84,7 @@ async fn run() -> eyre::Result<()> { Some(dataflow_path) => { tracing::info!("Starting dataflow `{}`", dataflow_path.display()); - Daemon::run_dataflow(&dataflow_path).await + Daemon::run_dataflow(&dataflow_path, record_events).await } None => { Daemon::run( diff --git a/examples/benchmark/run.rs b/examples/benchmark/run.rs index a7d35429..3a516467 100644 --- a/examples/benchmark/run.rs +++ b/examples/benchmark/run.rs @@ -26,7 +26,7 @@ async fn main() -> eyre::Result<()> { let dataflow = Path::new("dataflow.yml"); build_dataflow(dataflow).await?; - dora_daemon::Daemon::run_dataflow(dataflow).await?; + dora_daemon::Daemon::run_dataflow(dataflow, false).await?; Ok(()) } diff --git a/examples/c++-dataflow/run.rs b/examples/c++-dataflow/run.rs index ef34861b..9385633b 100644 --- a/examples/c++-dataflow/run.rs +++ b/examples/c++-dataflow/run.rs @@ -119,7 +119,7 @@ async fn main() -> eyre::Result<()> { let dataflow = Path::new("dataflow.yml").to_owned(); build_package("dora-runtime").await?; - dora_daemon::Daemon::run_dataflow(&dataflow).await?; + dora_daemon::Daemon::run_dataflow(&dataflow, false).await?; Ok(()) } diff --git a/examples/c-dataflow/run.rs b/examples/c-dataflow/run.rs index 2143dddf..e1bf1010 100644 --- a/examples/c-dataflow/run.rs +++ b/examples/c-dataflow/run.rs @@ -36,7 +36,7 @@ async fn main() -> eyre::Result<()> { build_c_operator().await?; let dataflow = Path::new("dataflow.yml").to_owned(); - dora_daemon::Daemon::run_dataflow(&dataflow).await?; + dora_daemon::Daemon::run_dataflow(&dataflow, false).await?; Ok(()) } diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index f1ee34de..6c3e6aa0 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -135,6 +135,7 @@ async fn start_dataflow( dataflow: dataflow_descriptor, local_working_dir: working_dir, name: None, + record_events: false, }, reply_sender, })) diff --git a/examples/rust-dataflow-url/run.rs b/examples/rust-dataflow-url/run.rs index dc27e137..3cd91c9c 100644 --- a/examples/rust-dataflow-url/run.rs +++ b/examples/rust-dataflow-url/run.rs @@ -25,7 +25,7 @@ async fn main() -> eyre::Result<()> { let dataflow = Path::new("dataflow.yml"); build_dataflow(dataflow).await?; - dora_daemon::Daemon::run_dataflow(dataflow).await?; + dora_daemon::Daemon::run_dataflow(dataflow, false).await?; Ok(()) } diff --git a/examples/rust-dataflow/run.rs b/examples/rust-dataflow/run.rs index a7d35429..3a516467 100644 --- a/examples/rust-dataflow/run.rs +++ b/examples/rust-dataflow/run.rs @@ -26,7 +26,7 @@ async fn main() -> eyre::Result<()> { let dataflow = Path::new("dataflow.yml"); build_dataflow(dataflow).await?; - dora_daemon::Daemon::run_dataflow(dataflow).await?; + dora_daemon::Daemon::run_dataflow(dataflow, false).await?; Ok(()) } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 7c0162f9..1c868b68 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -259,4 +259,5 @@ pub struct SpawnDataflowNodes { pub nodes: Vec, pub machine_listen_ports: BTreeMap, pub dataflow_descriptor: Descriptor, + pub record: bool, } diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 658d59ee..9d2a102c 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -27,6 +27,8 @@ pub enum ControlRequest { // TODO: remove this once we figure out deploying of node/operator // binaries from CLI to coordinator/daemon local_working_dir: PathBuf, + /// Whether the events of this dataflow should be recorded. + record_events: bool, }, Reload { dataflow_id: Uuid,