diff --git a/binaries/daemon/src/build/git.rs b/binaries/daemon/src/build/git.rs index 4f8fb795..91bb2922 100644 --- a/binaries/daemon/src/build/git.rs +++ b/binaries/daemon/src/build/git.rs @@ -1,4 +1,4 @@ -use crate::log::NodeLogger; +use crate::log::NodeBuildLogger; use dora_message::{common::LogLevel, descriptor::GitRepoRev, BuildId, DataflowId}; use eyre::{ContextCompat, WrapErr}; use git2::FetchOptions; @@ -139,7 +139,7 @@ pub struct GitFolder { } impl GitFolder { - pub async fn prepare(self, logger: &mut NodeLogger<'_>) -> eyre::Result { + pub async fn prepare(self, logger: &mut NodeBuildLogger<'_>) -> eyre::Result { let GitFolder { reuse } = self; let clone_dir = match reuse { @@ -164,7 +164,6 @@ impl GitFolder { logger .log( LogLevel::Info, - None, format!("fetching changes after copying {}", from.display()), ) .await; @@ -185,7 +184,6 @@ impl GitFolder { logger .log( LogLevel::Info, - None, format!("fetching changes after renaming {}", from.display()), ) .await; @@ -198,7 +196,6 @@ impl GitFolder { logger .log( LogLevel::Info, - None, format!("reusing up-to-date {}", dir.display()), ) .await; @@ -255,7 +252,7 @@ fn rev_str(rev: &Option) -> String { async fn clone_into( repo_addr: Url, clone_dir: &Path, - logger: &mut NodeLogger<'_>, + logger: &mut NodeBuildLogger<'_>, ) -> eyre::Result { if let Some(parent) = clone_dir.parent() { tokio::fs::create_dir_all(parent) @@ -266,7 +263,6 @@ async fn clone_into( logger .log( LogLevel::Info, - None, format!("cloning {repo_addr} into {}", clone_dir.display()), ) .await; diff --git a/binaries/daemon/src/build/mod.rs b/binaries/daemon/src/build/mod.rs index 8867bdfc..54cfc11a 100644 --- a/binaries/daemon/src/build/mod.rs +++ b/binaries/daemon/src/build/mod.rs @@ -1,30 +1,23 @@ pub use git::GitManager; use url::Url; -use std::{ - collections::{BTreeMap, BTreeSet}, - future::Future, - path::PathBuf, - sync::Arc, -}; +use std::{collections::BTreeMap, future::Future, path::PathBuf, sync::Arc}; use dora_core::{ build::run_build_command, - descriptor::{CustomNode, Descriptor, ResolvedNode}, + descriptor::{Descriptor, ResolvedNode}, uhlc::HLC, }; use dora_message::{ common::{LogLevel, Timestamped}, coordinator_to_daemon::GitSource, - daemon_to_node::NodeConfig, - descriptor::{EnvValue, GitRepoRev, ResolvedNodeSource}, - id::NodeId, - BuildId, DataflowId, + descriptor::EnvValue, + BuildId, }; use eyre::Context; use tokio::sync::mpsc; -use crate::{build::git::GitFolder, log::DaemonLogger, Event}; +use crate::{build::git::GitFolder, log::NodeBuildLogger, Event}; mod git; @@ -45,18 +38,10 @@ impl Builder { self, node: ResolvedNode, git: Option, - logger: &mut DaemonLogger, + logger: &mut NodeBuildLogger<'_>, git_manager: &mut GitManager, ) -> eyre::Result>> { - let build_id = self.build_id; - logger - .log_build( - build_id, - LogLevel::Debug, - Some(node.id.clone()), - "building node", - ) - .await; + logger.log(LogLevel::Debug, "building node").await; let prepared_git = if let Some(GitSource { repo, commit_hash }) = git { let repo_url = Url::parse(&repo).context("failed to parse git repository URL")?; @@ -78,214 +63,44 @@ impl Builder { .await .wrap_err("failed to clone logger")?; let task = async move { - self.prepare_node_inner(node, &mut logger, build_id, prepared_git) + self.prepare_node_inner(node, &mut logger, prepared_git) .await }; Ok(task) } async fn prepare_node_inner( - mut self, + self, node: ResolvedNode, - logger: &mut DaemonLogger, - build_id: uuid::Uuid, + logger: &mut NodeBuildLogger<'_>, git_folder: Option, ) -> eyre::Result { - let (command, error_msg) = match &node.kind { + let node_working_dir = match &node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { - let build_dir = match git_folder { + let node_working_dir = match git_folder { Some(git_folder) => git_folder.prepare(logger).await?, None => self.working_dir.clone(), }; if let Some(build) = &n.build { - self.build_node(logger, &node.env, build_dir.clone(), build) - .await?; + build_node(logger, &node.env, node_working_dir.clone(), build, self.uv).await?; } - let mut command = if self.build_only { - None - } else { - path_spawn_command(&build_dir, self.uv, logger, n, true).await? - }; - - if let Some(command) = &mut command { - command.current_dir(&self.working_dir); - command.stdin(Stdio::null()); - - command.env( - "DORA_NODE_CONFIG", - serde_yaml::to_string(&node_config.clone()) - .wrap_err("failed to serialize node config")?, - ); - // Injecting the env variable defined in the `yaml` into - // the node runtime. - if let Some(envs) = &node.env { - for (key, value) in envs { - command.env(key, value.to_string()); - } - } - if let Some(envs) = &n.envs { - // node has some inner env variables -> add them too - for (key, value) in envs { - command.env(key, value.to_string()); - } - } - - // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C - #[cfg(unix)] - command.process_group(0); - - command.env("PYTHONUNBUFFERED", "1"); - command - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - }; - - let error_msg = format!( - "failed to run `{}` with args `{}`", - n.path, - n.args.as_deref().unwrap_or_default(), - ); - (command, error_msg) + node_working_dir } dora_core::descriptor::CoreNodeKind::Runtime(n) => { // run build commands for operator in &n.operators { if let Some(build) = &operator.config.build { - self.build_node(logger, &node.env, self.working_dir.clone(), build) + build_node(logger, &node.env, self.working_dir.clone(), build, self.uv) .await?; } } - - let python_operators: Vec<&OperatorDefinition> = n - .operators - .iter() - .filter(|x| matches!(x.config.source, OperatorSource::Python { .. })) - .collect(); - - let other_operators = n - .operators - .iter() - .any(|x| !matches!(x.config.source, OperatorSource::Python { .. })); - - let mut command = if self.build_only { - None - } else if !python_operators.is_empty() && !other_operators { - // Use python to spawn runtime if there is a python operator - - // TODO: Handle multi-operator runtime once sub-interpreter is supported - if python_operators.len() > 2 { - eyre::bail!( - "Runtime currently only support one Python Operator. - This is because pyo4 sub-interpreter is not yet available. - See: https://github.com/PyO4/pyo3/issues/576" - ); - } - - let python_operator = python_operators - .first() - .context("Runtime had no operators definition.")?; - - if let OperatorSource::Python(PythonSource { - source: _, - conda_env: Some(conda_env), - }) = &python_operator.config.source - { - let conda = which::which("conda").context( - "failed to find `conda`, yet a `conda_env` was defined. Make sure that `conda` is available.", - )?; - let mut command = tokio::process::Command::new(conda); - command.args([ - "run", - "-n", - conda_env, - "python", - "-c", - format!("import dora; dora.start_runtime() # {}", node.id).as_str(), - ]); - Some(command) - } else { - let mut cmd = if self.uv { - let mut cmd = tokio::process::Command::new("uv"); - cmd.arg("run"); - cmd.arg("python"); - tracing::info!( - "spawning: uv run python -uc import dora; dora.start_runtime() # {}", - node.id - ); - cmd - } else { - let python = get_python_path() - .wrap_err("Could not find python path when spawning custom node")?; - tracing::info!( - "spawning: python -uc import dora; dora.start_runtime() # {}", - node.id - ); - - tokio::process::Command::new(python) - }; - // Force python to always flush stdout/stderr buffer - cmd.args([ - "-c", - format!("import dora; dora.start_runtime() # {}", node.id).as_str(), - ]); - Some(cmd) - } - } else if python_operators.is_empty() && other_operators { - let mut cmd = tokio::process::Command::new( - std::env::current_exe() - .wrap_err("failed to get current executable path")?, - ); - cmd.arg("runtime"); - Some(cmd) - } else { - eyre::bail!("Runtime can not mix Python Operator with other type of operator."); - }; - - let runtime_config = RuntimeConfig { - node: node_config.clone(), - operators: n.operators.clone(), - }; - - if let Some(command) = &mut command { - command.current_dir(&self.working_dir); - - command.env( - "DORA_RUNTIME_CONFIG", - serde_yaml::to_string(&runtime_config) - .wrap_err("failed to serialize runtime config")?, - ); - // Injecting the env variable defined in the `yaml` into - // the node runtime. - if let Some(envs) = &node.env { - for (key, value) in envs { - command.env(key, value.to_string()); - } - } - // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C - #[cfg(unix)] - command.process_group(0); - - command - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - }; - let error_msg = format!( - "failed to run runtime {}/{}", - runtime_config.node.dataflow_id, runtime_config.node.node_id - ); - (command, error_msg) + self.working_dir.clone() } }; Ok(PreparedNode { - command, - spawn_error_msg: error_msg, - working_dir: self.working_dir, - dataflow_id, + node_working_dir, node, - node_config, clock: self.clock, daemon_tx: self.daemon_tx, }) @@ -293,21 +108,14 @@ impl Builder { } pub async fn build_node( - node_id: NodeId, - logger: &mut DaemonLogger<'_>, + logger: &mut NodeBuildLogger<'_>, node_env: &Option>, working_dir: PathBuf, build: &String, uv: bool, ) -> eyre::Result<()> { logger - .log( - LogLevel::Info, - None, - Some(node_id), - Some("build".to_owned()), - format!("running build command: `{build}"), - ) + .log(LogLevel::Info, format!("running build command: `{build}")) .await; let build = build.to_owned(); let node_env = node_env.clone(); @@ -322,9 +130,6 @@ pub async fn build_node( logger .log( LogLevel::Info, - None, - Some(node_id), - Some("build command".into()), line.unwrap_or_else(|err| format!("io err: {}", err.kind())), ) .await; @@ -335,12 +140,8 @@ pub async fn build_node( } pub struct PreparedNode { - command: Option, - spawn_error_msg: String, - working_dir: PathBuf, - dataflow_id: DataflowId, + node_working_dir: PathBuf, node: ResolvedNode, - node_config: NodeConfig, clock: Arc, daemon_tx: mpsc::Sender>, } diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs index 10f8716a..176857d0 100644 --- a/binaries/daemon/src/log.rs +++ b/binaries/daemon/src/log.rs @@ -94,6 +94,32 @@ impl<'a> DataflowLogger<'a> { } } +pub struct NodeBuildLogger<'a> { + build_id: BuildId, + node_id: NodeId, + logger: CowMut<'a, DaemonLogger>, +} + +impl NodeBuildLogger<'_> { + pub fn inner(&self) -> &DaemonLogger { + &self.logger + } + + pub async fn log(&mut self, level: LogLevel, message: impl Into) { + self.logger + .log_build(self.build_id, level, Some(self.node_id.clone()), message) + .await + } + + pub async fn try_clone(&self) -> eyre::Result> { + Ok(NodeBuildLogger { + build_id: self.build_id, + node_id: self.node_id.clone(), + logger: CowMut::Owned(self.logger.try_clone().await?), + }) + } +} + pub struct DaemonLogger { daemon_id: DaemonId, logger: Logger, @@ -107,6 +133,14 @@ impl DaemonLogger { } } + pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger { + NodeBuildLogger { + build_id, + node_id, + logger: CowMut::Borrowed(self), + } + } + pub fn inner(&self) -> &Logger { &self.logger }