diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index e3a8d767..67ee69a4 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -745,6 +745,10 @@ async fn start_inner( dataflow .log_subscribers .push(LogSubscriber::new(level, connection)); + let buffered = std::mem::take(&mut dataflow.buffered_log_messages); + for message in buffered { + send_log_message(&mut dataflow.log_subscribers, &message).await; + } } } ControlEvent::BuildLogSubscribe { @@ -756,6 +760,10 @@ async fn start_inner( build .log_subscribers .push(LogSubscriber::new(level, connection)); + let buffered = std::mem::take(&mut build.buffered_log_messages); + for message in buffered { + send_log_message(&mut build.log_subscribers, &message).await; + } } } }, @@ -815,12 +823,21 @@ async fn start_inner( Event::Log(message) => { if let Some(dataflow_id) = &message.dataflow_id { if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) { - send_log_message(&mut dataflow.log_subscribers, &message).await; + if dataflow.log_subscribers.is_empty() { + // buffer log message until there are subscribers + dataflow.buffered_log_messages.push(message); + } else { + send_log_message(&mut dataflow.log_subscribers, &message).await; + } } - } - if let Some(build_id) = message.build_id { - if let Some(build) = running_builds.get_mut(&build_id) { - send_log_message(&mut build.log_subscribers, &message).await; + } else if let Some(build_id) = &message.build_id { + if let Some(build) = running_builds.get_mut(build_id) { + if build.log_subscribers.is_empty() { + // buffer log message until there are subscribers + build.buffered_log_messages.push(message); + } else { + send_log_message(&mut build.log_subscribers, &message).await; + } } } } @@ -984,6 +1001,8 @@ struct RunningBuild { errors: Vec, build_result: CachedResult, + /// Buffer for log messages that were sent before there were any subscribers. + buffered_log_messages: Vec, log_subscribers: Vec, pending_build_results: BTreeSet, @@ -1002,6 +1021,8 @@ struct RunningDataflow { spawn_result: CachedResult, stop_reply_senders: Vec>>, + /// Buffer for log messages that were sent before there were any subscribers. + buffered_log_messages: Vec, log_subscribers: Vec, pending_spawn_results: BTreeSet, @@ -1323,6 +1344,7 @@ async fn build_dataflow( Ok(RunningBuild { errors: Vec::new(), build_result: CachedResult::default(), + buffered_log_messages: Vec::new(), log_subscribers: Vec::new(), pending_build_results: daemons, }) @@ -1404,6 +1426,7 @@ async fn start_dataflow( nodes, spawn_result: CachedResult::default(), stop_reply_senders: Vec::new(), + buffered_log_messages: Vec::new(), log_subscribers: Vec::new(), pending_spawn_results: daemons, }) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index cc33b50f..86d05a62 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1018,7 +1018,7 @@ impl Daemon { let node_id = node.id.clone(); let mut logger = self.logger.for_node_build(build_id, node_id.clone()); - logger.log(LogLevel::Info, "building").await; + logger.log(LogLevel::Debug, "building").await; let git_source = git_sources.get(&node_id).cloned(); let prev_git_source = prev_git_sources.get(&node_id).cloned(); let prev_git = prev_git_source.map(|prev_source| PrevGitSource { diff --git a/libraries/core/src/build/mod.rs b/libraries/core/src/build/mod.rs index 2eb122a1..31c58ee8 100644 --- a/libraries/core/src/build/mod.rs +++ b/libraries/core/src/build/mod.rs @@ -109,7 +109,13 @@ async fn build_node( uv: bool, ) -> eyre::Result<()> { logger - .log_message(LogLevel::Info, format!("running build command: `{build}")) + .log_message( + LogLevel::Info, + format!( + "running build command: `{build}` in {}", + working_dir.display() + ), + ) .await; let build = build.to_owned(); let node_env = node_env.clone();