diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 827e670a..d4304281 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -744,7 +744,6 @@ impl Daemon { dataflow_descriptor, clock: self.clock.clone(), uv, - repos_in_use: &mut self.repos_in_use, }; // spawn nodes and set up subscriptions @@ -768,7 +767,12 @@ impl Daemon { .log(LogLevel::Info, Some("daemon".into()), "spawning") .await; match spawner - .spawn_node(node, node_stderr_most_recent, &mut logger) + .spawn_node( + node, + node_stderr_most_recent, + &mut logger, + &mut self.repos_in_use, + ) .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) { diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index f2100681..a44b3fdc 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -45,7 +45,8 @@ use tokio::{ use tracing::error; use url::Url; -pub struct Spawner<'a> { +#[derive(Clone)] +pub struct Spawner { pub dataflow_id: DataflowId, pub working_dir: PathBuf, pub daemon_tx: mpsc::Sender>, @@ -53,15 +54,15 @@ pub struct Spawner<'a> { /// clock is required for generating timestamps when dropping messages early because queue is full pub clock: Arc, pub uv: bool, - pub repos_in_use: &'a mut BTreeMap>, } -impl Spawner<'_> { +impl Spawner { pub async fn spawn_node( &mut self, node: ResolvedNode, node_stderr_most_recent: Arc>, logger: &mut NodeLogger<'_>, + repos_in_use: &mut BTreeMap>, ) -> eyre::Result { let dataflow_id = self.dataflow_id; let node_id = node.id.clone(); @@ -111,7 +112,7 @@ impl Spawner<'_> { .await? } dora_message::descriptor::NodeSource::GitBranch { repo, rev } => { - self.spawn_git_node(&n, repo, rev, logger, &node.env) + self.spawn_git_node(&n, repo, rev, logger, &node.env, repos_in_use) .await? } }; @@ -518,6 +519,7 @@ impl Spawner<'_> { rev: &Option, logger: &mut NodeLogger<'_>, node_env: &Option>, + repos_in_use: &mut BTreeMap>, ) -> Result, eyre::Error> { let dataflow_id = self.dataflow_id; let repo_url = Url::parse(repo_addr).context("failed to parse git repository URL")?; @@ -546,7 +548,8 @@ impl Spawner<'_> { } }; let clone_dir = if clone_dir_base.exists() { - let used_by_other_dataflow = self.used_by_other_dataflow(dataflow_id, &clone_dir_base); + let used_by_other_dataflow = + self.used_by_other_dataflow(dataflow_id, &clone_dir_base, repos_in_use); if used_by_other_dataflow { // don't reuse, choose new directory // (TODO reuse if still up to date) @@ -555,7 +558,9 @@ impl Spawner<'_> { let mut i = 1; loop { let new_path = clone_dir_base.with_file_name(format!("{dir_name}-{i}")); - if new_path.exists() && self.used_by_other_dataflow(dataflow_id, &new_path) { + if new_path.exists() + && self.used_by_other_dataflow(dataflow_id, &new_path, repos_in_use) + { i += 1; } else { break new_path; @@ -571,13 +576,13 @@ impl Spawner<'_> { if clone_dir.exists() { let empty = BTreeSet::new(); - let in_use = self.repos_in_use.get(&clone_dir).unwrap_or(&empty); + let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty); let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); if used_by_other_dataflow { // TODO allow if still up to date eyre::bail!("clone_dir is already in use by other dataflow") } else { - self.repos_in_use + repos_in_use .entry(clone_dir.clone()) .or_default() .insert(dataflow_id); @@ -594,7 +599,7 @@ impl Spawner<'_> { checkout_tree(&repository, refname)?; } } else { - self.repos_in_use + repos_in_use .entry(clone_dir.clone()) .or_default() .insert(dataflow_id); @@ -636,9 +641,10 @@ impl Spawner<'_> { &mut self, dataflow_id: uuid::Uuid, clone_dir_base: &PathBuf, + repos_in_use: &mut BTreeMap>, ) -> bool { let empty = BTreeSet::new(); - let in_use = self.repos_in_use.get(clone_dir_base).unwrap_or(&empty); + let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty); let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); used_by_other_dataflow }