Philipp Oppermann 7 months ago
parent
commit
b22e8a1411
Failed to extract signature
13 changed files with 125 additions and 78 deletions
  1. +19
    -1
      binaries/coordinator/src/control.rs
  2. +35
    -35
      binaries/coordinator/src/lib.rs
  3. +2
    -2
      binaries/coordinator/src/run/mod.rs
  4. +1
    -2
      binaries/daemon/src/build/git.rs
  5. +1
    -1
      binaries/daemon/src/build/mod.rs
  6. +5
    -8
      binaries/daemon/src/lib.rs
  7. +13
    -13
      binaries/daemon/src/log.rs
  8. +1
    -1
      binaries/daemon/src/spawn/mod.rs
  9. +5
    -5
      libraries/message/src/cli_to_coordinator.rs
  10. +2
    -2
      libraries/message/src/common.rs
  11. +4
    -4
      libraries/message/src/coordinator_to_cli.rs
  12. +2
    -2
      libraries/message/src/coordinator_to_daemon.rs
  13. +35
    -2
      libraries/message/src/lib.rs

+ 19
- 1
binaries/coordinator/src/control.rs View File

@@ -2,7 +2,9 @@ use crate::{
tcp_utils::{tcp_receive, tcp_send},
Event,
};
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use dora_message::{
cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply, BuildId,
};
use eyre::{eyre, Context};
use futures::{
future::{self, Either},
@@ -114,6 +116,17 @@ async fn handle_requests(
break;
}

if let Ok(ControlRequest::BuildLogSubscribe { build_id, level }) = request {
let _ = tx
.send(ControlEvent::BuildLogSubscribe {
build_id,
level,
connection,
})
.await;
break;
}

let result = match request {
Ok(request) => handle_request(request, &tx).await,
Err(err) => Err(err),
@@ -179,6 +192,11 @@ pub enum ControlEvent {
level: log::LevelFilter,
connection: TcpStream,
},
BuildLogSubscribe {
build_id: BuildId,
level: log::LevelFilter,
connection: TcpStream,
},
Error(eyre::Report),
}



+ 35
- 35
binaries/coordinator/src/lib.rs View File

@@ -29,11 +29,7 @@ use itertools::Itertools;
use log_subscriber::LogSubscriber;
use run::SpawnedDataflow;
use std::{
collections::{
btree_map::{Entry, OccupiedEntry},
BTreeMap, BTreeSet, HashMap,
},
env::current_dir,
collections::{BTreeMap, BTreeSet, HashMap},
net::SocketAddr,
path::PathBuf,
sync::Arc,
@@ -204,6 +200,7 @@ async fn start_inner(
let mut events = (abortable_events, daemon_events).merge();

let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();
let mut build_log_subscribers: BTreeMap<BuildId, Vec<LogSubscriber>> = Default::default();

let mut running_dataflows: HashMap<DataflowId, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<DataflowId, BTreeMap<DaemonId, DataflowDaemonResult>> =
@@ -211,8 +208,6 @@ async fn start_inner(
let mut archived_dataflows: HashMap<DataflowId, ArchivedDataflow> = HashMap::new();
let mut daemon_connections = DaemonConnections::default();

let mut build_log_subscribers: BTreeMap<SessionId, LogSubscriber> = Default::default();

while let Some(event) = events.next().await {
// used below for measuring the event handling duration
let start = Instant::now();
@@ -364,9 +359,9 @@ async fn start_inner(
let mut finished_dataflow = entry.remove();
let dataflow_id = finished_dataflow.uuid;
send_log_message(
&mut finished_dataflow,
&mut finished_dataflow.log_subscribers,
&LogMessage {
session_id: None,
build_id: None,
dataflow_id: Some(dataflow_id),
node_id: None,
daemon_id: None,
@@ -422,7 +417,7 @@ async fn start_inner(
uv,
} => {
// assign a random build id
let build_id = SessionId::new_v4();
let build_id = BuildId::generate();

let result = build_dataflow(
build_id,
@@ -447,6 +442,14 @@ async fn start_inner(
}
}
}
ControlRequest::WaitForBuild { build_id } => {
if let Some(build) = running_builds.get_mut(&build_id) {
build.spawn_result.register(reply_sender);
} else {
let _ =
reply_sender.send(Err(eyre!("unknown build id {build_id}")));
}
}
ControlRequest::Start {
build_id,
session_id,
@@ -702,6 +705,11 @@ async fn start_inner(
"LogSubscribe request should be handled separately"
)));
}
ControlRequest::BuildLogSubscribe { .. } => {
let _ = reply_sender.send(Err(eyre::eyre!(
"BuildLogSubscribe request should be handled separately"
)));
}
}
}
ControlEvent::Error(err) => tracing::error!("{err:?}"),
@@ -716,6 +724,16 @@ async fn start_inner(
.push(LogSubscriber::new(level, connection));
}
}
ControlEvent::BuildLogSubscribe {
build_id,
level,
connection,
} => {
build_log_subscribers
.entry(build_id)
.or_default()
.push(LogSubscriber::new(level, connection));
}
},
Event::DaemonHeartbeatInterval => {
let mut disconnected = BTreeSet::new();
@@ -773,12 +791,12 @@ async fn start_inner(
Event::Log(message) => {
if let Some(dataflow_id) = &message.dataflow_id {
if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
send_log_message(dataflow, &message).await;
send_log_message(&mut dataflow.log_subscribers, &message).await;
}
}
if let Some(session_id) = message.session_id {
if let Entry::Occupied(subscriber) = build_log_subscribers.entry(session_id) {
send_log_message_to_subscriber(&message, subscriber).await;
if let Some(build_id) = message.build_id {
if let Some(subscribers) = build_log_subscribers.get_mut(&build_id) {
send_log_message(subscribers, &message).await;
}
}
}
@@ -865,8 +883,8 @@ async fn start_inner(
Ok(())
}

async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) {
for subscriber in &mut dataflow.log_subscribers {
async fn send_log_message(log_subscribers: &mut Vec<LogSubscriber>, message: &LogMessage) {
for subscriber in log_subscribers.iter_mut() {
let send_result =
tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));

@@ -874,25 +892,7 @@ async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage)
subscriber.close();
}
}
dataflow.log_subscribers.retain(|s| !s.is_closed());
}

async fn send_log_message_to_subscriber(
message: &LogMessage,
mut subscriber: OccupiedEntry<'_, SessionId, LogSubscriber>,
) {
let send_result = tokio::time::timeout(
Duration::from_millis(100),
subscriber.get_mut().send_message(message),
);

if send_result.await.is_err() {
subscriber.get_mut().close();
}

if subscriber.get_mut().is_closed() {
subscriber.remove();
}
log_subscribers.retain(|s| !s.is_closed());
}

fn dataflow_result(


+ 2
- 2
binaries/coordinator/src/run/mod.rs View File

@@ -10,7 +10,7 @@ use dora_message::{
daemon_to_coordinator::DaemonCoordinatorReply,
descriptor::{Descriptor, ResolvedNode},
id::NodeId,
SessionId,
BuildId, SessionId,
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use itertools::Itertools;
@@ -22,7 +22,7 @@ use uuid::{NoContext, Timestamp, Uuid};

#[tracing::instrument(skip(daemon_connections, clock))]
pub(super) async fn spawn_dataflow(
build_id: Option<SessionId>,
build_id: Option<BuildId>,
session_id: SessionId,
dataflow: Descriptor,
local_working_dir: Option<PathBuf>,


+ 1
- 2
binaries/daemon/src/build/git.rs View File

@@ -8,7 +8,6 @@ use std::{
path::{Path, PathBuf},
};
use url::Url;
use uuid::Uuid;

#[derive(Default)]
pub struct GitManager {
@@ -30,7 +29,7 @@ struct PreparedBuild {
impl GitManager {
pub fn choose_clone_dir(
&mut self,
session_id: uuid::Uuid,
session_id: SessionId,
repo_url: Url,
commit_hash: String,
prev_commit_hash: Option<String>,


+ 1
- 1
binaries/daemon/src/build/mod.rs View File

@@ -12,7 +12,7 @@ use dora_message::{
common::{GitSource, LogLevel, Timestamped},
descriptor::EnvValue,
id::NodeId,
BuildId, SessionId,
SessionId,
};
use eyre::Context;
use tokio::sync::mpsc;


+ 5
- 8
binaries/daemon/src/lib.rs View File

@@ -539,6 +539,7 @@ impl Daemon {

let result = self
.build_dataflow(
build_id,
session_id,
base_working_dir,
nodes,
@@ -874,6 +875,7 @@ impl Daemon {

async fn build_dataflow(
&mut self,
build_id: BuildId,
session_id: SessionId,
base_working_dir: PathBuf,
nodes: BTreeMap<NodeId, ResolvedNode>,
@@ -899,7 +901,7 @@ impl Daemon {
let dynamic_node = node.kind.dynamic();

let node_id = node.id.clone();
let mut logger = self.logger.for_node_build(session_id, node_id.clone());
let mut logger = self.logger.for_node_build(build_id, node_id.clone());
logger.log(LogLevel::Info, "building").await;
let git_source = git_sources.get(&node_id).cloned();
let prev_git_source = prev_git_sources.get(&node_id).cloned();
@@ -925,12 +927,7 @@ impl Daemon {
}
Err(err) => {
self.logger
.log_build(
session_id,
LogLevel::Error,
Some(node_id),
format!("{err:?}"),
)
.log_build(build_id, LogLevel::Error, Some(node_id), format!("{err:?}"))
.await;
return Err(err);
}
@@ -2011,7 +2008,7 @@ impl Daemon {
fn base_working_dir(
&self,
local_working_dir: Option<PathBuf>,
session_id: Uuid,
session_id: SessionId,
) -> eyre::Result<PathBuf> {
match local_working_dir {
Some(working_dir) => {


+ 13
- 13
binaries/daemon/src/log.rs View File

@@ -8,7 +8,7 @@ use dora_core::{config::NodeId, uhlc};
use dora_message::{
common::{DaemonId, LogLevel, LogMessage, Timestamped},
daemon_to_coordinator::{CoordinatorRequest, DaemonEvent},
SessionId,
BuildId,
};
use eyre::Context;
use tokio::net::TcpStream;
@@ -95,7 +95,7 @@ impl<'a> DataflowLogger<'a> {
}

pub struct NodeBuildLogger<'a> {
session_id: SessionId,
build_id: BuildId,
node_id: NodeId,
logger: CowMut<'a, DaemonLogger>,
}
@@ -107,13 +107,13 @@ impl NodeBuildLogger<'_> {

pub async fn log(&mut self, level: LogLevel, message: impl Into<String>) {
self.logger
.log_build(self.session_id, level, Some(self.node_id.clone()), message)
.log_build(self.build_id, level, Some(self.node_id.clone()), message)
.await
}

pub async fn try_clone(&self) -> eyre::Result<NodeBuildLogger<'static>> {
Ok(NodeBuildLogger {
session_id: self.session_id,
build_id: self.build_id,
node_id: self.node_id.clone(),
logger: CowMut::Owned(self.logger.try_clone().await?),
})
@@ -133,9 +133,9 @@ impl DaemonLogger {
}
}

pub fn for_node_build(&mut self, session_id: SessionId, node_id: NodeId) -> NodeBuildLogger {
pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger {
NodeBuildLogger {
session_id,
build_id,
node_id,
logger: CowMut::Borrowed(self),
}
@@ -154,7 +154,7 @@ impl DaemonLogger {
message: impl Into<String>,
) {
let message = LogMessage {
session_id: None,
build_id: None,
daemon_id: Some(self.daemon_id.clone()),
dataflow_id,
node_id,
@@ -170,13 +170,13 @@ impl DaemonLogger {

pub async fn log_build(
&mut self,
session_id: SessionId,
build_id: BuildId,
level: LogLevel,
node_id: Option<NodeId>,
message: impl Into<String>,
) {
let message = LogMessage {
session_id: Some(session_id),
build_id: Some(build_id),
daemon_id: Some(self.daemon_id.clone()),
dataflow_id: None,
node_id,
@@ -239,7 +239,7 @@ impl Logger {
match message.level {
LogLevel::Error => {
tracing::error!(
session_id = ?message.session_id.map(|id| id.to_string()),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
@@ -252,7 +252,7 @@ impl Logger {
}
LogLevel::Warn => {
tracing::warn!(
session_id = ?message.session_id.map(|id| id.to_string()),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
@@ -265,7 +265,7 @@ impl Logger {
}
LogLevel::Info => {
tracing::info!(
session_id = ?message.session_id.map(|id| id.to_string()),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
@@ -278,7 +278,7 @@ impl Logger {
}
LogLevel::Debug => {
tracing::debug!(
session_id = ?message.session_id.map(|id| id.to_string()),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,


+ 1
- 1
binaries/daemon/src/spawn/mod.rs View File

@@ -560,7 +560,7 @@ impl PreparedNode {
.log(LogMessage {
daemon_id: Some(daemon_id.clone()),
dataflow_id: Some(dataflow_id),
session_id: None,
build_id: None,
level: LogLevel::Info,
node_id: Some(node_id.clone()),
target: Some("stdout".into()),


+ 5
- 5
libraries/message/src/cli_to_coordinator.rs View File

@@ -6,7 +6,7 @@ use crate::{
common::GitSource,
descriptor::Descriptor,
id::{NodeId, OperatorId},
SessionId,
BuildId, SessionId,
};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
@@ -27,11 +27,11 @@ pub enum ControlRequest {
uv: bool,
},
WaitForBuild {
build_id: Uuid,
build_id: BuildId,
},
Start {
build_id: Option<Uuid>,
session_id: Uuid,
build_id: Option<BuildId>,
session_id: SessionId,
dataflow: Descriptor,
name: Option<String>,
/// Allows overwriting the base working dir when CLI and daemon are
@@ -77,7 +77,7 @@ pub enum ControlRequest {
level: log::LevelFilter,
},
BuildLogSubscribe {
build_id: Uuid,
build_id: BuildId,
level: log::LevelFilter,
},
}

+ 2
- 2
libraries/message/src/common.rs View File

@@ -5,14 +5,14 @@ use aligned_vec::{AVec, ConstAlign};
use eyre::Context as _;
use uuid::Uuid;

use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, SessionId, DataflowId};
use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId, SessionId};

pub use log::Level as LogLevel;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[must_use]
pub struct LogMessage {
pub session_id: Option<SessionId>,
pub build_id: Option<BuildId>,
pub dataflow_id: Option<DataflowId>,
pub node_id: Option<NodeId>,
pub daemon_id: Option<DaemonId>,


+ 4
- 4
libraries/message/src/coordinator_to_cli.rs View File

@@ -3,18 +3,18 @@ use std::collections::{BTreeMap, BTreeSet};
use uuid::Uuid;

pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus};
use crate::{common::DaemonId, id::NodeId};
use crate::{common::DaemonId, id::NodeId, BuildId, SessionId};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply {
Error(String),
CoordinatorStopped,
DataflowBuildTriggered {
build_id: Uuid,
build_id: BuildId,
},
DataflowBuildFinished {
build_id: Uuid,
session_id: Uuid,
build_id: BuildId,
session_id: SessionId,
result: Result<(), String>,
},
DataflowStartTriggered {


+ 2
- 2
libraries/message/src/coordinator_to_daemon.rs View File

@@ -60,8 +60,8 @@ pub enum DaemonCoordinatorEvent {

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct BuildDataflowNodes {
pub build_id: Uuid,
pub session_id: Uuid,
pub build_id: BuildId,
pub session_id: SessionId,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///


+ 35
- 2
libraries/message/src/lib.rs View File

@@ -24,10 +24,43 @@ pub mod coordinator_to_cli;

pub use arrow_data;
pub use arrow_schema;
use uuid::Uuid;

pub type DataflowId = uuid::Uuid;
pub type SessionId = uuid::Uuid;
pub type BuildId = uuid::Uuid;

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash,
)]
pub struct SessionId(uuid::Uuid);

impl SessionId {
pub fn generate() -> Self {
Self(Uuid::new_v4())
}
}

impl std::fmt::Display for SessionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SessionId({})", self.0)
}
}

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash,
)]
pub struct BuildId(uuid::Uuid);

impl BuildId {
pub fn generate() -> Self {
Self(Uuid::new_v4())
}
}

impl std::fmt::Display for BuildId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BuildId({})", self.0)
}
}

fn current_crate_version() -> semver::Version {
let crate_version_raw = env!("CARGO_PKG_VERSION");


Loading…
Cancel
Save