From b22e8a141113ea2e248de30377cb26b103615e4d Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 4 Jun 2025 19:39:20 +0200 Subject: [PATCH] Wip --- binaries/coordinator/src/control.rs | 20 +++++- binaries/coordinator/src/lib.rs | 70 +++++++++---------- binaries/coordinator/src/run/mod.rs | 4 +- binaries/daemon/src/build/git.rs | 3 +- binaries/daemon/src/build/mod.rs | 2 +- binaries/daemon/src/lib.rs | 13 ++-- binaries/daemon/src/log.rs | 26 +++---- binaries/daemon/src/spawn/mod.rs | 2 +- libraries/message/src/cli_to_coordinator.rs | 10 +-- libraries/message/src/common.rs | 4 +- libraries/message/src/coordinator_to_cli.rs | 8 +-- .../message/src/coordinator_to_daemon.rs | 4 +- libraries/message/src/lib.rs | 37 +++++++++- 13 files changed, 125 insertions(+), 78 deletions(-) diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index c0e92417..ace212e2 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -2,7 +2,9 @@ use crate::{ tcp_utils::{tcp_receive, tcp_send}, Event, }; -use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; +use dora_message::{ + cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply, BuildId, +}; use eyre::{eyre, Context}; use futures::{ future::{self, Either}, @@ -114,6 +116,17 @@ async fn handle_requests( break; } + if let Ok(ControlRequest::BuildLogSubscribe { build_id, level }) = request { + let _ = tx + .send(ControlEvent::BuildLogSubscribe { + build_id, + level, + connection, + }) + .await; + break; + } + let result = match request { Ok(request) => handle_request(request, &tx).await, Err(err) => Err(err), @@ -179,6 +192,11 @@ pub enum ControlEvent { level: log::LevelFilter, connection: TcpStream, }, + BuildLogSubscribe { + build_id: BuildId, + level: log::LevelFilter, + connection: TcpStream, + }, Error(eyre::Report), } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 6d553f0c..68beba37 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -29,11 +29,7 @@ use itertools::Itertools; use log_subscriber::LogSubscriber; use run::SpawnedDataflow; use std::{ - collections::{ - btree_map::{Entry, OccupiedEntry}, - BTreeMap, BTreeSet, HashMap, - }, - env::current_dir, + collections::{BTreeMap, BTreeSet, HashMap}, net::SocketAddr, path::PathBuf, sync::Arc, @@ -204,6 +200,7 @@ async fn start_inner( let mut events = (abortable_events, daemon_events).merge(); let mut running_builds: HashMap = HashMap::new(); + let mut build_log_subscribers: BTreeMap> = Default::default(); let mut running_dataflows: HashMap = HashMap::new(); let mut dataflow_results: HashMap> = @@ -211,8 +208,6 @@ async fn start_inner( let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections = DaemonConnections::default(); - let mut build_log_subscribers: BTreeMap = Default::default(); - while let Some(event) = events.next().await { // used below for measuring the event handling duration let start = Instant::now(); @@ -364,9 +359,9 @@ async fn start_inner( let mut finished_dataflow = entry.remove(); let dataflow_id = finished_dataflow.uuid; send_log_message( - &mut finished_dataflow, + &mut finished_dataflow.log_subscribers, &LogMessage { - session_id: None, + build_id: None, dataflow_id: Some(dataflow_id), node_id: None, daemon_id: None, @@ -422,7 +417,7 @@ async fn start_inner( uv, } => { // assign a random build id - let build_id = SessionId::new_v4(); + let build_id = BuildId::generate(); let result = build_dataflow( build_id, @@ -447,6 +442,14 @@ async fn start_inner( } } } + ControlRequest::WaitForBuild { build_id } => { + if let Some(build) = running_builds.get_mut(&build_id) { + build.spawn_result.register(reply_sender); + } else { + let _ = + reply_sender.send(Err(eyre!("unknown build id {build_id}"))); + } + } ControlRequest::Start { build_id, session_id, @@ -702,6 +705,11 @@ async fn start_inner( "LogSubscribe request should be handled separately" ))); } + ControlRequest::BuildLogSubscribe { .. } => { + let _ = reply_sender.send(Err(eyre::eyre!( + "BuildLogSubscribe request should be handled separately" + ))); + } } } ControlEvent::Error(err) => tracing::error!("{err:?}"), @@ -716,6 +724,16 @@ async fn start_inner( .push(LogSubscriber::new(level, connection)); } } + ControlEvent::BuildLogSubscribe { + build_id, + level, + connection, + } => { + build_log_subscribers + .entry(build_id) + .or_default() + .push(LogSubscriber::new(level, connection)); + } }, Event::DaemonHeartbeatInterval => { let mut disconnected = BTreeSet::new(); @@ -773,12 +791,12 @@ async fn start_inner( Event::Log(message) => { if let Some(dataflow_id) = &message.dataflow_id { if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) { - send_log_message(dataflow, &message).await; + send_log_message(&mut dataflow.log_subscribers, &message).await; } } - if let Some(session_id) = message.session_id { - if let Entry::Occupied(subscriber) = build_log_subscribers.entry(session_id) { - send_log_message_to_subscriber(&message, subscriber).await; + if let Some(build_id) = message.build_id { + if let Some(subscribers) = build_log_subscribers.get_mut(&build_id) { + send_log_message(subscribers, &message).await; } } } @@ -865,8 +883,8 @@ async fn start_inner( Ok(()) } -async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) { - for subscriber in &mut dataflow.log_subscribers { +async fn send_log_message(log_subscribers: &mut Vec, message: &LogMessage) { + for subscriber in log_subscribers.iter_mut() { let send_result = tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message)); @@ -874,25 +892,7 @@ async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) subscriber.close(); } } - dataflow.log_subscribers.retain(|s| !s.is_closed()); -} - -async fn send_log_message_to_subscriber( - message: &LogMessage, - mut subscriber: OccupiedEntry<'_, SessionId, LogSubscriber>, -) { - let send_result = tokio::time::timeout( - Duration::from_millis(100), - subscriber.get_mut().send_message(message), - ); - - if send_result.await.is_err() { - subscriber.get_mut().close(); - } - - if subscriber.get_mut().is_closed() { - subscriber.remove(); - } + log_subscribers.retain(|s| !s.is_closed()); } fn dataflow_result( diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index b0ecda1f..ca89fb87 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -10,7 +10,7 @@ use dora_message::{ daemon_to_coordinator::DaemonCoordinatorReply, descriptor::{Descriptor, ResolvedNode}, id::NodeId, - SessionId, + BuildId, SessionId, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use itertools::Itertools; @@ -22,7 +22,7 @@ use uuid::{NoContext, Timestamp, Uuid}; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( - build_id: Option, + build_id: Option, session_id: SessionId, dataflow: Descriptor, local_working_dir: Option, diff --git a/binaries/daemon/src/build/git.rs b/binaries/daemon/src/build/git.rs index c8092edf..ca92373e 100644 --- a/binaries/daemon/src/build/git.rs +++ b/binaries/daemon/src/build/git.rs @@ -8,7 +8,6 @@ use std::{ path::{Path, PathBuf}, }; use url::Url; -use uuid::Uuid; #[derive(Default)] pub struct GitManager { @@ -30,7 +29,7 @@ struct PreparedBuild { impl GitManager { pub fn choose_clone_dir( &mut self, - session_id: uuid::Uuid, + session_id: SessionId, repo_url: Url, commit_hash: String, prev_commit_hash: Option, diff --git a/binaries/daemon/src/build/mod.rs b/binaries/daemon/src/build/mod.rs index 8c41bddf..2bff3bea 100644 --- a/binaries/daemon/src/build/mod.rs +++ b/binaries/daemon/src/build/mod.rs @@ -12,7 +12,7 @@ use dora_message::{ common::{GitSource, LogLevel, Timestamped}, descriptor::EnvValue, id::NodeId, - BuildId, SessionId, + SessionId, }; use eyre::Context; use tokio::sync::mpsc; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index dcfd3462..32a278e9 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -539,6 +539,7 @@ impl Daemon { let result = self .build_dataflow( + build_id, session_id, base_working_dir, nodes, @@ -874,6 +875,7 @@ impl Daemon { async fn build_dataflow( &mut self, + build_id: BuildId, session_id: SessionId, base_working_dir: PathBuf, nodes: BTreeMap, @@ -899,7 +901,7 @@ impl Daemon { let dynamic_node = node.kind.dynamic(); let node_id = node.id.clone(); - let mut logger = self.logger.for_node_build(session_id, node_id.clone()); + let mut logger = self.logger.for_node_build(build_id, node_id.clone()); logger.log(LogLevel::Info, "building").await; let git_source = git_sources.get(&node_id).cloned(); let prev_git_source = prev_git_sources.get(&node_id).cloned(); @@ -925,12 +927,7 @@ impl Daemon { } Err(err) => { self.logger - .log_build( - session_id, - LogLevel::Error, - Some(node_id), - format!("{err:?}"), - ) + .log_build(build_id, LogLevel::Error, Some(node_id), format!("{err:?}")) .await; return Err(err); } @@ -2011,7 +2008,7 @@ impl Daemon { fn base_working_dir( &self, local_working_dir: Option, - session_id: Uuid, + session_id: SessionId, ) -> eyre::Result { match local_working_dir { Some(working_dir) => { diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs index 6b7add12..176857d0 100644 --- a/binaries/daemon/src/log.rs +++ b/binaries/daemon/src/log.rs @@ -8,7 +8,7 @@ use dora_core::{config::NodeId, uhlc}; use dora_message::{ common::{DaemonId, LogLevel, LogMessage, Timestamped}, daemon_to_coordinator::{CoordinatorRequest, DaemonEvent}, - SessionId, + BuildId, }; use eyre::Context; use tokio::net::TcpStream; @@ -95,7 +95,7 @@ impl<'a> DataflowLogger<'a> { } pub struct NodeBuildLogger<'a> { - session_id: SessionId, + build_id: BuildId, node_id: NodeId, logger: CowMut<'a, DaemonLogger>, } @@ -107,13 +107,13 @@ impl NodeBuildLogger<'_> { pub async fn log(&mut self, level: LogLevel, message: impl Into) { self.logger - .log_build(self.session_id, level, Some(self.node_id.clone()), message) + .log_build(self.build_id, level, Some(self.node_id.clone()), message) .await } pub async fn try_clone(&self) -> eyre::Result> { Ok(NodeBuildLogger { - session_id: self.session_id, + build_id: self.build_id, node_id: self.node_id.clone(), logger: CowMut::Owned(self.logger.try_clone().await?), }) @@ -133,9 +133,9 @@ impl DaemonLogger { } } - pub fn for_node_build(&mut self, session_id: SessionId, node_id: NodeId) -> NodeBuildLogger { + pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger { NodeBuildLogger { - session_id, + build_id, node_id, logger: CowMut::Borrowed(self), } @@ -154,7 +154,7 @@ impl DaemonLogger { message: impl Into, ) { let message = LogMessage { - session_id: None, + build_id: None, daemon_id: Some(self.daemon_id.clone()), dataflow_id, node_id, @@ -170,13 +170,13 @@ impl DaemonLogger { pub async fn log_build( &mut self, - session_id: SessionId, + build_id: BuildId, level: LogLevel, node_id: Option, message: impl Into, ) { let message = LogMessage { - session_id: Some(session_id), + build_id: Some(build_id), daemon_id: Some(self.daemon_id.clone()), dataflow_id: None, node_id, @@ -239,7 +239,7 @@ impl Logger { match message.level { LogLevel::Error => { tracing::error!( - session_id = ?message.session_id.map(|id| id.to_string()), + build_id = ?message.build_id.map(|id| id.to_string()), dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, @@ -252,7 +252,7 @@ impl Logger { } LogLevel::Warn => { tracing::warn!( - session_id = ?message.session_id.map(|id| id.to_string()), + build_id = ?message.build_id.map(|id| id.to_string()), dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, @@ -265,7 +265,7 @@ impl Logger { } LogLevel::Info => { tracing::info!( - session_id = ?message.session_id.map(|id| id.to_string()), + build_id = ?message.build_id.map(|id| id.to_string()), dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, @@ -278,7 +278,7 @@ impl Logger { } LogLevel::Debug => { tracing::debug!( - session_id = ?message.session_id.map(|id| id.to_string()), + build_id = ?message.build_id.map(|id| id.to_string()), dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, diff --git a/binaries/daemon/src/spawn/mod.rs b/binaries/daemon/src/spawn/mod.rs index afe0e361..4b9d3d8f 100644 --- a/binaries/daemon/src/spawn/mod.rs +++ b/binaries/daemon/src/spawn/mod.rs @@ -560,7 +560,7 @@ impl PreparedNode { .log(LogMessage { daemon_id: Some(daemon_id.clone()), dataflow_id: Some(dataflow_id), - session_id: None, + build_id: None, level: LogLevel::Info, node_id: Some(node_id.clone()), target: Some("stdout".into()), diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index 03c8dac3..6bbdcde3 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -6,7 +6,7 @@ use crate::{ common::GitSource, descriptor::Descriptor, id::{NodeId, OperatorId}, - SessionId, + BuildId, SessionId, }; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] @@ -27,11 +27,11 @@ pub enum ControlRequest { uv: bool, }, WaitForBuild { - build_id: Uuid, + build_id: BuildId, }, Start { - build_id: Option, - session_id: Uuid, + build_id: Option, + session_id: SessionId, dataflow: Descriptor, name: Option, /// Allows overwriting the base working dir when CLI and daemon are @@ -77,7 +77,7 @@ pub enum ControlRequest { level: log::LevelFilter, }, BuildLogSubscribe { - build_id: Uuid, + build_id: BuildId, level: log::LevelFilter, }, } diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 5308fba9..70850212 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -5,14 +5,14 @@ use aligned_vec::{AVec, ConstAlign}; use eyre::Context as _; use uuid::Uuid; -use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, SessionId, DataflowId}; +use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId, SessionId}; pub use log::Level as LogLevel; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[must_use] pub struct LogMessage { - pub session_id: Option, + pub build_id: Option, pub dataflow_id: Option, pub node_id: Option, pub daemon_id: Option, diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index 5321536e..087f9506 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -3,18 +3,18 @@ use std::collections::{BTreeMap, BTreeSet}; use uuid::Uuid; pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus}; -use crate::{common::DaemonId, id::NodeId}; +use crate::{common::DaemonId, id::NodeId, BuildId, SessionId}; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { Error(String), CoordinatorStopped, DataflowBuildTriggered { - build_id: Uuid, + build_id: BuildId, }, DataflowBuildFinished { - build_id: Uuid, - session_id: Uuid, + build_id: BuildId, + session_id: SessionId, result: Result<(), String>, }, DataflowStartTriggered { diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index 95638084..7a2b0e8d 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -60,8 +60,8 @@ pub enum DaemonCoordinatorEvent { #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct BuildDataflowNodes { - pub build_id: Uuid, - pub session_id: Uuid, + pub build_id: BuildId, + pub session_id: SessionId, /// Allows overwriting the base working dir when CLI and daemon are /// running on the same machine. /// diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index 429b8217..365eab9f 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -24,10 +24,43 @@ pub mod coordinator_to_cli; pub use arrow_data; pub use arrow_schema; +use uuid::Uuid; pub type DataflowId = uuid::Uuid; -pub type SessionId = uuid::Uuid; -pub type BuildId = uuid::Uuid; + +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, +)] +pub struct SessionId(uuid::Uuid); + +impl SessionId { + pub fn generate() -> Self { + Self(Uuid::new_v4()) + } +} + +impl std::fmt::Display for SessionId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "SessionId({})", self.0) + } +} + +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, +)] +pub struct BuildId(uuid::Uuid); + +impl BuildId { + pub fn generate() -> Self { + Self(Uuid::new_v4()) + } +} + +impl std::fmt::Display for BuildId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BuildId({})", self.0) + } +} fn current_crate_version() -> semver::Version { let crate_version_raw = env!("CARGO_PKG_VERSION");