Philipp Oppermann 7 months ago
parent
commit
baa163f335
Failed to extract signature
9 changed files with 66 additions and 24 deletions
  1. +1
    -0
      Cargo.lock
  2. +22
    -1
      binaries/coordinator/src/lib.rs
  3. +1
    -0
      binaries/daemon/Cargo.toml
  4. +14
    -7
      binaries/daemon/src/build/git.rs
  5. +5
    -5
      binaries/daemon/src/build/mod.rs
  6. +11
    -3
      binaries/daemon/src/lib.rs
  7. +4
    -1
      libraries/message/src/cli_to_coordinator.rs
  8. +6
    -0
      libraries/message/src/common.rs
  9. +2
    -7
      libraries/message/src/coordinator_to_daemon.rs

+ 1
- 0
Cargo.lock View File

@@ -3039,6 +3039,7 @@ dependencies = [
"futures",
"futures-concurrency",
"git2",
"itertools 0.14.0",
"serde_json",
"serde_yaml 0.8.26",
"shared-memory-server",


+ 22
- 1
binaries/coordinator/src/lib.rs View File

@@ -10,7 +10,7 @@ use dora_core::{
};
use dora_message::{
cli_to_coordinator::ControlRequest,
common::DaemonId,
common::{DaemonId, GitSource},
coordinator_to_cli::{
ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
DataflowStatus, LogLevel, LogMessage,
@@ -412,6 +412,8 @@ async fn start_inner(
match request {
ControlRequest::Build {
dataflow,
git_sources,
prev_git_sources,
local_working_dir,
uv,
} => {
@@ -421,6 +423,8 @@ async fn start_inner(
let result = build_dataflow(
build_id,
dataflow,
git_sources,
prev_git_sources,
local_working_dir,
&clock,
uv,
@@ -1178,6 +1182,8 @@ async fn retrieve_logs(
async fn build_dataflow(
build_id: BuildId,
dataflow: Descriptor,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
working_dir: PathBuf,
clock: &HLC,
uv: bool,
@@ -1185,6 +1191,15 @@ async fn build_dataflow(
) -> eyre::Result<()> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;

let mut git_sources_by_daemon = git_sources
.into_iter()
.into_grouping_map_by(|(id, _)| nodes.get(id).and_then(|n| n.deploy.machine.as_ref()))
.collect();
let mut prev_git_sources_by_daemon = prev_git_sources
.into_iter()
.into_grouping_map_by(|(id, _)| nodes.get(id).and_then(|n| n.deploy.machine.as_ref()))
.collect();

let nodes_by_daemon = nodes.values().into_group_map_by(|n| &n.deploy.machine);

let mut daemons = BTreeSet::new();
@@ -1198,6 +1213,12 @@ async fn build_dataflow(
build_id,
working_dir: working_dir.clone(),
nodes: nodes.clone(),
git_sources: git_sources_by_daemon
.remove(&machine.as_ref())
.unwrap_or_default(),
prev_git_sources: prev_git_sources_by_daemon
.remove(&machine.as_ref())
.unwrap_or_default(),
dataflow_descriptor: dataflow.clone(),
nodes_on_machine,
uv,


+ 1
- 0
binaries/daemon/Cargo.toml View File

@@ -47,3 +47,4 @@ zenoh = "1.1.1"
url = "2.5.4"
git2 = { version = "0.18.0", features = ["vendored-openssl"] }
dunce = "1.0.5"
itertools = "0.14"

+ 14
- 7
binaries/daemon/src/build/git.rs View File

@@ -2,6 +2,7 @@ use crate::log::NodeBuildLogger;
use dora_message::{common::LogLevel, descriptor::GitRepoRev, BuildId, DataflowId};
use eyre::{ContextCompat, WrapErr};
use git2::FetchOptions;
use itertools::Itertools;
use std::{
collections::{BTreeMap, BTreeSet},
path::{Path, PathBuf},
@@ -30,17 +31,23 @@ impl GitManager {
pub fn choose_clone_dir(
&mut self,
build_id: uuid::Uuid,
previous_build: Option<uuid::Uuid>,
repo_url: Url,
commit_hash: String,
prev_commit_hash: Option<String>,
target_dir: &Path,
) -> eyre::Result<GitFolder> {
let clone_dir = Self::clone_dir_path(&target_dir, build_id, &repo_url, &commit_hash)?;

if self.clones_in_use.contains_key(&clone_dir) {
// The directory is currently in use by another dataflow. This should never
// happen as we generate a new build ID on every build.
eyre::bail!("clone_dir is already in use by other dataflow")
if let Some(using) = self.clones_in_use.get(&clone_dir) {
if !using.is_empty() {
// The directory is currently in use by another dataflow. Rebuilding
// while a dataflow is running could lead to unintended behavior.
eyre::bail!(
"the build directory is still in use by the following \
dataflows, please stop them before rebuilding: {}",
using.iter().join(", ")
)
}
}

let reuse = if self.clone_dir_ready(build_id, &clone_dir) {
@@ -50,10 +57,10 @@ impl GitManager {
ReuseOptions::Reuse {
dir: clone_dir.clone(),
}
} else if let Some(previous_build_id) = previous_build {
} else if let Some(previous_commit_hash) = prev_commit_hash {
// we might be able to update a previous clone
let prev_clone_dir =
Self::clone_dir_path(&target_dir, previous_build_id, &repo_url, &commit_hash)?;
Self::clone_dir_path(&target_dir, build_id, &repo_url, &previous_commit_hash)?;

if self
.clones_in_use


+ 5
- 5
binaries/daemon/src/build/mod.rs View File

@@ -9,8 +9,7 @@ use dora_core::{
uhlc::HLC,
};
use dora_message::{
common::{LogLevel, Timestamped},
coordinator_to_daemon::GitSource,
common::{GitSource, LogLevel, Timestamped},
descriptor::EnvValue,
BuildId,
};
@@ -24,7 +23,6 @@ mod git;
#[derive(Clone)]
pub struct Builder {
pub build_id: BuildId,
pub prev_build_id: Option<BuildId>,
pub working_dir: PathBuf,
pub daemon_tx: mpsc::Sender<Timestamped<Event>>,
pub dataflow_descriptor: Descriptor,
@@ -38,6 +36,7 @@ impl Builder {
self,
node: ResolvedNode,
git: Option<GitSource>,
prev_git: Option<GitSource>,
logger: &mut NodeBuildLogger<'_>,
git_manager: &mut GitManager,
) -> eyre::Result<impl Future<Output = eyre::Result<PreparedNode>>> {
@@ -46,11 +45,12 @@ impl Builder {
let prepared_git = if let Some(GitSource { repo, commit_hash }) = git {
let repo_url = Url::parse(&repo).context("failed to parse git repository URL")?;
let target_dir = self.working_dir.join("build");
let prev_hash = prev_git.filter(|p| p.repo == repo).map(|p| p.commit_hash);
let git_folder = git_manager.choose_clone_dir(
self.build_id,
self.prev_build_id,
repo_url,
commit_hash.clone(),
commit_hash,
prev_hash,
&target_dir,
)?;
Some(git_folder)


+ 11
- 3
binaries/daemon/src/lib.rs View File

@@ -480,6 +480,7 @@ impl Daemon {
working_dir,
nodes,
git_sources,
prev_git_sources,
dataflow_descriptor,
nodes_on_machine,
uv,
@@ -501,6 +502,7 @@ impl Daemon {
working_dir,
nodes,
git_sources,
prev_git_sources,
dataflow_descriptor,
nodes_on_machine,
uv,
@@ -835,17 +837,16 @@ impl Daemon {
async fn build_dataflow(
&mut self,
build_id: uuid::Uuid,
prev_build_id: Option<uuid::Uuid>,
working_dir: PathBuf,
nodes: BTreeMap<NodeId, ResolvedNode>,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
dataflow_descriptor: Descriptor,
local_nodes: BTreeSet<NodeId>,
uv: bool,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
let builder = build::Builder {
build_id,
prev_build_id,
working_dir,
daemon_tx: self.events_tx.clone(),
dataflow_descriptor,
@@ -863,10 +864,17 @@ impl Daemon {
let mut logger = self.logger.for_node_build(build_id, node_id.clone());
logger.log(LogLevel::Info, "building").await;
let git_source = git_sources.get(&node_id).cloned();
let prev_git_source = prev_git_sources.get(&node_id).cloned();

match builder
.clone()
.prepare_node(node, git_source, &mut logger, &mut self.git_manager)
.prepare_node(
node,
git_source,
prev_git_source,
&mut logger,
&mut self.git_manager,
)
.await
.wrap_err_with(|| format!("failed to build node `{node_id}`"))
{


+ 4
- 1
libraries/message/src/cli_to_coordinator.rs View File

@@ -1,8 +1,9 @@
use std::{path::PathBuf, time::Duration};
use std::{collections::BTreeMap, path::PathBuf, time::Duration};

use uuid::Uuid;

use crate::{
common::GitSource,
descriptor::Descriptor,
id::{NodeId, OperatorId},
};
@@ -11,6 +12,8 @@ use crate::{
pub enum ControlRequest {
Build {
dataflow: Descriptor,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
// TODO: remove this once we figure out deploying of node/operator
// binaries from CLI to coordinator/daemon
local_working_dir: PathBuf,


+ 6
- 0
libraries/message/src/common.rs View File

@@ -240,3 +240,9 @@ impl std::fmt::Display for DaemonId {
write!(f, "{}", self.uuid)
}
}

#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
pub struct GitSource {
pub repo: String,
pub commit_hash: String,
}

+ 2
- 7
libraries/message/src/coordinator_to_daemon.rs View File

@@ -7,7 +7,7 @@ use std::{
use uuid::Uuid;

use crate::{
common::DaemonId,
common::{DaemonId, GitSource},
descriptor::{Descriptor, ResolvedNode},
id::{NodeId, OperatorId},
DataflowId,
@@ -64,17 +64,12 @@ pub struct BuildDataflowNodes {
pub working_dir: PathBuf,
pub nodes: BTreeMap<NodeId, ResolvedNode>,
pub git_sources: BTreeMap<NodeId, GitSource>,
pub prev_git_sources: BTreeMap<NodeId, GitSource>,
pub dataflow_descriptor: Descriptor,
pub nodes_on_machine: BTreeSet<NodeId>,
pub uv: bool,
}

#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
pub struct GitSource {
pub repo: String,
pub commit_hash: String,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnDataflowNodes {
pub build_id: Option<Uuid>,


Loading…
Cancel
Save