From baa163f335a00aee91b5fbc0c54c498af1516998 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 4 Jun 2025 16:10:50 +0200 Subject: [PATCH] Wip --- Cargo.lock | 1 + binaries/coordinator/src/lib.rs | 23 ++++++++++++++++++- binaries/daemon/Cargo.toml | 1 + binaries/daemon/src/build/git.rs | 21 +++++++++++------ binaries/daemon/src/build/mod.rs | 10 ++++---- binaries/daemon/src/lib.rs | 14 ++++++++--- libraries/message/src/cli_to_coordinator.rs | 5 +++- libraries/message/src/common.rs | 6 +++++ .../message/src/coordinator_to_daemon.rs | 9 ++------ 9 files changed, 66 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 919ab7e4..b89617ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3039,6 +3039,7 @@ dependencies = [ "futures", "futures-concurrency", "git2", + "itertools 0.14.0", "serde_json", "serde_yaml 0.8.26", "shared-memory-server", diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index e7ed3d83..da431dd5 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -10,7 +10,7 @@ use dora_core::{ }; use dora_message::{ cli_to_coordinator::ControlRequest, - common::DaemonId, + common::{DaemonId, GitSource}, coordinator_to_cli::{ ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult, DataflowStatus, LogLevel, LogMessage, @@ -412,6 +412,8 @@ async fn start_inner( match request { ControlRequest::Build { dataflow, + git_sources, + prev_git_sources, local_working_dir, uv, } => { @@ -421,6 +423,8 @@ async fn start_inner( let result = build_dataflow( build_id, dataflow, + git_sources, + prev_git_sources, local_working_dir, &clock, uv, @@ -1178,6 +1182,8 @@ async fn retrieve_logs( async fn build_dataflow( build_id: BuildId, dataflow: Descriptor, + git_sources: BTreeMap, + prev_git_sources: BTreeMap, working_dir: PathBuf, clock: &HLC, uv: bool, @@ -1185,6 +1191,15 @@ async fn build_dataflow( ) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; + let mut git_sources_by_daemon = git_sources + .into_iter() + .into_grouping_map_by(|(id, _)| nodes.get(id).and_then(|n| n.deploy.machine.as_ref())) + .collect(); + let mut prev_git_sources_by_daemon = prev_git_sources + .into_iter() + .into_grouping_map_by(|(id, _)| nodes.get(id).and_then(|n| n.deploy.machine.as_ref())) + .collect(); + let nodes_by_daemon = nodes.values().into_group_map_by(|n| &n.deploy.machine); let mut daemons = BTreeSet::new(); @@ -1198,6 +1213,12 @@ async fn build_dataflow( build_id, working_dir: working_dir.clone(), nodes: nodes.clone(), + git_sources: git_sources_by_daemon + .remove(&machine.as_ref()) + .unwrap_or_default(), + prev_git_sources: prev_git_sources_by_daemon + .remove(&machine.as_ref()) + .unwrap_or_default(), dataflow_descriptor: dataflow.clone(), nodes_on_machine, uv, diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 6b9f7381..6dd2a2c3 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -47,3 +47,4 @@ zenoh = "1.1.1" url = "2.5.4" git2 = { version = "0.18.0", features = ["vendored-openssl"] } dunce = "1.0.5" +itertools = "0.14" diff --git a/binaries/daemon/src/build/git.rs b/binaries/daemon/src/build/git.rs index 91bb2922..0e6dbbe7 100644 --- a/binaries/daemon/src/build/git.rs +++ b/binaries/daemon/src/build/git.rs @@ -2,6 +2,7 @@ use crate::log::NodeBuildLogger; use dora_message::{common::LogLevel, descriptor::GitRepoRev, BuildId, DataflowId}; use eyre::{ContextCompat, WrapErr}; use git2::FetchOptions; +use itertools::Itertools; use std::{ collections::{BTreeMap, BTreeSet}, path::{Path, PathBuf}, @@ -30,17 +31,23 @@ impl GitManager { pub fn choose_clone_dir( &mut self, build_id: uuid::Uuid, - previous_build: Option, repo_url: Url, commit_hash: String, + prev_commit_hash: Option, target_dir: &Path, ) -> eyre::Result { let clone_dir = Self::clone_dir_path(&target_dir, build_id, &repo_url, &commit_hash)?; - if self.clones_in_use.contains_key(&clone_dir) { - // The directory is currently in use by another dataflow. This should never - // happen as we generate a new build ID on every build. - eyre::bail!("clone_dir is already in use by other dataflow") + if let Some(using) = self.clones_in_use.get(&clone_dir) { + if !using.is_empty() { + // The directory is currently in use by another dataflow. Rebuilding + // while a dataflow is running could lead to unintended behavior. + eyre::bail!( + "the build directory is still in use by the following \ + dataflows, please stop them before rebuilding: {}", + using.iter().join(", ") + ) + } } let reuse = if self.clone_dir_ready(build_id, &clone_dir) { @@ -50,10 +57,10 @@ impl GitManager { ReuseOptions::Reuse { dir: clone_dir.clone(), } - } else if let Some(previous_build_id) = previous_build { + } else if let Some(previous_commit_hash) = prev_commit_hash { // we might be able to update a previous clone let prev_clone_dir = - Self::clone_dir_path(&target_dir, previous_build_id, &repo_url, &commit_hash)?; + Self::clone_dir_path(&target_dir, build_id, &repo_url, &previous_commit_hash)?; if self .clones_in_use diff --git a/binaries/daemon/src/build/mod.rs b/binaries/daemon/src/build/mod.rs index 6aa1df34..0df5d977 100644 --- a/binaries/daemon/src/build/mod.rs +++ b/binaries/daemon/src/build/mod.rs @@ -9,8 +9,7 @@ use dora_core::{ uhlc::HLC, }; use dora_message::{ - common::{LogLevel, Timestamped}, - coordinator_to_daemon::GitSource, + common::{GitSource, LogLevel, Timestamped}, descriptor::EnvValue, BuildId, }; @@ -24,7 +23,6 @@ mod git; #[derive(Clone)] pub struct Builder { pub build_id: BuildId, - pub prev_build_id: Option, pub working_dir: PathBuf, pub daemon_tx: mpsc::Sender>, pub dataflow_descriptor: Descriptor, @@ -38,6 +36,7 @@ impl Builder { self, node: ResolvedNode, git: Option, + prev_git: Option, logger: &mut NodeBuildLogger<'_>, git_manager: &mut GitManager, ) -> eyre::Result>> { @@ -46,11 +45,12 @@ impl Builder { let prepared_git = if let Some(GitSource { repo, commit_hash }) = git { let repo_url = Url::parse(&repo).context("failed to parse git repository URL")?; let target_dir = self.working_dir.join("build"); + let prev_hash = prev_git.filter(|p| p.repo == repo).map(|p| p.commit_hash); let git_folder = git_manager.choose_clone_dir( self.build_id, - self.prev_build_id, repo_url, - commit_hash.clone(), + commit_hash, + prev_hash, &target_dir, )?; Some(git_folder) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 11690984..1f7fe82c 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -480,6 +480,7 @@ impl Daemon { working_dir, nodes, git_sources, + prev_git_sources, dataflow_descriptor, nodes_on_machine, uv, @@ -501,6 +502,7 @@ impl Daemon { working_dir, nodes, git_sources, + prev_git_sources, dataflow_descriptor, nodes_on_machine, uv, @@ -835,17 +837,16 @@ impl Daemon { async fn build_dataflow( &mut self, build_id: uuid::Uuid, - prev_build_id: Option, working_dir: PathBuf, nodes: BTreeMap, git_sources: BTreeMap, + prev_git_sources: BTreeMap, dataflow_descriptor: Descriptor, local_nodes: BTreeSet, uv: bool, ) -> eyre::Result>> { let builder = build::Builder { build_id, - prev_build_id, working_dir, daemon_tx: self.events_tx.clone(), dataflow_descriptor, @@ -863,10 +864,17 @@ impl Daemon { let mut logger = self.logger.for_node_build(build_id, node_id.clone()); logger.log(LogLevel::Info, "building").await; let git_source = git_sources.get(&node_id).cloned(); + let prev_git_source = prev_git_sources.get(&node_id).cloned(); match builder .clone() - .prepare_node(node, git_source, &mut logger, &mut self.git_manager) + .prepare_node( + node, + git_source, + prev_git_source, + &mut logger, + &mut self.git_manager, + ) .await .wrap_err_with(|| format!("failed to build node `{node_id}`")) { diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index d2019aa9..c77cc6ee 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -1,8 +1,9 @@ -use std::{path::PathBuf, time::Duration}; +use std::{collections::BTreeMap, path::PathBuf, time::Duration}; use uuid::Uuid; use crate::{ + common::GitSource, descriptor::Descriptor, id::{NodeId, OperatorId}, }; @@ -11,6 +12,8 @@ use crate::{ pub enum ControlRequest { Build { dataflow: Descriptor, + git_sources: BTreeMap, + prev_git_sources: BTreeMap, // TODO: remove this once we figure out deploying of node/operator // binaries from CLI to coordinator/daemon local_working_dir: PathBuf, diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 7fb8a41c..83591811 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -240,3 +240,9 @@ impl std::fmt::Display for DaemonId { write!(f, "{}", self.uuid) } } + +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] +pub struct GitSource { + pub repo: String, + pub commit_hash: String, +} diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index edadc776..0becd47e 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -7,7 +7,7 @@ use std::{ use uuid::Uuid; use crate::{ - common::DaemonId, + common::{DaemonId, GitSource}, descriptor::{Descriptor, ResolvedNode}, id::{NodeId, OperatorId}, DataflowId, @@ -64,17 +64,12 @@ pub struct BuildDataflowNodes { pub working_dir: PathBuf, pub nodes: BTreeMap, pub git_sources: BTreeMap, + pub prev_git_sources: BTreeMap, pub dataflow_descriptor: Descriptor, pub nodes_on_machine: BTreeSet, pub uv: bool, } -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] -pub struct GitSource { - pub repo: String, - pub commit_hash: String, -} - #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct SpawnDataflowNodes { pub build_id: Option,