diff --git a/.gitignore b/.gitignore index 2bab6ed3..ade7dc46 100644 --- a/.gitignore +++ b/.gitignore @@ -34,7 +34,7 @@ __pycache__/ # Distribution / packaging .Python -build/ +/build/ develop-eggs/ dist/ downloads/ @@ -179,4 +179,4 @@ out/ #Miscellaneous yolo.yml -~* \ No newline at end of file +~* diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index 1bbc6cc0..c1004489 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -633,11 +633,11 @@ fn run(args: Args) -> eyre::Result<()> { } fn start_dataflow( + build_id: Option, dataflow: String, name: Option, coordinator_socket: SocketAddr, uv: bool, - build_only: bool, ) -> Result<(PathBuf, Descriptor, Box, Uuid), eyre::Error> { let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; let dataflow_descriptor = @@ -656,11 +656,11 @@ fn start_dataflow( let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Start { + build_id, dataflow, name, local_working_dir: working_dir, uv, - build_only, }) .unwrap(), ) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 1302662c..e7ed3d83 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -5,6 +5,7 @@ use crate::{ pub use control::ControlEvent; use dora_core::{ config::{NodeId, OperatorId}, + descriptor::DescriptorExt, uhlc::{self, HLC}, }; use dora_message::{ @@ -14,17 +15,24 @@ use dora_message::{ ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult, DataflowStatus, LogLevel, LogMessage, }, - coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped}, + coordinator_to_daemon::{ + BuildDataflowNodes, DaemonCoordinatorEvent, RegisterResult, Timestamped, + }, daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult}, descriptor::{Descriptor, ResolvedNode}, + BuildId, }; use eyre::{bail, eyre, ContextCompat, Result, WrapErr}; use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt}; use futures_concurrency::stream::Merge; +use itertools::Itertools; use log_subscriber::LogSubscriber; use run::SpawnedDataflow; use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{ + btree_map::{Entry, OccupiedEntry}, + BTreeMap, BTreeSet, HashMap, + }, net::SocketAddr, path::PathBuf, sync::Arc, @@ -200,6 +208,8 @@ async fn start_inner( let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections = DaemonConnections::default(); + let mut build_log_subscribers: BTreeMap = Default::default(); + while let Some(event) = events.next().await { // used below for measuring the event handling duration let start = Instant::now(); @@ -353,7 +363,8 @@ async fn start_inner( send_log_message( &mut finished_dataflow, &LogMessage { - dataflow_id, + build_id: None, + dataflow_id: Some(dataflow_id), node_id: None, daemon_id: None, level: LogLevel::Info, @@ -399,12 +410,40 @@ async fn start_inner( reply_sender, } => { match request { + ControlRequest::Build { + dataflow, + local_working_dir, + uv, + } => { + // assign a random build id + let build_id = BuildId::new_v4(); + + let result = build_dataflow( + build_id, + dataflow, + local_working_dir, + &clock, + uv, + &mut daemon_connections, + ) + .await; + match result { + Ok(()) => { + let _ = reply_sender.send(Ok( + ControlRequestReply::DataflowBuildTriggered { build_id }, + )); + } + Err(err) => { + let _ = reply_sender.send(Err(err)); + } + } + } ControlRequest::Start { + build_id, dataflow, name, local_working_dir, uv, - build_only, } => { let name = name.or_else(|| names::Generator::default().next()); @@ -419,13 +458,13 @@ async fn start_inner( } } let dataflow = start_dataflow( + build_id, dataflow, local_working_dir, name, &mut daemon_connections, &clock, uv, - build_only, ) .await?; Ok(dataflow) @@ -721,8 +760,15 @@ async fn start_inner( } } Event::Log(message) => { - if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) { - send_log_message(dataflow, &message).await; + if let Some(dataflow_id) = &message.dataflow_id { + if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) { + send_log_message(dataflow, &message).await; + } + } + if let Some(build_id) = message.build_id { + if let Entry::Occupied(subscriber) = build_log_subscribers.entry(build_id) { + send_log_message_to_subscriber(&message, subscriber).await; + } } } Event::DaemonExit { daemon_id } => { @@ -739,21 +785,10 @@ async fn start_inner( match result { Ok(()) => { if dataflow.pending_spawn_results.is_empty() { - tracing::info!( - "successfully {} dataflow `{dataflow_id}`", - if dataflow.build_only { - "built" - } else { - "spawned" - } - ); + tracing::info!("successfully spawned dataflow `{dataflow_id}`",); dataflow.spawn_result.set_result(Ok( ControlRequestReply::DataflowSpawned { uuid: dataflow_id }, )); - - if dataflow.build_only { - running_dataflows.remove(&dataflow_id); - } } } Err(err) => { @@ -795,6 +830,24 @@ async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) dataflow.log_subscribers.retain(|s| !s.is_closed()); } +async fn send_log_message_to_subscriber( + message: &LogMessage, + mut subscriber: OccupiedEntry<'_, BuildId, LogSubscriber>, +) { + let send_result = tokio::time::timeout( + Duration::from_millis(100), + subscriber.get_mut().send_message(message), + ); + + if send_result.await.is_err() { + subscriber.get_mut().close(); + } + + if subscriber.get_mut().is_closed() { + subscriber.remove(); + } +} + fn dataflow_result( results: &BTreeMap, dataflow_uuid: Uuid, @@ -875,8 +928,6 @@ struct RunningDataflow { log_subscribers: Vec, pending_spawn_results: BTreeSet, - - build_only: bool, } pub enum SpawnResult { @@ -1123,26 +1174,108 @@ async fn retrieve_logs( reply_logs.map_err(|err| eyre!(err)) } +#[tracing::instrument(skip(daemon_connections, clock))] +async fn build_dataflow( + build_id: BuildId, + dataflow: Descriptor, + working_dir: PathBuf, + clock: &HLC, + uv: bool, + daemon_connections: &mut DaemonConnections, +) -> eyre::Result<()> { + let nodes = dataflow.resolve_aliases_and_set_defaults()?; + + let nodes_by_daemon = nodes.values().into_group_map_by(|n| &n.deploy.machine); + + let mut daemons = BTreeSet::new(); + for (machine, nodes_on_machine) in &nodes_by_daemon { + let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect(); + tracing::debug!( + "Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})" + ); + + let build_command = BuildDataflowNodes { + build_id, + working_dir: working_dir.clone(), + nodes: nodes.clone(), + dataflow_descriptor: dataflow.clone(), + nodes_on_machine, + uv, + }; + let message = serde_json::to_vec(&Timestamped { + inner: DaemonCoordinatorEvent::Build(build_command), + timestamp: clock.new_timestamp(), + })?; + + let daemon_id = build_dataflow_on_machine(daemon_connections, machine.as_deref(), &message) + .await + .wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?; + daemons.insert(daemon_id); + } + + tracing::info!("successfully triggered dataflow build `{build_id}`",); + + Ok(()) +} + +async fn build_dataflow_on_machine( + daemon_connections: &mut DaemonConnections, + machine: Option<&str>, + message: &[u8], +) -> Result { + let daemon_id = match machine { + Some(machine) => daemon_connections + .get_matching_daemon_id(machine) + .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))? + .clone(), + None => daemon_connections + .unnamed() + .next() + .wrap_err("no unnamed daemon connections")? + .clone(), + }; + + let daemon_connection = daemon_connections + .get_mut(&daemon_id) + .wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?; + tcp_send(&mut daemon_connection.stream, message) + .await + .wrap_err("failed to send build message to daemon")?; + + let reply_raw = tcp_receive(&mut daemon_connection.stream) + .await + .wrap_err("failed to receive build reply from daemon")?; + match serde_json::from_slice(&reply_raw) + .wrap_err("failed to deserialize build reply from daemon")? + { + DaemonCoordinatorReply::TriggerBuildResult(result) => result + .map_err(|e| eyre!(e)) + .wrap_err("daemon returned an error")?, + _ => bail!("unexpected reply"), + } + Ok(daemon_id) +} + async fn start_dataflow( + build_id: Option, dataflow: Descriptor, working_dir: PathBuf, name: Option, daemon_connections: &mut DaemonConnections, clock: &HLC, uv: bool, - build_only: bool, ) -> eyre::Result { let SpawnedDataflow { uuid, daemons, nodes, } = spawn_dataflow( + build_id, dataflow, working_dir, daemon_connections, clock, uv, - build_only, ) .await?; Ok(RunningDataflow { @@ -1160,7 +1293,6 @@ async fn start_dataflow( stop_reply_senders: Vec::new(), log_subscribers: Vec::new(), pending_spawn_results: daemons, - build_only, }) } diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 425f0213..d77d39e2 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -10,6 +10,7 @@ use dora_message::{ daemon_to_coordinator::DaemonCoordinatorReply, descriptor::{Descriptor, ResolvedNode}, id::NodeId, + BuildId, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use itertools::Itertools; @@ -21,12 +22,12 @@ use uuid::{NoContext, Timestamp, Uuid}; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( + build_id: Option, dataflow: Descriptor, working_dir: PathBuf, daemon_connections: &mut DaemonConnections, clock: &HLC, uv: bool, - build_only: bool, ) -> eyre::Result { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); @@ -37,18 +38,17 @@ pub(super) async fn spawn_dataflow( for (machine, nodes_on_machine) in &nodes_by_daemon { let spawn_nodes = nodes_on_machine.iter().map(|n| n.id.clone()).collect(); tracing::debug!( - "{} dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})", - if build_only { "Building" } else { "Spawning" } + "Spawning dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})" ); let spawn_command = SpawnDataflowNodes { + build_id, dataflow_id: uuid, working_dir: working_dir.clone(), nodes: nodes.clone(), dataflow_descriptor: dataflow.clone(), spawn_nodes, uv, - build_only, }; let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::Spawn(spawn_command), @@ -57,19 +57,11 @@ pub(super) async fn spawn_dataflow( let daemon_id = spawn_dataflow_on_machine(daemon_connections, machine.as_deref(), &message) .await - .wrap_err_with(|| { - format!( - "failed to {} dataflow on machine `{machine:?}`", - if build_only { "build" } else { "spawn" } - ) - })?; + .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?; daemons.insert(daemon_id); } - tracing::info!( - "successfully triggered dataflow {} `{uuid}`", - if build_only { "build" } else { "spawn" } - ); + tracing::info!("successfully triggered dataflow spawn `{uuid}`",); Ok(SpawnedDataflow { uuid, diff --git a/binaries/daemon/src/build/git.rs b/binaries/daemon/src/build/git.rs new file mode 100644 index 00000000..cfff37b5 --- /dev/null +++ b/binaries/daemon/src/build/git.rs @@ -0,0 +1,345 @@ +use crate::log::NodeLogger; +use dora_message::{common::LogLevel, descriptor::GitRepoRev, BuildId, DataflowId}; +use eyre::{ContextCompat, WrapErr}; +use git2::FetchOptions; +use std::{ + collections::{BTreeMap, BTreeSet}, + path::{Path, PathBuf}, +}; +use url::Url; +use uuid::Uuid; + +#[derive(Default)] +pub struct GitManager { + /// Directories that are currently in use by running dataflows. + clones_in_use: BTreeMap>, + /// Builds that are prepared, but not done yet. + prepared_builds: BTreeMap, + reuse_for: BTreeMap, +} + +#[derive(Default)] +struct PreparedBuild { + /// Clone dirs that will be created during the build process. + /// + /// This allows subsequent nodes to reuse the dirs. + planned_clone_dirs: BTreeSet, +} + +impl GitManager { + pub fn choose_clone_dir( + &mut self, + build_id: uuid::Uuid, + previous_build: Option, + repo_addr: String, + commit_hash: String, + target_dir: &Path, + ) -> eyre::Result { + let repo_url = Url::parse(&repo_addr).context("failed to parse git repository URL")?; + let clone_dir = Self::clone_dir_path(&target_dir, build_id, &repo_url, &commit_hash)?; + + if self.clones_in_use.contains_key(&clone_dir) { + // The directory is currently in use by another dataflow. This should never + // happen as we generate a new build ID on every build. + eyre::bail!("clone_dir is already in use by other dataflow") + } + + let reuse = if self.clone_dir_ready(build_id, &clone_dir) { + // The directory already contains a checkout of the commit we're interested in. + // So we can simply reuse the directory without doing any additional git + // operations. + ReuseOptions::Reuse { + dir: clone_dir.clone(), + } + } else if let Some(previous_build_id) = previous_build { + // we might be able to update a previous clone + let prev_clone_dir = + Self::clone_dir_path(&target_dir, previous_build_id, &repo_url, &commit_hash)?; + + if self + .clones_in_use + .get(&prev_clone_dir) + .map(|ids| !ids.is_empty()) + .unwrap_or(false) + { + // previous clone is still in use -> we cannot rename it, but we can copy it + ReuseOptions::CopyAndFetch { + from: prev_clone_dir, + target_dir: clone_dir.clone(), + commit_hash, + } + } else if prev_clone_dir.exists() { + // there is an unused previous clone that is not in use -> rename it + ReuseOptions::RenameAndFetch { + from: prev_clone_dir, + target_dir: clone_dir.clone(), + commit_hash, + } + } else { + // no existing clone associated with previous build id + ReuseOptions::NewClone { + target_dir: clone_dir.clone(), + repo_url, + commit_hash, + } + } + } else { + // no previous build that we can reuse + ReuseOptions::NewClone { + target_dir: clone_dir.clone(), + repo_url, + commit_hash, + } + }; + self.register_ready_clone_dir(build_id, clone_dir); + + Ok(GitFolder { reuse }) + } + + pub fn in_use(&self, dir: &Path) -> bool { + self.clones_in_use + .get(dir) + .map(|ids| !ids.is_empty()) + .unwrap_or(false) + } + + pub fn clone_dir_ready(&self, build_id: BuildId, dir: &Path) -> bool { + self.prepared_builds + .get(&build_id) + .map(|p| p.planned_clone_dirs.contains(dir)) + .unwrap_or(false) + || dir.exists() + } + + pub fn register_ready_clone_dir(&mut self, build_id: BuildId, dir: PathBuf) -> bool { + self.prepared_builds + .entry(build_id) + .or_default() + .planned_clone_dirs + .insert(dir) + } + + fn clone_dir_path( + base_dir: &Path, + build_id: BuildId, + repo_url: &Url, + commit_hash: &String, + ) -> eyre::Result { + let mut path = base_dir + .join(&build_id.to_string()) + .join(repo_url.host_str().context("git URL has no hostname")?); + path.extend(repo_url.path_segments().context("no path in git URL")?); + let path = path.join(commit_hash); + Ok(dunce::simplified(&path).to_owned()) + } +} + +pub struct GitFolder { + /// Specifies whether an existing repo should be reused. + reuse: ReuseOptions, +} + +impl GitFolder { + pub async fn prepare(self, logger: &mut NodeLogger<'_>) -> eyre::Result { + let GitFolder { reuse } = self; + + let clone_dir = match reuse { + ReuseOptions::NewClone { + target_dir, + repo_url, + commit_hash, + } => { + let repository = clone_into(repo_url, &target_dir, logger).await?; + checkout_tree(&repository, &commit_hash)?; + target_dir + } + ReuseOptions::CopyAndFetch { + from, + target_dir, + commit_hash, + } => { + tokio::fs::copy(&from, &target_dir) + .await + .context("failed to copy repo clone")?; + + logger + .log( + LogLevel::Info, + None, + format!("fetching changes after copying {}", from.display()), + ) + .await; + + let repository = fetch_changes(&target_dir, None).await?; + checkout_tree(&repository, &commit_hash)?; + target_dir + } + ReuseOptions::RenameAndFetch { + from, + target_dir, + commit_hash, + } => { + tokio::fs::rename(&from, &target_dir) + .await + .context("failed to rename repo clone")?; + + logger + .log( + LogLevel::Info, + None, + format!("fetching changes after renaming {}", from.display()), + ) + .await; + + let repository = fetch_changes(&target_dir, None).await?; + checkout_tree(&repository, &commit_hash)?; + target_dir + } + ReuseOptions::Reuse { dir } => { + logger + .log( + LogLevel::Info, + None, + format!("reusing up-to-date {}", dir.display()), + ) + .await; + dir + } + }; + Ok(clone_dir) + } +} + +fn used_by_other_dataflow( + dataflow_id: uuid::Uuid, + clone_dir_base: &PathBuf, + repos_in_use: &mut BTreeMap>, +) -> bool { + let empty = BTreeSet::new(); + let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty); + let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); + used_by_other_dataflow +} + +enum ReuseOptions { + /// Create a new clone of the repository. + NewClone { + target_dir: PathBuf, + repo_url: Url, + commit_hash: String, + }, + /// Reuse an existing up-to-date clone of the repository. + Reuse { dir: PathBuf }, + /// Copy an older clone of the repository and fetch changes, then reuse it. + CopyAndFetch { + from: PathBuf, + target_dir: PathBuf, + commit_hash: String, + }, + /// Rename an older clone of the repository and fetch changes, then reuse it. + RenameAndFetch { + from: PathBuf, + target_dir: PathBuf, + commit_hash: String, + }, +} + +fn rev_str(rev: &Option) -> String { + match rev { + Some(GitRepoRev::Branch(branch)) => format!(" (branch {branch})"), + Some(GitRepoRev::Tag(tag)) => format!(" (tag {tag})"), + Some(GitRepoRev::Rev(rev)) => format!(" (rev {rev})"), + None => String::new(), + } +} + +async fn clone_into( + repo_addr: Url, + clone_dir: &Path, + logger: &mut NodeLogger<'_>, +) -> eyre::Result { + if let Some(parent) = clone_dir.parent() { + tokio::fs::create_dir_all(parent) + .await + .context("failed to create parent directory for git clone")?; + } + + logger + .log( + LogLevel::Info, + None, + format!("cloning {repo_addr} into {}", clone_dir.display()), + ) + .await; + let clone_dir = clone_dir.to_owned(); + let task = tokio::task::spawn_blocking(move || { + let mut builder = git2::build::RepoBuilder::new(); + let mut fetch_options = git2::FetchOptions::new(); + fetch_options.download_tags(git2::AutotagOption::All); + builder.fetch_options(fetch_options); + builder + .clone(repo_addr.as_str(), &clone_dir) + .context("failed to clone repo") + }); + let repo = task.await??; + Ok(repo) +} + +async fn fetch_changes( + repo_dir: &Path, + refname: Option, +) -> Result { + let repo_dir = repo_dir.to_owned(); + let fetch_changes = tokio::task::spawn_blocking(move || { + let repository = git2::Repository::open(&repo_dir).context("failed to open git repo")?; + + { + let mut remote = repository + .find_remote("origin") + .context("failed to find remote `origin` in repo")?; + remote + .connect(git2::Direction::Fetch) + .context("failed to connect to remote")?; + let default_branch = remote + .default_branch() + .context("failed to get default branch for remote")?; + let fetch = match &refname { + Some(refname) => refname, + None => default_branch + .as_str() + .context("failed to read default branch as string")?, + }; + let mut fetch_options = FetchOptions::new(); + fetch_options.download_tags(git2::AutotagOption::All); + remote + .fetch(&[&fetch], Some(&mut fetch_options), None) + .context("failed to fetch from git repo")?; + } + Result::<_, eyre::Error>::Ok(repository) + }); + let repository = fetch_changes.await??; + Ok(repository) +} + +fn checkout_tree(repository: &git2::Repository, commit_hash: &str) -> eyre::Result<()> { + let (object, reference) = repository + .revparse_ext(&commit_hash) + .context("failed to parse ref")?; + repository + .checkout_tree(&object, None) + .context("failed to checkout ref")?; + match reference { + Some(reference) => repository + .set_head(reference.name().context("failed to get reference_name")?) + .context("failed to set head")?, + None => repository + .set_head_detached(object.id()) + .context("failed to set detached head")?, + } + + Ok(()) +} + +fn clone_dir_exists(dir: &PathBuf, repos_in_use: &BTreeMap>) -> bool { + repos_in_use.contains_key(dir) || dir.exists() +} diff --git a/binaries/daemon/src/build/mod.rs b/binaries/daemon/src/build/mod.rs new file mode 100644 index 00000000..52f9d96a --- /dev/null +++ b/binaries/daemon/src/build/mod.rs @@ -0,0 +1,347 @@ +pub use git::GitManager; + +use std::{ + collections::{BTreeMap, BTreeSet}, + future::Future, + path::PathBuf, + sync::Arc, +}; + +use dora_core::{ + build::run_build_command, + descriptor::{CustomNode, Descriptor, ResolvedNode}, + uhlc::HLC, +}; +use dora_message::{ + common::{LogLevel, Timestamped}, + daemon_to_node::NodeConfig, + descriptor::{EnvValue, GitRepoRev, ResolvedNodeSource}, + id::NodeId, + BuildId, DataflowId, +}; +use eyre::Context; +use tokio::sync::mpsc; + +use crate::{build::git::GitFolder, log::DaemonLogger, Event}; + +mod git; + +#[derive(Clone)] +pub struct Builder { + pub build_id: BuildId, + pub prev_build_id: Option, + pub working_dir: PathBuf, + pub daemon_tx: mpsc::Sender>, + pub dataflow_descriptor: Descriptor, + /// clock is required for generating timestamps when dropping messages early because queue is full + pub clock: Arc, + pub uv: bool, +} + +impl Builder { + pub async fn prepare_node( + self, + node: ResolvedNode, + logger: &mut DaemonLogger, + git_manager: &mut GitManager, + ) -> eyre::Result>> { + let build_id = self.build_id; + let node_id = node.id.clone(); + logger + .log_build( + build_id, + LogLevel::Debug, + Some(node.id.clone()), + "building node", + ) + .await; + + let prepared_git = if let dora_core::descriptor::CoreNodeKind::Custom { + source: ResolvedNodeSource::GitCommit { repo, commit_hash }, + .. + } = &node.kind + { + let target_dir = self.working_dir.join("build"); + let git_folder = git_manager.choose_clone_dir( + self.build_id, + self.prev_build_id, + repo.clone(), + commit_hash.clone(), + &target_dir, + )?; + Some(git_folder) + } else { + None + }; + + let mut logger = logger + .try_clone() + .await + .wrap_err("failed to clone logger")?; + let task = async move { + self.prepare_node_inner(node, &mut logger, build_id, prepared_git) + .await + }; + Ok(task) + } + + async fn prepare_node_inner( + mut self, + node: ResolvedNode, + logger: &mut DaemonLogger, + build_id: uuid::Uuid, + git_folder: Option, + ) -> eyre::Result { + let (command, error_msg) = match &node.kind { + dora_core::descriptor::CoreNodeKind::Custom(n) => { + let build_dir = match git_folder { + Some(git_folder) => git_folder.prepare(logger).await?, + None => self.working_dir.clone(), + }; + + if let Some(build) = &n.build { + self.build_node(logger, &node.env, build_dir.clone(), build) + .await?; + } + let mut command = if self.build_only { + None + } else { + path_spawn_command(&build_dir, self.uv, logger, n, true).await? + }; + + if let Some(command) = &mut command { + command.current_dir(&self.working_dir); + command.stdin(Stdio::null()); + + command.env( + "DORA_NODE_CONFIG", + serde_yaml::to_string(&node_config.clone()) + .wrap_err("failed to serialize node config")?, + ); + // Injecting the env variable defined in the `yaml` into + // the node runtime. + if let Some(envs) = &node.env { + for (key, value) in envs { + command.env(key, value.to_string()); + } + } + if let Some(envs) = &n.envs { + // node has some inner env variables -> add them too + for (key, value) in envs { + command.env(key, value.to_string()); + } + } + + // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C + #[cfg(unix)] + command.process_group(0); + + command.env("PYTHONUNBUFFERED", "1"); + command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + }; + + let error_msg = format!( + "failed to run `{}` with args `{}`", + n.path, + n.args.as_deref().unwrap_or_default(), + ); + (command, error_msg) + } + dora_core::descriptor::CoreNodeKind::Runtime(n) => { + // run build commands + for operator in &n.operators { + if let Some(build) = &operator.config.build { + self.build_node(logger, &node.env, self.working_dir.clone(), build) + .await?; + } + } + + let python_operators: Vec<&OperatorDefinition> = n + .operators + .iter() + .filter(|x| matches!(x.config.source, OperatorSource::Python { .. })) + .collect(); + + let other_operators = n + .operators + .iter() + .any(|x| !matches!(x.config.source, OperatorSource::Python { .. })); + + let mut command = if self.build_only { + None + } else if !python_operators.is_empty() && !other_operators { + // Use python to spawn runtime if there is a python operator + + // TODO: Handle multi-operator runtime once sub-interpreter is supported + if python_operators.len() > 2 { + eyre::bail!( + "Runtime currently only support one Python Operator. + This is because pyo4 sub-interpreter is not yet available. + See: https://github.com/PyO4/pyo3/issues/576" + ); + } + + let python_operator = python_operators + .first() + .context("Runtime had no operators definition.")?; + + if let OperatorSource::Python(PythonSource { + source: _, + conda_env: Some(conda_env), + }) = &python_operator.config.source + { + let conda = which::which("conda").context( + "failed to find `conda`, yet a `conda_env` was defined. Make sure that `conda` is available.", + )?; + let mut command = tokio::process::Command::new(conda); + command.args([ + "run", + "-n", + conda_env, + "python", + "-c", + format!("import dora; dora.start_runtime() # {}", node.id).as_str(), + ]); + Some(command) + } else { + let mut cmd = if self.uv { + let mut cmd = tokio::process::Command::new("uv"); + cmd.arg("run"); + cmd.arg("python"); + tracing::info!( + "spawning: uv run python -uc import dora; dora.start_runtime() # {}", + node.id + ); + cmd + } else { + let python = get_python_path() + .wrap_err("Could not find python path when spawning custom node")?; + tracing::info!( + "spawning: python -uc import dora; dora.start_runtime() # {}", + node.id + ); + + tokio::process::Command::new(python) + }; + // Force python to always flush stdout/stderr buffer + cmd.args([ + "-c", + format!("import dora; dora.start_runtime() # {}", node.id).as_str(), + ]); + Some(cmd) + } + } else if python_operators.is_empty() && other_operators { + let mut cmd = tokio::process::Command::new( + std::env::current_exe() + .wrap_err("failed to get current executable path")?, + ); + cmd.arg("runtime"); + Some(cmd) + } else { + eyre::bail!("Runtime can not mix Python Operator with other type of operator."); + }; + + let runtime_config = RuntimeConfig { + node: node_config.clone(), + operators: n.operators.clone(), + }; + + if let Some(command) = &mut command { + command.current_dir(&self.working_dir); + + command.env( + "DORA_RUNTIME_CONFIG", + serde_yaml::to_string(&runtime_config) + .wrap_err("failed to serialize runtime config")?, + ); + // Injecting the env variable defined in the `yaml` into + // the node runtime. + if let Some(envs) = &node.env { + for (key, value) in envs { + command.env(key, value.to_string()); + } + } + // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C + #[cfg(unix)] + command.process_group(0); + + command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + }; + let error_msg = format!( + "failed to run runtime {}/{}", + runtime_config.node.dataflow_id, runtime_config.node.node_id + ); + (command, error_msg) + } + }; + Ok(PreparedNode { + command, + spawn_error_msg: error_msg, + working_dir: self.working_dir, + dataflow_id, + node, + node_config, + clock: self.clock, + daemon_tx: self.daemon_tx, + }) + } +} + +pub async fn build_node( + node_id: NodeId, + logger: &mut DaemonLogger<'_>, + node_env: &Option>, + working_dir: PathBuf, + build: &String, + uv: bool, +) -> eyre::Result<()> { + logger + .log( + LogLevel::Info, + None, + Some(node_id), + Some("build".to_owned()), + format!("running build command: `{build}"), + ) + .await; + let build = build.to_owned(); + let node_env = node_env.clone(); + let mut logger = logger.try_clone().await.context("failed to clone logger")?; + let (stdout_tx, mut stdout) = tokio::sync::mpsc::channel(10); + let task = tokio::task::spawn_blocking(move || { + run_build_command(&build, &working_dir, uv, &node_env, stdout_tx) + .context("build command failed") + }); + tokio::spawn(async move { + while let Some(line) = stdout.recv().await { + logger + .log( + LogLevel::Info, + None, + Some(node_id), + Some("build command".into()), + line.unwrap_or_else(|err| format!("io err: {}", err.kind())), + ) + .await; + } + }); + task.await??; + Ok(()) +} + +pub struct PreparedNode { + command: Option, + spawn_error_msg: String, + working_dir: PathBuf, + dataflow_id: DataflowId, + node: ResolvedNode, + node_config: NodeConfig, + clock: Arc, + daemon_tx: mpsc::Sender>, +} diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 364ca6ae..347c0a55 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -15,7 +15,7 @@ use dora_message::{ DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus, }, coordinator_to_cli::DataflowResult, - coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes}, + coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes}, daemon_to_coordinator::{ CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult, }, @@ -60,6 +60,7 @@ use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tracing::{error, warn}; use uuid::{NoContext, Timestamp, Uuid}; +mod build; mod coordinator; mod local_listener; mod log; @@ -73,7 +74,7 @@ use dora_tracing::telemetry::serialize_context; #[cfg(feature = "telemetry")] use tracing_opentelemetry::OpenTelemetrySpanExt; -use crate::pending::DataflowStatus; +use crate::{build::GitManager, pending::DataflowStatus}; const STDERR_LOG_LINES: usize = 10; @@ -101,7 +102,7 @@ pub struct Daemon { logger: DaemonLogger, - repos_in_use: BTreeMap>, + git_manager: GitManager, } type DaemonRunResult = BTreeMap>>; @@ -167,13 +168,13 @@ impl Daemon { let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext)); let spawn_command = SpawnDataflowNodes { + build_id, dataflow_id, working_dir, spawn_nodes: nodes.keys().cloned().collect(), nodes, dataflow_descriptor: descriptor, uv, - build_only: false, }; let clock = Arc::new(HLC::default()); @@ -298,7 +299,7 @@ impl Daemon { clock, zenoh_session, remote_daemon_events_tx, - repos_in_use: Default::default(), + git_manager: Default::default(), }; let dora_events = ReceiverStream::new(dora_events_rx); @@ -421,11 +422,7 @@ impl Daemon { Event::SpawnDataflowResult { dataflow_id, result, - build_only, } => { - if build_only { - self.running.remove(&dataflow_id); - } if let Some(connection) = &mut self.coordinator_connection { let msg = serde_json::to_vec(&Timestamped { inner: CoordinatorRequest::Event { @@ -476,14 +473,76 @@ impl Daemon { reply_tx: Sender>, ) -> eyre::Result { let status = match event { + DaemonCoordinatorEvent::Build(BuildDataflowNodes { + build_id, + working_dir, + nodes, + dataflow_descriptor, + nodes_on_machine, + uv, + }) => { + match dataflow_descriptor.communication.remote { + dora_core::config::RemoteCommunicationConfig::Tcp => {} + } + + // Use the working directory if it exists, otherwise use the working directory where the daemon is spawned + let working_dir = if working_dir.exists() { + working_dir + } else { + std::env::current_dir().wrap_err("failed to get current working dir")? + }; + + let result = self + .build_dataflow( + build_id, + working_dir, + nodes, + dataflow_descriptor, + nodes_on_machine, + uv, + ) + .await; + let (trigger_result, result_task) = match result { + Ok(result_task) => (Ok(()), Some(result_task)), + Err(err) => (Err(format!("{err:?}")), None), + }; + let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result); + let _ = reply_tx.send(Some(reply)).map_err(|_| { + error!("could not send `TriggerBuildResult` reply from daemon to coordinator") + }); + + let result_tx = self.events_tx.clone(); + let clock = self.clock.clone(); + if let Some(result_task) = result_task { + tokio::spawn(async move { + let message = Timestamped { + inner: Event::BuildDataflowResult { + build_id, + result: result_task.await, + }, + timestamp: clock.new_timestamp(), + }; + let _ = result_tx + .send(message) + .map_err(|_| { + error!( + "could not send `BuildResult` reply from daemon to coordinator" + ) + }) + .await; + }); + } + + RunStatus::Continue + } DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { + build_id, dataflow_id, working_dir, nodes, dataflow_descriptor, spawn_nodes, uv, - build_only, }) => { match dataflow_descriptor.communication.remote { dora_core::config::RemoteCommunicationConfig::Tcp => {} @@ -498,13 +557,13 @@ impl Daemon { let result = self .spawn_dataflow( + build_id, dataflow_id, working_dir, nodes, dataflow_descriptor, spawn_nodes, uv, - build_only, ) .await; let (trigger_result, result_task) = match result { @@ -524,7 +583,6 @@ impl Daemon { inner: Event::SpawnDataflowResult { dataflow_id, result: result_task.await, - build_only, }, timestamp: clock.new_timestamp(), }; @@ -770,15 +828,87 @@ impl Daemon { } } + async fn build_dataflow( + &mut self, + build_id: uuid::Uuid, + working_dir: PathBuf, + nodes: BTreeMap, + dataflow_descriptor: Descriptor, + local_nodes: BTreeSet, + uv: bool, + ) -> eyre::Result>> { + let mut tasks = Vec::new(); + + // spawn nodes and set up subscriptions + for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) { + let dynamic_node = node.kind.dynamic(); + + let node_id = node.id.clone(); + self.logger + .log_build(build_id, LogLevel::Info, None, "building") + .await; + match spawner + .clone() + .prepare_node( + node, + node_stderr_most_recent, + &mut logger, + &mut self.repos_in_use, + ) + .await + .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) + { + Ok(result) => { + tasks.push(NodePrepareTask { + node_id, + task: result, + dynamic_node, + }); + } + Err(err) => { + logger + .log(LogLevel::Error, Some("daemon".into()), format!("{err:?}")) + .await; + self.dataflow_node_results + .entry(dataflow_id) + .or_default() + .insert( + node_id.clone(), + Err(NodeError { + timestamp: self.clock.new_timestamp(), + cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")), + exit_status: NodeExitStatus::Unknown, + }), + ); + stopped.push((node_id.clone(), dynamic_node)); + } + } + } + for (node_id, dynamic) in stopped { + self.handle_node_stop(dataflow_id, &node_id, dynamic) + .await?; + } + + let spawn_result = Self::spawn_prepared_nodes( + dataflow_id, + logger, + tasks, + self.events_tx.clone(), + self.clock.clone(), + ); + + Ok(spawn_result) + } + async fn spawn_dataflow( &mut self, + build_id: Option, dataflow_id: uuid::Uuid, working_dir: PathBuf, nodes: BTreeMap, dataflow_descriptor: Descriptor, spawn_nodes: BTreeSet, uv: bool, - build_only: bool, ) -> eyre::Result>> { let mut logger = self .logger @@ -843,7 +973,6 @@ impl Daemon { dataflow_descriptor, clock: self.clock.clone(), uv, - build_only, }; let mut tasks = Vec::new(); @@ -2272,10 +2401,13 @@ pub enum Event { dynamic_node: bool, result: Result, }, + BuildDataflowResult { + build_id: Uuid, + result: eyre::Result<()>, + }, SpawnDataflowResult { dataflow_id: Uuid, result: eyre::Result<()>, - build_only: bool, }, } diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs index 26488361..10f8716a 100644 --- a/binaries/daemon/src/log.rs +++ b/binaries/daemon/src/log.rs @@ -8,6 +8,7 @@ use dora_core::{config::NodeId, uhlc}; use dora_message::{ common::{DaemonId, LogLevel, LogMessage, Timestamped}, daemon_to_coordinator::{CoordinatorRequest, DaemonEvent}, + BuildId, }; use eyre::Context; use tokio::net::TcpStream; @@ -81,7 +82,7 @@ impl<'a> DataflowLogger<'a> { message: impl Into, ) { self.logger - .log(level, self.dataflow_id, node_id, target, message) + .log(level, Some(self.dataflow_id), node_id, target, message) .await } @@ -113,12 +114,13 @@ impl DaemonLogger { pub async fn log( &mut self, level: LogLevel, - dataflow_id: Uuid, + dataflow_id: Option, node_id: Option, target: Option, message: impl Into, ) { let message = LogMessage { + build_id: None, daemon_id: Some(self.daemon_id.clone()), dataflow_id, node_id, @@ -132,6 +134,28 @@ impl DaemonLogger { self.logger.log(message).await } + pub async fn log_build( + &mut self, + build_id: BuildId, + level: LogLevel, + node_id: Option, + message: impl Into, + ) { + let message = LogMessage { + build_id: Some(build_id), + daemon_id: Some(self.daemon_id.clone()), + dataflow_id: None, + node_id, + level, + target: Some("build".into()), + module_path: None, + file: None, + line: None, + message: message.into(), + }; + self.logger.log(message).await + } + pub(crate) fn daemon_id(&self) -> &DaemonId { &self.daemon_id } @@ -181,7 +205,8 @@ impl Logger { match message.level { LogLevel::Error => { tracing::error!( - dataflow_id = message.dataflow_id.to_string(), + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, module_path = message.module_path, @@ -193,7 +218,8 @@ impl Logger { } LogLevel::Warn => { tracing::warn!( - dataflow_id = message.dataflow_id.to_string(), + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, module_path = message.module_path, @@ -205,7 +231,8 @@ impl Logger { } LogLevel::Info => { tracing::info!( - dataflow_id = message.dataflow_id.to_string(), + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, module_path = message.module_path, @@ -217,7 +244,8 @@ impl Logger { } LogLevel::Debug => { tracing::debug!( - dataflow_id = message.dataflow_id.to_string(), + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, module_path = message.module_path, diff --git a/binaries/daemon/src/spawn/git.rs b/binaries/daemon/src/spawn/git.rs deleted file mode 100644 index 9803d1f2..00000000 --- a/binaries/daemon/src/spawn/git.rs +++ /dev/null @@ -1,286 +0,0 @@ -use crate::log::NodeLogger; -use dora_message::{common::LogLevel, descriptor::GitRepoRev, DataflowId}; -use eyre::{ContextCompat, WrapErr}; -use git2::FetchOptions; -use std::{ - collections::{BTreeMap, BTreeSet}, - path::{Path, PathBuf}, -}; -use url::Url; -use uuid::Uuid; - -pub struct GitFolder { - /// The URL of the git repository. - repo_addr: String, - /// The branch, tag, or git revision to checkout. - rev: Option, - /// The directory that should contain the checked-out repository. - clone_dir: PathBuf, - /// Specifies whether an existing repo should be reused. - reuse: ReuseOptions, -} - -impl GitFolder { - pub fn choose_clone_dir( - dataflow_id: uuid::Uuid, - repo_addr: String, - rev: Option, - target_dir: &Path, - repos_in_use: &mut BTreeMap>, - ) -> eyre::Result { - let repo_url = Url::parse(&repo_addr).context("failed to parse git repository URL")?; - - let base_dir = { - let base = { - let mut path = - target_dir.join(repo_url.host_str().context("git URL has no hostname")?); - - path.extend(repo_url.path_segments().context("no path in git URL")?); - path - }; - match &rev { - None => base, - Some(rev) => match rev { - GitRepoRev::Branch(branch) => base.join("branch").join(branch), - GitRepoRev::Tag(tag) => base.join("tag").join(tag), - GitRepoRev::Rev(rev) => base.join("rev").join(rev), - }, - } - }; - let clone_dir = if clone_dir_exists(&base_dir, repos_in_use) { - let used_by_other = used_by_other_dataflow(dataflow_id, &base_dir, repos_in_use); - if used_by_other { - // don't reuse, choose new directory - // (TODO reuse if still up to date) - - let dir_name = base_dir.file_name().unwrap().to_str().unwrap(); - let mut i = 1; - loop { - let new_path = base_dir.with_file_name(format!("{dir_name}-{i}")); - if clone_dir_exists(&new_path, repos_in_use) - && used_by_other_dataflow(dataflow_id, &new_path, repos_in_use) - { - i += 1; - } else { - break new_path; - } - } - } else { - base_dir - } - } else { - base_dir - }; - let clone_dir = dunce::simplified(&clone_dir).to_owned(); - - let reuse = if clone_dir_exists(&clone_dir, repos_in_use) { - let empty = BTreeSet::new(); - let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty); - let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); - if used_by_other_dataflow { - // The directory is currently in use by another dataflow. We currently don't - // support reusing the same clone across multiple dataflow runs. Above, we - // choose a new directory if we detect such a case. So this `if` branch - // should never be reached. - eyre::bail!("clone_dir is already in use by other dataflow") - } else if in_use.is_empty() { - // The cloned repo is not used by any dataflow, so we can safely reuse it. However, - // the clone might be still on an older commit, so we need to do a `git fetch` - // before we reuse it. - ReuseOptions::ReuseAfterFetch - } else { - // This clone is already used for another node of this dataflow. We will do a - // `git fetch` operation for the first node of this dataflow, so we don't need - // to do it again for other nodes of the dataflow. So we can simply reuse the - // directory without doing any additional git operations. - ReuseOptions::Reuse - } - } else { - ReuseOptions::NewClone - }; - repos_in_use - .entry(clone_dir.clone()) - .or_default() - .insert(dataflow_id); - - Ok(GitFolder { - clone_dir, - reuse, - repo_addr, - rev, - }) - } - - pub async fn prepare(self, logger: &mut NodeLogger<'_>) -> eyre::Result { - let GitFolder { - clone_dir, - reuse, - repo_addr, - rev, - } = self; - - let rev_str = rev_str(&rev); - let refname = rev.clone().map(|rev| match rev { - GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"), - GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"), - GitRepoRev::Rev(rev) => rev, - }); - - match reuse { - ReuseOptions::NewClone => { - let repository = clone_into(&repo_addr, &rev, &clone_dir, logger).await?; - checkout_tree(&repository, refname)?; - } - ReuseOptions::ReuseAfterFetch => { - logger - .log( - LogLevel::Info, - None, - format!("fetching changes and reusing {repo_addr}{rev_str}"), - ) - .await; - let refname_cloned = refname.clone(); - let clone_dir = clone_dir.clone(); - let repository = fetch_changes(clone_dir, refname_cloned).await?; - checkout_tree(&repository, refname)?; - } - ReuseOptions::Reuse => { - logger - .log( - LogLevel::Info, - None, - format!("reusing up-to-date {repo_addr}{rev_str}"), - ) - .await; - } - }; - Ok(clone_dir) - } -} - -fn used_by_other_dataflow( - dataflow_id: uuid::Uuid, - clone_dir_base: &PathBuf, - repos_in_use: &mut BTreeMap>, -) -> bool { - let empty = BTreeSet::new(); - let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty); - let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); - used_by_other_dataflow -} - -enum ReuseOptions { - /// Create a new clone of the repository. - NewClone, - /// Reuse an existing up-to-date clone of the repository. - Reuse, - /// Update an older clone of the repository, then reuse it. - ReuseAfterFetch, -} - -fn rev_str(rev: &Option) -> String { - match rev { - Some(GitRepoRev::Branch(branch)) => format!(" (branch {branch})"), - Some(GitRepoRev::Tag(tag)) => format!(" (tag {tag})"), - Some(GitRepoRev::Rev(rev)) => format!(" (rev {rev})"), - None => String::new(), - } -} - -async fn clone_into( - repo_addr: &String, - rev: &Option, - clone_dir: &Path, - logger: &mut NodeLogger<'_>, -) -> eyre::Result { - if let Some(parent) = clone_dir.parent() { - tokio::fs::create_dir_all(parent) - .await - .context("failed to create parent directory for git clone")?; - } - - let rev_str = rev_str(rev); - logger - .log( - LogLevel::Info, - None, - format!("cloning {repo_addr}{rev_str} into {}", clone_dir.display()), - ) - .await; - let rev: Option = rev.clone(); - let clone_into = clone_dir.to_owned(); - let repo_addr = repo_addr.clone(); - let task = tokio::task::spawn_blocking(move || { - let mut builder = git2::build::RepoBuilder::new(); - let mut fetch_options = git2::FetchOptions::new(); - fetch_options.download_tags(git2::AutotagOption::All); - builder.fetch_options(fetch_options); - if let Some(GitRepoRev::Branch(branch)) = &rev { - builder.branch(branch); - } - builder - .clone(&repo_addr, &clone_into) - .context("failed to clone repo") - }); - let repo = task.await??; - Ok(repo) -} - -async fn fetch_changes( - repo_dir: PathBuf, - refname: Option, -) -> Result { - let fetch_changes = tokio::task::spawn_blocking(move || { - let repository = git2::Repository::open(&repo_dir).context("failed to open git repo")?; - - { - let mut remote = repository - .find_remote("origin") - .context("failed to find remote `origin` in repo")?; - remote - .connect(git2::Direction::Fetch) - .context("failed to connect to remote")?; - let default_branch = remote - .default_branch() - .context("failed to get default branch for remote")?; - let fetch = match &refname { - Some(refname) => refname, - None => default_branch - .as_str() - .context("failed to read default branch as string")?, - }; - let mut fetch_options = FetchOptions::new(); - fetch_options.download_tags(git2::AutotagOption::All); - remote - .fetch(&[&fetch], Some(&mut fetch_options), None) - .context("failed to fetch from git repo")?; - } - Result::<_, eyre::Error>::Ok(repository) - }); - let repository = fetch_changes.await??; - Ok(repository) -} - -fn checkout_tree(repository: &git2::Repository, refname: Option) -> eyre::Result<()> { - if let Some(refname) = refname { - let (object, reference) = repository - .revparse_ext(&refname) - .context("failed to parse ref")?; - repository - .checkout_tree(&object, None) - .context("failed to checkout ref")?; - match reference { - Some(reference) => repository - .set_head(reference.name().context("failed to get reference_name")?) - .context("failed to set head")?, - None => repository - .set_head_detached(object.id()) - .context("failed to set detached head")?, - } - } - Ok(()) -} - -fn clone_dir_exists(dir: &PathBuf, repos_in_use: &BTreeMap>) -> bool { - repos_in_use.contains_key(dir) || dir.exists() -} diff --git a/binaries/daemon/src/spawn/mod.rs b/binaries/daemon/src/spawn/mod.rs index 9bf15360..5fc41a9d 100644 --- a/binaries/daemon/src/spawn/mod.rs +++ b/binaries/daemon/src/spawn/mod.rs @@ -46,8 +46,6 @@ use tokio::{ }; use tracing::error; -mod git; - #[derive(Clone)] pub struct Spawner { pub dataflow_id: DataflowId, @@ -57,333 +55,6 @@ pub struct Spawner { /// clock is required for generating timestamps when dropping messages early because queue is full pub clock: Arc, pub uv: bool, - pub build_only: bool, -} - -impl Spawner { - pub async fn prepare_node( - self, - node: ResolvedNode, - node_stderr_most_recent: Arc>, - logger: &mut NodeLogger<'_>, - repos_in_use: &mut BTreeMap>, - ) -> eyre::Result>> { - let dataflow_id = self.dataflow_id; - let node_id = node.id.clone(); - logger - .log( - LogLevel::Debug, - Some("daemon::spawner".into()), - "spawning node", - ) - .await; - - let queue_sizes = node_inputs(&node) - .into_iter() - .map(|(k, v)| (k, v.queue_size.unwrap_or(10))) - .collect(); - let daemon_communication = spawn_listener_loop( - &dataflow_id, - &node_id, - &self.daemon_tx, - self.dataflow_descriptor.communication.local, - queue_sizes, - self.clock.clone(), - ) - .await?; - - let node_config = NodeConfig { - dataflow_id, - node_id: node_id.clone(), - run_config: node.kind.run_config(), - daemon_communication, - dataflow_descriptor: self.dataflow_descriptor.clone(), - dynamic: node.kind.dynamic(), - }; - - let prepared_git = if let dora_core::descriptor::CoreNodeKind::Custom(CustomNode { - source: dora_message::descriptor::NodeSource::GitBranch { repo, rev }, - .. - }) = &node.kind - { - let target_dir = self.working_dir.join("build"); - let git_folder = GitFolder::choose_clone_dir( - self.dataflow_id, - repo.clone(), - rev.clone(), - &target_dir, - repos_in_use, - )?; - Some(git_folder) - } else { - None - }; - - let mut logger = logger - .try_clone() - .await - .wrap_err("failed to clone logger")?; - let task = async move { - self.prepare_node_inner( - node, - &mut logger, - dataflow_id, - node_config, - prepared_git, - node_stderr_most_recent, - ) - .await - }; - Ok(task) - } - - async fn prepare_node_inner( - mut self, - node: ResolvedNode, - logger: &mut NodeLogger<'_>, - dataflow_id: uuid::Uuid, - node_config: NodeConfig, - git_folder: Option, - node_stderr_most_recent: Arc>, - ) -> eyre::Result { - let (command, error_msg) = match &node.kind { - dora_core::descriptor::CoreNodeKind::Custom(n) => { - let build_dir = match git_folder { - Some(git_folder) => git_folder.prepare(logger).await?, - None => self.working_dir.clone(), - }; - - if let Some(build) = &n.build { - self.build_node(logger, &node.env, build_dir.clone(), build) - .await?; - } - let mut command = if self.build_only { - None - } else { - path_spawn_command(&build_dir, self.uv, logger, n, true).await? - }; - - if let Some(command) = &mut command { - command.current_dir(&self.working_dir); - command.stdin(Stdio::null()); - - command.env( - "DORA_NODE_CONFIG", - serde_yaml::to_string(&node_config.clone()) - .wrap_err("failed to serialize node config")?, - ); - // Injecting the env variable defined in the `yaml` into - // the node runtime. - if let Some(envs) = &node.env { - for (key, value) in envs { - command.env(key, value.to_string()); - } - } - if let Some(envs) = &n.envs { - // node has some inner env variables -> add them too - for (key, value) in envs { - command.env(key, value.to_string()); - } - } - - // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C - #[cfg(unix)] - command.process_group(0); - - command.env("PYTHONUNBUFFERED", "1"); - command - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - }; - - let error_msg = format!( - "failed to run `{}` with args `{}`", - n.path, - n.args.as_deref().unwrap_or_default(), - ); - (command, error_msg) - } - dora_core::descriptor::CoreNodeKind::Runtime(n) => { - // run build commands - for operator in &n.operators { - if let Some(build) = &operator.config.build { - self.build_node(logger, &node.env, self.working_dir.clone(), build) - .await?; - } - } - - let python_operators: Vec<&OperatorDefinition> = n - .operators - .iter() - .filter(|x| matches!(x.config.source, OperatorSource::Python { .. })) - .collect(); - - let other_operators = n - .operators - .iter() - .any(|x| !matches!(x.config.source, OperatorSource::Python { .. })); - - let mut command = if self.build_only { - None - } else if !python_operators.is_empty() && !other_operators { - // Use python to spawn runtime if there is a python operator - - // TODO: Handle multi-operator runtime once sub-interpreter is supported - if python_operators.len() > 2 { - eyre::bail!( - "Runtime currently only support one Python Operator. - This is because pyo4 sub-interpreter is not yet available. - See: https://github.com/PyO4/pyo3/issues/576" - ); - } - - let python_operator = python_operators - .first() - .context("Runtime had no operators definition.")?; - - if let OperatorSource::Python(PythonSource { - source: _, - conda_env: Some(conda_env), - }) = &python_operator.config.source - { - let conda = which::which("conda").context( - "failed to find `conda`, yet a `conda_env` was defined. Make sure that `conda` is available.", - )?; - let mut command = tokio::process::Command::new(conda); - command.args([ - "run", - "-n", - conda_env, - "python", - "-c", - format!("import dora; dora.start_runtime() # {}", node.id).as_str(), - ]); - Some(command) - } else { - let mut cmd = if self.uv { - let mut cmd = tokio::process::Command::new("uv"); - cmd.arg("run"); - cmd.arg("python"); - tracing::info!( - "spawning: uv run python -uc import dora; dora.start_runtime() # {}", - node.id - ); - cmd - } else { - let python = get_python_path() - .wrap_err("Could not find python path when spawning custom node")?; - tracing::info!( - "spawning: python -uc import dora; dora.start_runtime() # {}", - node.id - ); - - tokio::process::Command::new(python) - }; - // Force python to always flush stdout/stderr buffer - cmd.args([ - "-c", - format!("import dora; dora.start_runtime() # {}", node.id).as_str(), - ]); - Some(cmd) - } - } else if python_operators.is_empty() && other_operators { - let mut cmd = tokio::process::Command::new( - std::env::current_exe() - .wrap_err("failed to get current executable path")?, - ); - cmd.arg("runtime"); - Some(cmd) - } else { - eyre::bail!("Runtime can not mix Python Operator with other type of operator."); - }; - - let runtime_config = RuntimeConfig { - node: node_config.clone(), - operators: n.operators.clone(), - }; - - if let Some(command) = &mut command { - command.current_dir(&self.working_dir); - - command.env( - "DORA_RUNTIME_CONFIG", - serde_yaml::to_string(&runtime_config) - .wrap_err("failed to serialize runtime config")?, - ); - // Injecting the env variable defined in the `yaml` into - // the node runtime. - if let Some(envs) = &node.env { - for (key, value) in envs { - command.env(key, value.to_string()); - } - } - // Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C - #[cfg(unix)] - command.process_group(0); - - command - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - }; - let error_msg = format!( - "failed to run runtime {}/{}", - runtime_config.node.dataflow_id, runtime_config.node.node_id - ); - (command, error_msg) - } - }; - Ok(PreparedNode { - command, - spawn_error_msg: error_msg, - working_dir: self.working_dir, - dataflow_id, - node, - node_config, - clock: self.clock, - daemon_tx: self.daemon_tx, - node_stderr_most_recent, - }) - } - - async fn build_node( - &mut self, - logger: &mut NodeLogger<'_>, - node_env: &Option>, - working_dir: PathBuf, - build: &String, - ) -> Result<(), eyre::Error> { - logger - .log( - LogLevel::Info, - None, - format!("running build command: `{build}"), - ) - .await; - let build = build.to_owned(); - let uv = self.uv; - let node_env = node_env.clone(); - let mut logger = logger.try_clone().await.context("failed to clone logger")?; - let (stdout_tx, mut stdout) = tokio::sync::mpsc::channel(10); - let task = tokio::task::spawn_blocking(move || { - run_build_command(&build, &working_dir, uv, &node_env, stdout_tx) - .context("build command failed") - }); - tokio::spawn(async move { - while let Some(line) = stdout.recv().await { - logger - .log( - LogLevel::Info, - Some("build command".into()), - line.unwrap_or_else(|err| format!("io err: {}", err.kind())), - ) - .await; - } - }); - task.await??; - Ok(()) - } } pub struct PreparedNode { @@ -642,7 +313,8 @@ impl PreparedNode { cloned_logger .log(LogMessage { daemon_id: Some(daemon_id.clone()), - dataflow_id, + dataflow_id: Some(dataflow_id), + build_id: None, level: LogLevel::Info, node_id: Some(node_id.clone()), target: Some("stdout".into()), diff --git a/examples/rust-dataflow-git/dataflow.yml b/examples/rust-dataflow-git/dataflow.yml index f4bca5df..f1fa7512 100644 --- a/examples/rust-dataflow-git/dataflow.yml +++ b/examples/rust-dataflow-git/dataflow.yml @@ -2,8 +2,8 @@ nodes: - id: rust-node git: https://github.com/dora-rs/dora.git rev: e31b2a34 # pinned commit, update this when changing the message crate - build: cargo build -p rust-dataflow-example-node - path: target/debug/rust-dataflow-example-node + build: cargo build --manifest-path build/github.com/dora-rs/dora.git/e31b2a34/Cargo.toml -p rust-dataflow-example-node + path: build/github.com/dora-rs/dora.git/e31b2a34/target/debug/rust-dataflow-example-node inputs: tick: dora/timer/millis/10 outputs: diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index 456bb1bd..d2019aa9 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -9,14 +9,21 @@ use crate::{ #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequest { + Build { + dataflow: Descriptor, + // TODO: remove this once we figure out deploying of node/operator + // binaries from CLI to coordinator/daemon + local_working_dir: PathBuf, + uv: bool, + }, Start { + build_id: Option, dataflow: Descriptor, name: Option, // TODO: remove this once we figure out deploying of node/operator // binaries from CLI to coordinator/daemon local_working_dir: PathBuf, uv: bool, - build_only: bool, }, WaitForSpawn { dataflow_id: Uuid, diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 015b163e..7fb8a41c 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -5,14 +5,15 @@ use aligned_vec::{AVec, ConstAlign}; use eyre::Context as _; use uuid::Uuid; -use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, DataflowId}; +use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId}; pub use log::Level as LogLevel; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[must_use] pub struct LogMessage { - pub dataflow_id: DataflowId, + pub build_id: Option, + pub dataflow_id: Option, pub node_id: Option, pub daemon_id: Option, pub level: LogLevel, diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index 87eb7ae7..f26e0b24 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -9,10 +9,26 @@ use crate::{common::DaemonId, id::NodeId}; pub enum ControlRequestReply { Error(String), CoordinatorStopped, - DataflowStartTriggered { uuid: Uuid }, - DataflowSpawned { uuid: Uuid }, - DataflowReloaded { uuid: Uuid }, - DataflowStopped { uuid: Uuid, result: DataflowResult }, + DataflowBuildTriggered { + build_id: Uuid, + }, + DataflowBuildFinished { + build_id: Uuid, + result: Result<(), String>, + }, + DataflowStartTriggered { + uuid: Uuid, + }, + DataflowSpawned { + uuid: Uuid, + }, + DataflowReloaded { + uuid: Uuid, + }, + DataflowStopped { + uuid: Uuid, + result: DataflowResult, + }, DataflowList(DataflowList), DestroyOk, DaemonConnected(bool), diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index 8c68a6ca..43440a18 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -4,6 +4,8 @@ use std::{ time::Duration, }; +use uuid::Uuid; + use crate::{ common::DaemonId, descriptor::{Descriptor, ResolvedNode}, @@ -33,6 +35,7 @@ impl RegisterResult { #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum DaemonCoordinatorEvent { + Build(BuildDataflowNodes), Spawn(SpawnDataflowNodes), AllNodesReady { dataflow_id: DataflowId, @@ -55,13 +58,23 @@ pub enum DaemonCoordinatorEvent { Heartbeat, } +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct BuildDataflowNodes { + pub build_id: Uuid, + pub working_dir: PathBuf, + pub nodes: BTreeMap, + pub dataflow_descriptor: Descriptor, + pub nodes_on_machine: BTreeSet, + pub uv: bool, +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct SpawnDataflowNodes { + pub build_id: Option, pub dataflow_id: DataflowId, pub working_dir: PathBuf, pub nodes: BTreeMap, pub dataflow_descriptor: Descriptor, pub spawn_nodes: BTreeSet, pub uv: bool, - pub build_only: bool, } diff --git a/libraries/message/src/daemon_to_coordinator.rs b/libraries/message/src/daemon_to_coordinator.rs index 309697be..6e97e3ae 100644 --- a/libraries/message/src/daemon_to_coordinator.rs +++ b/libraries/message/src/daemon_to_coordinator.rs @@ -77,6 +77,7 @@ impl DataflowDaemonResult { #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum DaemonCoordinatorReply { + TriggerBuildResult(Result<(), String>), TriggerSpawnResult(Result<(), String>), ReloadResult(Result<(), String>), StopResult(Result<(), String>), diff --git a/libraries/message/src/descriptor.rs b/libraries/message/src/descriptor.rs index 02f660d4..ca583c0b 100644 --- a/libraries/message/src/descriptor.rs +++ b/libraries/message/src/descriptor.rs @@ -253,6 +253,12 @@ pub enum NodeSource { }, } +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub enum ResolvedNodeSource { + Local, + GitCommit { repo: String, commit_hash: String }, +} + #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub enum GitRepoRev { Branch(String), diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index 9d1870e0..3cca90ad 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -26,6 +26,7 @@ pub use arrow_data; pub use arrow_schema; pub type DataflowId = uuid::Uuid; +pub type BuildId = uuid::Uuid; fn current_crate_version() -> semver::Version { let crate_version_raw = env!("CARGO_PKG_VERSION");