From 188813cd5734fea031a8fc4cee42c6926790876e Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 18 Apr 2023 15:40:59 +0200 Subject: [PATCH] Make coordinator usable in examples Allow passing runtime path via coordinator --- Cargo.lock | 1 + Cargo.toml | 1 + binaries/coordinator/src/lib.rs | 21 ++++++++++++--------- binaries/coordinator/src/run/mod.rs | 6 ++++-- binaries/daemon/src/lib.rs | 13 +++++++++++-- libraries/core/src/daemon_messages.rs | 1 + 6 files changed, 30 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 724e17e6..42a22b17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1152,6 +1152,7 @@ dependencies = [ "futures", "serde_yaml 0.8.26", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", "uuid", diff --git a/Cargo.toml b/Cargo.toml index abdee5f6..6b8ac1bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ uuid = { version = "1.2.1", features = ["v4", "serde"] } tracing = "0.1.36" tracing-subscriber = "0.3.15" futures = "0.3.25" +tokio-stream = "0.1.11" [[example]] name = "c-dataflow" diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index d28aa604..d38e97b7 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -2,7 +2,7 @@ use crate::{ run::spawn_dataflow, tcp_utils::{tcp_receive, tcp_send}, }; -use control::ControlEvent; +pub use control::ControlEvent; use dora_core::{ config::{CommunicationConfig, DataId, NodeId, OperatorId}, coordinator_messages::RegisterResult, @@ -41,9 +41,6 @@ pub struct Args { #[clap(long)] pub port: Option, - #[clap(long)] - pub run_dataflow: Option, - #[clap(long)] pub dora_runtime_path: Option, } @@ -70,7 +67,7 @@ pub async fn start( .port(); let mut tasks = FuturesUnordered::new(); let future = async move { - start_inner(listener, &tasks, external_events).await?; + start_inner(listener, &tasks, external_events, args.dora_runtime_path).await?; tracing::debug!("coordinator main loop finished, waiting on spawned tasks"); while let Some(join_result) = tasks.next().await { @@ -88,6 +85,7 @@ async fn start_inner( listener: TcpListener, tasks: &FuturesUnordered>, external_events: impl Stream + Unpin, + runtime_path: Option, ) -> eyre::Result<()> { let new_daemon_connections = TcpListenerStream::new(listener).map(|c| { c.map(Event::NewDaemonConnection) @@ -278,9 +276,13 @@ async fn start_inner( bail!("there is already a running dataflow with name `{name}`"); } } - let dataflow = - start_dataflow(&dataflow_path, name, &mut daemon_connections) - .await?; + let dataflow = start_dataflow( + &dataflow_path, + name, + runtime_path.clone(), + &mut daemon_connections, + ) + .await?; Ok(dataflow) }; inner.await.map(|dataflow| { @@ -617,13 +619,14 @@ async fn reload_dataflow( async fn start_dataflow( path: &Path, name: Option, + runtime_path: Option, daemon_connections: &mut HashMap, ) -> eyre::Result { let SpawnedDataflow { uuid, communication_config, machines, - } = spawn_dataflow(path, daemon_connections).await?; + } = spawn_dataflow(path, runtime_path, daemon_connections).await?; Ok(RunningDataflow { uuid, name, diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 9e199e46..6d40f908 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -8,7 +8,7 @@ use dora_core::{ use eyre::{bail, eyre, ContextCompat, WrapErr}; use std::{ collections::{BTreeSet, HashMap}, - path::Path, + path::{Path, PathBuf}, }; use tokio::net::TcpStream; use uuid::Uuid; @@ -16,6 +16,7 @@ use uuid::Uuid; #[tracing::instrument(skip(daemon_connections))] pub async fn spawn_dataflow( dataflow_path: &Path, + runtime_path: Option, daemon_connections: &mut HashMap, ) -> eyre::Result { let descriptor = Descriptor::read(dataflow_path).await.wrap_err_with(|| { @@ -24,7 +25,7 @@ pub async fn spawn_dataflow( dataflow_path.display() ) })?; - descriptor.check(dataflow_path, None)?; + descriptor.check(dataflow_path, runtime_path.clone())?; let working_dir = dataflow_path .canonicalize() .context("failed to canoncialize dataflow path")? @@ -48,6 +49,7 @@ pub async fn spawn_dataflow( working_dir, nodes, daemon_communication: descriptor.daemon_config, + runtime_path, }; let message = serde_json::to_vec(&DaemonCoordinatorEvent::Spawn(spawn_command))?; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 30260a83..be3bc053 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -103,6 +103,7 @@ impl Daemon { working_dir, nodes, daemon_communication: descriptor.daemon_config, + runtime_path: dora_runtime_path.clone(), }; let exit_when_done = spawn_command @@ -254,9 +255,16 @@ impl Daemon { working_dir, nodes, daemon_communication, + runtime_path, }) => { let result = self - .spawn_dataflow(dataflow_id, working_dir, nodes, daemon_communication) + .spawn_dataflow( + dataflow_id, + working_dir, + nodes, + runtime_path.as_deref(), + daemon_communication, + ) .await; if let Err(err) = &result { tracing::error!("{err:?}"); @@ -350,6 +358,7 @@ impl Daemon { dataflow_id: uuid::Uuid, working_dir: PathBuf, nodes: Vec, + runtime_path: Option<&Path>, daemon_communication_config: DaemonCommunicationConfig, ) -> eyre::Result<()> { let dataflow = RunningDataflow::new(dataflow_id); @@ -405,7 +414,7 @@ impl Daemon { node, self.events_tx.clone(), daemon_communication_config, - self.dora_runtime_path.as_deref(), + runtime_path.or(self.dora_runtime_path.as_deref()), ) .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?; diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 2be0c6a0..268ddae8 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -232,6 +232,7 @@ pub struct SpawnDataflowNodes { pub working_dir: PathBuf, pub nodes: Vec, pub daemon_communication: DaemonCommunicationConfig, + pub runtime_path: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]