From 17f75c04d588e5888f840c455eecaffb1df257d8 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 25 Apr 2025 16:29:07 +0200 Subject: [PATCH] Wait until all nodes on daemon have been built before spawning --- binaries/daemon/src/lib.rs | 161 +++++++--- binaries/daemon/src/spawn.rs | 599 +++++++++++++++++++---------------- 2 files changed, 452 insertions(+), 308 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 582a018c..7abe7ec5 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -772,7 +772,12 @@ impl Daemon { uv: bool, build_only: bool, ) -> eyre::Result>> { - let mut logger = self.logger.for_dataflow(dataflow_id); + let mut logger = self + .logger + .for_dataflow(dataflow_id) + .try_clone() + .await + .context("failed to clone logger")?; let dataflow = RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor); let dataflow = match self.running.entry(dataflow_id) { @@ -857,7 +862,7 @@ impl Daemon { .await; match spawner .clone() - .spawn_node( + .prepare_node( node, node_stderr_most_recent, &mut logger, @@ -867,39 +872,7 @@ impl Daemon { .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) { Ok(result) => { - let events_tx = self.events_tx.clone(); - let clock = self.clock.clone(); - tasks.push(async move { - let (node_spawn_result, success) = match result.await { - Ok(node) => (Ok(node), Ok(())), - Err(err) => { - let node_err = NodeError { - timestamp: clock.new_timestamp(), - cause: NodeErrorCause::Other { - stderr: format!("spawn failed: {err:?}"), - }, - exit_status: NodeExitStatus::Unknown, - }; - (Err(node_err), Err(err)) - } - }; - let send_result = events_tx - .send(Timestamped { - inner: Event::SpawnNodeResult { - dataflow_id, - node_id, - result: node_spawn_result, - }, - timestamp: clock.new_timestamp(), - }) - .await; - if send_result.is_err() { - tracing::error!( - "failed to send SpawnNodeResult to main daemon task" - ) - } - success - }); + tasks.push((node_id, result)); } Err(err) => { logger @@ -980,16 +953,122 @@ impl Daemon { self.handle_node_stop(dataflow_id, &node_id).await?; } - let spawn_result = async move { - for task in tasks { - task.await?; - } - Ok(()) - }; + let spawn_result = Self::spawn_prepared_nodes( + dataflow_id, + logger, + tasks, + self.events_tx.clone(), + self.clock.clone(), + ); Ok(spawn_result) } + async fn spawn_prepared_nodes( + dataflow_id: Uuid, + mut logger: DataflowLogger<'_>, + tasks: Vec<( + NodeId, + impl Future>, + )>, + events_tx: mpsc::Sender>, + clock: Arc, + ) -> eyre::Result<()> { + let node_result = |node_id, result| Timestamped { + inner: Event::SpawnNodeResult { + dataflow_id, + node_id, + result, + }, + timestamp: clock.new_timestamp(), + }; + let mut failed_to_prepare = None; + let mut prepared_nodes = Vec::new(); + for (node_id, task) in tasks { + match task.await { + Ok(node) => prepared_nodes.push(node), + Err(err) => { + if failed_to_prepare.is_none() { + failed_to_prepare = Some(node_id.clone()); + } + let node_err: NodeError = NodeError { + timestamp: clock.new_timestamp(), + cause: NodeErrorCause::Other { + stderr: format!("preparing for spawn failed: {err:?}"), + }, + exit_status: NodeExitStatus::Unknown, + }; + let send_result = events_tx.send(node_result(node_id, Err(node_err))).await; + if send_result.is_err() { + tracing::error!("failed to send SpawnNodeResult to main daemon task") + } + } + } + } + + // once all nodes are prepared, do the actual spawning + if let Some(failed_node) = failed_to_prepare { + // don't spawn any nodes when an error occurred before + for node in prepared_nodes { + let err = NodeError { + timestamp: clock.new_timestamp(), + cause: NodeErrorCause::Cascading { + caused_by_node: failed_node.clone(), + }, + exit_status: NodeExitStatus::Unknown, + }; + let send_result = events_tx + .send(node_result(node.node_id().clone(), Err(err))) + .await; + if send_result.is_err() { + tracing::error!("failed to send SpawnNodeResult to main daemon task") + } + } + Err(eyre!("failed to prepare node {failed_node}")) + } else { + let mut spawn_result = Ok(()); + + logger + .log( + LogLevel::Info, + None, + Some("dora daemon".into()), + "finished building nodes, spawning...", + ) + .await; + + // spawn the nodes + for node in prepared_nodes { + let node_id = node.node_id().clone(); + let mut logger = logger.reborrow().for_node(node_id.clone()); + let result = node.spawn(&mut logger).await; + let node_spawn_result = match result { + Ok(node) => Ok(node), + Err(err) => { + let node_err = NodeError { + timestamp: clock.new_timestamp(), + cause: NodeErrorCause::Other { + stderr: format!("spawn failed: {err:?}"), + }, + exit_status: NodeExitStatus::Unknown, + }; + if spawn_result.is_ok() { + spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}"))); + } + Err(node_err) + } + }; + let send_result = events_tx + .send(node_result(node_id, node_spawn_result)) + .await; + if send_result.is_err() { + tracing::error!("failed to send SpawnNodeResult to main daemon task") + } + } + spawn_result + } + } + async fn handle_dynamic_node_event( &mut self, event: DynamicNodeEventWrapper, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index b066975d..f47233d8 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -22,6 +22,7 @@ use dora_message::{ daemon_to_coordinator::{DataMessage, NodeExitStatus, Timestamped}, daemon_to_node::{NodeConfig, RuntimeConfig}, descriptor::{EnvValue, GitRepoRev}, + id::NodeId, DataflowId, }; use dora_node_api::{ @@ -60,13 +61,13 @@ pub struct Spawner { } impl Spawner { - pub async fn spawn_node( + pub async fn prepare_node( mut self, node: ResolvedNode, node_stderr_most_recent: Arc>, logger: &mut NodeLogger<'_>, repos_in_use: &mut BTreeMap>, - ) -> eyre::Result>> { + ) -> eyre::Result>> { let dataflow_id = self.dataflow_id; let node_id = node.id.clone(); logger @@ -115,7 +116,7 @@ impl Spawner { .await .wrap_err("failed to clone logger")?; let task = async move { - self.spawn_node_inner( + self.prepare_node_inner( node, &mut logger, dataflow_id, @@ -128,22 +129,18 @@ impl Spawner { Ok(task) } - async fn spawn_node_inner( - &mut self, + async fn prepare_node_inner( + mut self, node: ResolvedNode, logger: &mut NodeLogger<'_>, dataflow_id: uuid::Uuid, node_config: NodeConfig, prepared_git: Option, node_stderr_most_recent: Arc>, - ) -> Result { - let send_stdout_to = node - .send_stdout_as() - .context("Could not resolve `send_stdout_as` configuration")?; - - let mut child = match node.kind { + ) -> eyre::Result { + let (command, error_msg) = match &node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { - let command = match &n.source { + let mut command = match &n.source { dora_message::descriptor::NodeSource::Local => { if let Some(build) = &n.build { self.build_node(logger, &node.env, self.working_dir.clone(), build) @@ -152,63 +149,71 @@ impl Spawner { if self.build_only { None } else { - spawn_command_from_path(&self.working_dir, self.uv, logger, &n, true) - .await? + path_spawn_command(&self.working_dir, self.uv, logger, &n, true).await? } } dora_message::descriptor::NodeSource::GitBranch { repo, rev } => { - self.spawn_git_node(&n, repo, rev, logger, &node.env, prepared_git.unwrap()) - .await? + self.git_node_spawn_command( + &n, + repo, + rev, + logger, + &node.env, + prepared_git.unwrap(), + ) + .await? } }; - let Some(mut command) = command else { - return Ok(RunningNode { - pid: None, - node_config, - }); - }; + if let Some(command) = &mut command { + command.current_dir(&self.working_dir); + command.stdin(Stdio::null()); + + command.env( + "DORA_NODE_CONFIG", + serde_yaml::to_string(&node_config.clone()) + .wrap_err("failed to serialize node config")?, + ); + // Injecting the env variable defined in the `yaml` into + // the node runtime. + if let Some(envs) = &node.env { + for (key, value) in envs { + command.env(key, value.to_string()); + } + } + if let Some(envs) = &n.envs { + // node has some inner env variables -> add them too + for (key, value) in envs { + command.env(key, value.to_string()); + } + } - command.current_dir(&self.working_dir); - command.stdin(Stdio::null()); + // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C + #[cfg(unix)] + command.process_group(0); - command.env( - "DORA_NODE_CONFIG", - serde_yaml::to_string(&node_config.clone()) - .wrap_err("failed to serialize node config")?, + command.env("PYTHONUNBUFFERED", "1"); + command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + }; + + let error_msg = format!( + "failed to run `{}` with args `{}`", + n.path, + n.args.as_deref().unwrap_or_default(), ); - // Injecting the env variable defined in the `yaml` into - // the node runtime. - if let Some(envs) = node.env { - for (key, value) in envs { - command.env(key, value.to_string()); - } - } - if let Some(envs) = n.envs { - // node has some inner env variables -> add them too - for (key, value) in envs { - command.env(key, value.to_string()); + (command, error_msg) + } + dora_core::descriptor::CoreNodeKind::Runtime(n) => { + // run build commands + for operator in &n.operators { + if let Some(build) = &operator.config.build { + self.build_node(logger, &node.env, self.working_dir.clone(), build) + .await?; } } - // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C - #[cfg(unix)] - command.process_group(0); - - command.env("PYTHONUNBUFFERED", "1"); - command - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .wrap_err_with(move || { - format!( - "failed to run `{}` with args `{}`", - n.path, - n.args.as_deref().unwrap_or_default(), - ) - })? - } - dora_core::descriptor::CoreNodeKind::Runtime(n) => { let python_operators: Vec<&OperatorDefinition> = n .operators .iter() @@ -220,7 +225,9 @@ impl Spawner { .iter() .any(|x| !matches!(x.config.source, OperatorSource::Python { .. })); - let mut command = if !python_operators.is_empty() && !other_operators { + let mut command = if self.build_only { + None + } else if !python_operators.is_empty() && !other_operators { // Use python to spawn runtime if there is a python operator // TODO: Handle multi-operator runtime once sub-interpreter is supported @@ -253,7 +260,7 @@ impl Spawner { "-c", format!("import dora; dora.start_runtime() # {}", node.id).as_str(), ]); - command + Some(command) } else { let mut cmd = if self.uv { let mut cmd = tokio::process::Command::new("uv"); @@ -279,7 +286,7 @@ impl Spawner { "-c", format!("import dora; dora.start_runtime() # {}", node.id).as_str(), ]); - cmd + Some(cmd) } } else if python_operators.is_empty() && other_operators { let mut cmd = tokio::process::Command::new( @@ -287,41 +294,256 @@ impl Spawner { .wrap_err("failed to get current executable path")?, ); cmd.arg("runtime"); - cmd + Some(cmd) } else { eyre::bail!("Runtime can not mix Python Operator with other type of operator."); }; - command.current_dir(&self.working_dir); let runtime_config = RuntimeConfig { node: node_config.clone(), - operators: n.operators, + operators: n.operators.clone(), }; - command.env( - "DORA_RUNTIME_CONFIG", - serde_yaml::to_string(&runtime_config) - .wrap_err("failed to serialize runtime config")?, + + if let Some(command) = &mut command { + command.current_dir(&self.working_dir); + + command.env( + "DORA_RUNTIME_CONFIG", + serde_yaml::to_string(&runtime_config) + .wrap_err("failed to serialize runtime config")?, + ); + // Injecting the env variable defined in the `yaml` into + // the node runtime. + if let Some(envs) = &node.env { + for (key, value) in envs { + command.env(key, value.to_string()); + } + } + // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C + #[cfg(unix)] + command.process_group(0); + + command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + }; + let error_msg = format!( + "failed to run runtime {}/{}", + runtime_config.node.dataflow_id, runtime_config.node.node_id ); - // Injecting the env variable defined in the `yaml` into - // the node runtime. - if let Some(envs) = node.env { - for (key, value) in envs { - command.env(key, value.to_string()); + (command, error_msg) + } + }; + Ok(PreparedNode { + command, + spawn_error_msg: error_msg, + working_dir: self.working_dir, + dataflow_id, + node, + node_config, + clock: self.clock, + daemon_tx: self.daemon_tx, + node_stderr_most_recent, + }) + } + + async fn prepare_git_node( + &mut self, + repo_addr: &String, + rev: &Option, + repos_in_use: &mut BTreeMap>, + ) -> eyre::Result { + let dataflow_id = self.dataflow_id; + let repo_url = Url::parse(repo_addr).context("failed to parse git repository URL")?; + let target_dir = self.working_dir.join("build"); + + let clone_dir_base = { + let base = { + let mut path = + target_dir.join(repo_url.host_str().context("git URL has no hostname")?); + + path.extend(repo_url.path_segments().context("no path in git URL")?); + path + }; + match rev { + None => base, + Some(rev) => match rev { + GitRepoRev::Branch(branch) => base.join("branch").join(branch), + GitRepoRev::Tag(tag) => base.join("tag").join(tag), + GitRepoRev::Rev(rev) => base.join("rev").join(rev), + }, + } + }; + let clone_dir = if clone_dir_exists(&clone_dir_base, repos_in_use) { + let used_by_other_dataflow = + self.used_by_other_dataflow(dataflow_id, &clone_dir_base, repos_in_use); + if used_by_other_dataflow { + // don't reuse, choose new directory + // (TODO reuse if still up to date) + + let dir_name = clone_dir_base.file_name().unwrap().to_str().unwrap(); + let mut i = 1; + loop { + let new_path = clone_dir_base.with_file_name(format!("{dir_name}-{i}")); + if clone_dir_exists(&new_path, repos_in_use) + && self.used_by_other_dataflow(dataflow_id, &new_path, repos_in_use) + { + i += 1; + } else { + break new_path; } } - // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C - #[cfg(unix)] - command.process_group(0); - - command - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .wrap_err(format!( - "failed to run runtime {}/{}", - runtime_config.node.dataflow_id, runtime_config.node.node_id - ))? + } else { + clone_dir_base + } + } else { + clone_dir_base + }; + let clone_dir = dunce::simplified(&clone_dir).to_owned(); + + let (reuse, checkout) = if clone_dir_exists(&clone_dir, repos_in_use) { + let empty = BTreeSet::new(); + let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty); + let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); + if used_by_other_dataflow { + // TODO allow if still up to date + eyre::bail!("clone_dir is already in use by other dataflow") + } else if in_use.is_empty() { + (true, true) + } else { + (true, false) + } + } else { + (false, true) + }; + repos_in_use + .entry(clone_dir.clone()) + .or_default() + .insert(dataflow_id); + + Ok(PreparedGit { + clone_dir, + reuse, + checkout, + }) + } + + async fn git_node_spawn_command( + &mut self, + node: &dora_core::descriptor::CustomNode, + repo_addr: &String, + rev: &Option, + logger: &mut NodeLogger<'_>, + node_env: &Option>, + prepared: PreparedGit, + ) -> Result, eyre::Error> { + let PreparedGit { + clone_dir, + reuse, + checkout, + } = prepared; + + let rev_str = rev_str(rev); + let refname = rev.clone().map(|rev| match rev { + GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"), + GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"), + GitRepoRev::Rev(rev) => rev, + }); + + if reuse { + logger + .log( + LogLevel::Info, + None, + format!("reusing {repo_addr}{rev_str}"), + ) + .await; + let refname_cloned = refname.clone(); + let clone_dir = clone_dir.clone(); + let repository = fetch_changes(clone_dir, refname_cloned).await?; + if checkout { + checkout_tree(&repository, refname)?; + } + } else { + let repository = clone_into(repo_addr, rev, &clone_dir, logger).await?; + if checkout { + checkout_tree(&repository, refname)?; + } + }; + if let Some(build) = &node.build { + self.build_node(logger, node_env, clone_dir.clone(), build) + .await?; + } + if self.build_only { + Ok(None) + } else { + path_spawn_command(&clone_dir, self.uv, logger, node, true).await + } + } + + fn used_by_other_dataflow( + &mut self, + dataflow_id: uuid::Uuid, + clone_dir_base: &PathBuf, + repos_in_use: &mut BTreeMap>, + ) -> bool { + let empty = BTreeSet::new(); + let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty); + let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); + used_by_other_dataflow + } + + async fn build_node( + &mut self, + logger: &mut NodeLogger<'_>, + node_env: &Option>, + working_dir: PathBuf, + build: &String, + ) -> Result<(), eyre::Error> { + logger + .log( + LogLevel::Info, + None, + format!("running build command: `{build}"), + ) + .await; + let build = build.to_owned(); + let uv = self.uv; + let node_env = node_env.clone(); + let task = tokio::task::spawn_blocking(move || { + run_build_command(&build, &working_dir, uv, &node_env).context("build command failed") + }); + task.await??; + Ok(()) + } +} + +pub struct PreparedNode { + command: Option, + spawn_error_msg: String, + working_dir: PathBuf, + dataflow_id: DataflowId, + node: ResolvedNode, + node_config: NodeConfig, + clock: Arc, + daemon_tx: mpsc::Sender>, + node_stderr_most_recent: Arc>, +} + +impl PreparedNode { + pub fn node_id(&self) -> &NodeId { + &self.node.id + } + + pub async fn spawn(mut self, logger: &mut NodeLogger<'_>) -> eyre::Result { + let mut child = match &mut self.command { + Some(command) => command.spawn().wrap_err(self.spawn_error_msg)?, + None => { + return Ok(RunningNode { + pid: None, + node_config: self.node_config, + }) } }; @@ -336,22 +558,29 @@ impl Spawner { ) .await; - let dataflow_dir: PathBuf = self.working_dir.join("out").join(dataflow_id.to_string()); + let dataflow_dir: PathBuf = self + .working_dir + .join("out") + .join(self.dataflow_id.to_string()); if !dataflow_dir.exists() { std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?; } let (tx, mut rx) = mpsc::channel(10); - let mut file = File::create(log::log_path(&self.working_dir, &dataflow_id, &node.id)) - .await - .expect("Failed to create log file"); + let mut file = File::create(log::log_path( + &self.working_dir, + &self.dataflow_id, + &self.node.id, + )) + .await + .expect("Failed to create log file"); let mut child_stdout = tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout")); let running_node = RunningNode { pid: Some(pid), - node_config, + node_config: self.node_config, }; let stdout_tx = tx.clone(); - let node_id = node.id.clone(); + let node_id = self.node.id.clone(); // Stdout listener stream tokio::spawn(async move { let mut buffer = String::new(); @@ -411,7 +640,7 @@ impl Spawner { // Stderr listener stream let stderr_tx = tx.clone(); - let node_id = node.id.clone(); + let node_id = self.node.id.clone(); let uhlc = self.clock.clone(); let daemon_tx_log = self.daemon_tx.clone(); tokio::spawn(async move { @@ -447,7 +676,7 @@ impl Spawner { buffer.push_str(&new); - node_stderr_most_recent.force_push(new); + self.node_stderr_most_recent.force_push(new); // send the buffered lines let lines = std::mem::take(&mut buffer); @@ -458,10 +687,11 @@ impl Spawner { } }); - let node_id = node.id.clone(); + let node_id = self.node.id.clone(); let (log_finish_tx, log_finish_rx) = oneshot::channel(); let clock = self.clock.clone(); let daemon_tx = self.daemon_tx.clone(); + let dataflow_id = self.dataflow_id; tokio::spawn(async move { let exit_status = NodeExitStatus::from(child.wait().await); let _ = log_finish_rx.await; @@ -478,7 +708,7 @@ impl Spawner { let _ = daemon_tx.send(event).await; }); - let node_id = node.id.clone(); + let node_id = self.node.id.clone(); let daemon_id = logger.inner().inner().daemon_id().clone(); let mut cloned_logger = logger .inner() @@ -488,6 +718,11 @@ impl Spawner { .await .context("failed to clone logger")?; + let send_stdout_to = self + .node + .send_stdout_as() + .context("Could not resolve `send_stdout_as` configuration")?; + // Log to file stream. tokio::spawn(async move { while let Some(message) = rx.recv().await { @@ -557,176 +792,6 @@ impl Spawner { }); Ok(running_node) } - - async fn prepare_git_node( - &mut self, - repo_addr: &String, - rev: &Option, - repos_in_use: &mut BTreeMap>, - ) -> eyre::Result { - let dataflow_id = self.dataflow_id; - let repo_url = Url::parse(repo_addr).context("failed to parse git repository URL")?; - let target_dir = self.working_dir.join("build"); - - let clone_dir_base = { - let base = { - let mut path = - target_dir.join(repo_url.host_str().context("git URL has no hostname")?); - - path.extend(repo_url.path_segments().context("no path in git URL")?); - path - }; - match rev { - None => base, - Some(rev) => match rev { - GitRepoRev::Branch(branch) => base.join("branch").join(branch), - GitRepoRev::Tag(tag) => base.join("tag").join(tag), - GitRepoRev::Rev(rev) => base.join("rev").join(rev), - }, - } - }; - let clone_dir = if clone_dir_exists(&clone_dir_base, repos_in_use) { - let used_by_other_dataflow = - self.used_by_other_dataflow(dataflow_id, &clone_dir_base, repos_in_use); - if used_by_other_dataflow { - // don't reuse, choose new directory - // (TODO reuse if still up to date) - - let dir_name = clone_dir_base.file_name().unwrap().to_str().unwrap(); - let mut i = 1; - loop { - let new_path = clone_dir_base.with_file_name(format!("{dir_name}-{i}")); - if clone_dir_exists(&new_path, repos_in_use) - && self.used_by_other_dataflow(dataflow_id, &new_path, repos_in_use) - { - i += 1; - } else { - break new_path; - } - } - } else { - clone_dir_base - } - } else { - clone_dir_base - }; - let clone_dir = dunce::simplified(&clone_dir).to_owned(); - - let (reuse, checkout) = if clone_dir_exists(&clone_dir, repos_in_use) { - let empty = BTreeSet::new(); - let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty); - let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); - if used_by_other_dataflow { - // TODO allow if still up to date - eyre::bail!("clone_dir is already in use by other dataflow") - } else if in_use.is_empty() { - (true, true) - } else { - (true, false) - } - } else { - (false, true) - }; - repos_in_use - .entry(clone_dir.clone()) - .or_default() - .insert(dataflow_id); - - Ok(PreparedGit { - clone_dir, - reuse, - checkout, - }) - } - - async fn spawn_git_node( - &mut self, - node: &dora_core::descriptor::CustomNode, - repo_addr: &String, - rev: &Option, - logger: &mut NodeLogger<'_>, - node_env: &Option>, - prepared: PreparedGit, - ) -> Result, eyre::Error> { - let PreparedGit { - clone_dir, - reuse, - checkout, - } = prepared; - - let rev_str = rev_str(rev); - let refname = rev.clone().map(|rev| match rev { - GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"), - GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"), - GitRepoRev::Rev(rev) => rev, - }); - - if reuse { - logger - .log( - LogLevel::Info, - None, - format!("reusing {repo_addr}{rev_str}"), - ) - .await; - let refname_cloned = refname.clone(); - let clone_dir = clone_dir.clone(); - let repository = fetch_changes(clone_dir, refname_cloned).await?; - if checkout { - checkout_tree(&repository, refname)?; - } - } else { - let repository = clone_into(repo_addr, rev, &clone_dir, logger).await?; - if checkout { - checkout_tree(&repository, refname)?; - } - }; - if let Some(build) = &node.build { - self.build_node(logger, node_env, clone_dir.clone(), build) - .await?; - } - if self.build_only { - Ok(None) - } else { - spawn_command_from_path(&clone_dir, self.uv, logger, node, true).await - } - } - - async fn build_node( - &mut self, - logger: &mut NodeLogger<'_>, - node_env: &Option>, - working_dir: PathBuf, - build: &String, - ) -> Result<(), eyre::Error> { - logger - .log( - LogLevel::Info, - None, - format!("running build command: `{build}"), - ) - .await; - let build = build.to_owned(); - let uv = self.uv; - let node_env = node_env.clone(); - let task = tokio::task::spawn_blocking(move || { - run_build_command(&build, &working_dir, uv, &node_env).context("build command failed") - }); - task.await??; - Ok(()) - } - - fn used_by_other_dataflow( - &mut self, - dataflow_id: uuid::Uuid, - clone_dir_base: &PathBuf, - repos_in_use: &mut BTreeMap>, - ) -> bool { - let empty = BTreeSet::new(); - let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty); - let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); - used_by_other_dataflow - } } fn rev_str(rev: &Option) -> String { @@ -832,7 +897,7 @@ fn checkout_tree(repository: &git2::Repository, refname: Option) -> eyre Ok(()) } -async fn spawn_command_from_path( +async fn path_spawn_command( working_dir: &Path, uv: bool, logger: &mut NodeLogger<'_>,