diff --git a/Cargo.lock b/Cargo.lock index 7a191bcf..00c290c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3006,6 +3006,8 @@ dependencies = [ "dora-message", "dunce", "eyre", + "git2", + "itertools 0.14.0", "log", "once_cell", "schemars", @@ -3015,6 +3017,7 @@ dependencies = [ "serde_yaml 0.9.34+deprecated", "tokio", "tracing", + "url", "uuid 1.16.0", "which", ] diff --git a/binaries/cli/src/command/build/local.rs b/binaries/cli/src/command/build/local.rs index 7e45cee8..1ae19ed4 100644 --- a/binaries/cli/src/command/build/local.rs +++ b/binaries/cli/src/command/build/local.rs @@ -1,14 +1,20 @@ -use std::{collections::BTreeMap, path::PathBuf}; +use std::{ + collections::{BTreeMap, BTreeSet}, + future::Future, + path::PathBuf, +}; use dora_core::{ - build::run_build_command, - descriptor::{Descriptor, NodeExt, SINGLE_OPERATOR_DEFAULT_ID}, + build::{BuildInfo, BuildLogger, Builder, GitManager}, + descriptor::{self, Descriptor, NodeExt, ResolvedNode, SINGLE_OPERATOR_DEFAULT_ID}, }; use dora_message::{ common::GitSource, id::{NodeId, OperatorId}, + BuildId, SessionId, }; use eyre::Context; +use futures::executor::block_on; use crate::session::DataflowSession; @@ -18,69 +24,102 @@ pub fn build_dataflow_locally( dataflow_session: &DataflowSession, working_dir: PathBuf, uv: bool, -) -> eyre::Result<()> { - let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string()); - let (stdout_tx, mut stdout) = tokio::sync::mpsc::channel::>(10); +) -> eyre::Result { + let runtime = tokio::runtime::Runtime::new()?; - tokio::spawn(async move { - while let Some(line) = stdout.recv().await { - println!( - "{}", - line.unwrap_or_else(|err| format!("io err: {}", err.kind())) - ); - } - }); + runtime.block_on(build_dataflow( + dataflow_session.session_id, + working_dir, + nodes, + git_sources, + prev_git_sources, + local_nodes, + uv, + )) +} - for node in dataflow.nodes { - match node.kind()? { - dora_core::descriptor::NodeKind::Standard(_) => { - let Some(build) = node.build.as_deref() else { - continue; - }; - run_build_command(build, &working_dir, uv, &node.env, stdout_tx.clone()) - .with_context(|| { - format!("build command failed for standard node `{}`", node.id) - })? - } - dora_core::descriptor::NodeKind::Runtime(runtime_node) => { - for operator in &runtime_node.operators { - let Some(build) = operator.config.build.as_deref() else { - continue; - }; - run_build_command(build, &working_dir, uv, &node.env, stdout_tx.clone()) - .with_context(|| { - format!( - "build command failed for operator `{}/{}`", - node.id, operator.id - ) - })?; - } - } - dora_core::descriptor::NodeKind::Custom(custom_node) => { - let Some(build) = custom_node.build.as_deref() else { - continue; - }; - run_build_command(build, &working_dir, uv, &node.env, stdout_tx.clone()) - .with_context(|| { - format!("build command failed for custom node `{}`", node.id) - })? +async fn build_dataflow( + session_id: SessionId, + base_working_dir: PathBuf, + nodes: BTreeMap, + git_sources: BTreeMap, + prev_git_sources: BTreeMap, + local_nodes: BTreeSet, + uv: bool, +) -> eyre::Result { + let builder = Builder { + session_id, + base_working_dir, + uv, + }; + + let mut git_manager = GitManager::default(); + + let mut tasks = Vec::new(); + + // build nodes + for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) { + let node_id = node.id.clone(); + let git_source = git_sources.get(&node_id).cloned(); + let prev_git_source = prev_git_sources.get(&node_id).cloned(); + + let task = builder + .clone() + .build_node( + node, + git_source, + prev_git_source, + LocalBuildLogger, + &mut git_manager, + ) + .await + .wrap_err_with(|| format!("failed to build node `{node_id}`"))?; + tasks.push((node_id, task)); + } + + let mut info = BuildInfo { + node_working_dirs: Default::default(), + }; + let mut errors = Vec::new(); + for (node_id, task) in tasks { + match task.await { + Ok(node) => { + info.node_working_dirs + .insert(node_id, node.node_working_dir); } - dora_core::descriptor::NodeKind::Operator(operator) => { - let Some(build) = operator.config.build.as_deref() else { - continue; - }; - run_build_command(build, &working_dir, uv, &node.env, stdout_tx.clone()) - .with_context(|| { - format!( - "build command failed for operator `{}/{}`", - node.id, - operator.id.as_ref().unwrap_or(&default_op_id) - ) - })?; + Err(err) => { + errors.push((node_id, err)); } } } - std::mem::drop(stdout_tx); + if errors.is_empty() { + Ok(info) + } else { + let mut message = "failed to build dataflow:\n".to_owned(); + for (node_id, err) in errors { + message.push_str(&format!("- {node_id}: {err:?}\n-------------------\n\n")); + } + Err(eyre::eyre!(message)) + } +} + +struct LocalBuildLogger; + +impl BuildLogger for LocalBuildLogger { + type Clone = Self; + + fn log_message( + &mut self, + level: log::Level, + message: impl Into + Send, + ) -> impl Future + Send { + async move { + let message: String = message.into(); + println!("{level}: \t{message}"); + } + } - Ok(()) + fn try_clone(&self) -> impl Future> + Send { + async { Ok(LocalBuildLogger) } + } } diff --git a/binaries/cli/src/command/build/mod.rs b/binaries/cli/src/command/build/mod.rs index abf2ac32..fff1d452 100644 --- a/binaries/cli/src/command/build/mod.rs +++ b/binaries/cli/src/command/build/mod.rs @@ -3,7 +3,7 @@ use dora_core::{ descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt}, topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}, }; -use dora_message::descriptor::NodeSource; +use dora_message::{descriptor::NodeSource, BuildId}; use eyre::Context; use std::collections::BTreeMap; @@ -79,7 +79,7 @@ pub fn build( .parent() .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); - build_dataflow_locally( + let build_info = build_dataflow_locally( dataflow_descriptor, &git_sources, &dataflow_session, @@ -88,6 +88,9 @@ pub fn build( )?; dataflow_session.git_sources = git_sources; + // generate a random BuildId and store the associated build info + dataflow_session.build_id = Some(BuildId::generate()); + dataflow_session.local_build = Some(build_info); dataflow_session .write_out_for_dataflow(&dataflow_path) .context("failed to write out dataflow session file")?; @@ -122,8 +125,14 @@ pub fn build( coordinator_socket(coordinator_addr, coordinator_port), log::LevelFilter::Info, )?; + + dataflow_session.build_id = Some(build_id); + dataflow_session.local_build = None; + dataflow_session + .write_out_for_dataflow(&dataflow_path) + .context("failed to write out dataflow session file")?; } - } + }; Ok(()) } diff --git a/binaries/cli/src/command/run.rs b/binaries/cli/src/command/run.rs index 616a63b1..df01d16e 100644 --- a/binaries/cli/src/command/run.rs +++ b/binaries/cli/src/command/run.rs @@ -15,6 +15,7 @@ pub fn run(dataflow: String, uv: bool) -> Result<(), eyre::Error> { let result = rt.block_on(Daemon::run_dataflow( &dataflow_path, dataflow_session.build_id, + dataflow_session.local_build, dataflow_session.session_id, uv, ))?; diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index 278f9e90..ec28062e 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -508,7 +508,8 @@ fn run_cli(args: Args) -> eyre::Result<()> { let dataflow_session = DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?; - let result = Daemon::run_dataflow(&dataflow_path, dataflow_session.build_id, dataflow_session.session_id, false).await?; + let result = Daemon::run_dataflow(&dataflow_path, + dataflow_session.build_id, dataflow_session.local_build, dataflow_session.session_id, false).await?; handle_dataflow_result(result, None) } None => { diff --git a/binaries/cli/src/session.rs b/binaries/cli/src/session.rs index 0edb8011..29609e54 100644 --- a/binaries/cli/src/session.rs +++ b/binaries/cli/src/session.rs @@ -3,6 +3,7 @@ use std::{ path::{Path, PathBuf}, }; +use dora_core::build::BuildInfo; use dora_message::{common::GitSource, id::NodeId, BuildId, SessionId}; use eyre::{Context, ContextCompat}; @@ -11,6 +12,7 @@ pub struct DataflowSession { pub build_id: Option, pub session_id: SessionId, pub git_sources: BTreeMap, + pub local_build: Option, } impl Default for DataflowSession { @@ -19,6 +21,7 @@ impl Default for DataflowSession { build_id: None, session_id: SessionId::generate(), git_sources: Default::default(), + local_build: Default::default(), } } } diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 32a278e9..db44f102 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -2,6 +2,7 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; use crossbeam::queue::ArrayQueue; use dora_core::{ + build::{self, BuildInfo, GitManager}, config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId}, descriptor::{ read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode, @@ -62,7 +63,6 @@ use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tracing::{error, warn}; use uuid::{NoContext, Timestamp, Uuid}; -mod build; mod coordinator; mod local_listener; mod log; @@ -76,10 +76,7 @@ use dora_tracing::telemetry::serialize_context; #[cfg(feature = "telemetry")] use tracing_opentelemetry::OpenTelemetrySpanExt; -use crate::{ - build::{BuildInfo, GitManager}, - pending::DataflowStatus, -}; +use crate::pending::DataflowStatus; const STDERR_LOG_LINES: usize = 10; @@ -156,6 +153,7 @@ impl Daemon { None, clock, Some(remote_daemon_events_tx), + Default::default(), ) .await .map(|_| ()) @@ -164,6 +162,7 @@ impl Daemon { pub async fn run_dataflow( dataflow_path: &Path, build_id: Option, + local_build: Option, session_id: SessionId, uv: bool, ) -> eyre::Result { @@ -218,6 +217,16 @@ impl Daemon { Some(exit_when_done), clock.clone(), None, + if let Some(local_build) = local_build { + let Some(build_id) = build_id else { + bail!("no build_id, but local_build set") + }; + let mut builds = BTreeMap::new(); + builds.insert(build_id, local_build); + builds + } else { + Default::default() + }, ); let spawn_result = reply_rx @@ -249,6 +258,7 @@ impl Daemon { exit_when_done: Option>, clock: Arc, remote_daemon_events_tx: Option>>>, + builds: BTreeMap, ) -> eyre::Result { let coordinator_connection = match coordinator_addr { Some(addr) => { @@ -313,7 +323,7 @@ impl Daemon { zenoh_session, remote_daemon_events_tx, git_manager: Default::default(), - builds: Default::default(), + builds, sessions: Default::default(), }; @@ -888,9 +898,6 @@ impl Daemon { let builder = build::Builder { session_id, base_working_dir, - daemon_tx: self.events_tx.clone(), - dataflow_descriptor, - clock: self.clock.clone(), uv, }; @@ -906,13 +913,18 @@ impl Daemon { let git_source = git_sources.get(&node_id).cloned(); let prev_git_source = prev_git_sources.get(&node_id).cloned(); + let logger_cloned = logger + .try_clone_impl() + .await + .wrap_err("failed to clone logger")?; + match builder .clone() .build_node( node, git_source, prev_git_source, - &mut logger, + logger_cloned, &mut self.git_manager, ) .await @@ -926,9 +938,7 @@ impl Daemon { }); } Err(err) => { - self.logger - .log_build(build_id, LogLevel::Error, Some(node_id), format!("{err:?}")) - .await; + logger.log(LogLevel::Error, format!("{err:?}")).await; return Err(err); } } diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs index 176857d0..c5fe171a 100644 --- a/binaries/daemon/src/log.rs +++ b/binaries/daemon/src/log.rs @@ -4,7 +4,7 @@ use std::{ sync::Arc, }; -use dora_core::{config::NodeId, uhlc}; +use dora_core::{build::BuildLogger, config::NodeId, uhlc}; use dora_message::{ common::{DaemonId, LogLevel, LogMessage, Timestamped}, daemon_to_coordinator::{CoordinatorRequest, DaemonEvent}, @@ -101,17 +101,13 @@ pub struct NodeBuildLogger<'a> { } impl NodeBuildLogger<'_> { - pub fn inner(&self) -> &DaemonLogger { - &self.logger - } - pub async fn log(&mut self, level: LogLevel, message: impl Into) { self.logger .log_build(self.build_id, level, Some(self.node_id.clone()), message) .await } - pub async fn try_clone(&self) -> eyre::Result> { + pub async fn try_clone_impl(&self) -> eyre::Result> { Ok(NodeBuildLogger { build_id: self.build_id, node_id: self.node_id.clone(), @@ -120,6 +116,22 @@ impl NodeBuildLogger<'_> { } } +impl BuildLogger for NodeBuildLogger<'_> { + type Clone = NodeBuildLogger<'static>; + + fn log_message( + &mut self, + level: LogLevel, + message: impl Into + Send, + ) -> impl std::future::Future + Send { + self.log(level, message) + } + + fn try_clone(&self) -> impl std::future::Future> + Send { + self.try_clone_impl() + } +} + pub struct DaemonLogger { daemon_id: DaemonId, logger: Logger, diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 7d9233b1..bb6cb6ec 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -19,8 +19,11 @@ which = "5.0.0" uuid = { version = "1.7", features = ["serde", "v7"] } tracing = "0.1" serde-with-expand-env = "1.1.0" -tokio = { version = "1.24.1", features = ["fs", "process", "sync"] } +tokio = { version = "1.24.1", features = ["fs", "process", "sync", "rt"] } schemars = "0.8.19" serde_json = "1.0.117" log = { version = "0.4.21", features = ["serde"] } dunce = "1.0.5" +url = "2.5.4" +git2 = { version = "0.18.0", features = ["vendored-openssl"] } +itertools = "0.14" diff --git a/libraries/core/src/build.rs b/libraries/core/src/build/build_command.rs similarity index 100% rename from libraries/core/src/build.rs rename to libraries/core/src/build/build_command.rs diff --git a/binaries/daemon/src/build/git.rs b/libraries/core/src/build/git.rs similarity index 97% rename from binaries/daemon/src/build/git.rs rename to libraries/core/src/build/git.rs index ca92373e..d62dfd94 100644 --- a/binaries/daemon/src/build/git.rs +++ b/libraries/core/src/build/git.rs @@ -1,4 +1,4 @@ -use crate::log::NodeBuildLogger; +use crate::build::BuildLogger; use dora_message::{common::LogLevel, descriptor::GitRepoRev, DataflowId, SessionId}; use eyre::{ContextCompat, WrapErr}; use git2::FetchOptions; @@ -142,7 +142,7 @@ pub struct GitFolder { } impl GitFolder { - pub async fn prepare(self, logger: &mut NodeBuildLogger<'_>) -> eyre::Result { + pub async fn prepare(self, logger: &mut impl BuildLogger) -> eyre::Result { let GitFolder { reuse } = self; let clone_dir = match reuse { @@ -165,7 +165,7 @@ impl GitFolder { .context("failed to copy repo clone")?; logger - .log( + .log_message( LogLevel::Info, format!("fetching changes after copying {}", from.display()), ) @@ -185,7 +185,7 @@ impl GitFolder { .context("failed to rename repo clone")?; logger - .log( + .log_message( LogLevel::Info, format!("fetching changes after renaming {}", from.display()), ) @@ -197,7 +197,7 @@ impl GitFolder { } ReuseOptions::Reuse { dir } => { logger - .log( + .log_message( LogLevel::Info, format!("reusing up-to-date {}", dir.display()), ) @@ -244,7 +244,7 @@ fn rev_str(rev: &Option) -> String { async fn clone_into( repo_addr: Url, clone_dir: &Path, - logger: &mut NodeBuildLogger<'_>, + logger: &mut impl BuildLogger, ) -> eyre::Result { if let Some(parent) = clone_dir.parent() { tokio::fs::create_dir_all(parent) @@ -253,7 +253,7 @@ async fn clone_into( } logger - .log( + .log_message( LogLevel::Info, format!("cloning {repo_addr} into {}", clone_dir.display()), ) @@ -310,7 +310,7 @@ async fn fetch_changes( fn checkout_tree(repository: &git2::Repository, commit_hash: &str) -> eyre::Result<()> { let (object, reference) = repository - .revparse_ext(&commit_hash) + .revparse_ext(commit_hash) .context("failed to parse ref")?; repository .checkout_tree(&object, None) diff --git a/libraries/core/src/build/logger.rs b/libraries/core/src/build/logger.rs new file mode 100644 index 00000000..d683bcd4 --- /dev/null +++ b/libraries/core/src/build/logger.rs @@ -0,0 +1,15 @@ +use std::future::Future; + +use dora_message::common::LogLevel; + +pub trait BuildLogger: Send { + type Clone: BuildLogger + 'static; + + fn log_message( + &mut self, + level: LogLevel, + message: impl Into + Send, + ) -> impl Future + Send; + + fn try_clone(&self) -> impl Future> + Send; +} diff --git a/binaries/daemon/src/build/mod.rs b/libraries/core/src/build/mod.rs similarity index 75% rename from binaries/daemon/src/build/mod.rs rename to libraries/core/src/build/mod.rs index 2bff3bea..373b3d8c 100644 --- a/binaries/daemon/src/build/mod.rs +++ b/libraries/core/src/build/mod.rs @@ -1,34 +1,30 @@ pub use git::GitManager; +pub use logger::BuildLogger; + use url::Url; -use std::{collections::BTreeMap, future::Future, path::PathBuf, sync::Arc}; +use std::{collections::BTreeMap, future::Future, path::PathBuf}; -use dora_core::{ - build::run_build_command, - descriptor::{Descriptor, ResolvedNode}, - uhlc::HLC, -}; +use crate::descriptor::ResolvedNode; use dora_message::{ - common::{GitSource, LogLevel, Timestamped}, - descriptor::EnvValue, + common::{GitSource, LogLevel}, + descriptor::{CoreNodeKind, EnvValue}, id::NodeId, SessionId, }; use eyre::Context; -use tokio::sync::mpsc; -use crate::{build::git::GitFolder, log::NodeBuildLogger, Event}; +use build_command::run_build_command; +use git::GitFolder; +mod build_command; mod git; +mod logger; #[derive(Clone)] pub struct Builder { pub session_id: SessionId, pub base_working_dir: PathBuf, - pub daemon_tx: mpsc::Sender>, - pub dataflow_descriptor: Descriptor, - /// clock is required for generating timestamps when dropping messages early because queue is full - pub clock: Arc, pub uv: bool, } @@ -38,10 +34,10 @@ impl Builder { node: ResolvedNode, git: Option, prev_git: Option, - logger: &mut NodeBuildLogger<'_>, + mut logger: impl BuildLogger, git_manager: &mut GitManager, ) -> eyre::Result>> { - logger.log(LogLevel::Debug, "building node").await; + logger.log_message(LogLevel::Debug, "building node").await; let prepared_git = if let Some(GitSource { repo, commit_hash }) = git { let repo_url = Url::parse(&repo).context("failed to parse git repository URL")?; @@ -59,10 +55,6 @@ impl Builder { None }; - let mut logger = logger - .try_clone() - .await - .wrap_err("failed to clone logger")?; let task = async move { self.build_node_inner(node, &mut logger, prepared_git).await }; Ok(task) } @@ -70,11 +62,11 @@ impl Builder { async fn build_node_inner( self, node: ResolvedNode, - logger: &mut NodeBuildLogger<'_>, + logger: &mut impl BuildLogger, git_folder: Option, ) -> eyre::Result { let node_working_dir = match &node.kind { - dora_core::descriptor::CoreNodeKind::Custom(n) => { + CoreNodeKind::Custom(n) => { let node_working_dir = match git_folder { Some(git_folder) => git_folder.prepare(logger).await?, None => self.base_working_dir, @@ -85,7 +77,7 @@ impl Builder { } node_working_dir } - dora_core::descriptor::CoreNodeKind::Runtime(n) => { + CoreNodeKind::Runtime(n) => { // run build commands for operator in &n.operators { if let Some(build) = &operator.config.build { @@ -106,15 +98,15 @@ impl Builder { } } -pub async fn build_node( - logger: &mut NodeBuildLogger<'_>, +async fn build_node( + logger: &mut impl BuildLogger, node_env: &Option>, working_dir: PathBuf, build: &String, uv: bool, ) -> eyre::Result<()> { logger - .log(LogLevel::Info, format!("running build command: `{build}")) + .log_message(LogLevel::Info, format!("running build command: `{build}")) .await; let build = build.to_owned(); let node_env = node_env.clone(); @@ -127,7 +119,7 @@ pub async fn build_node( tokio::spawn(async move { while let Some(line) = stdout.recv().await { logger - .log( + .log_message( LogLevel::Info, line.unwrap_or_else(|err| format!("io err: {}", err.kind())), ) @@ -142,7 +134,7 @@ pub struct BuiltNode { pub node_working_dir: PathBuf, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct BuildInfo { pub node_working_dirs: BTreeMap, } diff --git a/libraries/core/src/git.rs b/libraries/core/src/git.rs new file mode 100644 index 00000000..e69de29b diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index 90f2c564..f18a6059 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -9,6 +9,7 @@ pub use dora_message::{config, uhlc}; pub mod build; pub mod descriptor; +pub mod git; pub mod metadata; pub mod topics;