From 83baae6ccb595e72bedeccb5fcc32071e0c911a1 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 4 Jun 2025 19:52:53 +0200 Subject: [PATCH] Wip --- binaries/cli/src/attach.rs | 14 ++++- binaries/cli/src/lib.rs | 35 +++++------ binaries/cli/src/session.rs | 9 ++- binaries/coordinator/src/lib.rs | 68 ++++++++++----------- libraries/message/src/coordinator_to_cli.rs | 3 +- 5 files changed, 66 insertions(+), 63 deletions(-) diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index e40be1d8..8e9c3851 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -185,6 +185,7 @@ pub fn attach_dataflow( pub fn print_log_message(log_message: LogMessage) { let LogMessage { + build_id, dataflow_id, node_id, daemon_id, @@ -201,7 +202,16 @@ pub fn print_log_message(log_message: LogMessage) { log::Level::Info => "INFO ".green(), other => format!("{other:5}").normal(), }; - let dataflow = format!(" dataflow `{dataflow_id}`").cyan(); + let dataflow = if let Some(dataflow_id) = dataflow_id { + format!(" dataflow `{dataflow_id}`").cyan() + } else { + String::new().cyan() + }; + let build = if let Some(build_id) = build_id { + format!(" build `{build_id}`").cyan() + } else { + String::new().cyan() + }; let daemon = match daemon_id { Some(id) => format!(" on daemon `{id}`"), None => " on default daemon".to_string(), @@ -216,7 +226,7 @@ pub fn print_log_message(log_message: LogMessage) { None => "".normal(), }; - println!("{level}{dataflow}{daemon}{node}{target}: {message}"); + println!("{level}{build}{dataflow}{daemon}{node}{target}: {message}"); } enum AttachEvent { diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index b27dcb99..158eadb2 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -17,19 +17,18 @@ use dora_message::{ cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus}, - BuildId, SessionId, + BuildId, }; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; use dora_tracing::{set_up_tracing_opts, FileLogging}; use duration_str::parse; -use eyre::{bail, Context, ContextCompat}; +use eyre::{bail, Context}; use formatting::FormatDataflowError; use std::{ env::current_dir, io::Write, net::{SocketAddr, TcpStream}, - path::Path, }; use std::{ net::{IpAddr, Ipv4Addr}, @@ -403,7 +402,7 @@ fn run(args: Args) -> eyre::Result<()> { } => template::create(args, internal_create_with_path_dependencies)?, Command::Run { dataflow, uv } => { let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; - let dataflow_session = DataflowSession::read_session(&dataflow) + let dataflow_session = DataflowSession::read_session(&dataflow_path) .context("failed to read DataflowSession")?; let rt = Builder::new_multi_thread() @@ -647,7 +646,7 @@ fn build_dataflow( dataflow: String, coordinator_socket: SocketAddr, uv: bool, -) -> eyre::Result<(Box, Uuid)> { +) -> eyre::Result<(Box, BuildId)> { let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; let dataflow_descriptor = Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; @@ -669,13 +668,13 @@ fn build_dataflow( }; let mut session = connect_to_coordinator(coordinator_socket) .wrap_err("failed to connect to dora coordinator")?; - let dataflow_id = { + let build_id = { let dataflow = dataflow_descriptor.clone(); let session: &mut TcpRequestReplyConnection = &mut *session; let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Build { - session_id: dataflow_session.build_id, + session_id: dataflow_session.session_id, dataflow, git_sources, prev_git_sources: dataflow_session.git_sources.clone(), @@ -697,15 +696,15 @@ fn build_dataflow( other => bail!("unexpected start dataflow reply: {other:?}"), } }; - Ok((session, dataflow_id)) + Ok((session, build_id)) } fn wait_until_dataflow_built( - build_id: Uuid, + build_id: BuildId, session: &mut Box, coordinator_addr: SocketAddr, log_level: log::LevelFilter, -) -> eyre::Result<()> { +) -> eyre::Result { // subscribe to log messages let mut log_session = TcpConnection { stream: TcpStream::connect(coordinator_addr) @@ -742,18 +741,16 @@ fn wait_until_dataflow_built( let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { - ControlRequestReply::DataflowBuildFinished { - build_id, - session_id, - result, - } => match result { - Ok(()) => eprintln!("dataflow build finished successfully"), + ControlRequestReply::DataflowBuildFinished { build_id, result } => match result { + Ok(()) => { + eprintln!("dataflow build finished successfully"); + Ok(build_id) + } Err(err) => bail!("{err}"), }, ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected start dataflow reply: {other:?}"), } - Ok(()) } fn start_dataflow( @@ -789,8 +786,8 @@ fn start_dataflow( let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Start { - build_id, - session_id, + build_id: dataflow_session.build_id, + session_id: dataflow_session.session_id, dataflow, name, local_working_dir, diff --git a/binaries/cli/src/session.rs b/binaries/cli/src/session.rs index bc5d464a..c0069359 100644 --- a/binaries/cli/src/session.rs +++ b/binaries/cli/src/session.rs @@ -3,14 +3,13 @@ use std::{ path::{Path, PathBuf}, }; -use dora_message::{common::GitSource, id::NodeId, BuildId}; +use dora_message::{common::GitSource, id::NodeId, BuildId, SessionId}; use eyre::{Context, ContextCompat}; -use uuid::Uuid; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DataflowSession { pub build_id: Option, - pub session_id: Uuid, + pub session_id: SessionId, pub git_sources: BTreeMap, } @@ -18,7 +17,7 @@ impl Default for DataflowSession { fn default() -> Self { Self { build_id: None, - session_id: Uuid::new_v4(), + session_id: SessionId::generate(), git_sources: Default::default(), } } @@ -29,7 +28,7 @@ impl DataflowSession { let session_file = session_file_path(dataflow_path)?; if session_file.exists() { if let Ok(parsed) = deserialize(&session_file) { - return Ok((parsed)); + return Ok(parsed); } else { tracing::warn!("failed to read dataflow session file, regenerating (you might need to run `dora build` again)"); } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 68beba37..bfaa9799 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -200,7 +200,6 @@ async fn start_inner( let mut events = (abortable_events, daemon_events).merge(); let mut running_builds: HashMap = HashMap::new(); - let mut build_log_subscribers: BTreeMap> = Default::default(); let mut running_dataflows: HashMap = HashMap::new(); let mut dataflow_results: HashMap> = @@ -389,7 +388,7 @@ async fn start_inner( } if !matches!( finished_dataflow.spawn_result, - SpawnResult::Spawned { .. } + CachedResult::Cached { .. } ) { log::error!("pending spawn result on dataflow finish"); } @@ -432,7 +431,8 @@ async fn start_inner( ) .await; match result { - Ok(()) => { + Ok(build) => { + running_builds.insert(build_id, build); let _ = reply_sender.send(Ok( ControlRequestReply::DataflowBuildTriggered { build_id }, )); @@ -444,7 +444,7 @@ async fn start_inner( } ControlRequest::WaitForBuild { build_id } => { if let Some(build) = running_builds.get_mut(&build_id) { - build.spawn_result.register(reply_sender); + build.build_result.register(reply_sender); } else { let _ = reply_sender.send(Err(eyre!("unknown build id {build_id}"))); @@ -729,10 +729,11 @@ async fn start_inner( level, connection, } => { - build_log_subscribers - .entry(build_id) - .or_default() - .push(LogSubscriber::new(level, connection)); + if let Some(build) = running_builds.get_mut(&build_id) { + build + .log_subscribers + .push(LogSubscriber::new(level, connection)); + } } }, Event::DaemonHeartbeatInterval => { @@ -795,8 +796,8 @@ async fn start_inner( } } if let Some(build_id) = message.build_id { - if let Some(subscribers) = build_log_subscribers.get_mut(&build_id) { - send_log_message(subscribers, &message).await; + if let Some(build) = running_builds.get_mut(&build_id) { + send_log_message(&mut build.log_subscribers, &message).await; } } } @@ -820,19 +821,15 @@ async fn start_inner( }; if build.pending_build_results.is_empty() { tracing::info!("dataflow build finished: `{build_id}`"); - let build = running_builds.remove(&build_id).unwrap(); + let mut build = running_builds.remove(&build_id).unwrap(); let result = if build.errors.is_empty() { Ok(()) } else { Err(format!("build failed: {}", build.errors.join("\n\n"))) }; - build.build_result_sender.send(Ok( - ControlRequestReply::DataflowBuildFinished { - build_id, - session_id, - result, - }, + build.build_result.set_result(Ok( + ControlRequestReply::DataflowBuildFinished { build_id, result }, )); } } @@ -960,12 +957,8 @@ async fn send_heartbeat_message( } struct RunningBuild { - build_id: BuildId, - /// The IDs of the daemons that the build is running on. - daemons: BTreeSet, - errors: Vec, - build_result_sender: tokio::sync::oneshot::Sender>, + build_result: CachedResult, log_subscribers: Vec, @@ -982,7 +975,7 @@ struct RunningDataflow { exited_before_subscribe: Vec, nodes: BTreeMap, - spawn_result: SpawnResult, + spawn_result: CachedResult, stop_reply_senders: Vec>>, log_subscribers: Vec, @@ -990,16 +983,16 @@ struct RunningDataflow { pending_spawn_results: BTreeSet, } -pub enum SpawnResult { +pub enum CachedResult { Pending { result_senders: Vec>>, }, - Spawned { + Cached { result: eyre::Result, }, } -impl Default for SpawnResult { +impl Default for CachedResult { fn default() -> Self { Self::Pending { result_senders: Vec::new(), @@ -1007,14 +1000,14 @@ impl Default for SpawnResult { } } -impl SpawnResult { +impl CachedResult { fn register( &mut self, reply_sender: tokio::sync::oneshot::Sender>, ) { match self { - SpawnResult::Pending { result_senders } => result_senders.push(reply_sender), - SpawnResult::Spawned { result } => { + CachedResult::Pending { result_senders } => result_senders.push(reply_sender), + CachedResult::Cached { result } => { Self::send_result_to(result, reply_sender); } } @@ -1022,13 +1015,13 @@ impl SpawnResult { fn set_result(&mut self, result: eyre::Result) { match self { - SpawnResult::Pending { result_senders } => { + CachedResult::Pending { result_senders } => { for sender in result_senders.drain(..) { Self::send_result_to(&result, sender); } - *self = SpawnResult::Spawned { result }; + *self = CachedResult::Cached { result }; } - SpawnResult::Spawned { .. } => {} + CachedResult::Cached { .. } => {} } } @@ -1245,7 +1238,7 @@ async fn build_dataflow( clock: &HLC, uv: bool, daemon_connections: &mut DaemonConnections, -) -> eyre::Result<()> { +) -> eyre::Result { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut git_sources_by_daemon = git_sources @@ -1294,7 +1287,12 @@ async fn build_dataflow( tracing::info!("successfully triggered dataflow build `{session_id}`",); - Ok(()) + Ok(RunningBuild { + errors: Vec::new(), + build_result: CachedResult::default(), + log_subscribers: Vec::new(), + pending_build_results: daemons, + }) } async fn build_dataflow_on_machine( @@ -1370,7 +1368,7 @@ async fn start_dataflow( exited_before_subscribe: Default::default(), daemons: daemons.clone(), nodes, - spawn_result: SpawnResult::default(), + spawn_result: CachedResult::default(), stop_reply_senders: Vec::new(), log_subscribers: Vec::new(), pending_spawn_results: daemons, diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index 087f9506..79dc85ae 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet}; use uuid::Uuid; pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus}; -use crate::{common::DaemonId, id::NodeId, BuildId, SessionId}; +use crate::{common::DaemonId, id::NodeId, BuildId}; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { @@ -14,7 +14,6 @@ pub enum ControlRequestReply { }, DataflowBuildFinished { build_id: BuildId, - session_id: SessionId, result: Result<(), String>, }, DataflowStartTriggered {