From bcb861932c1461f306268ba55c1bab943885a882 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 23 Apr 2025 19:04:28 +0200 Subject: [PATCH] Make `dora build` command behave like `dora start` without spawning Instead of running all the build commands directly, run them on their intended target machines through the coordinator. This commit is a breaking change because a coordinator connection is now required for `dora build`. --- binaries/cli/src/build.rs | 67 --------- binaries/cli/src/lib.rs | 133 +++++++++++------- binaries/coordinator/src/lib.rs | 28 +++- binaries/coordinator/src/run/mod.rs | 17 ++- binaries/daemon/src/lib.rs | 5 + binaries/daemon/src/spawn.rs | 15 +- examples/multiple-daemons/run.rs | 1 + libraries/message/src/cli_to_coordinator.rs | 1 + .../message/src/coordinator_to_daemon.rs | 1 + 9 files changed, 145 insertions(+), 123 deletions(-) delete mode 100644 binaries/cli/src/build.rs diff --git a/binaries/cli/src/build.rs b/binaries/cli/src/build.rs deleted file mode 100644 index 7783615b..00000000 --- a/binaries/cli/src/build.rs +++ /dev/null @@ -1,67 +0,0 @@ -use dora_core::{ - build::run_build_command, - config::OperatorId, - descriptor::{Descriptor, DescriptorExt, NodeExt, SINGLE_OPERATOR_DEFAULT_ID}, -}; -use eyre::Context; - -use crate::resolve_dataflow; - -pub fn build(dataflow: String, uv: bool) -> eyre::Result<()> { - let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; - let descriptor = Descriptor::blocking_read(&dataflow)?; - let dataflow_absolute = if dataflow.is_relative() { - std::env::current_dir().unwrap().join(dataflow) - } else { - dataflow.to_owned() - }; - let working_dir = dataflow_absolute.parent().unwrap(); - - let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string()); - - for node in descriptor.nodes { - match node.kind()? { - dora_core::descriptor::NodeKind::Standard(_) => { - if let Some(build) = &node.build { - run_build_command(build, working_dir, uv, &node.env).with_context(|| { - format!("build command failed for standard node `{}`", node.id) - })? - } - } - dora_core::descriptor::NodeKind::Runtime(runtime_node) => { - for operator in &runtime_node.operators { - if let Some(build) = &operator.config.build { - run_build_command(build, working_dir, uv, &node.env).with_context( - || { - format!( - "build command failed for operator `{}/{}`", - node.id, operator.id - ) - }, - )?; - } - } - } - dora_core::descriptor::NodeKind::Custom(custom_node) => { - if let Some(build) = &custom_node.build { - run_build_command(build, working_dir, uv, &node.env).with_context(|| { - format!("build command failed for custom node `{}`", node.id) - })? - } - } - dora_core::descriptor::NodeKind::Operator(operator) => { - if let Some(build) = &operator.config.build { - run_build_command(build, working_dir, uv, &node.env).with_context(|| { - format!( - "build command failed for operator `{}/{}`", - node.id, - operator.id.as_ref().unwrap_or(&default_op_id) - ) - })? - } - } - } - } - - Ok(()) -} diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index 1357fa78..2b667b0d 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -33,7 +33,6 @@ use tracing::level_filters::LevelFilter; use uuid::Uuid; mod attach; -pub(crate) mod build; mod check; mod formatting; mod graph; @@ -83,6 +82,12 @@ enum Command { /// Path to the dataflow descriptor file #[clap(value_name = "PATH")] dataflow: String, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, // Use UV to build nodes. #[clap(long, action)] uv: bool, @@ -366,8 +371,13 @@ fn run(args: Args) -> eyre::Result<()> { } => { graph::create(dataflow, mermaid, open)?; } - Command::Build { dataflow, uv } => { - build::build(dataflow, uv)?; + Command::Build { + dataflow, + coordinator_addr, + coordinator_port, + uv, + } => { + start_dataflow(dataflow, None, coordinator_addr, coordinator_port, uv, true)?; } Command::New { args, @@ -419,26 +429,15 @@ fn run(args: Args) -> eyre::Result<()> { hot_reload, uv, } => { - let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; - let dataflow_descriptor = - Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; - let working_dir = dataflow - .canonicalize() - .context("failed to canonicalize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? - .to_owned(); - - let coordinator_socket = (coordinator_addr, coordinator_port).into(); - let mut session = connect_to_coordinator(coordinator_socket) - .wrap_err("failed to connect to dora coordinator")?; - let dataflow_id = start_dataflow( - dataflow_descriptor.clone(), - name, - working_dir, - &mut *session, - uv, - )?; + let (dataflow, dataflow_descriptor, coordinator_socket, mut session, dataflow_id) = + start_dataflow( + dataflow, + name, + coordinator_addr, + coordinator_port, + uv, + false, + )?; let attach = match (attach, detach) { (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), @@ -613,34 +612,72 @@ fn run(args: Args) -> eyre::Result<()> { } fn start_dataflow( - dataflow: Descriptor, + dataflow: String, name: Option, - local_working_dir: PathBuf, - session: &mut TcpRequestReplyConnection, + coordinator_addr: IpAddr, + coordinator_port: u16, uv: bool, -) -> Result { - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::Start { - dataflow, - name, - local_working_dir, - uv, - }) - .unwrap(), - ) - .wrap_err("failed to send start dataflow message")?; - - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStarted { uuid } => { - eprintln!("{uuid}"); - Ok(uuid) + build_only: bool, +) -> Result< + ( + PathBuf, + Descriptor, + SocketAddr, + Box, + Uuid, + ), + eyre::Error, +> { + let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let dataflow_descriptor = + Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; + let working_dir = dataflow + .canonicalize() + .context("failed to canonicalize dataflow path")? + .parent() + .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? + .to_owned(); + let coordinator_socket = (coordinator_addr, coordinator_port).into(); + let mut session = connect_to_coordinator(coordinator_socket) + .wrap_err("failed to connect to dora coordinator")?; + let dataflow_id = { + let dataflow = dataflow_descriptor.clone(); + let session: &mut TcpRequestReplyConnection = &mut *session; + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::Start { + dataflow, + name, + local_working_dir: working_dir, + uv, + build_only, + }) + .unwrap(), + ) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStarted { uuid } => { + if build_only { + eprintln!("dataflow build successful"); + } else { + eprintln!("{uuid}"); + } + uuid + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - } + }; + Ok(( + dataflow, + dataflow_descriptor, + coordinator_socket, + session, + dataflow_id, + )) } fn stop_dataflow_interactive( diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 2bb2a9fc..422043ca 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -394,6 +394,7 @@ async fn start_inner( name, local_working_dir, uv, + build_only, } => { let name = name.or_else(|| names::Generator::default().next()); @@ -414,6 +415,7 @@ async fn start_inner( &mut daemon_connections, &clock, uv, + build_only, ) .await?; Ok(dataflow) @@ -716,13 +718,23 @@ async fn start_inner( match result { Ok(()) => { if dataflow.pending_spawn_results.is_empty() { - tracing::info!("successfully spawned dataflow `{dataflow_id}`"); + tracing::info!( + "successfully {} dataflow `{dataflow_id}`", + if dataflow.build_only { + "built" + } else { + "spawned" + } + ); if let Some(reply_tx) = dataflow.spawn_result_tx.take() { let _ = reply_tx.send(Ok(ControlRequestReply::DataflowStarted { uuid: dataflow_id, })); } + if dataflow.build_only { + running_dataflows.remove(&dataflow_id); + } } } Err(err) => { @@ -846,6 +858,8 @@ struct RunningDataflow { pending_spawn_results: BTreeSet, spawn_result_tx: Option>>, + + build_only: bool, } struct ArchivedDataflow { @@ -1045,12 +1059,21 @@ async fn start_dataflow( daemon_connections: &mut DaemonConnections, clock: &HLC, uv: bool, + build_only: bool, ) -> eyre::Result { let SpawnedDataflow { uuid, daemons, nodes, - } = spawn_dataflow(dataflow, working_dir, daemon_connections, clock, uv).await?; + } = spawn_dataflow( + dataflow, + working_dir, + daemon_connections, + clock, + uv, + build_only, + ) + .await?; Ok(RunningDataflow { uuid, name, @@ -1066,6 +1089,7 @@ async fn start_dataflow( log_subscribers: Vec::new(), pending_spawn_results: daemons, spawn_result_tx: None, + build_only, }) } diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 09c2b1a4..425f0213 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -26,6 +26,7 @@ pub(super) async fn spawn_dataflow( daemon_connections: &mut DaemonConnections, clock: &HLC, uv: bool, + build_only: bool, ) -> eyre::Result { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); @@ -36,7 +37,8 @@ pub(super) async fn spawn_dataflow( for (machine, nodes_on_machine) in &nodes_by_daemon { let spawn_nodes = nodes_on_machine.iter().map(|n| n.id.clone()).collect(); tracing::debug!( - "Spawning dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})" + "{} dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})", + if build_only { "Building" } else { "Spawning" } ); let spawn_command = SpawnDataflowNodes { @@ -46,6 +48,7 @@ pub(super) async fn spawn_dataflow( dataflow_descriptor: dataflow.clone(), spawn_nodes, uv, + build_only, }; let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::Spawn(spawn_command), @@ -54,11 +57,19 @@ pub(super) async fn spawn_dataflow( let daemon_id = spawn_dataflow_on_machine(daemon_connections, machine.as_deref(), &message) .await - .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?; + .wrap_err_with(|| { + format!( + "failed to {} dataflow on machine `{machine:?}`", + if build_only { "build" } else { "spawn" } + ) + })?; daemons.insert(daemon_id); } - tracing::info!("successfully triggered dataflow spawn `{uuid}`"); + tracing::info!( + "successfully triggered dataflow {} `{uuid}`", + if build_only { "build" } else { "spawn" } + ); Ok(SpawnedDataflow { uuid, diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index a79a344c..cc618d5f 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -167,6 +167,7 @@ impl Daemon { nodes, dataflow_descriptor: descriptor, uv, + build_only: false, }; let clock = Arc::new(HLC::default()); @@ -470,6 +471,7 @@ impl Daemon { dataflow_descriptor, spawn_nodes, uv, + build_only, }) => { match dataflow_descriptor.communication.remote { dora_core::config::RemoteCommunicationConfig::Tcp => {} @@ -490,6 +492,7 @@ impl Daemon { dataflow_descriptor, spawn_nodes, uv, + build_only, ) .await; let (trigger_result, result_task) = match result { @@ -762,6 +765,7 @@ impl Daemon { dataflow_descriptor: Descriptor, spawn_nodes: BTreeSet, uv: bool, + build_only: bool, ) -> eyre::Result>> { let mut logger = self.logger.for_dataflow(dataflow_id); let dataflow = @@ -821,6 +825,7 @@ impl Daemon { dataflow_descriptor, clock: self.clock.clone(), uv, + build_only, }; let mut tasks = Vec::new(); diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 3927eb3c..b066975d 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -56,6 +56,7 @@ pub struct Spawner { /// clock is required for generating timestamps when dropping messages early because queue is full pub clock: Arc, pub uv: bool, + pub build_only: bool, } impl Spawner { @@ -148,8 +149,12 @@ impl Spawner { self.build_node(logger, &node.env, self.working_dir.clone(), build) .await?; } - spawn_command_from_path(&self.working_dir, self.uv, logger, &n, true) - .await? + if self.build_only { + None + } else { + spawn_command_from_path(&self.working_dir, self.uv, logger, &n, true) + .await? + } } dora_message::descriptor::NodeSource::GitBranch { repo, rev } => { self.spawn_git_node(&n, repo, rev, logger, &node.env, prepared_git.unwrap()) @@ -680,7 +685,11 @@ impl Spawner { self.build_node(logger, node_env, clone_dir.clone(), build) .await?; } - spawn_command_from_path(&clone_dir, self.uv, logger, node, true).await + if self.build_only { + Ok(None) + } else { + spawn_command_from_path(&clone_dir, self.uv, logger, node, true).await + } } async fn build_node( diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 64410a8a..17b4765f 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -143,6 +143,7 @@ async fn start_dataflow( local_working_dir: working_dir, name: None, uv: false, + build_only: false, }, reply_sender, })) diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index 1b62fd58..ab91f449 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -16,6 +16,7 @@ pub enum ControlRequest { // binaries from CLI to coordinator/daemon local_working_dir: PathBuf, uv: bool, + build_only: bool, }, Reload { dataflow_id: Uuid, diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index 482f0042..8c68a6ca 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -63,4 +63,5 @@ pub struct SpawnDataflowNodes { pub dataflow_descriptor: Descriptor, pub spawn_nodes: BTreeSet, pub uv: bool, + pub build_only: bool, }