From ed224d62e7da128c0ec4fbf21174fcaf9ff5e41a Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 4 Jun 2025 16:14:59 +0200 Subject: [PATCH] Wip --- binaries/cli/src/lib.rs | 4 +- binaries/coordinator/src/lib.rs | 28 +++++------ binaries/coordinator/src/run/mod.rs | 4 +- binaries/daemon/src/build/git.rs | 22 ++++---- binaries/daemon/src/build/mod.rs | 6 +-- binaries/daemon/src/lib.rs | 50 +++++++++++++------ binaries/daemon/src/log.rs | 24 ++++----- binaries/daemon/src/spawn/mod.rs | 14 ++---- libraries/message/src/cli_to_coordinator.rs | 2 +- libraries/message/src/common.rs | 2 +- libraries/message/src/coordinator_to_cli.rs | 4 +- .../message/src/coordinator_to_daemon.rs | 4 +- 12 files changed, 89 insertions(+), 75 deletions(-) diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index c1004489..af54a977 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -633,7 +633,7 @@ fn run(args: Args) -> eyre::Result<()> { } fn start_dataflow( - build_id: Option, + session_id: Option, dataflow: String, name: Option, coordinator_socket: SocketAddr, @@ -656,7 +656,7 @@ fn start_dataflow( let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Start { - build_id, + session_id, dataflow, name, local_working_dir: working_dir, diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index da431dd5..9b9ec085 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -363,7 +363,7 @@ async fn start_inner( send_log_message( &mut finished_dataflow, &LogMessage { - build_id: None, + session_id: None, dataflow_id: Some(dataflow_id), node_id: None, daemon_id: None, @@ -418,10 +418,10 @@ async fn start_inner( uv, } => { // assign a random build id - let build_id = BuildId::new_v4(); + let session_id = BuildId::new_v4(); let result = build_dataflow( - build_id, + session_id, dataflow, git_sources, prev_git_sources, @@ -434,7 +434,7 @@ async fn start_inner( match result { Ok(()) => { let _ = reply_sender.send(Ok( - ControlRequestReply::DataflowBuildTriggered { build_id }, + ControlRequestReply::DataflowBuildTriggered { session_id }, )); } Err(err) => { @@ -443,7 +443,7 @@ async fn start_inner( } } ControlRequest::Start { - build_id, + session_id, dataflow, name, local_working_dir, @@ -462,7 +462,7 @@ async fn start_inner( } } let dataflow = start_dataflow( - build_id, + session_id, dataflow, local_working_dir, name, @@ -769,8 +769,8 @@ async fn start_inner( send_log_message(dataflow, &message).await; } } - if let Some(build_id) = message.build_id { - if let Entry::Occupied(subscriber) = build_log_subscribers.entry(build_id) { + if let Some(session_id) = message.session_id { + if let Entry::Occupied(subscriber) = build_log_subscribers.entry(session_id) { send_log_message_to_subscriber(&message, subscriber).await; } } @@ -1180,7 +1180,7 @@ async fn retrieve_logs( #[tracing::instrument(skip(daemon_connections, clock))] async fn build_dataflow( - build_id: BuildId, + session_id: BuildId, dataflow: Descriptor, git_sources: BTreeMap, prev_git_sources: BTreeMap, @@ -1206,11 +1206,11 @@ async fn build_dataflow( for (machine, nodes_on_machine) in &nodes_by_daemon { let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect(); tracing::debug!( - "Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})" + "Running dataflow build `{session_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})" ); let build_command = BuildDataflowNodes { - build_id, + session_id, working_dir: working_dir.clone(), nodes: nodes.clone(), git_sources: git_sources_by_daemon @@ -1234,7 +1234,7 @@ async fn build_dataflow( daemons.insert(daemon_id); } - tracing::info!("successfully triggered dataflow build `{build_id}`",); + tracing::info!("successfully triggered dataflow build `{session_id}`",); Ok(()) } @@ -1278,7 +1278,7 @@ async fn build_dataflow_on_machine( } async fn start_dataflow( - build_id: Option, + session_id: Option, dataflow: Descriptor, working_dir: PathBuf, name: Option, @@ -1291,7 +1291,7 @@ async fn start_dataflow( daemons, nodes, } = spawn_dataflow( - build_id, + session_id, dataflow, working_dir, daemon_connections, diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index d77d39e2..7de7ea91 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -22,7 +22,7 @@ use uuid::{NoContext, Timestamp, Uuid}; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( - build_id: Option, + session_id: Option, dataflow: Descriptor, working_dir: PathBuf, daemon_connections: &mut DaemonConnections, @@ -42,7 +42,7 @@ pub(super) async fn spawn_dataflow( ); let spawn_command = SpawnDataflowNodes { - build_id, + session_id, dataflow_id: uuid, working_dir: working_dir.clone(), nodes: nodes.clone(), diff --git a/binaries/daemon/src/build/git.rs b/binaries/daemon/src/build/git.rs index 0e6dbbe7..bfe0dcb5 100644 --- a/binaries/daemon/src/build/git.rs +++ b/binaries/daemon/src/build/git.rs @@ -30,13 +30,13 @@ struct PreparedBuild { impl GitManager { pub fn choose_clone_dir( &mut self, - build_id: uuid::Uuid, + session_id: uuid::Uuid, repo_url: Url, commit_hash: String, prev_commit_hash: Option, target_dir: &Path, ) -> eyre::Result { - let clone_dir = Self::clone_dir_path(&target_dir, build_id, &repo_url, &commit_hash)?; + let clone_dir = Self::clone_dir_path(&target_dir, session_id, &repo_url, &commit_hash)?; if let Some(using) = self.clones_in_use.get(&clone_dir) { if !using.is_empty() { @@ -50,7 +50,7 @@ impl GitManager { } } - let reuse = if self.clone_dir_ready(build_id, &clone_dir) { + let reuse = if self.clone_dir_ready(session_id, &clone_dir) { // The directory already contains a checkout of the commit we're interested in. // So we can simply reuse the directory without doing any additional git // operations. @@ -60,7 +60,7 @@ impl GitManager { } else if let Some(previous_commit_hash) = prev_commit_hash { // we might be able to update a previous clone let prev_clone_dir = - Self::clone_dir_path(&target_dir, build_id, &repo_url, &previous_commit_hash)?; + Self::clone_dir_path(&target_dir, session_id, &repo_url, &previous_commit_hash)?; if self .clones_in_use @@ -97,7 +97,7 @@ impl GitManager { commit_hash, } }; - self.register_ready_clone_dir(build_id, clone_dir); + self.register_ready_clone_dir(session_id, clone_dir); Ok(GitFolder { reuse }) } @@ -109,17 +109,17 @@ impl GitManager { .unwrap_or(false) } - pub fn clone_dir_ready(&self, build_id: BuildId, dir: &Path) -> bool { + pub fn clone_dir_ready(&self, session_id: BuildId, dir: &Path) -> bool { self.prepared_builds - .get(&build_id) + .get(&session_id) .map(|p| p.planned_clone_dirs.contains(dir)) .unwrap_or(false) || dir.exists() } - pub fn register_ready_clone_dir(&mut self, build_id: BuildId, dir: PathBuf) -> bool { + pub fn register_ready_clone_dir(&mut self, session_id: BuildId, dir: PathBuf) -> bool { self.prepared_builds - .entry(build_id) + .entry(session_id) .or_default() .planned_clone_dirs .insert(dir) @@ -127,12 +127,12 @@ impl GitManager { fn clone_dir_path( base_dir: &Path, - build_id: BuildId, + session_id: BuildId, repo_url: &Url, commit_hash: &String, ) -> eyre::Result { let mut path = base_dir - .join(&build_id.to_string()) + .join(&session_id.to_string()) .join(repo_url.host_str().context("git URL has no hostname")?); path.extend(repo_url.path_segments().context("no path in git URL")?); let path = path.join(commit_hash); diff --git a/binaries/daemon/src/build/mod.rs b/binaries/daemon/src/build/mod.rs index 0df5d977..a96d4129 100644 --- a/binaries/daemon/src/build/mod.rs +++ b/binaries/daemon/src/build/mod.rs @@ -22,7 +22,7 @@ mod git; #[derive(Clone)] pub struct Builder { - pub build_id: BuildId, + pub session_id: BuildId, pub working_dir: PathBuf, pub daemon_tx: mpsc::Sender>, pub dataflow_descriptor: Descriptor, @@ -47,7 +47,7 @@ impl Builder { let target_dir = self.working_dir.join("build"); let prev_hash = prev_git.filter(|p| p.repo == repo).map(|p| p.commit_hash); let git_folder = git_manager.choose_clone_dir( - self.build_id, + self.session_id, repo_url, commit_hash, prev_hash, @@ -79,7 +79,7 @@ impl Builder { dora_core::descriptor::CoreNodeKind::Custom(n) => { let node_working_dir = match git_folder { Some(git_folder) => git_folder.prepare(logger).await?, - None => self.working_dir.join(self.build_id.to_string()), + None => self.working_dir.join(self.session_id.to_string()), }; if let Some(build) = &n.build { diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 1f7fe82c..75496a63 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -15,9 +15,7 @@ use dora_message::{ DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus, }, coordinator_to_cli::DataflowResult, - coordinator_to_daemon::{ - BuildDataflowNodes, DaemonCoordinatorEvent, GitSource, SpawnDataflowNodes, - }, + coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes}, daemon_to_coordinator::{ CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult, }, @@ -170,7 +168,7 @@ impl Daemon { let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext)); let spawn_command = SpawnDataflowNodes { - build_id, + session_id, dataflow_id, working_dir, spawn_nodes: nodes.keys().cloned().collect(), @@ -421,6 +419,23 @@ impl Daemon { .await?; } }, + Event::BuildDataflowResult { session_id, result } => { + if let Some(connection) = &mut self.coordinator_connection { + let msg = serde_json::to_vec(&Timestamped { + inner: CoordinatorRequest::Event { + daemon_id: self.daemon_id.clone(), + event: DaemonEvent::SpawnResult { + dataflow_id, + result: result.map_err(|err| format!("{err:?}")), + }, + }, + timestamp: self.clock.new_timestamp(), + })?; + socket_stream_send(connection, &msg) + .await + .wrap_err("failed to send Exit message to dora-coordinator")?; + } + } Event::SpawnDataflowResult { dataflow_id, result, @@ -476,7 +491,7 @@ impl Daemon { ) -> eyre::Result { let status = match event { DaemonCoordinatorEvent::Build(BuildDataflowNodes { - build_id, + session_id, working_dir, nodes, git_sources, @@ -498,7 +513,7 @@ impl Daemon { let result = self .build_dataflow( - build_id, + session_id, working_dir, nodes, git_sources, @@ -523,7 +538,7 @@ impl Daemon { tokio::spawn(async move { let message = Timestamped { inner: Event::BuildDataflowResult { - build_id, + session_id, result: result_task.await, }, timestamp: clock.new_timestamp(), @@ -542,7 +557,7 @@ impl Daemon { RunStatus::Continue } DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { - build_id, + session_id, dataflow_id, working_dir, nodes, @@ -563,7 +578,7 @@ impl Daemon { let result = self .spawn_dataflow( - build_id, + session_id, dataflow_id, working_dir, nodes, @@ -836,7 +851,7 @@ impl Daemon { async fn build_dataflow( &mut self, - build_id: uuid::Uuid, + session_id: uuid::Uuid, working_dir: PathBuf, nodes: BTreeMap, git_sources: BTreeMap, @@ -846,7 +861,7 @@ impl Daemon { uv: bool, ) -> eyre::Result>> { let builder = build::Builder { - build_id, + session_id, working_dir, daemon_tx: self.events_tx.clone(), dataflow_descriptor, @@ -861,7 +876,7 @@ impl Daemon { let dynamic_node = node.kind.dynamic(); let node_id = node.id.clone(); - let mut logger = self.logger.for_node_build(build_id, node_id.clone()); + let mut logger = self.logger.for_node_build(session_id, node_id.clone()); logger.log(LogLevel::Info, "building").await; let git_source = git_sources.get(&node_id).cloned(); let prev_git_source = prev_git_sources.get(&node_id).cloned(); @@ -887,7 +902,12 @@ impl Daemon { } Err(err) => { self.logger - .log_build(build_id, LogLevel::Error, Some(node_id), format!("{err:?}")) + .log_build( + session_id, + LogLevel::Error, + Some(node_id), + format!("{err:?}"), + ) .await; return Err(err); } @@ -925,7 +945,7 @@ impl Daemon { async fn spawn_dataflow( &mut self, - build_id: Option, + session_id: Option, dataflow_id: uuid::Uuid, working_dir: PathBuf, nodes: BTreeMap, @@ -2425,7 +2445,7 @@ pub enum Event { result: Result, }, BuildDataflowResult { - build_id: Uuid, + session_id: Uuid, result: eyre::Result<()>, }, SpawnDataflowResult { diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs index 176857d0..cdcacd31 100644 --- a/binaries/daemon/src/log.rs +++ b/binaries/daemon/src/log.rs @@ -95,7 +95,7 @@ impl<'a> DataflowLogger<'a> { } pub struct NodeBuildLogger<'a> { - build_id: BuildId, + session_id: BuildId, node_id: NodeId, logger: CowMut<'a, DaemonLogger>, } @@ -107,13 +107,13 @@ impl NodeBuildLogger<'_> { pub async fn log(&mut self, level: LogLevel, message: impl Into) { self.logger - .log_build(self.build_id, level, Some(self.node_id.clone()), message) + .log_build(self.session_id, level, Some(self.node_id.clone()), message) .await } pub async fn try_clone(&self) -> eyre::Result> { Ok(NodeBuildLogger { - build_id: self.build_id, + session_id: self.session_id, node_id: self.node_id.clone(), logger: CowMut::Owned(self.logger.try_clone().await?), }) @@ -133,9 +133,9 @@ impl DaemonLogger { } } - pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger { + pub fn for_node_build(&mut self, session_id: BuildId, node_id: NodeId) -> NodeBuildLogger { NodeBuildLogger { - build_id, + session_id, node_id, logger: CowMut::Borrowed(self), } @@ -154,7 +154,7 @@ impl DaemonLogger { message: impl Into, ) { let message = LogMessage { - build_id: None, + session_id: None, daemon_id: Some(self.daemon_id.clone()), dataflow_id, node_id, @@ -170,13 +170,13 @@ impl DaemonLogger { pub async fn log_build( &mut self, - build_id: BuildId, + session_id: BuildId, level: LogLevel, node_id: Option, message: impl Into, ) { let message = LogMessage { - build_id: Some(build_id), + session_id: Some(session_id), daemon_id: Some(self.daemon_id.clone()), dataflow_id: None, node_id, @@ -239,7 +239,7 @@ impl Logger { match message.level { LogLevel::Error => { tracing::error!( - build_id = ?message.build_id.map(|id| id.to_string()), + session_id = ?message.session_id.map(|id| id.to_string()), dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, @@ -252,7 +252,7 @@ impl Logger { } LogLevel::Warn => { tracing::warn!( - build_id = ?message.build_id.map(|id| id.to_string()), + session_id = ?message.session_id.map(|id| id.to_string()), dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, @@ -265,7 +265,7 @@ impl Logger { } LogLevel::Info => { tracing::info!( - build_id = ?message.build_id.map(|id| id.to_string()), + session_id = ?message.session_id.map(|id| id.to_string()), dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, @@ -278,7 +278,7 @@ impl Logger { } LogLevel::Debug => { tracing::debug!( - build_id = ?message.build_id.map(|id| id.to_string()), + session_id = ?message.session_id.map(|id| id.to_string()), dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, diff --git a/binaries/daemon/src/spawn/mod.rs b/binaries/daemon/src/spawn/mod.rs index 5fc41a9d..a3e23362 100644 --- a/binaries/daemon/src/spawn/mod.rs +++ b/binaries/daemon/src/spawn/mod.rs @@ -1,17 +1,15 @@ use crate::{ log::{self, NodeLogger}, - node_communication::spawn_listener_loop, - node_inputs, CoreNodeKindExt, DoraEvent, Event, OutputId, RunningNode, + CoreNodeKindExt, DoraEvent, Event, OutputId, RunningNode, }; use aligned_vec::{AVec, ConstAlign}; use crossbeam::queue::ArrayQueue; use dora_arrow_convert::IntoArrow; use dora_core::{ - build::run_build_command, config::DataId, descriptor::{ - resolve_path, source_is_url, CustomNode, Descriptor, OperatorDefinition, OperatorSource, - PythonSource, ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE, + resolve_path, source_is_url, Descriptor, ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, + SHELL_SOURCE, }, get_python_path, uhlc::HLC, @@ -31,12 +29,8 @@ use dora_node_api::{ Metadata, }; use eyre::{ContextCompat, WrapErr}; -use git::GitFolder; use std::{ - collections::{BTreeMap, BTreeSet}, - future::Future, path::{Path, PathBuf}, - process::Stdio, sync::Arc, }; use tokio::{ @@ -314,7 +308,7 @@ impl PreparedNode { .log(LogMessage { daemon_id: Some(daemon_id.clone()), dataflow_id: Some(dataflow_id), - build_id: None, + session_id: None, level: LogLevel::Info, node_id: Some(node_id.clone()), target: Some("stdout".into()), diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index c77cc6ee..50e7258c 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -20,7 +20,7 @@ pub enum ControlRequest { uv: bool, }, Start { - build_id: Option, + session_id: Option, dataflow: Descriptor, name: Option, // TODO: remove this once we figure out deploying of node/operator diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 83591811..cf8976fe 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -12,7 +12,7 @@ pub use log::Level as LogLevel; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[must_use] pub struct LogMessage { - pub build_id: Option, + pub session_id: Option, pub dataflow_id: Option, pub node_id: Option, pub daemon_id: Option, diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index f26e0b24..fd0e733a 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -10,10 +10,10 @@ pub enum ControlRequestReply { Error(String), CoordinatorStopped, DataflowBuildTriggered { - build_id: Uuid, + session_id: Uuid, }, DataflowBuildFinished { - build_id: Uuid, + session_id: Uuid, result: Result<(), String>, }, DataflowStartTriggered { diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index 0becd47e..3f2e2500 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -60,7 +60,7 @@ pub enum DaemonCoordinatorEvent { #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct BuildDataflowNodes { - pub build_id: Uuid, + pub session_id: Uuid, pub working_dir: PathBuf, pub nodes: BTreeMap, pub git_sources: BTreeMap, @@ -72,7 +72,7 @@ pub struct BuildDataflowNodes { #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct SpawnDataflowNodes { - pub build_id: Option, + pub session_id: Option, pub dataflow_id: DataflowId, pub working_dir: PathBuf, pub nodes: BTreeMap,