From c7473182ad6f5fbe7e5b0e0456974048b6e6d99e Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 4 Jun 2025 17:08:33 +0200 Subject: [PATCH] Wip --- binaries/coordinator/src/lib.rs | 93 +++++++++++++-- binaries/coordinator/src/listener.rs | 15 +++ binaries/coordinator/src/run/mod.rs | 10 +- binaries/daemon/src/build/git.rs | 17 ++- binaries/daemon/src/build/mod.rs | 22 ++-- binaries/daemon/src/lib.rs | 112 ++++++++++++------ binaries/daemon/src/log.rs | 8 +- examples/multiple-daemons/run.rs | 1 - libraries/message/src/cli_to_coordinator.rs | 27 +++-- libraries/message/src/common.rs | 4 +- libraries/message/src/coordinator_to_cli.rs | 1 + .../message/src/coordinator_to_daemon.rs | 24 +++- .../message/src/daemon_to_coordinator.rs | 10 +- libraries/message/src/lib.rs | 1 + 14 files changed, 254 insertions(+), 91 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 9b9ec085..04e90252 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -20,7 +20,7 @@ use dora_message::{ }, daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult}, descriptor::{Descriptor, ResolvedNode}, - BuildId, + BuildId, DataflowId, SessionId, }; use eyre::{bail, eyre, ContextCompat, Result, WrapErr}; use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -33,6 +33,7 @@ use std::{ btree_map::{Entry, OccupiedEntry}, BTreeMap, BTreeSet, HashMap, }, + env::current_dir, net::SocketAddr, path::PathBuf, sync::Arc, @@ -202,13 +203,15 @@ async fn start_inner( let mut events = (abortable_events, daemon_events).merge(); - let mut running_dataflows: HashMap = HashMap::new(); - let mut dataflow_results: HashMap> = + let mut running_builds: HashMap = HashMap::new(); + + let mut running_dataflows: HashMap = HashMap::new(); + let mut dataflow_results: HashMap> = HashMap::new(); - let mut archived_dataflows: HashMap = HashMap::new(); + let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections = DaemonConnections::default(); - let mut build_log_subscribers: BTreeMap = Default::default(); + let mut build_log_subscribers: BTreeMap = Default::default(); while let Some(event) = events.next().await { // used below for measuring the event handling duration @@ -411,6 +414,7 @@ async fn start_inner( } => { match request { ControlRequest::Build { + session_id, dataflow, git_sources, prev_git_sources, @@ -418,9 +422,10 @@ async fn start_inner( uv, } => { // assign a random build id - let session_id = BuildId::new_v4(); + let build_id = SessionId::new_v4(); let result = build_dataflow( + build_id, session_id, dataflow, git_sources, @@ -443,6 +448,7 @@ async fn start_inner( } } ControlRequest::Start { + build_id, session_id, dataflow, name, @@ -462,6 +468,7 @@ async fn start_inner( } } let dataflow = start_dataflow( + build_id, session_id, dataflow, local_working_dir, @@ -779,6 +786,42 @@ async fn start_inner( tracing::info!("Daemon `{daemon_id}` exited"); daemon_connections.remove(&daemon_id); } + Event::DataflowBuildResult { + build_id, + session_id, + daemon_id, + result, + } => match running_builds.get_mut(&build_id) { + Some(build) => { + build.pending_build_results.remove(&daemon_id); + match result { + Ok(()) => {} + Err(err) => { + build.errors.push(format!("{err:?}")); + } + }; + if build.pending_build_results.is_empty() { + tracing::info!("dataflow build finished: `{build_id}`"); + let build = running_builds.remove(&build_id).unwrap(); + let result = if build.errors.is_empty() { + Ok(()) + } else { + Err(format!("build failed: {}", build.errors.join("\n\n"))) + }; + + build.build_result_sender.send(Ok( + ControlRequestReply::DataflowBuildFinished { + build_id, + session_id, + result, + }, + )); + } + } + None => { + tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"); + } + }, Event::DataflowSpawnResult { dataflow_id, daemon_id, @@ -836,7 +879,7 @@ async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) async fn send_log_message_to_subscriber( message: &LogMessage, - mut subscriber: OccupiedEntry<'_, BuildId, LogSubscriber>, + mut subscriber: OccupiedEntry<'_, SessionId, LogSubscriber>, ) { let send_result = tokio::time::timeout( Duration::from_millis(100), @@ -916,6 +959,19 @@ async fn send_heartbeat_message( .wrap_err("failed to send heartbeat message to daemon") } +struct RunningBuild { + build_id: BuildId, + /// The IDs of the daemons that the build is running on. + daemons: BTreeSet, + + errors: Vec, + build_result_sender: tokio::sync::oneshot::Sender>, + + log_subscribers: Vec, + + pending_build_results: BTreeSet, +} + struct RunningDataflow { name: Option, uuid: Uuid, @@ -1180,11 +1236,12 @@ async fn retrieve_logs( #[tracing::instrument(skip(daemon_connections, clock))] async fn build_dataflow( - session_id: BuildId, + build_id: BuildId, + session_id: SessionId, dataflow: Descriptor, git_sources: BTreeMap, prev_git_sources: BTreeMap, - working_dir: PathBuf, + local_working_dir: Option, clock: &HLC, uv: bool, daemon_connections: &mut DaemonConnections, @@ -1210,8 +1267,9 @@ async fn build_dataflow( ); let build_command = BuildDataflowNodes { + build_id, session_id, - working_dir: working_dir.clone(), + local_working_dir: local_working_dir.clone(), nodes: nodes.clone(), git_sources: git_sources_by_daemon .remove(&machine.as_ref()) @@ -1278,9 +1336,10 @@ async fn build_dataflow_on_machine( } async fn start_dataflow( - session_id: Option, + build_id: Option, + session_id: SessionId, dataflow: Descriptor, - working_dir: PathBuf, + local_working_dir: Option, name: Option, daemon_connections: &mut DaemonConnections, clock: &HLC, @@ -1291,9 +1350,10 @@ async fn start_dataflow( daemons, nodes, } = spawn_dataflow( + build_id, session_id, dataflow, - working_dir, + local_working_dir, daemon_connections, clock, uv, @@ -1388,6 +1448,12 @@ pub enum Event { DaemonExit { daemon_id: dora_message::common::DaemonId, }, + DataflowBuildResult { + build_id: BuildId, + session_id: SessionId, + daemon_id: DaemonId, + result: eyre::Result<()>, + }, DataflowSpawnResult { dataflow_id: uuid::Uuid, daemon_id: DaemonId, @@ -1417,6 +1483,7 @@ impl Event { Event::CtrlC => "CtrlC", Event::Log(_) => "Log", Event::DaemonExit { .. } => "DaemonExit", + Event::DataflowBuildResult { .. } => "DataflowBuildResult", Event::DataflowSpawnResult { .. } => "DataflowSpawnResult", } } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 39e17bca..41516aff 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -112,6 +112,21 @@ pub async fn handle_connection( break; } } + DaemonEvent::BuildResult { + build_id, + session_id, + result, + } => { + let event = Event::DataflowBuildResult { + build_id, + session_id, + daemon_id, + result: result.map_err(|err| eyre::eyre!(err)), + }; + if events_tx.send(event).await.is_err() { + break; + } + } DaemonEvent::SpawnResult { dataflow_id, result, diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 7de7ea91..b0ecda1f 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -10,7 +10,7 @@ use dora_message::{ daemon_to_coordinator::DaemonCoordinatorReply, descriptor::{Descriptor, ResolvedNode}, id::NodeId, - BuildId, + SessionId, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use itertools::Itertools; @@ -22,9 +22,10 @@ use uuid::{NoContext, Timestamp, Uuid}; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( - session_id: Option, + build_id: Option, + session_id: SessionId, dataflow: Descriptor, - working_dir: PathBuf, + local_working_dir: Option, daemon_connections: &mut DaemonConnections, clock: &HLC, uv: bool, @@ -42,9 +43,10 @@ pub(super) async fn spawn_dataflow( ); let spawn_command = SpawnDataflowNodes { + build_id, session_id, dataflow_id: uuid, - working_dir: working_dir.clone(), + local_working_dir: local_working_dir.clone(), nodes: nodes.clone(), dataflow_descriptor: dataflow.clone(), spawn_nodes, diff --git a/binaries/daemon/src/build/git.rs b/binaries/daemon/src/build/git.rs index bfe0dcb5..fcce5dc7 100644 --- a/binaries/daemon/src/build/git.rs +++ b/binaries/daemon/src/build/git.rs @@ -1,5 +1,5 @@ use crate::log::NodeBuildLogger; -use dora_message::{common::LogLevel, descriptor::GitRepoRev, BuildId, DataflowId}; +use dora_message::{common::LogLevel, descriptor::GitRepoRev, DataflowId, SessionId}; use eyre::{ContextCompat, WrapErr}; use git2::FetchOptions; use itertools::Itertools; @@ -15,7 +15,7 @@ pub struct GitManager { /// Directories that are currently in use by running dataflows. clones_in_use: BTreeMap>, /// Builds that are prepared, but not done yet. - prepared_builds: BTreeMap, + prepared_builds: BTreeMap, reuse_for: BTreeMap, } @@ -36,7 +36,7 @@ impl GitManager { prev_commit_hash: Option, target_dir: &Path, ) -> eyre::Result { - let clone_dir = Self::clone_dir_path(&target_dir, session_id, &repo_url, &commit_hash)?; + let clone_dir = Self::clone_dir_path(&target_dir, &repo_url, &commit_hash)?; if let Some(using) = self.clones_in_use.get(&clone_dir) { if !using.is_empty() { @@ -60,7 +60,7 @@ impl GitManager { } else if let Some(previous_commit_hash) = prev_commit_hash { // we might be able to update a previous clone let prev_clone_dir = - Self::clone_dir_path(&target_dir, session_id, &repo_url, &previous_commit_hash)?; + Self::clone_dir_path(&target_dir, &repo_url, &previous_commit_hash)?; if self .clones_in_use @@ -109,7 +109,7 @@ impl GitManager { .unwrap_or(false) } - pub fn clone_dir_ready(&self, session_id: BuildId, dir: &Path) -> bool { + pub fn clone_dir_ready(&self, session_id: SessionId, dir: &Path) -> bool { self.prepared_builds .get(&session_id) .map(|p| p.planned_clone_dirs.contains(dir)) @@ -117,7 +117,7 @@ impl GitManager { || dir.exists() } - pub fn register_ready_clone_dir(&mut self, session_id: BuildId, dir: PathBuf) -> bool { + pub fn register_ready_clone_dir(&mut self, session_id: SessionId, dir: PathBuf) -> bool { self.prepared_builds .entry(session_id) .or_default() @@ -127,13 +127,10 @@ impl GitManager { fn clone_dir_path( base_dir: &Path, - session_id: BuildId, repo_url: &Url, commit_hash: &String, ) -> eyre::Result { - let mut path = base_dir - .join(&session_id.to_string()) - .join(repo_url.host_str().context("git URL has no hostname")?); + let mut path = base_dir.join(repo_url.host_str().context("git URL has no hostname")?); path.extend(repo_url.path_segments().context("no path in git URL")?); let path = path.join(commit_hash); Ok(dunce::simplified(&path).to_owned()) diff --git a/binaries/daemon/src/build/mod.rs b/binaries/daemon/src/build/mod.rs index a96d4129..76013529 100644 --- a/binaries/daemon/src/build/mod.rs +++ b/binaries/daemon/src/build/mod.rs @@ -11,7 +11,7 @@ use dora_core::{ use dora_message::{ common::{GitSource, LogLevel, Timestamped}, descriptor::EnvValue, - BuildId, + SessionId, }; use eyre::Context; use tokio::sync::mpsc; @@ -22,8 +22,8 @@ mod git; #[derive(Clone)] pub struct Builder { - pub session_id: BuildId, - pub working_dir: PathBuf, + pub session_id: SessionId, + pub base_working_dir: PathBuf, pub daemon_tx: mpsc::Sender>, pub dataflow_descriptor: Descriptor, /// clock is required for generating timestamps when dropping messages early because queue is full @@ -44,7 +44,7 @@ impl Builder { let prepared_git = if let Some(GitSource { repo, commit_hash }) = git { let repo_url = Url::parse(&repo).context("failed to parse git repository URL")?; - let target_dir = self.working_dir.join("build"); + let target_dir = self.base_working_dir.join("git"); let prev_hash = prev_git.filter(|p| p.repo == repo).map(|p| p.commit_hash); let git_folder = git_manager.choose_clone_dir( self.session_id, @@ -79,7 +79,7 @@ impl Builder { dora_core::descriptor::CoreNodeKind::Custom(n) => { let node_working_dir = match git_folder { Some(git_folder) => git_folder.prepare(logger).await?, - None => self.working_dir.join(self.session_id.to_string()), + None => self.base_working_dir, }; if let Some(build) = &n.build { @@ -91,11 +91,17 @@ impl Builder { // run build commands for operator in &n.operators { if let Some(build) = &operator.config.build { - build_node(logger, &node.env, self.working_dir.clone(), build, self.uv) - .await?; + build_node( + logger, + &node.env, + self.base_working_dir.clone(), + build, + self.uv, + ) + .await?; } } - self.working_dir.clone() + self.base_working_dir.clone() } }; Ok(PreparedNode { diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 75496a63..b6dc24f6 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -12,7 +12,8 @@ use dora_core::{ }; use dora_message::{ common::{ - DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus, + DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause, + NodeExitStatus, }, coordinator_to_cli::DataflowResult, coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes}, @@ -24,7 +25,7 @@ use dora_message::{ descriptor::NodeSource, metadata::{self, ArrowTypeInfo}, node_to_daemon::{DynamicNodeEvent, Timestamped}, - DataflowId, + BuildId, DataflowId, SessionId, }; use dora_node_api::{arrow::datatypes::DataType, Parameter}; use eyre::{bail, eyre, Context, ContextCompat, Result}; @@ -38,6 +39,7 @@ use socket_stream_utils::socket_stream_send; use spawn::Spawner; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, + env::current_dir, future::Future, net::SocketAddr, path::{Path, PathBuf}, @@ -154,7 +156,13 @@ impl Daemon { .map(|_| ()) } - pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result { + pub async fn run_dataflow( + dataflow_path: &Path, + build_id: Option, + session_id: SessionId, + local_working_dir: Option, + uv: bool, + ) -> eyre::Result { let working_dir = dataflow_path .canonicalize() .context("failed to canonicalize dataflow path")? @@ -168,9 +176,10 @@ impl Daemon { let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext)); let spawn_command = SpawnDataflowNodes { + build_id, session_id, dataflow_id, - working_dir, + local_working_dir, spawn_nodes: nodes.keys().cloned().collect(), nodes, dataflow_descriptor: descriptor, @@ -419,21 +428,26 @@ impl Daemon { .await?; } }, - Event::BuildDataflowResult { session_id, result } => { + Event::BuildDataflowResult { + build_id, + session_id, + result, + } => { if let Some(connection) = &mut self.coordinator_connection { let msg = serde_json::to_vec(&Timestamped { inner: CoordinatorRequest::Event { daemon_id: self.daemon_id.clone(), - event: DaemonEvent::SpawnResult { - dataflow_id, + event: DaemonEvent::BuildResult { + build_id, + session_id, result: result.map_err(|err| format!("{err:?}")), }, }, timestamp: self.clock.new_timestamp(), })?; - socket_stream_send(connection, &msg) - .await - .wrap_err("failed to send Exit message to dora-coordinator")?; + socket_stream_send(connection, &msg).await.wrap_err( + "failed to send BuildDataflowResult message to dora-coordinator", + )?; } } Event::SpawnDataflowResult { @@ -451,9 +465,9 @@ impl Daemon { }, timestamp: self.clock.new_timestamp(), })?; - socket_stream_send(connection, &msg) - .await - .wrap_err("failed to send Exit message to dora-coordinator")?; + socket_stream_send(connection, &msg).await.wrap_err( + "failed to send SpawnDataflowResult message to dora-coordinator", + )?; } } } @@ -491,8 +505,9 @@ impl Daemon { ) -> eyre::Result { let status = match event { DaemonCoordinatorEvent::Build(BuildDataflowNodes { + build_id, session_id, - working_dir, + local_working_dir, nodes, git_sources, prev_git_sources, @@ -504,17 +519,12 @@ impl Daemon { dora_core::config::RemoteCommunicationConfig::Tcp => {} } - // Use the working directory if it exists, otherwise use the working directory where the daemon is spawned - let working_dir = if working_dir.exists() { - working_dir - } else { - std::env::current_dir().wrap_err("failed to get current working dir")? - }; + let base_working_dir = self.base_working_dir(local_working_dir, session_id)?; let result = self .build_dataflow( session_id, - working_dir, + base_working_dir, nodes, git_sources, prev_git_sources, @@ -538,6 +548,7 @@ impl Daemon { tokio::spawn(async move { let message = Timestamped { inner: Event::BuildDataflowResult { + build_id, session_id, result: result_task.await, }, @@ -557,9 +568,10 @@ impl Daemon { RunStatus::Continue } DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { + build_id, session_id, dataflow_id, - working_dir, + local_working_dir, nodes, dataflow_descriptor, spawn_nodes, @@ -569,18 +581,13 @@ impl Daemon { dora_core::config::RemoteCommunicationConfig::Tcp => {} } - // Use the working directory if it exists, otherwise use the working directory where the daemon is spawned - let working_dir = if working_dir.exists() { - working_dir - } else { - std::env::current_dir().wrap_err("failed to get current working dir")? - }; + let base_working_dir = self.base_working_dir(local_working_dir, session_id)?; let result = self .spawn_dataflow( - session_id, + build_id, dataflow_id, - working_dir, + base_working_dir, nodes, dataflow_descriptor, spawn_nodes, @@ -852,7 +859,7 @@ impl Daemon { async fn build_dataflow( &mut self, session_id: uuid::Uuid, - working_dir: PathBuf, + base_working_dir: PathBuf, nodes: BTreeMap, git_sources: BTreeMap, prev_git_sources: BTreeMap, @@ -862,7 +869,7 @@ impl Daemon { ) -> eyre::Result>> { let builder = build::Builder { session_id, - working_dir, + base_working_dir, daemon_tx: self.events_tx.clone(), dataflow_descriptor, clock: self.clock.clone(), @@ -945,9 +952,9 @@ impl Daemon { async fn spawn_dataflow( &mut self, - session_id: Option, - dataflow_id: uuid::Uuid, - working_dir: PathBuf, + build_id: Option, + dataflow_id: DataflowId, + base_working_dir: PathBuf, nodes: BTreeMap, dataflow_descriptor: Descriptor, spawn_nodes: BTreeSet, @@ -963,7 +970,8 @@ impl Daemon { RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor); let dataflow = match self.running.entry(dataflow_id) { std::collections::hash_map::Entry::Vacant(entry) => { - self.working_dir.insert(dataflow_id, working_dir.clone()); + self.working_dir + .insert(dataflow_id, base_working_dir.clone()); entry.insert(dataflow) } std::collections::hash_map::Entry::Occupied(_) => { @@ -1971,6 +1979,34 @@ impl Daemon { } Ok(RunStatus::Continue) } + + fn base_working_dir( + &self, + local_working_dir: Option, + session_id: Uuid, + ) -> eyre::Result { + match local_working_dir { + Some(working_dir) => { + // check that working directory exists + if working_dir.exists() { + Ok(working_dir) + } else { + bail!( + "working directory does not exist: {}", + working_dir.display(), + ) + } + } + None => { + // use subfolder of daemon working dir + let daemon_working_dir = + current_dir().context("failed to get daemon working dir")?; + Ok(daemon_working_dir + .join("_work") + .join(session_id.to_string())) + } + } + } } async fn set_up_event_stream( @@ -2445,7 +2481,8 @@ pub enum Event { result: Result, }, BuildDataflowResult { - session_id: Uuid, + build_id: BuildId, + session_id: SessionId, result: eyre::Result<()>, }, SpawnDataflowResult { @@ -2473,6 +2510,7 @@ impl Event { Event::SecondCtrlC => "SecondCtrlC", Event::DaemonError(_) => "DaemonError", Event::SpawnNodeResult { .. } => "SpawnNodeResult", + Event::BuildDataflowResult { .. } => "BuildDataflowResult", Event::SpawnDataflowResult { .. } => "SpawnDataflowResult", } } diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs index cdcacd31..6b7add12 100644 --- a/binaries/daemon/src/log.rs +++ b/binaries/daemon/src/log.rs @@ -8,7 +8,7 @@ use dora_core::{config::NodeId, uhlc}; use dora_message::{ common::{DaemonId, LogLevel, LogMessage, Timestamped}, daemon_to_coordinator::{CoordinatorRequest, DaemonEvent}, - BuildId, + SessionId, }; use eyre::Context; use tokio::net::TcpStream; @@ -95,7 +95,7 @@ impl<'a> DataflowLogger<'a> { } pub struct NodeBuildLogger<'a> { - session_id: BuildId, + session_id: SessionId, node_id: NodeId, logger: CowMut<'a, DaemonLogger>, } @@ -133,7 +133,7 @@ impl DaemonLogger { } } - pub fn for_node_build(&mut self, session_id: BuildId, node_id: NodeId) -> NodeBuildLogger { + pub fn for_node_build(&mut self, session_id: SessionId, node_id: NodeId) -> NodeBuildLogger { NodeBuildLogger { session_id, node_id, @@ -170,7 +170,7 @@ impl DaemonLogger { pub async fn log_build( &mut self, - session_id: BuildId, + session_id: SessionId, level: LogLevel, node_id: Option, message: impl Into, diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 130d43c1..d9e37e59 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -146,7 +146,6 @@ async fn start_dataflow( local_working_dir: working_dir, name: None, uv: false, - build_only: false, }, reply_sender, })) diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index 50e7258c..b1f2a7bb 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -6,26 +6,39 @@ use crate::{ common::GitSource, descriptor::Descriptor, id::{NodeId, OperatorId}, + SessionId, }; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequest { Build { + session_id: SessionId, dataflow: Descriptor, git_sources: BTreeMap, prev_git_sources: BTreeMap, - // TODO: remove this once we figure out deploying of node/operator - // binaries from CLI to coordinator/daemon - local_working_dir: PathBuf, + /// Allows overwriting the base working dir when CLI and daemon are + /// running on the same machine. + /// + /// Must not be used for multi-machine dataflows. + /// + /// Note that nodes with git sources still use a subdirectory of + /// the base working dir. + local_working_dir: Option, uv: bool, }, Start { - session_id: Option, + build_id: Option, + session_id: Uuid, dataflow: Descriptor, name: Option, - // TODO: remove this once we figure out deploying of node/operator - // binaries from CLI to coordinator/daemon - local_working_dir: PathBuf, + /// Allows overwriting the base working dir when CLI and daemon are + /// running on the same machine. + /// + /// Must not be used for multi-machine dataflows. + /// + /// Note that nodes with git sources still use a subdirectory of + /// the base working dir. + local_working_dir: Option, uv: bool, }, WaitForSpawn { diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index cf8976fe..5308fba9 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -5,14 +5,14 @@ use aligned_vec::{AVec, ConstAlign}; use eyre::Context as _; use uuid::Uuid; -use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId}; +use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, SessionId, DataflowId}; pub use log::Level as LogLevel; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[must_use] pub struct LogMessage { - pub session_id: Option, + pub session_id: Option, pub dataflow_id: Option, pub node_id: Option, pub daemon_id: Option, diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index fd0e733a..161a5d63 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -13,6 +13,7 @@ pub enum ControlRequestReply { session_id: Uuid, }, DataflowBuildFinished { + build_id: Uuid, session_id: Uuid, result: Result<(), String>, }, diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index 3f2e2500..95638084 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -10,7 +10,7 @@ use crate::{ common::{DaemonId, GitSource}, descriptor::{Descriptor, ResolvedNode}, id::{NodeId, OperatorId}, - DataflowId, + BuildId, DataflowId, SessionId, }; pub use crate::common::Timestamped; @@ -60,8 +60,16 @@ pub enum DaemonCoordinatorEvent { #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct BuildDataflowNodes { + pub build_id: Uuid, pub session_id: Uuid, - pub working_dir: PathBuf, + /// Allows overwriting the base working dir when CLI and daemon are + /// running on the same machine. + /// + /// Must not be used for multi-machine dataflows. + /// + /// Note that nodes with git sources still use a subdirectory of + /// the base working dir. + pub local_working_dir: Option, pub nodes: BTreeMap, pub git_sources: BTreeMap, pub prev_git_sources: BTreeMap, @@ -72,9 +80,17 @@ pub struct BuildDataflowNodes { #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct SpawnDataflowNodes { - pub session_id: Option, + pub build_id: Option, + pub session_id: SessionId, pub dataflow_id: DataflowId, - pub working_dir: PathBuf, + /// Allows overwriting the base working dir when CLI and daemon are + /// running on the same machine. + /// + /// Must not be used for multi-machine dataflows. + /// + /// Note that nodes with git sources still use a subdirectory of + /// the base working dir. + pub local_working_dir: Option, pub nodes: BTreeMap, pub dataflow_descriptor: Descriptor, pub spawn_nodes: BTreeSet, diff --git a/libraries/message/src/daemon_to_coordinator.rs b/libraries/message/src/daemon_to_coordinator.rs index 6e97e3ae..5ccdb174 100644 --- a/libraries/message/src/daemon_to_coordinator.rs +++ b/libraries/message/src/daemon_to_coordinator.rs @@ -3,7 +3,10 @@ use std::collections::BTreeMap; pub use crate::common::{ DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped, }; -use crate::{common::DaemonId, current_crate_version, id::NodeId, versions_compatible, DataflowId}; +use crate::{ + common::DaemonId, current_crate_version, id::NodeId, versions_compatible, BuildId, DataflowId, + SessionId, +}; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum CoordinatorRequest { @@ -46,6 +49,11 @@ impl DaemonRegisterRequest { #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum DaemonEvent { + BuildResult { + build_id: BuildId, + session_id: SessionId, + result: Result<(), String>, + }, SpawnResult { dataflow_id: DataflowId, result: Result<(), String>, diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index 3cca90ad..429b8217 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -26,6 +26,7 @@ pub use arrow_data; pub use arrow_schema; pub type DataflowId = uuid::Uuid; +pub type SessionId = uuid::Uuid; pub type BuildId = uuid::Uuid; fn current_crate_version() -> semver::Version {