@@ -745,6 +745,10 @@ async fn start_inner(
dataflow
dataflow
.log_subscribers
.log_subscribers
.push(LogSubscriber::new(level, connection));
.push(LogSubscriber::new(level, connection));
let buffered = std::mem::take(&mut dataflow.buffered_log_messages);
for message in buffered {
send_log_message(&mut dataflow.log_subscribers, &message).await;
}
}
}
}
}
ControlEvent::BuildLogSubscribe {
ControlEvent::BuildLogSubscribe {
@@ -756,6 +760,10 @@ async fn start_inner(
build
build
.log_subscribers
.log_subscribers
.push(LogSubscriber::new(level, connection));
.push(LogSubscriber::new(level, connection));
let buffered = std::mem::take(&mut build.buffered_log_messages);
for message in buffered {
send_log_message(&mut build.log_subscribers, &message).await;
}
}
}
}
}
},
},
@@ -815,12 +823,21 @@ async fn start_inner(
Event::Log(message) => {
Event::Log(message) => {
if let Some(dataflow_id) = &message.dataflow_id {
if let Some(dataflow_id) = &message.dataflow_id {
if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
send_log_message(&mut dataflow.log_subscribers, &message).await;
if dataflow.log_subscribers.is_empty() {
// buffer log message until there are subscribers
dataflow.buffered_log_messages.push(message);
} else {
send_log_message(&mut dataflow.log_subscribers, &message).await;
}
}
}
}
if let Some(build_id) = message.build_id {
if let Some(build) = running_builds.get_mut(&build_id) {
send_log_message(&mut build.log_subscribers, &message).await;
} else if let Some(build_id) = &message.build_id {
if let Some(build) = running_builds.get_mut(build_id) {
if build.log_subscribers.is_empty() {
// buffer log message until there are subscribers
build.buffered_log_messages.push(message);
} else {
send_log_message(&mut build.log_subscribers, &message).await;
}
}
}
}
}
}
}
@@ -984,6 +1001,8 @@ struct RunningBuild {
errors: Vec<String>,
errors: Vec<String>,
build_result: CachedResult,
build_result: CachedResult,
/// Buffer for log messages that were sent before there were any subscribers.
buffered_log_messages: Vec<LogMessage>,
log_subscribers: Vec<LogSubscriber>,
log_subscribers: Vec<LogSubscriber>,
pending_build_results: BTreeSet<DaemonId>,
pending_build_results: BTreeSet<DaemonId>,
@@ -1002,6 +1021,8 @@ struct RunningDataflow {
spawn_result: CachedResult,
spawn_result: CachedResult,
stop_reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
stop_reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
/// Buffer for log messages that were sent before there were any subscribers.
buffered_log_messages: Vec<LogMessage>,
log_subscribers: Vec<LogSubscriber>,
log_subscribers: Vec<LogSubscriber>,
pending_spawn_results: BTreeSet<DaemonId>,
pending_spawn_results: BTreeSet<DaemonId>,
@@ -1323,6 +1344,7 @@ async fn build_dataflow(
Ok(RunningBuild {
Ok(RunningBuild {
errors: Vec::new(),
errors: Vec::new(),
build_result: CachedResult::default(),
build_result: CachedResult::default(),
buffered_log_messages: Vec::new(),
log_subscribers: Vec::new(),
log_subscribers: Vec::new(),
pending_build_results: daemons,
pending_build_results: daemons,
})
})
@@ -1404,6 +1426,7 @@ async fn start_dataflow(
nodes,
nodes,
spawn_result: CachedResult::default(),
spawn_result: CachedResult::default(),
stop_reply_senders: Vec::new(),
stop_reply_senders: Vec::new(),
buffered_log_messages: Vec::new(),
log_subscribers: Vec::new(),
log_subscribers: Vec::new(),
pending_spawn_results: daemons,
pending_spawn_results: daemons,
})
})