From 3019eba371f28f2a6cbb2ce57675434677b78547 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 12 Jun 2024 19:03:01 +0200 Subject: [PATCH 01/30] Print only first node error and report more metadata in dataflow results Allows us to mark certain node errors as cascading, which will deprioritize them when printed. Also updates the error printing code to only print the error that happened first. --- binaries/cli/src/attach.rs | 8 +- binaries/cli/src/main.rs | 27 +++- binaries/coordinator/src/lib.rs | 65 ++++---- binaries/coordinator/src/listener.rs | 7 +- binaries/daemon/src/lib.rs | 177 +++++---------------- libraries/core/src/coordinator_messages.rs | 4 +- libraries/core/src/topics.rs | 151 ++++++++++++++++-- 7 files changed, 242 insertions(+), 197 deletions(-) diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 62745e14..72470b36 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -133,9 +133,11 @@ pub fn attach_dataflow( ControlRequestReply::DataflowStarted { uuid: _ } => (), ControlRequestReply::DataflowStopped { uuid, result } => { info!("dataflow {uuid} stopped"); - break result - .map_err(|err| eyre::eyre!(err)) - .wrap_err("dataflow failed"); + break if result.is_ok() { + Ok(()) + } else { + Err(eyre::eyre!("dataflow failed: {}", result.root_error())) + }; } ControlRequestReply::DataflowReloaded { uuid } => { info!("dataflow {uuid} reloaded") diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 413fc1d0..5ffd5d6a 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -433,7 +433,12 @@ fn run() -> eyre::Result<()> { ); } - Daemon::run_dataflow(&dataflow_path).await + let result = Daemon::run_dataflow(&dataflow_path).await?; + if result.is_ok() { + Ok(()) + } else { + eyre::bail!("dataflow failed: {}", result.root_error()) + } } None => { if coordinator_addr.ip() == LOCALHOST { @@ -512,9 +517,13 @@ fn stop_dataflow( let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { - ControlRequestReply::DataflowStopped { uuid: _, result } => result - .map_err(|err| eyre::eyre!(err)) - .wrap_err("dataflow failed"), + ControlRequestReply::DataflowStopped { uuid: _, result } => { + if result.is_ok() { + Ok(()) + } else { + Err(eyre::eyre!("dataflow failed: {}", result.root_error())) + } + } ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected stop dataflow reply: {other:?}"), } @@ -537,9 +546,13 @@ fn stop_dataflow_by_name( let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { - ControlRequestReply::DataflowStopped { uuid: _, result } => result - .map_err(|err| eyre::eyre!(err)) - .wrap_err("dataflow failed"), + ControlRequestReply::DataflowStopped { uuid: _, result } => { + if result.is_ok() { + Ok(()) + } else { + Err(eyre::eyre!("dataflow failed: {}", result.root_error())) + } + } ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected stop dataflow reply: {other:?}"), } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index bfbe6f10..566297df 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,7 +9,9 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{ControlRequest, ControlRequestReply, DataflowId}, + topics::{ + ControlRequest, ControlRequestReply, DataflowDaemonResult, DataflowId, DataflowResult, + }, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -134,7 +136,8 @@ async fn start_inner( let mut events = (abortable_events, daemon_events).merge(); let mut running_dataflows: HashMap = HashMap::new(); - let mut dataflow_results: HashMap>> = HashMap::new(); + let mut dataflow_results: HashMap> = + HashMap::new(); let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); @@ -271,26 +274,20 @@ async fn start_inner( .insert(uuid, ArchivedDataflow::from(entry.get())); } entry.get_mut().machines.remove(&machine_id); - match &result { - Ok(()) => { - tracing::info!("dataflow `{uuid}` finished successfully on machine `{machine_id}`"); - } - Err(err) => { - tracing::error!("{err:?}"); - } - } dataflow_results .entry(uuid) .or_default() - .insert(machine_id, result.map_err(|err| format!("{err:?}"))); + .insert(machine_id, result); if entry.get_mut().machines.is_empty() { let finished_dataflow = entry.remove(); let reply = ControlRequestReply::DataflowStopped { uuid, result: dataflow_results .get(&uuid) - .map(|r| dataflow_result(r, uuid)) - .unwrap_or(Ok(())), + .map(|r| dataflow_result(r, uuid, &clock)) + .unwrap_or_else(|| { + DataflowResult::ok_empty(uuid, clock.new_timestamp()) + }), }; for sender in finished_dataflow.reply_senders { let _ = sender.send(Ok(reply.clone())); @@ -353,8 +350,13 @@ async fn start_inner( uuid: dataflow_uuid, result: dataflow_results .get(&dataflow_uuid) - .map(|r| dataflow_result(r, dataflow_uuid)) - .unwrap_or(Ok(())), + .map(|r| dataflow_result(r, dataflow_uuid, &clock)) + .unwrap_or_else(|| { + DataflowResult::ok_empty( + dataflow_uuid, + clock.new_timestamp(), + ) + }), }, }; let _ = reply_sender.send(Ok(status)); @@ -396,6 +398,7 @@ async fn start_inner( reply_sender, clock.new_timestamp(), grace_duration, + &clock, ) .await?; } @@ -412,6 +415,7 @@ async fn start_inner( reply_sender, clock.new_timestamp(), grace_duration, + &clock, ) .await? } @@ -545,18 +549,19 @@ async fn start_inner( async fn stop_dataflow_by_uuid( running_dataflows: &mut HashMap, - dataflow_results: &HashMap>>, + dataflow_results: &HashMap>, dataflow_uuid: Uuid, daemon_connections: &mut HashMap, reply_sender: tokio::sync::oneshot::Sender>, timestamp: uhlc::Timestamp, grace_duration: Option, + clock: &uhlc::HLC, ) -> Result<(), eyre::ErrReport> { let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else { if let Some(result) = dataflow_results.get(&dataflow_uuid) { let reply = ControlRequestReply::DataflowStopped { uuid: dataflow_uuid, - result: dataflow_result(result, dataflow_uuid), + result: dataflow_result(result, dataflow_uuid, clock), }; let _ = reply_sender.send(Ok(reply)); return Ok(()); @@ -599,22 +604,22 @@ fn format_error(machine: &str, err: &str) -> String { } fn dataflow_result( - results: &BTreeMap>, + results: &BTreeMap, dataflow_uuid: Uuid, -) -> Result<(), String> { - let mut errors = Vec::new(); - for (machine, result) in results { - if let Err(err) = result { - errors.push(format_error(machine, err)); + clock: &uhlc::HLC, +) -> DataflowResult { + let mut node_results = BTreeMap::new(); + for (_machine, result) in results { + node_results.extend(result.node_results.clone()); + if let Err(err) = clock.update_with_timestamp(&result.timestamp) { + tracing::warn!("failed to update HLC: {err}"); } } - if errors.is_empty() { - Ok(()) - } else { - let mut formatted = format!("errors occurred in dataflow {dataflow_uuid}:\n"); - formatted.push_str(&errors.join("\n")); - Err(formatted) + DataflowResult { + uuid: dataflow_uuid, + timestamp: clock.new_timestamp(), + node_results, } } @@ -935,7 +940,7 @@ impl Event { pub enum DataflowEvent { DataflowFinishedOnMachine { machine_id: String, - result: eyre::Result<()>, + result: DataflowDaemonResult, }, ReadyOnMachine { machine_id: String, diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 86600a4b..df105691 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,6 +1,6 @@ use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC}; -use eyre::{eyre, Context}; +use eyre::Context; use std::{io::ErrorKind, net::SocketAddr, sync::Arc}; use tokio::{ net::{TcpListener, TcpStream}, @@ -85,10 +85,7 @@ pub async fn handle_connection( } => { let event = Event::Dataflow { uuid: dataflow_id, - event: DataflowEvent::DataflowFinishedOnMachine { - machine_id, - result: result.map_err(|e| eyre!(e)), - }, + event: DataflowEvent::DataflowFinishedOnMachine { machine_id, result }, }; if events_tx.send(event).await.is_err() { break; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 8b85ee8e..dd38543b 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -5,6 +5,9 @@ use dora_core::coordinator_messages::CoordinatorRequest; use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped}; use dora_core::message::uhlc::{self, HLC}; use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; +use dora_core::topics::{ + DataflowDaemonResult, DataflowResult, NodeError, NodeErrorCause, NodeExitStatus, +}; use dora_core::{ config::{DataId, InputMapping, NodeId}, coordinator_messages::DaemonEvent, @@ -24,9 +27,7 @@ use shared_memory_server::ShmemConf; use std::sync::Arc; use std::time::Instant; use std::{ - borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, - io, net::SocketAddr, path::{Path, PathBuf}, time::Duration, @@ -72,7 +73,7 @@ pub struct Daemon { /// used for testing and examples exit_when_done: Option>, /// used to record dataflow results when `exit_when_done` is used - dataflow_errors: BTreeMap>, + dataflow_node_results: BTreeMap>>, clock: Arc, } @@ -122,7 +123,7 @@ impl Daemon { .map(|_| ()) } - pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> { + pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result { let working_dir = dataflow_path .canonicalize() .context("failed to canoncialize dataflow path")? @@ -134,8 +135,9 @@ impl Daemon { descriptor.check(&working_dir)?; let nodes = descriptor.resolve_aliases_and_set_defaults()?; + let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext)); let spawn_command = SpawnDataflowNodes { - dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)), + dataflow_id, working_dir, nodes, machine_listen_ports: BTreeMap::new(), @@ -165,7 +167,7 @@ impl Daemon { None, "".to_string(), Some(exit_when_done), - clock, + clock.clone(), ); let spawn_result = reply_rx @@ -179,20 +181,15 @@ impl Daemon { } }); - let (dataflow_errors, ()) = future::try_join(run_result, spawn_result).await?; + let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?; - if dataflow_errors.is_empty() { - Ok(()) - } else { - let mut output = "some nodes failed:".to_owned(); - for (dataflow, node_errors) in dataflow_errors { - for (node, error) in node_errors { - use std::fmt::Write; - write!(&mut output, "\n - {dataflow}/{node}: {error}").unwrap(); - } - } - bail!("{output}"); - } + Ok(DataflowResult { + uuid: dataflow_id, + timestamp: clock.new_timestamp(), + node_results: dataflow_results + .remove(&dataflow_id) + .context("no node results for dataflow_id")?, + }) } async fn run_general( @@ -201,7 +198,7 @@ impl Daemon { machine_id: String, exit_when_done: Option>, clock: Arc, - ) -> eyre::Result>> { + ) -> eyre::Result>>> { let coordinator_connection = match coordinator_addr { Some(addr) => { let stream = TcpStream::connect(addr) @@ -225,7 +222,7 @@ impl Daemon { inter_daemon_connections: BTreeMap::new(), machine_id, exit_when_done, - dataflow_errors: BTreeMap::new(), + dataflow_node_results: BTreeMap::new(), clock, }; @@ -246,7 +243,7 @@ impl Daemon { async fn run_inner( mut self, incoming_events: impl Stream> + Unpin, - ) -> eyre::Result>> { + ) -> eyre::Result>>> { let mut events = incoming_events; while let Some(event) = events.next().await { @@ -302,7 +299,7 @@ impl Daemon { } } - Ok(self.dataflow_errors) + Ok(self.dataflow_node_results) } async fn handle_coordinator_event( @@ -921,17 +918,15 @@ impl Daemon { dataflow.running_nodes.remove(node_id); if dataflow.running_nodes.is_empty() { - let result = match self.dataflow_errors.get(&dataflow.id) { - None => Ok(()), - Some(errors) => { - let mut output = "some nodes failed:".to_owned(); - for (node, error) in errors { - use std::fmt::Write; - write!(&mut output, "\n - {node}: {error}").unwrap(); - } - Err(output) - } + let result = DataflowDaemonResult { + timestamp: self.clock.new_timestamp(), + node_results: self + .dataflow_node_results + .get(&dataflow.id) + .context("failed to get dataflow node results")? + .clone(), }; + tracing::info!( "Dataflow `{dataflow_id}` finished on machine `{}`", self.machine_id @@ -1050,80 +1045,25 @@ impl Daemon { node_id, exit_status, } => { - let node_error = match exit_status { + let node_result = match exit_status { NodeExitStatus::Success => { tracing::info!("node {dataflow_id}/{node_id} finished successfully"); - None - } - NodeExitStatus::IoError(err) => { - let err = eyre!(err).wrap_err(format!( - " - I/O error while waiting for node `{dataflow_id}/{node_id}. - - Check logs using: dora logs {dataflow_id} {node_id} - " - )); - tracing::error!("{err:?}"); - Some(err) - } - NodeExitStatus::ExitCode(code) => { - let err = eyre!( - " - {dataflow_id}/{node_id} failed with exit code {code}. - - Check logs using: dora logs {dataflow_id} {node_id} - " - ); - tracing::error!("{err}"); - Some(err) - } - NodeExitStatus::Signal(signal) => { - let signal: Cow<_> = match signal { - 1 => "SIGHUP".into(), - 2 => "SIGINT".into(), - 3 => "SIGQUIT".into(), - 4 => "SIGILL".into(), - 6 => "SIGABRT".into(), - 8 => "SIGFPE".into(), - 9 => "SIGKILL".into(), - 11 => "SIGSEGV".into(), - 13 => "SIGPIPE".into(), - 14 => "SIGALRM".into(), - 15 => "SIGTERM".into(), - 22 => "SIGABRT".into(), - 23 => "NSIG".into(), - - other => other.to_string().into(), - }; - let err = eyre!( - " - {dataflow_id}/{node_id} failed with signal `{signal}` - - Check logs using: dora logs {dataflow_id} {node_id} - " - ); - tracing::error!("{err}"); - Some(err) - } - NodeExitStatus::Unknown => { - let err = eyre!( - " - {dataflow_id}/{node_id} failed with unknown exit code - - Check logs using: dora logs {dataflow_id} {node_id} - " - ); - tracing::error!("{err}"); - Some(err) + Ok(()) } + exit_status => Err(NodeError { + timestamp: self.clock.new_timestamp(), + cause: NodeErrorCause::Other { + // TODO: load from file + stderr: String::new(), + }, + exit_status, + }), }; - if let Some(err) = node_error { - self.dataflow_errors - .entry(dataflow_id) - .or_default() - .insert(node_id.clone(), err); - } + self.dataflow_node_results + .entry(dataflow_id) + .or_default() + .insert(node_id.clone(), node_result); self.handle_node_stop(dataflow_id, &node_id).await?; @@ -1575,39 +1515,6 @@ pub enum DoraEvent { }, } -#[derive(Debug)] -pub enum NodeExitStatus { - Success, - IoError(io::Error), - ExitCode(i32), - Signal(i32), - Unknown, -} - -impl From> for NodeExitStatus { - fn from(result: Result) -> Self { - match result { - Ok(status) => { - if status.success() { - NodeExitStatus::Success - } else if let Some(code) = status.code() { - Self::ExitCode(code) - } else { - #[cfg(unix)] - { - use std::os::unix::process::ExitStatusExt; - if let Some(signal) = status.signal() { - return Self::Signal(signal); - } - } - Self::Unknown - } - } - Err(err) => Self::IoError(err), - } - } -} - #[must_use] enum RunStatus { Continue, diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 38e9eae2..1c5118cc 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,4 +1,4 @@ -use crate::daemon_messages::DataflowId; +use crate::{daemon_messages::DataflowId, topics::DataflowDaemonResult}; use eyre::eyre; #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -22,7 +22,7 @@ pub enum DaemonEvent { }, AllNodesFinished { dataflow_id: DataflowId, - result: Result<(), String>, + result: DataflowDaemonResult, }, Heartbeat, } diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 506c1b42..a46f674c 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,4 +1,10 @@ -use std::{collections::BTreeSet, fmt::Display, path::PathBuf, time::Duration}; +use dora_message::uhlc; +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt::Display, + path::PathBuf, + time::Duration, +}; use uuid::Uuid; use crate::{ @@ -51,20 +57,10 @@ pub enum ControlRequest { pub enum ControlRequestReply { Error(String), CoordinatorStopped, - DataflowStarted { - uuid: Uuid, - }, - DataflowReloaded { - uuid: Uuid, - }, - DataflowStopped { - uuid: Uuid, - result: Result<(), String>, - }, - - DataflowList { - dataflows: Vec, - }, + DataflowStarted { uuid: Uuid }, + DataflowReloaded { uuid: Uuid }, + DataflowStopped { uuid: Uuid, result: DataflowResult }, + DataflowList { dataflows: Vec }, DestroyOk, DaemonConnected(bool), ConnectedMachines(BTreeSet), @@ -86,3 +82,128 @@ impl Display for DataflowId { } } } + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowResult { + pub uuid: Uuid, + pub timestamp: uhlc::Timestamp, + pub node_results: BTreeMap>, +} + +impl DataflowResult { + pub fn ok_empty(uuid: Uuid, timestamp: uhlc::Timestamp) -> Self { + Self { + uuid, + timestamp, + node_results: Default::default(), + } + } + + pub fn is_ok(&self) -> bool { + self.node_results.values().all(|r| r.is_ok()) + } + + pub fn root_error(&self) -> RootError<'_> { + RootError(self) + } +} + +pub struct RootError<'a>(&'a DataflowResult); + +impl std::fmt::Display for RootError<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let failed = self + .0 + .node_results + .iter() + .filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e))); + let total_failed = failed.clone().count(); + + let mut non_cascading: Vec<_> = failed + .clone() + .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading)) + .collect(); + non_cascading.sort_by_key(|(_, e)| e.timestamp); + // try to print earliest non-cascading error + if let Some((id, err)) = non_cascading.first() { + // TODO: better error formatting + write!(f, "Node `{id}` failed: {err:?}")?; + } else { + // no non-cascading errors -> print earliest cascading + let mut all: Vec<_> = failed.collect(); + all.sort_by_key(|(_, e)| e.timestamp); + if let Some((id, err)) = all.first() { + // TODO: better error formatting + write!(f, "Node `{id}` failed: {err:?}")?; + } else { + write!(f, "unknown error")?; + } + } + + if total_failed > 1 { + write!( + f, + "\n\nThere are {} more errors. Check the `out/{}` folder for full details.", + total_failed - 1, + self.0.uuid + )?; + } + + Ok(()) + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowDaemonResult { + pub timestamp: uhlc::Timestamp, + pub node_results: BTreeMap>, +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct NodeError { + pub timestamp: uhlc::Timestamp, + pub cause: NodeErrorCause, + pub exit_status: NodeExitStatus, +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub enum NodeErrorCause { + /// Node failed because another node failed before, + Cascading, + Other { + stderr: String, + }, +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub enum NodeExitStatus { + Success, + IoError(String), + ExitCode(i32), + Signal(i32), + Unknown, +} + +impl From> for NodeExitStatus { + fn from(result: Result) -> Self { + match result { + Ok(status) => { + if status.success() { + NodeExitStatus::Success + } else if let Some(code) = status.code() { + Self::ExitCode(code) + } else { + #[cfg(unix)] + { + use std::os::unix::process::ExitStatusExt; + if let Some(signal) = status.signal() { + return Self::Signal(signal); + } + } + Self::Unknown + } + } + Err(err) => Self::IoError(err.to_string()), + } + } +} From 8056c23db7b38beadf2d8456a94c317bb8a75848 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 13 Jun 2024 11:38:58 +0200 Subject: [PATCH 02/30] List failed and finished dataflows in `dora list` --- binaries/cli/src/main.rs | 39 ++++++++++++++++++++++---------- binaries/coordinator/src/lib.rs | 23 +++++++++++++++---- examples/multiple-daemons/run.rs | 6 ++--- libraries/core/src/topics.rs | 11 ++++++--- 4 files changed, 57 insertions(+), 22 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 97a12400..290a0cc9 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -5,7 +5,7 @@ use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, topics::{ - ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, }, }; @@ -321,17 +321,18 @@ fn run() -> eyre::Result<()> { } => { let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; - let uuids = query_running_dataflows(&mut *session) + let list = query_running_dataflows(&mut *session) .wrap_err("failed to query running dataflows")?; if let Some(dataflow) = dataflow { let uuid = Uuid::parse_str(&dataflow).ok(); let name = if uuid.is_some() { None } else { Some(dataflow) }; logs::logs(&mut *session, uuid, name, node)? } else { - let uuid = match &uuids[..] { + let uuid = match &list.active[..] { [] => bail!("No dataflows are running"), [uuid] => uuid.clone(), - _ => inquire::Select::new("Choose dataflow to show logs:", uuids).prompt()?, + _ => inquire::Select::new("Choose dataflow to show logs:", list.active) + .prompt()?, }; logs::logs(&mut *session, Some(uuid.uuid), None, node)? } @@ -509,11 +510,11 @@ fn stop_dataflow_interactive( grace_duration: Option, session: &mut TcpRequestReplyConnection, ) -> eyre::Result<()> { - let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; - if uuids.is_empty() { + let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; + if list.active.is_empty() { eprintln!("No dataflows are running"); } else { - let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?; + let selection = inquire::Select::new("Choose dataflow to stop:", list.active).prompt()?; stop_dataflow(selection.uuid, grace_duration, session)?; } @@ -571,13 +572,27 @@ fn stop_dataflow_by_name( } fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { - let ids = query_running_dataflows(session)?; + let list = query_running_dataflows(session)?; - if ids.is_empty() { + if list.active.is_empty() { eprintln!("No dataflows are running"); } else { println!("Running dataflows:"); - for id in ids { + for id in list.active { + println!("- {id}"); + } + } + + if !list.failed.is_empty() { + println!("Failed dataflows:"); + for id in list.failed { + println!("- {id}"); + } + } + + if !list.finished.is_empty() { + println!("Finished dataflows:"); + for id in list.finished { println!("- {id}"); } } @@ -587,14 +602,14 @@ fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> fn query_running_dataflows( session: &mut TcpRequestReplyConnection, -) -> Result, eyre::ErrReport> { +) -> Result { let reply_raw = session .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) .wrap_err("failed to send list message")?; let reply: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; let ids = match reply { - ControlRequestReply::DataflowList { dataflows } => dataflows, + ControlRequestReply::DataflowList(list) => list, ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected list dataflow reply: {other:?}"), }; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 3b0c6459..5ab19416 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,7 +9,7 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{ControlRequest, ControlRequestReply, DataflowId}, + topics::{ControlRequest, ControlRequestReply, DataflowId, DataflowList}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -458,15 +458,30 @@ async fn start_inner( let mut dataflows: Vec<_> = running_dataflows.values().collect(); dataflows.sort_by_key(|d| (&d.name, d.uuid)); - let reply = Ok(ControlRequestReply::DataflowList { - dataflows: dataflows + let mut finished = Vec::new(); + let mut failed = Vec::new(); + for (&uuid, results) in &dataflow_results { + let name = + archived_dataflows.get(&uuid).and_then(|d| d.name.clone()); + let id = DataflowId { uuid, name }; + if results.values().all(|r| r.is_ok()) { + finished.push(id); + } else { + failed.push(id); + } + } + + let reply = Ok(ControlRequestReply::DataflowList(DataflowList { + active: dataflows .into_iter() .map(|d| DataflowId { uuid: d.uuid, name: d.name.clone(), }) .collect(), - }); + finished, + failed, + })); let _ = reply_sender.send(reply); } ControlRequest::DaemonConnected => { diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 049dc7d8..c86ae7ed 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -2,8 +2,8 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, topics::{ - ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - DORA_COORDINATOR_PORT_DEFAULT, + ControlRequest, ControlRequestReply, DataflowId, DataflowList, + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, }, }; use dora_tracing::set_up_tracing; @@ -176,7 +176,7 @@ async fn running_dataflows(coordinator_events_tx: &Sender) -> eyre::Resul .await?; let result = reply.await??; let dataflows = match result { - ControlRequestReply::DataflowList { dataflows } => dataflows, + ControlRequestReply::DataflowList(DataflowList { active, .. }) => active, ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected start dataflow reply: {other:?}"), }; diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 8e90e48f..b94c7d61 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -55,6 +55,13 @@ pub enum ControlRequest { ConnectedMachines, } +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowList { + pub active: Vec, + pub finished: Vec, + pub failed: Vec, +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { Error(String), @@ -70,9 +77,7 @@ pub enum ControlRequestReply { result: Result<(), String>, }, - DataflowList { - dataflows: Vec, - }, + DataflowList(DataflowList), DestroyOk, DaemonConnected(bool), ConnectedMachines(BTreeSet), From 71ea44ad017b36c8d933a91fde873fdcd881084e Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 13 Jun 2024 16:45:40 +0200 Subject: [PATCH 03/30] Mark node failures as cascading on init errors caused by other nodes The init function returns an error if another node exited before initialization. In this case, we consider the subsequent error as a cascading error and skip it when printing the error message to the user. --- binaries/daemon/src/lib.rs | 30 +++++++++++++++++++++--------- binaries/daemon/src/pending.rs | 20 ++++++++++++++------ 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index dd38543b..05cfee01 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -654,7 +654,8 @@ impl Daemon { ) .await?; match status { - DataflowStatus::AllNodesReady => { + DataflowStatus::AllNodesReady { cascading_errors } => { + dataflow.cascading_errors.extend(cascading_errors); tracing::info!( "all nodes are ready, starting dataflow `{dataflow_id}`" ); @@ -1050,14 +1051,22 @@ impl Daemon { tracing::info!("node {dataflow_id}/{node_id} finished successfully"); Ok(()) } - exit_status => Err(NodeError { - timestamp: self.clock.new_timestamp(), - cause: NodeErrorCause::Other { - // TODO: load from file - stderr: String::new(), - }, - exit_status, - }), + exit_status => { + let cause = match self.running.get_mut(&dataflow_id) { + Some(dataflow) if dataflow.cascading_errors.contains(&node_id) => { + NodeErrorCause::Cascading + } + _ => NodeErrorCause::Other { + // TODO: load from file + stderr: String::new(), + }, + }; + Err(NodeError { + timestamp: self.clock.new_timestamp(), + cause, + exit_status, + }) + } }; self.dataflow_node_results @@ -1297,6 +1306,8 @@ pub struct RunningDataflow { /// /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. empty_set: BTreeSet, + + cascading_errors: BTreeSet, } impl RunningDataflow { @@ -1315,6 +1326,7 @@ impl RunningDataflow { _timer_handles: Vec::new(), stop_sent: false, empty_set: BTreeSet::new(), + cascading_errors: BTreeSet::new(), } } diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index ccba1a56..87bba6e9 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use dora_core::{ config::NodeId, @@ -113,15 +113,18 @@ impl PendingNodes { } Ok(DataflowStatus::Pending) } else { - self.answer_subscribe_requests(None).await; - Ok(DataflowStatus::AllNodesReady) + let cascading_errors = self.answer_subscribe_requests(None).await; + Ok(DataflowStatus::AllNodesReady { cascading_errors }) } } else { Ok(DataflowStatus::Pending) } } - async fn answer_subscribe_requests(&mut self, external_error: Option) { + async fn answer_subscribe_requests( + &mut self, + external_error: Option, + ) -> BTreeSet { let result = if self.exited_before_subscribe.is_empty() { match external_error { Some(err) => Err(err), @@ -147,9 +150,14 @@ impl PendingNodes { }; // answer all subscribe requests let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); - for reply_sender in subscribe_replies.into_values() { + let mut cascading_errors = BTreeSet::new(); + for (node_id, reply_sender) in subscribe_replies.into_iter() { + if result.is_err() { + cascading_errors.insert(node_id); + } let _ = reply_sender.send(DaemonReply::Result(result.clone())); } + cascading_errors } async fn report_nodes_ready( @@ -182,6 +190,6 @@ impl PendingNodes { } pub enum DataflowStatus { - AllNodesReady, + AllNodesReady { cascading_errors: BTreeSet }, Pending, } From 4e69495012dea006108d96d3b32d5f3bdb6edd23 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 13 Jun 2024 17:08:32 +0200 Subject: [PATCH 04/30] Fix: Pass `cascading_errors` as arg to ensure that it is always applied Before this commit, there were some cases where the returned `DataflowStatus` was ignored and the reported `cascading_errors` were never applied. --- binaries/daemon/src/lib.rs | 17 +++++++++++++---- binaries/daemon/src/pending.rs | 27 +++++++++++++++++---------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 9b9e81ff..bb6a3aa9 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -379,7 +379,10 @@ impl Daemon { Some(dataflow) => { dataflow .pending_nodes - .handle_external_all_nodes_ready(success) + .handle_external_all_nodes_ready( + success, + &mut dataflow.cascading_errors, + ) .await?; if success { tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); @@ -633,6 +636,7 @@ impl Daemon { &node_id, &mut self.coordinator_connection, &self.clock, + &mut dataflow.cascading_errors, ) .await?; } @@ -739,11 +743,11 @@ impl Daemon { reply_sender, &mut self.coordinator_connection, &self.clock, + &mut dataflow.cascading_errors, ) .await?; match status { - DataflowStatus::AllNodesReady { cascading_errors } => { - dataflow.cascading_errors.extend(cascading_errors); + DataflowStatus::AllNodesReady => { tracing::info!( "all nodes are ready, starting dataflow `{dataflow_id}`" ); @@ -994,7 +998,12 @@ impl Daemon { dataflow .pending_nodes - .handle_node_stop(node_id, &mut self.coordinator_connection, &self.clock) + .handle_node_stop( + node_id, + &mut self.coordinator_connection, + &self.clock, + &mut dataflow.cascading_errors, + ) .await?; Self::handle_outputs_done( diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 87bba6e9..599ff85b 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -61,12 +61,13 @@ impl PendingNodes { reply_sender: oneshot::Sender, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut BTreeSet, ) -> eyre::Result { self.waiting_subscribers .insert(node_id.clone(), reply_sender); self.local_nodes.remove(&node_id); - self.update_dataflow_status(coordinator_connection, clock) + self.update_dataflow_status(coordinator_connection, clock, cascading_errors) .await } @@ -75,17 +76,22 @@ impl PendingNodes { node_id: &NodeId, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut BTreeSet, ) -> eyre::Result<()> { if self.local_nodes.remove(node_id) { tracing::warn!("node `{node_id}` exited before initializing dora connection"); self.exited_before_subscribe.insert(node_id.clone()); - self.update_dataflow_status(coordinator_connection, clock) + self.update_dataflow_status(coordinator_connection, clock, cascading_errors) .await?; } Ok(()) } - pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> { + pub async fn handle_external_all_nodes_ready( + &mut self, + success: bool, + cascading_errors: &mut BTreeSet, + ) -> eyre::Result<()> { if !self.local_nodes.is_empty() { bail!("received external `all_nodes_ready` event before local nodes were ready"); } @@ -94,7 +100,8 @@ impl PendingNodes { } else { Some("some nodes failed to initialize on remote machines".to_string()) }; - self.answer_subscribe_requests(external_error).await; + self.answer_subscribe_requests(external_error, cascading_errors) + .await; Ok(()) } @@ -103,6 +110,7 @@ impl PendingNodes { &mut self, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut BTreeSet, ) -> eyre::Result { if self.local_nodes.is_empty() { if self.external_nodes { @@ -113,8 +121,8 @@ impl PendingNodes { } Ok(DataflowStatus::Pending) } else { - let cascading_errors = self.answer_subscribe_requests(None).await; - Ok(DataflowStatus::AllNodesReady { cascading_errors }) + self.answer_subscribe_requests(None, cascading_errors).await; + Ok(DataflowStatus::AllNodesReady) } } else { Ok(DataflowStatus::Pending) @@ -124,7 +132,8 @@ impl PendingNodes { async fn answer_subscribe_requests( &mut self, external_error: Option, - ) -> BTreeSet { + cascading_errors: &mut BTreeSet, + ) { let result = if self.exited_before_subscribe.is_empty() { match external_error { Some(err) => Err(err), @@ -150,14 +159,12 @@ impl PendingNodes { }; // answer all subscribe requests let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); - let mut cascading_errors = BTreeSet::new(); for (node_id, reply_sender) in subscribe_replies.into_iter() { if result.is_err() { cascading_errors.insert(node_id); } let _ = reply_sender.send(DaemonReply::Result(result.clone())); } - cascading_errors } async fn report_nodes_ready( @@ -190,6 +197,6 @@ impl PendingNodes { } pub enum DataflowStatus { - AllNodesReady { cascading_errors: BTreeSet }, + AllNodesReady, Pending, } From ba865638dd1d8ba36a33c9ccf4bd3bda8438dc53 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Sat, 15 Jun 2024 13:32:34 +0200 Subject: [PATCH 05/30] Print errors as formatted string --- libraries/core/src/topics.rs | 37 ++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 52fc88d1..539a8e97 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,5 +1,6 @@ use dora_message::uhlc; use std::{ + borrow::Cow, collections::{BTreeMap, BTreeSet}, fmt::Display, net::{IpAddr, Ipv4Addr}, @@ -129,15 +130,13 @@ impl std::fmt::Display for RootError<'_> { non_cascading.sort_by_key(|(_, e)| e.timestamp); // try to print earliest non-cascading error if let Some((id, err)) = non_cascading.first() { - // TODO: better error formatting - write!(f, "Node `{id}` failed: {err:?}")?; + write!(f, "Node `{id}` failed: {err}")?; } else { // no non-cascading errors -> print earliest cascading let mut all: Vec<_> = failed.collect(); all.sort_by_key(|(_, e)| e.timestamp); if let Some((id, err)) = all.first() { - // TODO: better error formatting - write!(f, "Node `{id}` failed: {err:?}")?; + write!(f, "Node `{id}` failed: {err}")?; } else { write!(f, "unknown error")?; } @@ -169,6 +168,36 @@ pub struct NodeError { pub exit_status: NodeExitStatus, } +impl std::fmt::Display for NodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.exit_status { + NodeExitStatus::Success => write!(f, ""), + NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"), + NodeExitStatus::ExitCode(code) => write!(f, "exited with code: {code}"), + NodeExitStatus::Signal(signal) => { + let signal_str: Cow<_> = match signal { + 1 => "SIGHUP".into(), + 2 => "SIGINT".into(), + 3 => "SIGQUIT".into(), + 4 => "SIGILL".into(), + 6 => "SIGABRT".into(), + 8 => "SIGFPE".into(), + 9 => "SIGKILL".into(), + 11 => "SIGSEGV".into(), + 13 => "SIGPIPE".into(), + 14 => "SIGALRM".into(), + 15 => "SIGTERM".into(), + 22 => "SIGABRT".into(), + 23 => "NSIG".into(), + other => other.to_string().into(), + }; + write!(f, "exited because of signal {signal_str}") + } + NodeExitStatus::Unknown => write!(f, "unknown exit status"), + } + } +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum NodeErrorCause { /// Node failed because another node failed before, From 72bc9cd74eff950261e0d63c60e636363cf78f17 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Sat, 15 Jun 2024 14:11:14 +0200 Subject: [PATCH 06/30] Print error causes and include node that caused error --- binaries/coordinator/src/lib.rs | 16 +++-- binaries/coordinator/src/listener.rs | 4 +- binaries/daemon/src/lib.rs | 57 +++++++++++---- binaries/daemon/src/pending.rs | 81 ++++++++++------------ libraries/core/src/coordinator_messages.rs | 4 +- libraries/core/src/daemon_messages.rs | 2 +- libraries/core/src/topics.rs | 17 ++++- 7 files changed, 112 insertions(+), 69 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index ead7d54f..d299060c 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -223,18 +223,22 @@ async fn start_inner( Event::Dataflow { uuid, event } => match event { DataflowEvent::ReadyOnMachine { machine_id, - success, + exited_before_subscribe, } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { let dataflow = entry.get_mut(); dataflow.pending_machines.remove(&machine_id); - dataflow.init_success &= success; + dataflow + .exited_before_subscribe + .extend(exited_before_subscribe); if dataflow.pending_machines.is_empty() { let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::AllNodesReady { dataflow_id: uuid, - success: dataflow.init_success, + exited_before_subscribe: dataflow + .exited_before_subscribe + .clone(), }, timestamp: clock.new_timestamp(), }) @@ -674,7 +678,7 @@ struct RunningDataflow { machines: BTreeSet, /// IDs of machines that are waiting until all nodes are started. pending_machines: BTreeSet, - init_success: bool, + exited_before_subscribe: Vec, nodes: Vec, reply_senders: Vec>>, @@ -873,7 +877,7 @@ async fn start_dataflow( } else { BTreeSet::new() }, - init_success: true, + exited_before_subscribe: Default::default(), machines, nodes, reply_senders: Vec::new(), @@ -944,7 +948,7 @@ pub enum DataflowEvent { }, ReadyOnMachine { machine_id: String, - success: bool, + exited_before_subscribe: Vec, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index df105691..f6f9b56c 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -66,13 +66,13 @@ pub async fn handle_connection( coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { coordinator_messages::DaemonEvent::AllNodesReady { dataflow_id, - success, + exited_before_subscribe, } => { let event = Event::Dataflow { uuid: dataflow_id, event: DataflowEvent::ReadyOnMachine { machine_id, - success, + exited_before_subscribe, }, }; if events_tx.send(event).await.is_err() { diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index bb6a3aa9..d4b4d71f 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -373,18 +373,19 @@ impl Daemon { } DaemonCoordinatorEvent::AllNodesReady { dataflow_id, - success, + exited_before_subscribe, } => { match self.running.get_mut(&dataflow_id) { Some(dataflow) => { + let ready = exited_before_subscribe.is_empty(); dataflow .pending_nodes .handle_external_all_nodes_ready( - success, - &mut dataflow.cascading_errors, + exited_before_subscribe, + &mut dataflow.cascading_error_causes, ) .await?; - if success { + if ready { tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); dataflow.start(&self.events_tx, &self.clock).await?; } @@ -636,7 +637,7 @@ impl Daemon { &node_id, &mut self.coordinator_connection, &self.clock, - &mut dataflow.cascading_errors, + &mut dataflow.cascading_error_causes, ) .await?; } @@ -743,7 +744,7 @@ impl Daemon { reply_sender, &mut self.coordinator_connection, &self.clock, - &mut dataflow.cascading_errors, + &mut dataflow.cascading_error_causes, ) .await?; match status { @@ -1002,7 +1003,7 @@ impl Daemon { node_id, &mut self.coordinator_connection, &self.clock, - &mut dataflow.cascading_errors, + &mut dataflow.cascading_error_causes, ) .await?; @@ -1153,11 +1154,20 @@ impl Daemon { Ok(()) } exit_status => { - let cause = match self.running.get_mut(&dataflow_id) { - Some(dataflow) if dataflow.cascading_errors.contains(&node_id) => { - NodeErrorCause::Cascading + let caused_by_node = self + .running + .get_mut(&dataflow_id) + .and_then(|dataflow| { + dataflow.cascading_error_causes.error_caused_by(&node_id) + }) + .cloned(); + + let cause = match caused_by_node { + Some(caused_by_node) => { + tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"); + NodeErrorCause::Cascading { caused_by_node } } - _ => NodeErrorCause::Other { + None => NodeErrorCause::Other { // TODO: load from file stderr: String::new(), }, @@ -1382,7 +1392,8 @@ pub struct RunningDataflow { /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. empty_set: BTreeSet, - cascading_errors: BTreeSet, + /// Contains the node that caused the error for nodes that experienced a cascading error. + cascading_error_causes: CascadingErrorCauses, } impl RunningDataflow { @@ -1401,7 +1412,7 @@ impl RunningDataflow { _timer_handles: Vec::new(), stop_sent: false, empty_set: BTreeSet::new(), - cascading_errors: BTreeSet::new(), + cascading_error_causes: Default::default(), } } @@ -1651,3 +1662,23 @@ fn set_up_ctrlc_handler( Ok(ReceiverStream::new(ctrlc_rx)) } + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct CascadingErrorCauses { + caused_by: BTreeMap, +} + +impl CascadingErrorCauses { + pub fn experienced_cascading_error(&self, node: &NodeId) -> bool { + self.caused_by.contains_key(node) + } + + /// Return the ID of the node that caused a cascading error for the given node, if any. + pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> { + self.caused_by.get(node) + } + + pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) { + self.caused_by.entry(affected_node).or_insert(causing_node); + } +} diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 599ff85b..1feb070b 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use dora_core::{ config::NodeId, @@ -9,7 +9,7 @@ use dora_core::{ use eyre::{bail, Context}; use tokio::{net::TcpStream, sync::oneshot}; -use crate::tcp_utils::tcp_send; +use crate::{tcp_utils::tcp_send, CascadingErrorCauses}; pub struct PendingNodes { dataflow_id: DataflowId, @@ -28,7 +28,7 @@ pub struct PendingNodes { /// /// If this list is non-empty, we should not start the dataflow at all. Instead, /// we report an error to the other nodes. - exited_before_subscribe: HashSet, + exited_before_subscribe: Vec, /// Whether the local init result was already reported to the coordinator. reported_init_to_coordinator: bool, @@ -42,7 +42,7 @@ impl PendingNodes { local_nodes: HashSet::new(), external_nodes: false, waiting_subscribers: HashMap::new(), - exited_before_subscribe: HashSet::new(), + exited_before_subscribe: Default::default(), reported_init_to_coordinator: false, } } @@ -61,7 +61,7 @@ impl PendingNodes { reply_sender: oneshot::Sender, coordinator_connection: &mut Option, clock: &HLC, - cascading_errors: &mut BTreeSet, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result { self.waiting_subscribers .insert(node_id.clone(), reply_sender); @@ -76,11 +76,11 @@ impl PendingNodes { node_id: &NodeId, coordinator_connection: &mut Option, clock: &HLC, - cascading_errors: &mut BTreeSet, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result<()> { if self.local_nodes.remove(node_id) { tracing::warn!("node `{node_id}` exited before initializing dora connection"); - self.exited_before_subscribe.insert(node_id.clone()); + self.exited_before_subscribe.push(node_id.clone()); self.update_dataflow_status(coordinator_connection, clock, cascading_errors) .await?; } @@ -89,18 +89,14 @@ impl PendingNodes { pub async fn handle_external_all_nodes_ready( &mut self, - success: bool, - cascading_errors: &mut BTreeSet, + exited_before_subscribe: Vec, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result<()> { if !self.local_nodes.is_empty() { bail!("received external `all_nodes_ready` event before local nodes were ready"); } - let external_error = if success { - None - } else { - Some("some nodes failed to initialize on remote machines".to_string()) - }; - self.answer_subscribe_requests(external_error, cascading_errors) + + self.answer_subscribe_requests(exited_before_subscribe, cascading_errors) .await; Ok(()) @@ -110,7 +106,7 @@ impl PendingNodes { &mut self, coordinator_connection: &mut Option, clock: &HLC, - cascading_errors: &mut BTreeSet, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result { if self.local_nodes.is_empty() { if self.external_nodes { @@ -121,7 +117,8 @@ impl PendingNodes { } Ok(DataflowStatus::Pending) } else { - self.answer_subscribe_requests(None, cascading_errors).await; + self.answer_subscribe_requests(Vec::new(), cascading_errors) + .await; Ok(DataflowStatus::AllNodesReady) } } else { @@ -131,37 +128,33 @@ impl PendingNodes { async fn answer_subscribe_requests( &mut self, - external_error: Option, - cascading_errors: &mut BTreeSet, + exited_before_subscribe_external: Vec, + cascading_errors: &mut CascadingErrorCauses, ) { - let result = if self.exited_before_subscribe.is_empty() { - match external_error { - Some(err) => Err(err), - None => Ok(()), - } - } else { - let node_id_message = if self.exited_before_subscribe.len() == 1 { - self.exited_before_subscribe - .iter() - .next() - .map(|node_id| node_id.to_string()) - .unwrap_or("".to_string()) - } else { - "".to_string() - }; - Err(format!( + let node_exited_before_subscribe = match self.exited_before_subscribe.as_slice() { + [first, ..] => Some(first), + [] => match exited_before_subscribe_external.as_slice() { + [first, ..] => Some(first), + [] => None, + }, + }; + + let result = match &node_exited_before_subscribe { + Some(causing_node) => Err(format!( "Some nodes exited before subscribing to dora: {:?}\n\n\ This is typically happens when an initialization error occurs - in the node or operator. To check the output of the failed - nodes, run `dora logs {} {node_id_message}`.", + in the node or operator. To check the output of the causing + node, run `dora logs {} {causing_node}`.", self.exited_before_subscribe, self.dataflow_id - )) + )), + None => Ok(()), }; + // answer all subscribe requests let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); for (node_id, reply_sender) in subscribe_replies.into_iter() { - if result.is_err() { - cascading_errors.insert(node_id); + if let Some(causing_node) = node_exited_before_subscribe { + cascading_errors.report_cascading_error(causing_node.clone(), node_id.clone()); } let _ = reply_sender.send(DaemonReply::Result(result.clone())); } @@ -176,15 +169,17 @@ impl PendingNodes { bail!("no coordinator connection to send AllNodesReady"); }; - let success = self.exited_before_subscribe.is_empty(); - tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes"); + tracing::info!( + "all local nodes are ready (exit before subscribe: {:?}), waiting for remote nodes", + self.exited_before_subscribe + ); let msg = serde_json::to_vec(&Timestamped { inner: CoordinatorRequest::Event { machine_id: self.machine_id.clone(), event: DaemonEvent::AllNodesReady { dataflow_id: self.dataflow_id, - success, + exited_before_subscribe: self.exited_before_subscribe.clone(), }, }, timestamp, diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 1c5118cc..5a4a1db9 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,4 +1,4 @@ -use crate::{daemon_messages::DataflowId, topics::DataflowDaemonResult}; +use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult}; use eyre::eyre; #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -18,7 +18,7 @@ pub enum CoordinatorRequest { pub enum DaemonEvent { AllNodesReady { dataflow_id: DataflowId, - success: bool, + exited_before_subscribe: Vec, }, AllNodesFinished { dataflow_id: DataflowId, diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index ae6fc262..ce6c459a 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -234,7 +234,7 @@ pub enum DaemonCoordinatorEvent { Spawn(SpawnDataflowNodes), AllNodesReady { dataflow_id: DataflowId, - success: bool, + exited_before_subscribe: Vec, }, StopDataflow { dataflow_id: DataflowId, diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 539a8e97..a2b21e8a 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -125,7 +125,7 @@ impl std::fmt::Display for RootError<'_> { let mut non_cascading: Vec<_> = failed .clone() - .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading)) + .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. })) .collect(); non_cascading.sort_by_key(|(_, e)| e.timestamp); // try to print earliest non-cascading error @@ -194,14 +194,27 @@ impl std::fmt::Display for NodeError { write!(f, "exited because of signal {signal_str}") } NodeExitStatus::Unknown => write!(f, "unknown exit status"), + }?; + + match &self.cause { + NodeErrorCause::Cascading { caused_by_node } => write!( + f, + "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." + )?, + NodeErrorCause::Other { stderr } if stderr.is_empty() => {} + NodeErrorCause::Other { stderr } => write!(f, "\n\nStderr output:\n{stderr}\n")?, } + + Ok(()) } } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum NodeErrorCause { /// Node failed because another node failed before, - Cascading, + Cascading { + caused_by_node: NodeId, + }, Other { stderr: String, }, From 39164b963421837e05ce042ec3a7a578459cc138 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Sat, 15 Jun 2024 15:59:24 +0200 Subject: [PATCH 07/30] Remove old formatting methods --- binaries/coordinator/src/lib.rs | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index d299060c..1ca1985a 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -594,19 +594,6 @@ async fn stop_dataflow_by_uuid( Ok(()) } -fn format_error(machine: &str, err: &str) -> String { - let mut error = err - .lines() - .fold(format!("- machine `{machine}`:\n"), |mut output, line| { - output.push_str(" "); - output.push_str(line); - output.push('\n'); - output - }); - error.push('\n'); - error -} - fn dataflow_result( results: &BTreeMap, dataflow_uuid: Uuid, @@ -983,21 +970,3 @@ fn set_up_ctrlc_handler() -> Result, eyre::ErrReport> Ok(ReceiverStream::new(ctrlc_rx)) } - -#[cfg(test)] -mod test { - #[test] - fn test_format_error() { - let machine = "machine A"; - let err = "foo\nbar\nbuzz"; - - // old method - let old_error = { - #[allow(clippy::format_collect)] - let err: String = err.lines().map(|line| format!(" {line}\n")).collect(); - format!("- machine `{machine}`:\n{err}\n") - }; - let new_error = super::format_error(machine, err); - assert_eq!(old_error, new_error) - } -} From b231df1b82971d6c812370541a7b314b59e40606 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 17 Jun 2024 11:33:02 +0200 Subject: [PATCH 08/30] Report last 10 stderr lines on node failure --- Cargo.lock | 1 + binaries/daemon/Cargo.toml | 1 + binaries/daemon/src/lib.rs | 29 ++++++++++++++++++++++++----- binaries/daemon/src/spawn.rs | 12 +++++++++--- 4 files changed, 35 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74ec1d4f..ad71dee9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2320,6 +2320,7 @@ dependencies = [ "aligned-vec", "async-trait", "bincode", + "crossbeam", "ctrlc", "dora-arrow-convert", "dora-core", diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index a53607f9..274c68bd 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -39,3 +39,4 @@ aligned-vec = "0.5.0" ctrlc = "3.2.5" which = "5.0.0" sysinfo = "0.30.11" +crossbeam = "0.8.4" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index d4b4d71f..f31e284f 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1,5 +1,6 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; +use crossbeam::queue::ArrayQueue; use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; use dora_core::daemon_messages::{ @@ -65,6 +66,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::pending::DataflowStatus; +const STDERR_LOG_LINES: usize = 10; + pub struct Daemon { running: HashMap, working_dir: HashMap, @@ -615,6 +618,11 @@ impl Daemon { dataflow.pending_nodes.insert(node.id.clone()); let node_id = node.id.clone(); + let node_stderr_most_recent = dataflow + .node_stderr_most_recent + .entry(node.id.clone()) + .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES))) + .clone(); match spawn::spawn_node( dataflow_id, &working_dir, @@ -622,6 +630,7 @@ impl Daemon { self.events_tx.clone(), dataflow_descriptor.clone(), self.clock.clone(), + node_stderr_most_recent, ) .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) @@ -1154,9 +1163,8 @@ impl Daemon { Ok(()) } exit_status => { - let caused_by_node = self - .running - .get_mut(&dataflow_id) + let dataflow = self.running.get(&dataflow_id); + let caused_by_node = dataflow .and_then(|dataflow| { dataflow.cascading_error_causes.error_caused_by(&node_id) }) @@ -1168,8 +1176,16 @@ impl Daemon { NodeErrorCause::Cascading { caused_by_node } } None => NodeErrorCause::Other { - // TODO: load from file - stderr: String::new(), + stderr: dataflow + .and_then(|d| d.node_stderr_most_recent.get(&node_id)) + .map(|queue| { + let mut s = String::new(); + while let Some(line) = queue.pop() { + s += &line; + } + s + }) + .unwrap_or_default(), }, }; Err(NodeError { @@ -1394,6 +1410,8 @@ pub struct RunningDataflow { /// Contains the node that caused the error for nodes that experienced a cascading error. cascading_error_causes: CascadingErrorCauses, + + node_stderr_most_recent: BTreeMap>>, } impl RunningDataflow { @@ -1413,6 +1431,7 @@ impl RunningDataflow { stop_sent: false, empty_set: BTreeSet::new(), cascading_error_causes: Default::default(), + node_stderr_most_recent: BTreeMap::new(), } } diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 55a67384..e513500b 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -3,6 +3,7 @@ use crate::{ OutputId, RunningNode, }; use aligned_vec::{AVec, ConstAlign}; +use crossbeam::queue::ArrayQueue; use dora_arrow_convert::IntoArrow; use dora_core::{ config::DataId, @@ -42,6 +43,7 @@ pub async fn spawn_node( daemon_tx: mpsc::Sender>, dataflow_descriptor: Descriptor, clock: Arc, + node_stderr_most_recent: Arc>, ) -> eyre::Result { let node_id = node.id.clone(); tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); @@ -358,18 +360,22 @@ pub async fn spawn_node( } }; - match String::from_utf8(raw) { - Ok(s) => buffer.push_str(&s), + let new = match String::from_utf8(raw) { + Ok(s) => s, Err(err) => { let lossy = String::from_utf8_lossy(err.as_bytes()); tracing::warn!( "stderr not valid UTF-8 string (node {node_id}): {}: {lossy}", err.utf8_error() ); - buffer.push_str(&lossy) + lossy.into_owned() } }; + buffer.push_str(&new); + + node_stderr_most_recent.force_push(new); + if buffer.starts_with("Traceback (most recent call last):") { if !finished { continue; From 43b128f67aa0b1eb48e329e97397f3d0d9c4bdf5 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 17 Jun 2024 11:46:11 +0200 Subject: [PATCH 09/30] Print lines before and after stderr for easier readability --- libraries/core/src/topics.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index a2b21e8a..a0504966 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -202,7 +202,10 @@ impl std::fmt::Display for NodeError { "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." )?, NodeErrorCause::Other { stderr } if stderr.is_empty() => {} - NodeErrorCause::Other { stderr } => write!(f, "\n\nStderr output:\n{stderr}\n")?, + NodeErrorCause::Other { stderr } => { + let line: &str = "---------------------------------------------------------------------------------"; + write!(f, "\n\nStderr output:\n{line}\n{stderr}\n[...]\n{line}\n")? + }, } Ok(()) From 94fecd61c51588d7c8ef9652b872cc516a53df1e Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 17 Jun 2024 17:25:26 +0200 Subject: [PATCH 10/30] Print all non-cascading errors (instead of only first one) --- binaries/cli/src/attach.rs | 7 ++++- binaries/cli/src/formatting.rs | 46 +++++++++++++++++++++++++++++++++ binaries/cli/src/main.rs | 14 +++++++--- libraries/core/src/topics.rs | 47 ---------------------------------- 4 files changed, 63 insertions(+), 51 deletions(-) create mode 100644 binaries/cli/src/formatting.rs diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 72470b36..7c1aefbe 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -11,6 +11,8 @@ use std::{path::PathBuf, sync::mpsc, time::Duration}; use tracing::{error, info}; use uuid::Uuid; +use crate::formatting::FormatDataflowError; + pub fn attach_dataflow( dataflow: Descriptor, dataflow_path: PathBuf, @@ -136,7 +138,10 @@ pub fn attach_dataflow( break if result.is_ok() { Ok(()) } else { - Err(eyre::eyre!("dataflow failed: {}", result.root_error())) + Err(eyre::eyre!( + "dataflow failed: {}", + FormatDataflowError(&result) + )) }; } ControlRequestReply::DataflowReloaded { uuid } => { diff --git a/binaries/cli/src/formatting.rs b/binaries/cli/src/formatting.rs new file mode 100644 index 00000000..0a2cb94b --- /dev/null +++ b/binaries/cli/src/formatting.rs @@ -0,0 +1,46 @@ +use dora_core::topics::{DataflowResult, NodeErrorCause}; + +pub struct FormatDataflowError<'a>(pub &'a DataflowResult); + +impl std::fmt::Display for FormatDataflowError<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let failed = self + .0 + .node_results + .iter() + .filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e))); + let total_failed = failed.clone().count(); + + let mut non_cascading: Vec<_> = failed + .clone() + .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. })) + .collect(); + non_cascading.sort_by_key(|(_, e)| e.timestamp); + // try to print earliest non-cascading error + if !non_cascading.is_empty() { + for (id, err) in non_cascading { + writeln!(f, "Node `{id}` failed: {err}")?; + } + } else { + // no non-cascading errors -> print earliest cascading + let mut all: Vec<_> = failed.collect(); + all.sort_by_key(|(_, e)| e.timestamp); + if let Some((id, err)) = all.first() { + write!(f, "Node `{id}` failed: {err}")?; + } else { + write!(f, "unknown error")?; + } + } + + if total_failed > 1 { + write!( + f, + "\n\nThere are {} more errors. Check the `out/{}` folder for full details.", + total_failed - 1, + self.0.uuid + )?; + } + + Ok(()) + } +} diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 29208736..0d5b0e27 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -15,6 +15,7 @@ use dora_tracing::set_up_tracing; use dora_tracing::set_up_tracing_opts; use duration_str::parse; use eyre::{bail, Context}; +use formatting::FormatDataflowError; use std::net::SocketAddr; use std::{ net::{IpAddr, Ipv4Addr}, @@ -27,6 +28,7 @@ use uuid::Uuid; mod attach; mod build; mod check; +mod formatting; mod graph; mod logs; mod template; @@ -462,7 +464,7 @@ fn run() -> eyre::Result<()> { if result.is_ok() { Ok(()) } else { - eyre::bail!("dataflow failed: {}", result.root_error()) + eyre::bail!("dataflow failed: {}", FormatDataflowError(&result)) } } None => { @@ -546,7 +548,10 @@ fn stop_dataflow( if result.is_ok() { Ok(()) } else { - Err(eyre::eyre!("dataflow failed: {}", result.root_error())) + Err(eyre::eyre!( + "dataflow failed: {}", + FormatDataflowError(&result) + )) } } ControlRequestReply::Error(err) => bail!("{err}"), @@ -575,7 +580,10 @@ fn stop_dataflow_by_name( if result.is_ok() { Ok(()) } else { - Err(eyre::eyre!("dataflow failed: {}", result.root_error())) + Err(eyre::eyre!( + "dataflow failed: {}", + FormatDataflowError(&result) + )) } } ControlRequestReply::Error(err) => bail!("{err}"), diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index a0504966..d2eac29e 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -106,53 +106,6 @@ impl DataflowResult { pub fn is_ok(&self) -> bool { self.node_results.values().all(|r| r.is_ok()) } - - pub fn root_error(&self) -> RootError<'_> { - RootError(self) - } -} - -pub struct RootError<'a>(&'a DataflowResult); - -impl std::fmt::Display for RootError<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let failed = self - .0 - .node_results - .iter() - .filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e))); - let total_failed = failed.clone().count(); - - let mut non_cascading: Vec<_> = failed - .clone() - .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. })) - .collect(); - non_cascading.sort_by_key(|(_, e)| e.timestamp); - // try to print earliest non-cascading error - if let Some((id, err)) = non_cascading.first() { - write!(f, "Node `{id}` failed: {err}")?; - } else { - // no non-cascading errors -> print earliest cascading - let mut all: Vec<_> = failed.collect(); - all.sort_by_key(|(_, e)| e.timestamp); - if let Some((id, err)) = all.first() { - write!(f, "Node `{id}` failed: {err}")?; - } else { - write!(f, "unknown error")?; - } - } - - if total_failed > 1 { - write!( - f, - "\n\nThere are {} more errors. Check the `out/{}` folder for full details.", - total_failed - 1, - self.0.uuid - )?; - } - - Ok(()) - } } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] From 450002ed92557efb0b4ea11fdaf67066c97ab603 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 17 Jun 2024 17:32:23 +0200 Subject: [PATCH 11/30] Fix number of consequential errors and improve message --- binaries/cli/src/formatting.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/binaries/cli/src/formatting.rs b/binaries/cli/src/formatting.rs index 0a2cb94b..022458ca 100644 --- a/binaries/cli/src/formatting.rs +++ b/binaries/cli/src/formatting.rs @@ -17,26 +17,29 @@ impl std::fmt::Display for FormatDataflowError<'_> { .collect(); non_cascading.sort_by_key(|(_, e)| e.timestamp); // try to print earliest non-cascading error - if !non_cascading.is_empty() { + let hidden = if !non_cascading.is_empty() { + let printed = non_cascading.len(); for (id, err) in non_cascading { writeln!(f, "Node `{id}` failed: {err}")?; } + total_failed - printed } else { // no non-cascading errors -> print earliest cascading let mut all: Vec<_> = failed.collect(); all.sort_by_key(|(_, e)| e.timestamp); if let Some((id, err)) = all.first() { write!(f, "Node `{id}` failed: {err}")?; + total_failed - 1 } else { write!(f, "unknown error")?; + 0 } - } + }; - if total_failed > 1 { + if hidden > 1 { write!( f, - "\n\nThere are {} more errors. Check the `out/{}` folder for full details.", - total_failed - 1, + "\n\nThere are {hidden} consequential errors. Check the `out/{}` folder for full details.", self.0.uuid )?; } From 912338d806e9c8eda3d377049edaa5994b8d606f Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 17 Jun 2024 17:35:45 +0200 Subject: [PATCH 12/30] Shorten node_exited_before_subscribe error message --- binaries/daemon/src/pending.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 1feb070b..d1fb1b30 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -141,11 +141,9 @@ impl PendingNodes { let result = match &node_exited_before_subscribe { Some(causing_node) => Err(format!( - "Some nodes exited before subscribing to dora: {:?}\n\n\ - This is typically happens when an initialization error occurs - in the node or operator. To check the output of the causing - node, run `dora logs {} {causing_node}`.", - self.exited_before_subscribe, self.dataflow_id + "Node {causing_node} exited before initializing dora. For \ + more information, run `dora logs {} {causing_node}`.", + self.dataflow_id )), None => Ok(()), }; From cdb3123fa85b07d79d156aa35d1568d0470167c1 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 20 Jun 2024 16:38:58 +0200 Subject: [PATCH 13/30] Format dora list output as table --- Cargo.lock | 12 ++++++++- binaries/cli/Cargo.toml | 1 + binaries/cli/src/main.rs | 54 ++++++++++++++++++++++++++-------------- 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74ec1d4f..8c26f680 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2268,6 +2268,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml 0.9.34+deprecated", + "tabwriter", "termcolor", "tokio", "tokio-stream", @@ -9088,6 +9089,15 @@ dependencies = [ "windows 0.52.0", ] +[[package]] +name = "tabwriter" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a327282c4f64f6dc37e3bba4c2b6842cc3a992f204fa58d917696a89f691e5f6" +dependencies = [ + "unicode-width", +] + [[package]] name = "tap" version = "1.0.1" @@ -9604,7 +9614,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 15ee0424..cc910b39 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -42,3 +42,4 @@ tokio = { version = "1.20.1", features = ["full"] } tokio-stream = { version = "0.1.8", features = ["io-util", "net"] } futures = "0.3.21" duration-str = "0.5" +tabwriter = "1.4.0" diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 290a0cc9..1dcc33f9 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -15,12 +15,13 @@ use dora_tracing::set_up_tracing; use dora_tracing::set_up_tracing_opts; use duration_str::parse; use eyre::{bail, Context}; -use std::net::SocketAddr; +use std::{io::Write, net::SocketAddr}; use std::{ net::{IpAddr, Ipv4Addr}, path::PathBuf, time::Duration, }; +use tabwriter::TabWriter; use tokio::runtime::Builder; use uuid::Uuid; @@ -574,29 +575,46 @@ fn stop_dataflow_by_name( fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { let list = query_running_dataflows(session)?; - if list.active.is_empty() { - eprintln!("No dataflows are running"); - } else { - println!("Running dataflows:"); - for id in list.active { - println!("- {id}"); - } + let mut tw = TabWriter::new(vec![]); + tw.write_all(b"UUID\tName\tStatus\n")?; + + for id in list.active { + tw.write_all( + format!( + "{}\t{}\trunning\n", + id.uuid, + id.name.as_deref().unwrap_or_default() + ) + .as_bytes(), + )?; } - if !list.failed.is_empty() { - println!("Failed dataflows:"); - for id in list.failed { - println!("- {id}"); - } + for id in list.failed { + tw.write_all( + format!( + "{}\t{}\tFAILED\n", + id.uuid, + id.name.as_deref().unwrap_or_default() + ) + .as_bytes(), + )?; } - if !list.finished.is_empty() { - println!("Finished dataflows:"); - for id in list.finished { - println!("- {id}"); - } + for id in list.finished { + tw.write_all( + format!( + "{}\t{}\tfinished\n", + id.uuid, + id.name.as_deref().unwrap_or_default() + ) + .as_bytes(), + )?; } + tw.flush()?; + let formatted = String::from_utf8(tw.into_inner()?)?; + println!("{formatted}"); + Ok(()) } From 262f39fb98349ef07dfb80b9d0bededb5bba9ab9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 20 Jun 2024 16:56:25 +0200 Subject: [PATCH 14/30] Refactor: Report dataflow list as one `Vec` --- binaries/cli/src/main.rs | 58 ++++++++++----------------------- binaries/coordinator/src/lib.rs | 49 ++++++++++++++-------------- libraries/core/src/topics.rs | 27 ++++++++++++--- 3 files changed, 65 insertions(+), 69 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 1dcc33f9..1643d39d 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -329,11 +329,11 @@ fn run() -> eyre::Result<()> { let name = if uuid.is_some() { None } else { Some(dataflow) }; logs::logs(&mut *session, uuid, name, node)? } else { - let uuid = match &list.active[..] { + let active = list.get_active(); + let uuid = match &active[..] { [] => bail!("No dataflows are running"), [uuid] => uuid.clone(), - _ => inquire::Select::new("Choose dataflow to show logs:", list.active) - .prompt()?, + _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, }; logs::logs(&mut *session, Some(uuid.uuid), None, node)? } @@ -512,10 +512,11 @@ fn stop_dataflow_interactive( session: &mut TcpRequestReplyConnection, ) -> eyre::Result<()> { let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; - if list.active.is_empty() { + let active = list.get_active(); + if active.is_empty() { eprintln!("No dataflows are running"); } else { - let selection = inquire::Select::new("Choose dataflow to stop:", list.active).prompt()?; + let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?; stop_dataflow(selection.uuid, grace_duration, session)?; } @@ -577,50 +578,25 @@ fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> let mut tw = TabWriter::new(vec![]); tw.write_all(b"UUID\tName\tStatus\n")?; - - for id in list.active { - tw.write_all( - format!( - "{}\t{}\trunning\n", - id.uuid, - id.name.as_deref().unwrap_or_default() - ) - .as_bytes(), - )?; - } - - for id in list.failed { - tw.write_all( - format!( - "{}\t{}\tFAILED\n", - id.uuid, - id.name.as_deref().unwrap_or_default() - ) - .as_bytes(), - )?; - } - - for id in list.finished { - tw.write_all( - format!( - "{}\t{}\tfinished\n", - id.uuid, - id.name.as_deref().unwrap_or_default() - ) - .as_bytes(), - )?; + for entry in list.0 { + let uuid = entry.id.uuid; + let name = entry.id.name.unwrap_or_default(); + let status = match entry.status { + dora_core::topics::DataflowStatus::Running => "running", + dora_core::topics::DataflowStatus::Finished => "finished", + dora_core::topics::DataflowStatus::Failed => "FAILED", + }; + tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; } - tw.flush()?; let formatted = String::from_utf8(tw.into_inner()?)?; + println!("{formatted}"); Ok(()) } -fn query_running_dataflows( - session: &mut TcpRequestReplyConnection, -) -> Result { +fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result { let reply_raw = session .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) .wrap_err("failed to send list message")?; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 5ab19416..d00dfe22 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,7 +9,7 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{ControlRequest, ControlRequestReply, DataflowId, DataflowList}, + topics::{ControlRequest, ControlRequestReply, DataflowId, DataflowListEntry}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -458,30 +458,31 @@ async fn start_inner( let mut dataflows: Vec<_> = running_dataflows.values().collect(); dataflows.sort_by_key(|d| (&d.name, d.uuid)); - let mut finished = Vec::new(); - let mut failed = Vec::new(); - for (&uuid, results) in &dataflow_results { - let name = - archived_dataflows.get(&uuid).and_then(|d| d.name.clone()); - let id = DataflowId { uuid, name }; - if results.values().all(|r| r.is_ok()) { - finished.push(id); - } else { - failed.push(id); - } - } + let running = dataflows.into_iter().map(|d| DataflowListEntry { + id: DataflowId { + uuid: d.uuid, + name: d.name.clone(), + }, + status: dora_core::topics::DataflowStatus::Running, + }); + let finished_failed = + dataflow_results.iter().map(|(&uuid, results)| { + let name = + archived_dataflows.get(&uuid).and_then(|d| d.name.clone()); + let id = DataflowId { uuid, name }; + let status = if results.values().all(|r| r.is_ok()) { + dora_core::topics::DataflowStatus::Finished + } else { + dora_core::topics::DataflowStatus::Failed + }; + DataflowListEntry { id, status } + }); - let reply = Ok(ControlRequestReply::DataflowList(DataflowList { - active: dataflows - .into_iter() - .map(|d| DataflowId { - uuid: d.uuid, - name: d.name.clone(), - }) - .collect(), - finished, - failed, - })); + let reply = Ok(ControlRequestReply::DataflowList( + dora_core::topics::DataflowList( + running.chain(finished_failed).collect(), + ), + )); let _ = reply_sender.send(reply); } ControlRequest::DaemonConnected => { diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index b94c7d61..9677ec6b 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -56,10 +56,29 @@ pub enum ControlRequest { } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct DataflowList { - pub active: Vec, - pub finished: Vec, - pub failed: Vec, +pub struct DataflowList(pub Vec); + +impl DataflowList { + pub fn get_active(&self) -> Vec { + self.0 + .iter() + .filter(|d| d.status == DataflowStatus::Running) + .map(|d| d.id.clone()) + .collect() + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowListEntry { + pub id: DataflowId, + pub status: DataflowStatus, +} + +#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub enum DataflowStatus { + Running, + Finished, + Failed, } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] From d2e2eef99e8e79275bc8f9203ef1d93e79f95f3d Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 20 Jun 2024 16:57:19 +0200 Subject: [PATCH 15/30] Slightly adjust how status is printed --- binaries/cli/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 1643d39d..630f3f19 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -582,9 +582,9 @@ fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> let uuid = entry.id.uuid; let name = entry.id.name.unwrap_or_default(); let status = match entry.status { - dora_core::topics::DataflowStatus::Running => "running", - dora_core::topics::DataflowStatus::Finished => "finished", - dora_core::topics::DataflowStatus::Failed => "FAILED", + dora_core::topics::DataflowStatus::Running => "Running", + dora_core::topics::DataflowStatus::Finished => "Succeeded", + dora_core::topics::DataflowStatus::Failed => "Failed", }; tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; } From a0e8fff411abebca3b374a8c60d1ac3f5e8133f9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 21 Jun 2024 12:29:25 +0200 Subject: [PATCH 16/30] Include dataflow UUID in error message Co-authored-by: Haixuan Xavier Tao --- binaries/cli/src/attach.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 7c1aefbe..690d8194 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -139,7 +139,7 @@ pub fn attach_dataflow( Ok(()) } else { Err(eyre::eyre!( - "dataflow failed: {}", + "Dataflow {uuid} failed:\n {}", FormatDataflowError(&result) )) }; From 0423155b549a05f0ee2d108239c6e204a4c76234 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 21 Jun 2024 12:40:18 +0200 Subject: [PATCH 17/30] Refactor: move dataflow result handling to a separate function --- binaries/cli/src/main.rs | 39 ++++++++++++++------------------------- 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index fba298c5..67c03ae8 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -460,11 +460,7 @@ fn run() -> eyre::Result<()> { } let result = Daemon::run_dataflow(&dataflow_path).await?; - if result.is_ok() { - Ok(()) - } else { - eyre::bail!("dataflow failed: {}", FormatDataflowError(&result)) - } + handle_dataflow_result(result) } None => { if coordinator_addr.ip() == LOCALHOST { @@ -543,21 +539,23 @@ fn stop_dataflow( let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { - ControlRequestReply::DataflowStopped { uuid: _, result } => { - if result.is_ok() { - Ok(()) - } else { - Err(eyre::eyre!( - "dataflow failed: {}", - FormatDataflowError(&result) - )) - } - } + ControlRequestReply::DataflowStopped { uuid: _, result } => handle_dataflow_result(result), ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected stop dataflow reply: {other:?}"), } } +fn handle_dataflow_result(result: dora_core::topics::DataflowResult) -> Result<(), eyre::Error> { + if result.is_ok() { + Ok(()) + } else { + Err(eyre::eyre!( + "dataflow failed: {}", + FormatDataflowError(&result) + )) + } +} + fn stop_dataflow_by_name( name: String, grace_duration: Option, @@ -575,16 +573,7 @@ fn stop_dataflow_by_name( let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { - ControlRequestReply::DataflowStopped { uuid: _, result } => { - if result.is_ok() { - Ok(()) - } else { - Err(eyre::eyre!( - "dataflow failed: {}", - FormatDataflowError(&result) - )) - } - } + ControlRequestReply::DataflowStopped { uuid: _, result } => handle_dataflow_result(result), ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected stop dataflow reply: {other:?}"), } From 252746a203148dd0d9cbbe6014ff11756dbf5ece Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 21 Jun 2024 13:09:01 +0200 Subject: [PATCH 18/30] Fix: Move ellipsis marker to the beginning of the stderr output --- libraries/core/src/topics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index d2eac29e..123edc6b 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -157,7 +157,7 @@ impl std::fmt::Display for NodeError { NodeErrorCause::Other { stderr } if stderr.is_empty() => {} NodeErrorCause::Other { stderr } => { let line: &str = "---------------------------------------------------------------------------------"; - write!(f, "\n\nStderr output:\n{line}\n{stderr}\n[...]\n{line}\n")? + write!(f, "\n\nStderr output:\n{line}\n[...]\n{stderr}\n{line}\n")? }, } From 4fb5a96390ba5064bf6d49512e0e2fc61547962b Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 21 Jun 2024 13:15:45 +0200 Subject: [PATCH 19/30] Use `handle_dataflow_result for attach too --- binaries/cli/src/attach.rs | 11 ++--------- binaries/cli/src/main.rs | 27 +++++++++++++++++++-------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 690d8194..1d5f8275 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -11,7 +11,7 @@ use std::{path::PathBuf, sync::mpsc, time::Duration}; use tracing::{error, info}; use uuid::Uuid; -use crate::formatting::FormatDataflowError; +use crate::handle_dataflow_result; pub fn attach_dataflow( dataflow: Descriptor, @@ -135,14 +135,7 @@ pub fn attach_dataflow( ControlRequestReply::DataflowStarted { uuid: _ } => (), ControlRequestReply::DataflowStopped { uuid, result } => { info!("dataflow {uuid} stopped"); - break if result.is_ok() { - Ok(()) - } else { - Err(eyre::eyre!( - "Dataflow {uuid} failed:\n {}", - FormatDataflowError(&result) - )) - }; + break handle_dataflow_result(result, Some(uuid)); } ControlRequestReply::DataflowReloaded { uuid } => { info!("dataflow {uuid} reloaded") diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 67c03ae8..070a3640 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -460,7 +460,7 @@ fn run() -> eyre::Result<()> { } let result = Daemon::run_dataflow(&dataflow_path).await?; - handle_dataflow_result(result) + handle_dataflow_result(result, None) } None => { if coordinator_addr.ip() == LOCALHOST { @@ -539,20 +539,29 @@ fn stop_dataflow( let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { - ControlRequestReply::DataflowStopped { uuid: _, result } => handle_dataflow_result(result), + ControlRequestReply::DataflowStopped { uuid, result } => { + handle_dataflow_result(result, Some(uuid)) + } ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected stop dataflow reply: {other:?}"), } } -fn handle_dataflow_result(result: dora_core::topics::DataflowResult) -> Result<(), eyre::Error> { +fn handle_dataflow_result( + result: dora_core::topics::DataflowResult, + uuid: Option, +) -> Result<(), eyre::Error> { if result.is_ok() { Ok(()) } else { - Err(eyre::eyre!( - "dataflow failed: {}", - FormatDataflowError(&result) - )) + Err(match uuid { + Some(uuid) => { + eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result)) + } + None => { + eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result)) + } + }) } } @@ -573,7 +582,9 @@ fn stop_dataflow_by_name( let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { - ControlRequestReply::DataflowStopped { uuid: _, result } => handle_dataflow_result(result), + ControlRequestReply::DataflowStopped { uuid, result } => { + handle_dataflow_result(result, Some(uuid)) + } ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected stop dataflow reply: {other:?}"), } From 361ea2748579316e49c2e1bf25543e87197148b8 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 21 Jun 2024 13:54:11 +0200 Subject: [PATCH 20/30] Slightly tweak error printing --- binaries/cli/src/formatting.rs | 1 + libraries/core/src/topics.rs | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/binaries/cli/src/formatting.rs b/binaries/cli/src/formatting.rs index 022458ca..f19e1599 100644 --- a/binaries/cli/src/formatting.rs +++ b/binaries/cli/src/formatting.rs @@ -4,6 +4,7 @@ pub struct FormatDataflowError<'a>(pub &'a DataflowResult); impl std::fmt::Display for FormatDataflowError<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f)?; let failed = self .0 .node_results diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 123edc6b..cb4061e5 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -126,7 +126,7 @@ impl std::fmt::Display for NodeError { match &self.exit_status { NodeExitStatus::Success => write!(f, ""), NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"), - NodeExitStatus::ExitCode(code) => write!(f, "exited with code: {code}"), + NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"), NodeExitStatus::Signal(signal) => { let signal_str: Cow<_> = match signal { 1 => "SIGHUP".into(), @@ -156,8 +156,8 @@ impl std::fmt::Display for NodeError { )?, NodeErrorCause::Other { stderr } if stderr.is_empty() => {} NodeErrorCause::Other { stderr } => { - let line: &str = "---------------------------------------------------------------------------------"; - write!(f, "\n\nStderr output:\n{line}\n[...]\n{stderr}\n{line}\n")? + let line: &str = "---------------------------------------------------------------------------------\n"; + write!(f, "with stderr output:\n{line}[...]\n{stderr}{line}")? }, } From 24e6c9c30aec456049eb0b83b9a257d39ec59e14 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 21 Jun 2024 17:43:09 +0200 Subject: [PATCH 21/30] Add space --- libraries/core/src/topics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index cb4061e5..dd708292 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -157,7 +157,7 @@ impl std::fmt::Display for NodeError { NodeErrorCause::Other { stderr } if stderr.is_empty() => {} NodeErrorCause::Other { stderr } => { let line: &str = "---------------------------------------------------------------------------------\n"; - write!(f, "with stderr output:\n{line}[...]\n{stderr}{line}")? + write!(f, " with stderr output:\n{line}[...]\n{stderr}{line}")? }, } From 6b91c25d819343c7e23f3597a5e08769734d8a0e Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 24 Jun 2024 05:26:31 +0000 Subject: [PATCH 22/30] Update dependencies --- Cargo.lock | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 45237130..c08cddc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -885,9 +885,12 @@ dependencies = [ [[package]] name = "atomic" -version = "0.5.3" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" +checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" +dependencies = [ + "bytemuck", +] [[package]] name = "atomic-waker" @@ -9871,9 +9874,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "3ea73390fe27785838dcbf75b91b1d84799e28f1ce71e6f372a5dc2200c80de5" dependencies = [ "atomic", "getrandom", @@ -9885,9 +9888,9 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2" +checksum = "cdb394c528d1cb434c2f0522027e50c0705305caf6d20405c07a6f7e4cf9543c" dependencies = [ "proc-macro2", "quote", From ee394361e9f1cbc3b2bed3c1f613ac8482e8ec41 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 11:08:43 +0200 Subject: [PATCH 23/30] Add error cause for grace duration kills --- Cargo.lock | 13 ++++++++++++- binaries/daemon/Cargo.toml | 1 + binaries/daemon/src/lib.rs | 8 ++++++++ libraries/core/src/topics.rs | 6 ++++++ 4 files changed, 27 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index ad71dee9..0043ef5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1925,6 +1925,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -2321,6 +2331,7 @@ dependencies = [ "async-trait", "bincode", "crossbeam", + "crossbeam-skiplist", "ctrlc", "dora-arrow-convert", "dora-core", @@ -9605,7 +9616,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 274c68bd..2f1be890 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -40,3 +40,4 @@ ctrlc = "3.2.5" which = "5.0.0" sysinfo = "0.30.11" crossbeam = "0.8.4" +crossbeam-skiplist = "0.1.3" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index f31e284f..d06453b5 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1169,12 +1169,16 @@ impl Daemon { dataflow.cascading_error_causes.error_caused_by(&node_id) }) .cloned(); + let grace_duration_kill = dataflow + .map(|d| d.grace_duration_kills.contains(&node_id)) + .unwrap_or_default(); let cause = match caused_by_node { Some(caused_by_node) => { tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"); NodeErrorCause::Cascading { caused_by_node } } + None if grace_duration_kill => NodeErrorCause::GraceDuration, None => NodeErrorCause::Other { stderr: dataflow .and_then(|d| d.node_stderr_most_recent.get(&node_id)) @@ -1410,6 +1414,7 @@ pub struct RunningDataflow { /// Contains the node that caused the error for nodes that experienced a cascading error. cascading_error_causes: CascadingErrorCauses, + grace_duration_kills: Arc>, node_stderr_most_recent: BTreeMap>>, } @@ -1431,6 +1436,7 @@ impl RunningDataflow { stop_sent: false, empty_set: BTreeSet::new(), cascading_error_causes: Default::default(), + grace_duration_kills: Default::default(), node_stderr_most_recent: BTreeMap::new(), } } @@ -1494,6 +1500,7 @@ impl RunningDataflow { } let running_nodes = self.running_nodes.clone(); + let grace_duration_kills = self.grace_duration_kills.clone(); tokio::spawn(async move { let duration = grace_duration.unwrap_or(Duration::from_millis(500)); tokio::time::sleep(duration).await; @@ -1503,6 +1510,7 @@ impl RunningDataflow { for (node, node_details) in running_nodes.iter() { if let Some(pid) = node_details.pid { if let Some(process) = system.process(Pid::from(pid as usize)) { + grace_duration_kills.insert(node.clone()); process.kill(); warn!( "{node} was killed due to not stopping within the {:#?} grace period", diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index dd708292..8b587427 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -150,6 +150,10 @@ impl std::fmt::Display for NodeError { }?; match &self.cause { + NodeErrorCause::GraceDuration => write!( + f, + "\n\nThe node was killed by dora because it didn't react to a stop message in time." + )?, NodeErrorCause::Cascading { caused_by_node } => write!( f, "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." @@ -167,6 +171,8 @@ impl std::fmt::Display for NodeError { #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum NodeErrorCause { + /// Node was killed because it didn't react to a stop message in time. + GraceDuration, /// Node failed because another node failed before, Cascading { caused_by_node: NodeId, From ece3f72a39694227d75289c30daf716d2317cdd9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 11:14:56 +0200 Subject: [PATCH 24/30] Shorten grace duration error message to one line --- libraries/core/src/topics.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 8b587427..25f16c48 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -144,16 +144,17 @@ impl std::fmt::Display for NodeError { 23 => "NSIG".into(), other => other.to_string().into(), }; - write!(f, "exited because of signal {signal_str}") + if matches!(self.cause, NodeErrorCause::GraceDuration) { + write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})") + } else { + write!(f, "exited because of signal {signal_str}") + } } NodeExitStatus::Unknown => write!(f, "unknown exit status"), }?; match &self.cause { - NodeErrorCause::GraceDuration => write!( - f, - "\n\nThe node was killed by dora because it didn't react to a stop message in time." - )?, + NodeErrorCause::GraceDuration => {}, // handled above NodeErrorCause::Cascading { caused_by_node } => write!( f, "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." From f91c9e44b2fb06e88c532984d39352597be5a658 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 11:21:31 +0200 Subject: [PATCH 25/30] Make `dora start` attach by default, add `--detach` to opt-out --- binaries/cli/src/main.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index a746d0b8..25eeab23 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -117,6 +117,9 @@ enum Command { /// Attach to the dataflow and wait for its completion #[clap(long, action)] attach: bool, + /// Run the dataflow in background + #[clap(long, action)] + detach: bool, /// Enable hot reloading (Python only) #[clap(long, action)] hot_reload: bool, @@ -341,6 +344,7 @@ fn run() -> eyre::Result<()> { coordinator_addr, coordinator_port, attach, + detach, hot_reload, } => { let dataflow_descriptor = @@ -368,6 +372,16 @@ fn run() -> eyre::Result<()> { &mut *session, )?; + let attach = match (attach, detach) { + (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), + (true, false) => true, + (false, true) => false, + (false, false) => { + println!("attaching to dataflow (use `--detach` to run in background)"); + true + } + }; + if attach { attach_dataflow( dataflow_descriptor, From 6736436e150ff4a2424c27b80abc8e7bea3c354d Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 24 Jun 2024 09:32:44 +0000 Subject: [PATCH 26/30] Update dependencies --- Cargo.lock | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c08cddc0..533bf1a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2014,9 +2014,9 @@ checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991" [[package]] name = "cxx" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8194f089b6da4751d6c1da1ef37c17255df51f9346cdb160f8b096562ae4a85c" +checksum = "273dcfd3acd4e1e276af13ed2a43eea7001318823e7a726a6b3ed39b4acc0b82" dependencies = [ "cc", "cxxbridge-flags", @@ -2026,9 +2026,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e8df9a089caae66634d754672d5f909395f30f38af6ff19366980d8a8b57501" +checksum = "d8b2766fbd92be34e9ed143898fce6c572dc009de39506ed6903e5a05b68914e" dependencies = [ "cc", "codespan-reporting", @@ -2041,15 +2041,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25290be4751803672a70b98c68b51c1e7d0a640ab5a4377f240f9d2e70054cd1" +checksum = "839fcd5e43464614ffaa989eaf1c139ef1f0c51672a1ed08023307fa1b909ccd" [[package]] name = "cxxbridge-macro" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8cb317cb13604b4752416783bb25070381c36e844743e4146b7f8e55de7d140" +checksum = "4b2c1c1776b986979be68bb2285da855f8d8a35851a769fca8740df7c3d07877" dependencies = [ "proc-macro2", "quote", @@ -4583,7 +4583,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if 1.0.0", - "windows-targets 0.52.5", + "windows-targets 0.48.5", ] [[package]] @@ -6391,9 +6391,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] From 5bda3338afca32736fe0768922df51cb5049abb4 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 12:59:28 +0200 Subject: [PATCH 27/30] Only add `[...]` omission marker if stderr buffer is full --- binaries/daemon/src/lib.rs | 6 +++++- libraries/core/src/topics.rs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index d06453b5..2f030a61 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1183,7 +1183,11 @@ impl Daemon { stderr: dataflow .and_then(|d| d.node_stderr_most_recent.get(&node_id)) .map(|queue| { - let mut s = String::new(); + let mut s = if queue.is_full() { + "[...]".into() + } else { + String::new() + }; while let Some(line) = queue.pop() { s += &line; } diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 25f16c48..faebe9d0 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -162,7 +162,7 @@ impl std::fmt::Display for NodeError { NodeErrorCause::Other { stderr } if stderr.is_empty() => {} NodeErrorCause::Other { stderr } => { let line: &str = "---------------------------------------------------------------------------------\n"; - write!(f, " with stderr output:\n{line}[...]\n{stderr}{line}")? + write!(f, " with stderr output:\n{line}{stderr}{line}")? }, } From 3bf521211d40f036bbbf02fd6c02ee1ae17e0522 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 18:14:00 +0200 Subject: [PATCH 28/30] Fix CI: Use `--detach` to start dataflow in background --- .github/workflows/ci.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e5a4017c..8708ba96 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -286,12 +286,12 @@ jobs: cargo build --all dora up dora list - dora start dataflow.yml --name ci-rust-test + dora start dataflow.yml --name ci-rust-test --detach sleep 10 dora stop --name ci-rust-test --grace-duration 5s cd .. dora build examples/rust-dataflow/dataflow_dynamic.yml - dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic + dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic --detach cargo run -p rust-dataflow-example-sink-dynamic sleep 5 dora stop --name ci-rust-dynamic --grace-duration 5s @@ -315,11 +315,11 @@ jobs: cd test_python_project dora up dora list - dora start dataflow.yml --name ci-python-test + dora start dataflow.yml --name ci-python-test --detach sleep 10 dora stop --name ci-python-test --grace-duration 5s pip install "numpy<2.0.0" opencv-python - dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic + dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach python ../examples/python-dataflow/plot_dynamic.py sleep 5 dora stop --name ci-python-test --grace-duration 5s @@ -339,7 +339,7 @@ jobs: cmake -B build cmake --build build cmake --install build - dora start dataflow.yml --name ci-c-test + dora start dataflow.yml --name ci-c-test --detach sleep 10 dora stop --name ci-c-test --grace-duration 5s dora destroy @@ -358,7 +358,7 @@ jobs: cmake -B build cmake --build build cmake --install build - dora start dataflow.yml --name ci-cxx-test + dora start dataflow.yml --name ci-cxx-test --detach sleep 10 dora stop --name ci-cxx-test --grace-duration 5s dora destroy From 12049c4039822a47ac6777d1930f5fc0aa8be484 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 18:56:55 +0200 Subject: [PATCH 29/30] Fix: Add `DataflowDaemonResult::is_ok` method --- libraries/core/src/topics.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 18a48008..7933c875 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -140,6 +140,12 @@ pub struct DataflowDaemonResult { pub node_results: BTreeMap>, } +impl DataflowDaemonResult { + pub fn is_ok(&self) -> bool { + self.node_results.values().all(|r| r.is_ok()) + } +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct NodeError { pub timestamp: uhlc::Timestamp, From cf45f11257f1b699d7de052a8927d0c1cefb8cd2 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 19:00:07 +0200 Subject: [PATCH 30/30] Fix example --- examples/multiple-daemons/run.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index c86ae7ed..7dcb1ab8 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -2,8 +2,8 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, topics::{ - ControlRequest, ControlRequestReply, DataflowId, DataflowList, - DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, + ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, }, }; use dora_tracing::set_up_tracing; @@ -176,7 +176,7 @@ async fn running_dataflows(coordinator_events_tx: &Sender) -> eyre::Resul .await?; let result = reply.await??; let dataflows = match result { - ControlRequestReply::DataflowList(DataflowList { active, .. }) => active, + ControlRequestReply::DataflowList(list) => list.get_active(), ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected start dataflow reply: {other:?}"), };