Philipp Oppermann 7 months ago
parent
commit
83baae6ccb
Failed to extract signature
5 changed files with 66 additions and 63 deletions
  1. +12
    -2
      binaries/cli/src/attach.rs
  2. +16
    -19
      binaries/cli/src/lib.rs
  3. +4
    -5
      binaries/cli/src/session.rs
  4. +33
    -35
      binaries/coordinator/src/lib.rs
  5. +1
    -2
      libraries/message/src/coordinator_to_cli.rs

+ 12
- 2
binaries/cli/src/attach.rs View File

@@ -185,6 +185,7 @@ pub fn attach_dataflow(

pub fn print_log_message(log_message: LogMessage) {
let LogMessage {
build_id,
dataflow_id,
node_id,
daemon_id,
@@ -201,7 +202,16 @@ pub fn print_log_message(log_message: LogMessage) {
log::Level::Info => "INFO ".green(),
other => format!("{other:5}").normal(),
};
let dataflow = format!(" dataflow `{dataflow_id}`").cyan();
let dataflow = if let Some(dataflow_id) = dataflow_id {
format!(" dataflow `{dataflow_id}`").cyan()
} else {
String::new().cyan()
};
let build = if let Some(build_id) = build_id {
format!(" build `{build_id}`").cyan()
} else {
String::new().cyan()
};
let daemon = match daemon_id {
Some(id) => format!(" on daemon `{id}`"),
None => " on default daemon".to_string(),
@@ -216,7 +226,7 @@ pub fn print_log_message(log_message: LogMessage) {
None => "".normal(),
};

println!("{level}{dataflow}{daemon}{node}{target}: {message}");
println!("{level}{build}{dataflow}{daemon}{node}{target}: {message}");
}

enum AttachEvent {


+ 16
- 19
binaries/cli/src/lib.rs View File

@@ -17,19 +17,18 @@ use dora_message::{
cli_to_coordinator::ControlRequest,
common::LogMessage,
coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus},
BuildId, SessionId,
BuildId,
};
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use dora_tracing::{set_up_tracing_opts, FileLogging};
use duration_str::parse;
use eyre::{bail, Context, ContextCompat};
use eyre::{bail, Context};
use formatting::FormatDataflowError;
use std::{
env::current_dir,
io::Write,
net::{SocketAddr, TcpStream},
path::Path,
};
use std::{
net::{IpAddr, Ipv4Addr},
@@ -403,7 +402,7 @@ fn run(args: Args) -> eyre::Result<()> {
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Run { dataflow, uv } => {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_session = DataflowSession::read_session(&dataflow)
let dataflow_session = DataflowSession::read_session(&dataflow_path)
.context("failed to read DataflowSession")?;

let rt = Builder::new_multi_thread()
@@ -647,7 +646,7 @@ fn build_dataflow(
dataflow: String,
coordinator_socket: SocketAddr,
uv: bool,
) -> eyre::Result<(Box<TcpRequestReplyConnection>, Uuid)> {
) -> eyre::Result<(Box<TcpRequestReplyConnection>, BuildId)> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
@@ -669,13 +668,13 @@ fn build_dataflow(
};
let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = {
let build_id = {
let dataflow = dataflow_descriptor.clone();
let session: &mut TcpRequestReplyConnection = &mut *session;
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Build {
session_id: dataflow_session.build_id,
session_id: dataflow_session.session_id,
dataflow,
git_sources,
prev_git_sources: dataflow_session.git_sources.clone(),
@@ -697,15 +696,15 @@ fn build_dataflow(
other => bail!("unexpected start dataflow reply: {other:?}"),
}
};
Ok((session, dataflow_id))
Ok((session, build_id))
}

fn wait_until_dataflow_built(
build_id: Uuid,
build_id: BuildId,
session: &mut Box<TcpRequestReplyConnection>,
coordinator_addr: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<()> {
) -> eyre::Result<BuildId> {
// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_addr)
@@ -742,18 +741,16 @@ fn wait_until_dataflow_built(
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowBuildFinished {
build_id,
session_id,
result,
} => match result {
Ok(()) => eprintln!("dataflow build finished successfully"),
ControlRequestReply::DataflowBuildFinished { build_id, result } => match result {
Ok(()) => {
eprintln!("dataflow build finished successfully");
Ok(build_id)
}
Err(err) => bail!("{err}"),
},
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
Ok(())
}

fn start_dataflow(
@@ -789,8 +786,8 @@ fn start_dataflow(
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
build_id,
session_id,
build_id: dataflow_session.build_id,
session_id: dataflow_session.session_id,
dataflow,
name,
local_working_dir,


+ 4
- 5
binaries/cli/src/session.rs View File

@@ -3,14 +3,13 @@ use std::{
path::{Path, PathBuf},
};

use dora_message::{common::GitSource, id::NodeId, BuildId};
use dora_message::{common::GitSource, id::NodeId, BuildId, SessionId};
use eyre::{Context, ContextCompat};
use uuid::Uuid;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DataflowSession {
pub build_id: Option<BuildId>,
pub session_id: Uuid,
pub session_id: SessionId,
pub git_sources: BTreeMap<NodeId, GitSource>,
}

@@ -18,7 +17,7 @@ impl Default for DataflowSession {
fn default() -> Self {
Self {
build_id: None,
session_id: Uuid::new_v4(),
session_id: SessionId::generate(),
git_sources: Default::default(),
}
}
@@ -29,7 +28,7 @@ impl DataflowSession {
let session_file = session_file_path(dataflow_path)?;
if session_file.exists() {
if let Ok(parsed) = deserialize(&session_file) {
return Ok((parsed));
return Ok(parsed);
} else {
tracing::warn!("failed to read dataflow session file, regenerating (you might need to run `dora build` again)");
}


+ 33
- 35
binaries/coordinator/src/lib.rs View File

@@ -200,7 +200,6 @@ async fn start_inner(
let mut events = (abortable_events, daemon_events).merge();

let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();
let mut build_log_subscribers: BTreeMap<BuildId, Vec<LogSubscriber>> = Default::default();

let mut running_dataflows: HashMap<DataflowId, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<DataflowId, BTreeMap<DaemonId, DataflowDaemonResult>> =
@@ -389,7 +388,7 @@ async fn start_inner(
}
if !matches!(
finished_dataflow.spawn_result,
SpawnResult::Spawned { .. }
CachedResult::Cached { .. }
) {
log::error!("pending spawn result on dataflow finish");
}
@@ -432,7 +431,8 @@ async fn start_inner(
)
.await;
match result {
Ok(()) => {
Ok(build) => {
running_builds.insert(build_id, build);
let _ = reply_sender.send(Ok(
ControlRequestReply::DataflowBuildTriggered { build_id },
));
@@ -444,7 +444,7 @@ async fn start_inner(
}
ControlRequest::WaitForBuild { build_id } => {
if let Some(build) = running_builds.get_mut(&build_id) {
build.spawn_result.register(reply_sender);
build.build_result.register(reply_sender);
} else {
let _ =
reply_sender.send(Err(eyre!("unknown build id {build_id}")));
@@ -729,10 +729,11 @@ async fn start_inner(
level,
connection,
} => {
build_log_subscribers
.entry(build_id)
.or_default()
.push(LogSubscriber::new(level, connection));
if let Some(build) = running_builds.get_mut(&build_id) {
build
.log_subscribers
.push(LogSubscriber::new(level, connection));
}
}
},
Event::DaemonHeartbeatInterval => {
@@ -795,8 +796,8 @@ async fn start_inner(
}
}
if let Some(build_id) = message.build_id {
if let Some(subscribers) = build_log_subscribers.get_mut(&build_id) {
send_log_message(subscribers, &message).await;
if let Some(build) = running_builds.get_mut(&build_id) {
send_log_message(&mut build.log_subscribers, &message).await;
}
}
}
@@ -820,19 +821,15 @@ async fn start_inner(
};
if build.pending_build_results.is_empty() {
tracing::info!("dataflow build finished: `{build_id}`");
let build = running_builds.remove(&build_id).unwrap();
let mut build = running_builds.remove(&build_id).unwrap();
let result = if build.errors.is_empty() {
Ok(())
} else {
Err(format!("build failed: {}", build.errors.join("\n\n")))
};

build.build_result_sender.send(Ok(
ControlRequestReply::DataflowBuildFinished {
build_id,
session_id,
result,
},
build.build_result.set_result(Ok(
ControlRequestReply::DataflowBuildFinished { build_id, result },
));
}
}
@@ -960,12 +957,8 @@ async fn send_heartbeat_message(
}

struct RunningBuild {
build_id: BuildId,
/// The IDs of the daemons that the build is running on.
daemons: BTreeSet<DaemonId>,

errors: Vec<String>,
build_result_sender: tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>,
build_result: CachedResult,

log_subscribers: Vec<LogSubscriber>,

@@ -982,7 +975,7 @@ struct RunningDataflow {
exited_before_subscribe: Vec<NodeId>,
nodes: BTreeMap<NodeId, ResolvedNode>,

spawn_result: SpawnResult,
spawn_result: CachedResult,
stop_reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,

log_subscribers: Vec<LogSubscriber>,
@@ -990,16 +983,16 @@ struct RunningDataflow {
pending_spawn_results: BTreeSet<DaemonId>,
}

pub enum SpawnResult {
pub enum CachedResult {
Pending {
result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
},
Spawned {
Cached {
result: eyre::Result<ControlRequestReply>,
},
}

impl Default for SpawnResult {
impl Default for CachedResult {
fn default() -> Self {
Self::Pending {
result_senders: Vec::new(),
@@ -1007,14 +1000,14 @@ impl Default for SpawnResult {
}
}

impl SpawnResult {
impl CachedResult {
fn register(
&mut self,
reply_sender: tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>,
) {
match self {
SpawnResult::Pending { result_senders } => result_senders.push(reply_sender),
SpawnResult::Spawned { result } => {
CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
CachedResult::Cached { result } => {
Self::send_result_to(result, reply_sender);
}
}
@@ -1022,13 +1015,13 @@ impl SpawnResult {

fn set_result(&mut self, result: eyre::Result<ControlRequestReply>) {
match self {
SpawnResult::Pending { result_senders } => {
CachedResult::Pending { result_senders } => {
for sender in result_senders.drain(..) {
Self::send_result_to(&result, sender);
}
*self = SpawnResult::Spawned { result };
*self = CachedResult::Cached { result };
}
SpawnResult::Spawned { .. } => {}
CachedResult::Cached { .. } => {}
}
}

@@ -1245,7 +1238,7 @@ async fn build_dataflow(
clock: &HLC,
uv: bool,
daemon_connections: &mut DaemonConnections,
) -> eyre::Result<()> {
) -> eyre::Result<RunningBuild> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;

let mut git_sources_by_daemon = git_sources
@@ -1294,7 +1287,12 @@ async fn build_dataflow(

tracing::info!("successfully triggered dataflow build `{session_id}`",);

Ok(())
Ok(RunningBuild {
errors: Vec::new(),
build_result: CachedResult::default(),
log_subscribers: Vec::new(),
pending_build_results: daemons,
})
}

async fn build_dataflow_on_machine(
@@ -1370,7 +1368,7 @@ async fn start_dataflow(
exited_before_subscribe: Default::default(),
daemons: daemons.clone(),
nodes,
spawn_result: SpawnResult::default(),
spawn_result: CachedResult::default(),
stop_reply_senders: Vec::new(),
log_subscribers: Vec::new(),
pending_spawn_results: daemons,


+ 1
- 2
libraries/message/src/coordinator_to_cli.rs View File

@@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet};
use uuid::Uuid;

pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus};
use crate::{common::DaemonId, id::NodeId, BuildId, SessionId};
use crate::{common::DaemonId, id::NodeId, BuildId};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply {
@@ -14,7 +14,6 @@ pub enum ControlRequestReply {
},
DataflowBuildFinished {
build_id: BuildId,
session_id: SessionId,
result: Result<(), String>,
},
DataflowStartTriggered {


Loading…
Cancel
Save