diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index d4304281..1d1e5656 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -38,6 +38,7 @@ use socket_stream_utils::socket_stream_send; use spawn::Spawner; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, + future::Future, net::SocketAddr, path::{Path, PathBuf}, pin::pin, @@ -54,6 +55,7 @@ use tokio::{ mpsc::{self, UnboundedSender}, oneshot::{self, Sender}, }, + task::JoinSet, }; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tracing::{error, warn}; @@ -384,6 +386,24 @@ impl Daemon { Event::DaemonError(err) => { tracing::error!("Daemon error: {err:?}"); } + Event::SpawnNodeResult { + dataflow_id, + node_id, + result, + } => match result { + Ok(running_node) => { + if let Some(dataflow) = self.running.get_mut(&dataflow_id) { + dataflow.running_nodes.insert(node_id, running_node); + } + } + Err(error) => { + self.dataflow_node_results + .entry(dataflow_id) + .or_default() + .insert(node_id.clone(), Err(error)); + self.handle_node_stop(dataflow_id, &node_id).await?; + } + }, } } @@ -441,10 +461,17 @@ impl Daemon { if let Err(err) = &result { tracing::error!("{err:?}"); } - let reply = - DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}"))); - let _ = reply_tx.send(Some(reply)).map_err(|_| { - error!("could not send `SpawnResult` reply from daemon to coordinator") + tokio::spawn(async move { + let result = match result { + Err(err) => Err(err), + Ok(task) => task.await, + }; + let reply = DaemonCoordinatorReply::SpawnResult( + result.map_err(|err| format!("{err:?}")), + ); + let _ = reply_tx.send(Some(reply)).map_err(|_| { + error!("could not send `SpawnResult` reply from daemon to coordinator") + }); }); RunStatus::Continue } @@ -685,7 +712,7 @@ impl Daemon { dataflow_descriptor: Descriptor, spawn_nodes: BTreeSet, uv: bool, - ) -> eyre::Result<()> { + ) -> eyre::Result>> { let mut logger = self.logger.for_dataflow(dataflow_id); let dataflow = RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor); @@ -737,7 +764,7 @@ impl Daemon { } } - let mut spawner = Spawner { + let spawner = Spawner { dataflow_id, working_dir, daemon_tx: self.events_tx.clone(), @@ -746,6 +773,8 @@ impl Daemon { uv, }; + let mut tasks = JoinSet::new(); + // spawn nodes and set up subscriptions for node in nodes.into_values() { let mut logger = logger.reborrow().for_node(node.id.clone()); @@ -767,6 +796,7 @@ impl Daemon { .log(LogLevel::Info, Some("daemon".into()), "spawning") .await; match spawner + .clone() .spawn_node( node, node_stderr_most_recent, @@ -776,8 +806,43 @@ impl Daemon { .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) { - Ok(running_node) => { - dataflow.running_nodes.insert(node_id, running_node); + Ok(spawn_task) => { + let events_tx = self.events_tx.clone(); + let clock = self.clock.clone(); + tasks.spawn(async move { + let result = spawn_task.await.unwrap_or_else(|err| { + Err(eyre!("failed to join spawn task: {err}")) + }); + let (node_spawn_result, success) = match result { + Ok(node) => (Ok(node), Ok(())), + Err(err) => { + let node_err = NodeError { + timestamp: clock.new_timestamp(), + cause: NodeErrorCause::Other { + stderr: format!("spawn failed: {err:?}"), + }, + exit_status: NodeExitStatus::Unknown, + }; + (Err(node_err), Err(err)) + } + }; + let send_result = events_tx + .send(Timestamped { + inner: Event::SpawnNodeResult { + dataflow_id, + node_id, + result: node_spawn_result, + }, + timestamp: clock.new_timestamp(), + }) + .await; + if send_result.is_err() { + tracing::error!( + "failed to send SpawnNodeResult to main daemon task" + ) + } + success + }); } Err(err) => { logger @@ -858,7 +923,11 @@ impl Daemon { self.handle_node_stop(dataflow_id, &node_id).await?; } - Ok(()) + let spawn_result = async move { + let result: eyre::Result<()> = tasks.join_all().await.into_iter().collect(); + result + }; + Ok(spawn_result) } async fn handle_dynamic_node_event( @@ -1713,7 +1782,7 @@ fn close_input( } #[derive(Debug)] -struct RunningNode { +pub struct RunningNode { pid: Option, node_config: NodeConfig, } @@ -2017,6 +2086,11 @@ pub enum Event { CtrlC, SecondCtrlC, DaemonError(eyre::Report), + SpawnNodeResult { + dataflow_id: DataflowId, + node_id: NodeId, + result: Result, + }, } impl From for Event { diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index a44b3fdc..883ceb7d 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -10,8 +10,8 @@ use dora_core::{ build::run_build_command, config::DataId, descriptor::{ - resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, - ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE, + resolve_path, source_is_url, CustomNode, Descriptor, OperatorDefinition, OperatorSource, + PythonSource, ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE, }, get_python_path, uhlc::HLC, @@ -58,12 +58,12 @@ pub struct Spawner { impl Spawner { pub async fn spawn_node( - &mut self, + mut self, node: ResolvedNode, node_stderr_most_recent: Arc>, logger: &mut NodeLogger<'_>, repos_in_use: &mut BTreeMap>, - ) -> eyre::Result { + ) -> eyre::Result>> { let dataflow_id = self.dataflow_id; let node_id = node.id.clone(); logger @@ -87,9 +87,6 @@ impl Spawner { self.clock.clone(), ) .await?; - let send_stdout_to = node - .send_stdout_as() - .context("Could not resolve `send_stdout_as` configuration")?; let node_config = NodeConfig { dataflow_id, @@ -100,6 +97,47 @@ impl Spawner { dynamic: node.kind.dynamic(), }; + let prepared_git = if let dora_core::descriptor::CoreNodeKind::Custom(CustomNode { + source: dora_message::descriptor::NodeSource::GitBranch { repo, rev }, + .. + }) = &node.kind + { + Some(self.prepare_git_node(repo, rev, repos_in_use).await?) + } else { + None + }; + + let mut logger = logger + .try_clone() + .await + .wrap_err("failed to clone logger")?; + let task = async move { + self.spawn_node_inner( + node, + &mut logger, + dataflow_id, + node_config, + prepared_git, + node_stderr_most_recent, + ) + .await + }; + Ok(tokio::spawn(task)) + } + + async fn spawn_node_inner( + &mut self, + node: ResolvedNode, + logger: &mut NodeLogger<'_>, + dataflow_id: uuid::Uuid, + node_config: NodeConfig, + prepared_git: Option, + node_stderr_most_recent: Arc>, + ) -> Result { + let send_stdout_to = node + .send_stdout_as() + .context("Could not resolve `send_stdout_as` configuration")?; + let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { let command = match &n.source { @@ -112,7 +150,7 @@ impl Spawner { .await? } dora_message::descriptor::NodeSource::GitBranch { repo, rev } => { - self.spawn_git_node(&n, repo, rev, logger, &node.env, repos_in_use) + self.spawn_git_node(&n, repo, rev, logger, &node.env, prepared_git.unwrap()) .await? } }; @@ -442,6 +480,7 @@ impl Spawner { .try_clone() .await .context("failed to clone logger")?; + // Log to file stream. tokio::spawn(async move { while let Some(message) = rx.recv().await { @@ -512,24 +551,16 @@ impl Spawner { Ok(running_node) } - async fn spawn_git_node( + async fn prepare_git_node( &mut self, - node: &dora_core::descriptor::CustomNode, repo_addr: &String, rev: &Option, - logger: &mut NodeLogger<'_>, - node_env: &Option>, repos_in_use: &mut BTreeMap>, - ) -> Result, eyre::Error> { + ) -> eyre::Result { let dataflow_id = self.dataflow_id; let repo_url = Url::parse(repo_addr).context("failed to parse git repository URL")?; let target_dir = self.working_dir.join("build"); - let rev_str = rev_str(rev); - let refname = rev.clone().map(|rev| match rev { - GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"), - GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"), - GitRepoRev::Rev(rev) => rev, - }); + let clone_dir_base = { let base = { let mut path = @@ -574,7 +605,7 @@ impl Spawner { }; let clone_dir = dunce::simplified(&clone_dir).to_owned(); - if clone_dir.exists() { + let reuse = if clone_dir.exists() { let empty = BTreeSet::new(); let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty); let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); @@ -582,27 +613,50 @@ impl Spawner { // TODO allow if still up to date eyre::bail!("clone_dir is already in use by other dataflow") } else { - repos_in_use - .entry(clone_dir.clone()) - .or_default() - .insert(dataflow_id); - logger - .log( - LogLevel::Info, - None, - format!("reusing {repo_addr}{rev_str}"), - ) - .await; - let refname_cloned = refname.clone(); - let clone_dir = clone_dir.clone(); - let repository = fetch_changes(clone_dir, refname_cloned).await?; - checkout_tree(&repository, refname)?; + true } } else { - repos_in_use - .entry(clone_dir.clone()) - .or_default() - .insert(dataflow_id); + false + }; + repos_in_use + .entry(clone_dir.clone()) + .or_default() + .insert(dataflow_id); + + Ok(PreparedGit { clone_dir, reuse }) + } + + async fn spawn_git_node( + &mut self, + node: &dora_core::descriptor::CustomNode, + repo_addr: &String, + rev: &Option, + logger: &mut NodeLogger<'_>, + node_env: &Option>, + prepared: PreparedGit, + ) -> Result, eyre::Error> { + let PreparedGit { clone_dir, reuse } = prepared; + + let rev_str = rev_str(rev); + let refname = rev.clone().map(|rev| match rev { + GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"), + GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"), + GitRepoRev::Rev(rev) => rev, + }); + + if reuse { + logger + .log( + LogLevel::Info, + None, + format!("reusing {repo_addr}{rev_str}"), + ) + .await; + let refname_cloned = refname.clone(); + let clone_dir = clone_dir.clone(); + let repository = fetch_changes(clone_dir, refname_cloned).await?; + checkout_tree(&repository, refname)?; + } else { let repository = clone_into(repo_addr, rev, &clone_dir, logger).await?; checkout_tree(&repository, refname)?; }; @@ -849,3 +903,8 @@ async fn spawn_command_from_path( Ok(Some(cmd)) } + +struct PreparedGit { + clone_dir: PathBuf, + reuse: bool, +}