diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e5a4017c..8708ba96 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -286,12 +286,12 @@ jobs: cargo build --all dora up dora list - dora start dataflow.yml --name ci-rust-test + dora start dataflow.yml --name ci-rust-test --detach sleep 10 dora stop --name ci-rust-test --grace-duration 5s cd .. dora build examples/rust-dataflow/dataflow_dynamic.yml - dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic + dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic --detach cargo run -p rust-dataflow-example-sink-dynamic sleep 5 dora stop --name ci-rust-dynamic --grace-duration 5s @@ -315,11 +315,11 @@ jobs: cd test_python_project dora up dora list - dora start dataflow.yml --name ci-python-test + dora start dataflow.yml --name ci-python-test --detach sleep 10 dora stop --name ci-python-test --grace-duration 5s pip install "numpy<2.0.0" opencv-python - dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic + dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach python ../examples/python-dataflow/plot_dynamic.py sleep 5 dora stop --name ci-python-test --grace-duration 5s @@ -339,7 +339,7 @@ jobs: cmake -B build cmake --build build cmake --install build - dora start dataflow.yml --name ci-c-test + dora start dataflow.yml --name ci-c-test --detach sleep 10 dora stop --name ci-c-test --grace-duration 5s dora destroy @@ -358,7 +358,7 @@ jobs: cmake -B build cmake --build build cmake --install build - dora start dataflow.yml --name ci-cxx-test + dora start dataflow.yml --name ci-cxx-test --detach sleep 10 dora stop --name ci-cxx-test --grace-duration 5s dora destroy diff --git a/Cargo.lock b/Cargo.lock index 09e062a8..02bbd0b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -885,9 +885,12 @@ dependencies = [ [[package]] name = "atomic" -version = "0.5.3" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" +checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" +dependencies = [ + "bytemuck", +] [[package]] name = "atomic-waker" @@ -1925,6 +1928,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -2011,9 +2024,9 @@ checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991" [[package]] name = "cxx" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8194f089b6da4751d6c1da1ef37c17255df51f9346cdb160f8b096562ae4a85c" +checksum = "273dcfd3acd4e1e276af13ed2a43eea7001318823e7a726a6b3ed39b4acc0b82" dependencies = [ "cc", "cxxbridge-flags", @@ -2023,9 +2036,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e8df9a089caae66634d754672d5f909395f30f38af6ff19366980d8a8b57501" +checksum = "d8b2766fbd92be34e9ed143898fce6c572dc009de39506ed6903e5a05b68914e" dependencies = [ "cc", "codespan-reporting", @@ -2038,15 +2051,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25290be4751803672a70b98c68b51c1e7d0a640ab5a4377f240f9d2e70054cd1" +checksum = "839fcd5e43464614ffaa989eaf1c139ef1f0c51672a1ed08023307fa1b909ccd" [[package]] name = "cxxbridge-macro" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8cb317cb13604b4752416783bb25070381c36e844743e4146b7f8e55de7d140" +checksum = "4b2c1c1776b986979be68bb2285da855f8d8a35851a769fca8740df7c3d07877" dependencies = [ "proc-macro2", "quote", @@ -2268,6 +2281,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml 0.9.34+deprecated", + "tabwriter", "termcolor", "tokio", "tokio-stream", @@ -2320,6 +2334,8 @@ dependencies = [ "aligned-vec", "async-trait", "bincode", + "crossbeam", + "crossbeam-skiplist", "ctrlc", "dora-arrow-convert", "dora-core", @@ -4582,7 +4598,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if 1.0.0", - "windows-targets 0.52.5", + "windows-targets 0.48.5", ] [[package]] @@ -6390,9 +6406,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] @@ -9195,6 +9211,15 @@ dependencies = [ "windows 0.52.0", ] +[[package]] +name = "tabwriter" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a327282c4f64f6dc37e3bba4c2b6842cc3a992f204fa58d917696a89f691e5f6" +dependencies = [ + "unicode-width", +] + [[package]] name = "tap" version = "1.0.1" @@ -9909,9 +9934,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "3ea73390fe27785838dcbf75b91b1d84799e28f1ce71e6f372a5dc2200c80de5" dependencies = [ "atomic", "getrandom", @@ -9923,9 +9948,9 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2" +checksum = "cdb394c528d1cb434c2f0522027e50c0705305caf6d20405c07a6f7e4cf9543c" dependencies = [ "proc-macro2", "quote", diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 15ee0424..cc910b39 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -42,3 +42,4 @@ tokio = { version = "1.20.1", features = ["full"] } tokio-stream = { version = "0.1.8", features = ["io-util", "net"] } futures = "0.3.21" duration-str = "0.5" +tabwriter = "1.4.0" diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 62745e14..1d5f8275 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -11,6 +11,8 @@ use std::{path::PathBuf, sync::mpsc, time::Duration}; use tracing::{error, info}; use uuid::Uuid; +use crate::handle_dataflow_result; + pub fn attach_dataflow( dataflow: Descriptor, dataflow_path: PathBuf, @@ -133,9 +135,7 @@ pub fn attach_dataflow( ControlRequestReply::DataflowStarted { uuid: _ } => (), ControlRequestReply::DataflowStopped { uuid, result } => { info!("dataflow {uuid} stopped"); - break result - .map_err(|err| eyre::eyre!(err)) - .wrap_err("dataflow failed"); + break handle_dataflow_result(result, Some(uuid)); } ControlRequestReply::DataflowReloaded { uuid } => { info!("dataflow {uuid} reloaded") diff --git a/binaries/cli/src/formatting.rs b/binaries/cli/src/formatting.rs new file mode 100644 index 00000000..f19e1599 --- /dev/null +++ b/binaries/cli/src/formatting.rs @@ -0,0 +1,50 @@ +use dora_core::topics::{DataflowResult, NodeErrorCause}; + +pub struct FormatDataflowError<'a>(pub &'a DataflowResult); + +impl std::fmt::Display for FormatDataflowError<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f)?; + let failed = self + .0 + .node_results + .iter() + .filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e))); + let total_failed = failed.clone().count(); + + let mut non_cascading: Vec<_> = failed + .clone() + .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. })) + .collect(); + non_cascading.sort_by_key(|(_, e)| e.timestamp); + // try to print earliest non-cascading error + let hidden = if !non_cascading.is_empty() { + let printed = non_cascading.len(); + for (id, err) in non_cascading { + writeln!(f, "Node `{id}` failed: {err}")?; + } + total_failed - printed + } else { + // no non-cascading errors -> print earliest cascading + let mut all: Vec<_> = failed.collect(); + all.sort_by_key(|(_, e)| e.timestamp); + if let Some((id, err)) = all.first() { + write!(f, "Node `{id}` failed: {err}")?; + total_failed - 1 + } else { + write!(f, "unknown error")?; + 0 + } + }; + + if hidden > 1 { + write!( + f, + "\n\nThere are {hidden} consequential errors. Check the `out/{}` folder for full details.", + self.0.uuid + )?; + } + + Ok(()) + } +} diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index a746d0b8..8fdd6a2d 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -5,7 +5,7 @@ use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, topics::{ - ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, }, }; @@ -15,18 +15,21 @@ use dora_tracing::set_up_tracing; use dora_tracing::set_up_tracing_opts; use duration_str::parse; use eyre::{bail, Context}; -use std::net::SocketAddr; +use formatting::FormatDataflowError; +use std::{io::Write, net::SocketAddr}; use std::{ net::{IpAddr, Ipv4Addr}, path::PathBuf, time::Duration, }; +use tabwriter::TabWriter; use tokio::runtime::Builder; use uuid::Uuid; mod attach; mod build; mod check; +mod formatting; mod graph; mod logs; mod template; @@ -117,6 +120,9 @@ enum Command { /// Attach to the dataflow and wait for its completion #[clap(long, action)] attach: bool, + /// Run the dataflow in background + #[clap(long, action)] + detach: bool, /// Enable hot reloading (Python only) #[clap(long, action)] hot_reload: bool, @@ -320,17 +326,18 @@ fn run() -> eyre::Result<()> { } => { let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; - let uuids = query_running_dataflows(&mut *session) + let list = query_running_dataflows(&mut *session) .wrap_err("failed to query running dataflows")?; if let Some(dataflow) = dataflow { let uuid = Uuid::parse_str(&dataflow).ok(); let name = if uuid.is_some() { None } else { Some(dataflow) }; logs::logs(&mut *session, uuid, name, node)? } else { - let uuid = match &uuids[..] { + let active = list.get_active(); + let uuid = match &active[..] { [] => bail!("No dataflows are running"), [uuid] => uuid.clone(), - _ => inquire::Select::new("Choose dataflow to show logs:", uuids).prompt()?, + _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, }; logs::logs(&mut *session, Some(uuid.uuid), None, node)? } @@ -341,6 +348,7 @@ fn run() -> eyre::Result<()> { coordinator_addr, coordinator_port, attach, + detach, hot_reload, } => { let dataflow_descriptor = @@ -368,6 +376,16 @@ fn run() -> eyre::Result<()> { &mut *session, )?; + let attach = match (attach, detach) { + (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), + (true, false) => true, + (false, true) => false, + (false, false) => { + println!("attaching to dataflow (use `--detach` to run in background)"); + true + } + }; + if attach { attach_dataflow( dataflow_descriptor, @@ -457,7 +475,8 @@ fn run() -> eyre::Result<()> { ); } - Daemon::run_dataflow(&dataflow_path).await + let result = Daemon::run_dataflow(&dataflow_path).await?; + handle_dataflow_result(result, None) } None => { if coordinator_addr.ip() == LOCALHOST { @@ -508,11 +527,12 @@ fn stop_dataflow_interactive( grace_duration: Option, session: &mut TcpRequestReplyConnection, ) -> eyre::Result<()> { - let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; - if uuids.is_empty() { + let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; + let active = list.get_active(); + if active.is_empty() { eprintln!("No dataflows are running"); } else { - let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?; + let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?; stop_dataflow(selection.uuid, grace_duration, session)?; } @@ -536,14 +556,32 @@ fn stop_dataflow( let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { - ControlRequestReply::DataflowStopped { uuid: _, result } => result - .map_err(|err| eyre::eyre!(err)) - .wrap_err("dataflow failed"), + ControlRequestReply::DataflowStopped { uuid, result } => { + handle_dataflow_result(result, Some(uuid)) + } ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected stop dataflow reply: {other:?}"), } } +fn handle_dataflow_result( + result: dora_core::topics::DataflowResult, + uuid: Option, +) -> Result<(), eyre::Error> { + if result.is_ok() { + Ok(()) + } else { + Err(match uuid { + Some(uuid) => { + eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result)) + } + None => { + eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result)) + } + }) + } +} + fn stop_dataflow_by_name( name: String, grace_duration: Option, @@ -561,39 +599,45 @@ fn stop_dataflow_by_name( let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { - ControlRequestReply::DataflowStopped { uuid: _, result } => result - .map_err(|err| eyre::eyre!(err)) - .wrap_err("dataflow failed"), + ControlRequestReply::DataflowStopped { uuid, result } => { + handle_dataflow_result(result, Some(uuid)) + } ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected stop dataflow reply: {other:?}"), } } fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { - let ids = query_running_dataflows(session)?; + let list = query_running_dataflows(session)?; - if ids.is_empty() { - eprintln!("No dataflows are running"); - } else { - println!("Running dataflows:"); - for id in ids { - println!("- {id}"); - } + let mut tw = TabWriter::new(vec![]); + tw.write_all(b"UUID\tName\tStatus\n")?; + for entry in list.0 { + let uuid = entry.id.uuid; + let name = entry.id.name.unwrap_or_default(); + let status = match entry.status { + dora_core::topics::DataflowStatus::Running => "Running", + dora_core::topics::DataflowStatus::Finished => "Succeeded", + dora_core::topics::DataflowStatus::Failed => "Failed", + }; + tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; } + tw.flush()?; + let formatted = String::from_utf8(tw.into_inner()?)?; + + println!("{formatted}"); Ok(()) } -fn query_running_dataflows( - session: &mut TcpRequestReplyConnection, -) -> Result, eyre::ErrReport> { +fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result { let reply_raw = session .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) .wrap_err("failed to send list message")?; let reply: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; let ids = match reply { - ControlRequestReply::DataflowList { dataflows } => dataflows, + ControlRequestReply::DataflowList(list) => list, ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected list dataflow reply: {other:?}"), }; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 3b0c6459..67a830b3 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,7 +9,10 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{ControlRequest, ControlRequestReply, DataflowId}, + topics::{ + ControlRequest, ControlRequestReply, DataflowDaemonResult, DataflowId, DataflowListEntry, + DataflowResult, + }, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -134,7 +137,8 @@ async fn start_inner( let mut events = (abortable_events, daemon_events).merge(); let mut running_dataflows: HashMap = HashMap::new(); - let mut dataflow_results: HashMap>> = HashMap::new(); + let mut dataflow_results: HashMap> = + HashMap::new(); let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); @@ -220,18 +224,22 @@ async fn start_inner( Event::Dataflow { uuid, event } => match event { DataflowEvent::ReadyOnMachine { machine_id, - success, + exited_before_subscribe, } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { let dataflow = entry.get_mut(); dataflow.pending_machines.remove(&machine_id); - dataflow.init_success &= success; + dataflow + .exited_before_subscribe + .extend(exited_before_subscribe); if dataflow.pending_machines.is_empty() { let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::AllNodesReady { dataflow_id: uuid, - success: dataflow.init_success, + exited_before_subscribe: dataflow + .exited_before_subscribe + .clone(), }, timestamp: clock.new_timestamp(), }) @@ -271,26 +279,20 @@ async fn start_inner( .insert(uuid, ArchivedDataflow::from(entry.get())); } entry.get_mut().machines.remove(&machine_id); - match &result { - Ok(()) => { - tracing::info!("dataflow `{uuid}` finished successfully on machine `{machine_id}`"); - } - Err(err) => { - tracing::error!("{err:?}"); - } - } dataflow_results .entry(uuid) .or_default() - .insert(machine_id, result.map_err(|err| format!("{err:?}"))); + .insert(machine_id, result); if entry.get_mut().machines.is_empty() { let finished_dataflow = entry.remove(); let reply = ControlRequestReply::DataflowStopped { uuid, result: dataflow_results .get(&uuid) - .map(|r| dataflow_result(r, uuid)) - .unwrap_or(Ok(())), + .map(|r| dataflow_result(r, uuid, &clock)) + .unwrap_or_else(|| { + DataflowResult::ok_empty(uuid, clock.new_timestamp()) + }), }; for sender in finished_dataflow.reply_senders { let _ = sender.send(Ok(reply.clone())); @@ -353,8 +355,13 @@ async fn start_inner( uuid: dataflow_uuid, result: dataflow_results .get(&dataflow_uuid) - .map(|r| dataflow_result(r, dataflow_uuid)) - .unwrap_or(Ok(())), + .map(|r| dataflow_result(r, dataflow_uuid, &clock)) + .unwrap_or_else(|| { + DataflowResult::ok_empty( + dataflow_uuid, + clock.new_timestamp(), + ) + }), }, }; let _ = reply_sender.send(Ok(status)); @@ -396,6 +403,7 @@ async fn start_inner( reply_sender, clock.new_timestamp(), grace_duration, + &clock, ) .await?; } @@ -412,6 +420,7 @@ async fn start_inner( reply_sender, clock.new_timestamp(), grace_duration, + &clock, ) .await? } @@ -458,15 +467,31 @@ async fn start_inner( let mut dataflows: Vec<_> = running_dataflows.values().collect(); dataflows.sort_by_key(|d| (&d.name, d.uuid)); - let reply = Ok(ControlRequestReply::DataflowList { - dataflows: dataflows - .into_iter() - .map(|d| DataflowId { - uuid: d.uuid, - name: d.name.clone(), - }) - .collect(), + let running = dataflows.into_iter().map(|d| DataflowListEntry { + id: DataflowId { + uuid: d.uuid, + name: d.name.clone(), + }, + status: dora_core::topics::DataflowStatus::Running, }); + let finished_failed = + dataflow_results.iter().map(|(&uuid, results)| { + let name = + archived_dataflows.get(&uuid).and_then(|d| d.name.clone()); + let id = DataflowId { uuid, name }; + let status = if results.values().all(|r| r.is_ok()) { + dora_core::topics::DataflowStatus::Finished + } else { + dora_core::topics::DataflowStatus::Failed + }; + DataflowListEntry { id, status } + }); + + let reply = Ok(ControlRequestReply::DataflowList( + dora_core::topics::DataflowList( + running.chain(finished_failed).collect(), + ), + )); let _ = reply_sender.send(reply); } ControlRequest::DaemonConnected => { @@ -545,18 +570,19 @@ async fn start_inner( async fn stop_dataflow_by_uuid( running_dataflows: &mut HashMap, - dataflow_results: &HashMap>>, + dataflow_results: &HashMap>, dataflow_uuid: Uuid, daemon_connections: &mut HashMap, reply_sender: tokio::sync::oneshot::Sender>, timestamp: uhlc::Timestamp, grace_duration: Option, + clock: &uhlc::HLC, ) -> Result<(), eyre::ErrReport> { let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else { if let Some(result) = dataflow_results.get(&dataflow_uuid) { let reply = ControlRequestReply::DataflowStopped { uuid: dataflow_uuid, - result: dataflow_result(result, dataflow_uuid), + result: dataflow_result(result, dataflow_uuid, clock), }; let _ = reply_sender.send(Ok(reply)); return Ok(()); @@ -585,36 +611,23 @@ async fn stop_dataflow_by_uuid( Ok(()) } -fn format_error(machine: &str, err: &str) -> String { - let mut error = err - .lines() - .fold(format!("- machine `{machine}`:\n"), |mut output, line| { - output.push_str(" "); - output.push_str(line); - output.push('\n'); - output - }); - error.push('\n'); - error -} - fn dataflow_result( - results: &BTreeMap>, + results: &BTreeMap, dataflow_uuid: Uuid, -) -> Result<(), String> { - let mut errors = Vec::new(); - for (machine, result) in results { - if let Err(err) = result { - errors.push(format_error(machine, err)); + clock: &uhlc::HLC, +) -> DataflowResult { + let mut node_results = BTreeMap::new(); + for (_machine, result) in results { + node_results.extend(result.node_results.clone()); + if let Err(err) = clock.update_with_timestamp(&result.timestamp) { + tracing::warn!("failed to update HLC: {err}"); } } - if errors.is_empty() { - Ok(()) - } else { - let mut formatted = format!("errors occurred in dataflow {dataflow_uuid}:\n"); - formatted.push_str(&errors.join("\n")); - Err(formatted) + DataflowResult { + uuid: dataflow_uuid, + timestamp: clock.new_timestamp(), + node_results, } } @@ -669,7 +682,7 @@ struct RunningDataflow { machines: BTreeSet, /// IDs of machines that are waiting until all nodes are started. pending_machines: BTreeSet, - init_success: bool, + exited_before_subscribe: Vec, nodes: Vec, reply_senders: Vec>>, @@ -868,7 +881,7 @@ async fn start_dataflow( } else { BTreeSet::new() }, - init_success: true, + exited_before_subscribe: Default::default(), machines, nodes, reply_senders: Vec::new(), @@ -935,11 +948,11 @@ impl Event { pub enum DataflowEvent { DataflowFinishedOnMachine { machine_id: String, - result: eyre::Result<()>, + result: DataflowDaemonResult, }, ReadyOnMachine { machine_id: String, - success: bool, + exited_before_subscribe: Vec, }, } @@ -974,21 +987,3 @@ fn set_up_ctrlc_handler() -> Result, eyre::ErrReport> Ok(ReceiverStream::new(ctrlc_rx)) } - -#[cfg(test)] -mod test { - #[test] - fn test_format_error() { - let machine = "machine A"; - let err = "foo\nbar\nbuzz"; - - // old method - let old_error = { - #[allow(clippy::format_collect)] - let err: String = err.lines().map(|line| format!(" {line}\n")).collect(); - format!("- machine `{machine}`:\n{err}\n") - }; - let new_error = super::format_error(machine, err); - assert_eq!(old_error, new_error) - } -} diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 86600a4b..f6f9b56c 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,6 +1,6 @@ use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC}; -use eyre::{eyre, Context}; +use eyre::Context; use std::{io::ErrorKind, net::SocketAddr, sync::Arc}; use tokio::{ net::{TcpListener, TcpStream}, @@ -66,13 +66,13 @@ pub async fn handle_connection( coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { coordinator_messages::DaemonEvent::AllNodesReady { dataflow_id, - success, + exited_before_subscribe, } => { let event = Event::Dataflow { uuid: dataflow_id, event: DataflowEvent::ReadyOnMachine { machine_id, - success, + exited_before_subscribe, }, }; if events_tx.send(event).await.is_err() { @@ -85,10 +85,7 @@ pub async fn handle_connection( } => { let event = Event::Dataflow { uuid: dataflow_id, - event: DataflowEvent::DataflowFinishedOnMachine { - machine_id, - result: result.map_err(|e| eyre!(e)), - }, + event: DataflowEvent::DataflowFinishedOnMachine { machine_id, result }, }; if events_tx.send(event).await.is_err() { break; diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index a53607f9..2f1be890 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -39,3 +39,5 @@ aligned-vec = "0.5.0" ctrlc = "3.2.5" which = "5.0.0" sysinfo = "0.30.11" +crossbeam = "0.8.4" +crossbeam-skiplist = "0.1.3" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 521b5bd8..2f030a61 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1,5 +1,6 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; +use crossbeam::queue::ArrayQueue; use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; use dora_core::daemon_messages::{ @@ -9,6 +10,9 @@ use dora_core::descriptor::runtime_node_inputs; use dora_core::message::uhlc::{self, HLC}; use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; use dora_core::topics::LOCALHOST; +use dora_core::topics::{ + DataflowDaemonResult, DataflowResult, NodeError, NodeErrorCause, NodeExitStatus, +}; use dora_core::{ config::{DataId, InputMapping, NodeId}, coordinator_messages::DaemonEvent, @@ -29,9 +33,7 @@ use shared_memory_server::ShmemConf; use std::sync::Arc; use std::time::Instant; use std::{ - borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, - io, net::SocketAddr, path::{Path, PathBuf}, time::Duration, @@ -64,6 +66,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::pending::DataflowStatus; +const STDERR_LOG_LINES: usize = 10; + pub struct Daemon { running: HashMap, working_dir: HashMap, @@ -78,7 +82,7 @@ pub struct Daemon { /// used for testing and examples exit_when_done: Option>, /// used to record dataflow results when `exit_when_done` is used - dataflow_errors: BTreeMap>, + dataflow_node_results: BTreeMap>>, clock: Arc, } @@ -148,7 +152,7 @@ impl Daemon { .map(|_| ()) } - pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> { + pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result { let working_dir = dataflow_path .canonicalize() .context("failed to canoncialize dataflow path")? @@ -160,8 +164,9 @@ impl Daemon { descriptor.check(&working_dir)?; let nodes = descriptor.resolve_aliases_and_set_defaults()?; + let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext)); let spawn_command = SpawnDataflowNodes { - dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)), + dataflow_id, working_dir, nodes, machine_listen_ports: BTreeMap::new(), @@ -191,7 +196,7 @@ impl Daemon { None, "".to_string(), Some(exit_when_done), - clock, + clock.clone(), ); let spawn_result = reply_rx @@ -205,20 +210,15 @@ impl Daemon { } }); - let (dataflow_errors, ()) = future::try_join(run_result, spawn_result).await?; + let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?; - if dataflow_errors.is_empty() { - Ok(()) - } else { - let mut output = "some nodes failed:".to_owned(); - for (dataflow, node_errors) in dataflow_errors { - for (node, error) in node_errors { - use std::fmt::Write; - write!(&mut output, "\n - {dataflow}/{node}: {error}").unwrap(); - } - } - bail!("{output}"); - } + Ok(DataflowResult { + uuid: dataflow_id, + timestamp: clock.new_timestamp(), + node_results: dataflow_results + .remove(&dataflow_id) + .context("no node results for dataflow_id")?, + }) } async fn run_general( @@ -227,7 +227,7 @@ impl Daemon { machine_id: String, exit_when_done: Option>, clock: Arc, - ) -> eyre::Result>> { + ) -> eyre::Result>>> { let coordinator_connection = match coordinator_addr { Some(addr) => { let stream = TcpStream::connect(addr) @@ -251,7 +251,7 @@ impl Daemon { inter_daemon_connections: BTreeMap::new(), machine_id, exit_when_done, - dataflow_errors: BTreeMap::new(), + dataflow_node_results: BTreeMap::new(), clock, }; @@ -272,7 +272,7 @@ impl Daemon { async fn run_inner( mut self, incoming_events: impl Stream> + Unpin, - ) -> eyre::Result>> { + ) -> eyre::Result>>> { let mut events = incoming_events; while let Some(event) = events.next().await { @@ -329,7 +329,7 @@ impl Daemon { } } - Ok(self.dataflow_errors) + Ok(self.dataflow_node_results) } async fn handle_coordinator_event( @@ -376,15 +376,19 @@ impl Daemon { } DaemonCoordinatorEvent::AllNodesReady { dataflow_id, - success, + exited_before_subscribe, } => { match self.running.get_mut(&dataflow_id) { Some(dataflow) => { + let ready = exited_before_subscribe.is_empty(); dataflow .pending_nodes - .handle_external_all_nodes_ready(success) + .handle_external_all_nodes_ready( + exited_before_subscribe, + &mut dataflow.cascading_error_causes, + ) .await?; - if success { + if ready { tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); dataflow.start(&self.events_tx, &self.clock).await?; } @@ -614,6 +618,11 @@ impl Daemon { dataflow.pending_nodes.insert(node.id.clone()); let node_id = node.id.clone(); + let node_stderr_most_recent = dataflow + .node_stderr_most_recent + .entry(node.id.clone()) + .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES))) + .clone(); match spawn::spawn_node( dataflow_id, &working_dir, @@ -621,6 +630,7 @@ impl Daemon { self.events_tx.clone(), dataflow_descriptor.clone(), self.clock.clone(), + node_stderr_most_recent, ) .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) @@ -636,6 +646,7 @@ impl Daemon { &node_id, &mut self.coordinator_connection, &self.clock, + &mut dataflow.cascading_error_causes, ) .await?; } @@ -742,6 +753,7 @@ impl Daemon { reply_sender, &mut self.coordinator_connection, &self.clock, + &mut dataflow.cascading_error_causes, ) .await?; match status { @@ -996,7 +1008,12 @@ impl Daemon { dataflow .pending_nodes - .handle_node_stop(node_id, &mut self.coordinator_connection, &self.clock) + .handle_node_stop( + node_id, + &mut self.coordinator_connection, + &self.clock, + &mut dataflow.cascading_error_causes, + ) .await?; Self::handle_outputs_done( @@ -1013,17 +1030,15 @@ impl Daemon { .iter() .all(|(_id, n)| n.node_config.dynamic) { - let result = match self.dataflow_errors.get(&dataflow.id) { - None => Ok(()), - Some(errors) => { - let mut output = "some nodes failed:".to_owned(); - for (node, error) in errors { - use std::fmt::Write; - write!(&mut output, "\n - {node}: {error}").unwrap(); - } - Err(output) - } + let result = DataflowDaemonResult { + timestamp: self.clock.new_timestamp(), + node_results: self + .dataflow_node_results + .get(&dataflow.id) + .context("failed to get dataflow node results")? + .clone(), }; + tracing::info!( "Dataflow `{dataflow_id}` finished on machine `{}`", self.machine_id @@ -1142,80 +1157,57 @@ impl Daemon { node_id, exit_status, } => { - let node_error = match exit_status { + let node_result = match exit_status { NodeExitStatus::Success => { tracing::info!("node {dataflow_id}/{node_id} finished successfully"); - None - } - NodeExitStatus::IoError(err) => { - let err = eyre!(err).wrap_err(format!( - " - I/O error while waiting for node `{dataflow_id}/{node_id}. - - Check logs using: dora logs {dataflow_id} {node_id} - " - )); - tracing::error!("{err:?}"); - Some(err) + Ok(()) } - NodeExitStatus::ExitCode(code) => { - let err = eyre!( - " - {dataflow_id}/{node_id} failed with exit code {code}. - - Check logs using: dora logs {dataflow_id} {node_id} - " - ); - tracing::error!("{err}"); - Some(err) - } - NodeExitStatus::Signal(signal) => { - let signal: Cow<_> = match signal { - 1 => "SIGHUP".into(), - 2 => "SIGINT".into(), - 3 => "SIGQUIT".into(), - 4 => "SIGILL".into(), - 6 => "SIGABRT".into(), - 8 => "SIGFPE".into(), - 9 => "SIGKILL".into(), - 11 => "SIGSEGV".into(), - 13 => "SIGPIPE".into(), - 14 => "SIGALRM".into(), - 15 => "SIGTERM".into(), - 22 => "SIGABRT".into(), - 23 => "NSIG".into(), - - other => other.to_string().into(), + exit_status => { + let dataflow = self.running.get(&dataflow_id); + let caused_by_node = dataflow + .and_then(|dataflow| { + dataflow.cascading_error_causes.error_caused_by(&node_id) + }) + .cloned(); + let grace_duration_kill = dataflow + .map(|d| d.grace_duration_kills.contains(&node_id)) + .unwrap_or_default(); + + let cause = match caused_by_node { + Some(caused_by_node) => { + tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"); + NodeErrorCause::Cascading { caused_by_node } + } + None if grace_duration_kill => NodeErrorCause::GraceDuration, + None => NodeErrorCause::Other { + stderr: dataflow + .and_then(|d| d.node_stderr_most_recent.get(&node_id)) + .map(|queue| { + let mut s = if queue.is_full() { + "[...]".into() + } else { + String::new() + }; + while let Some(line) = queue.pop() { + s += &line; + } + s + }) + .unwrap_or_default(), + }, }; - let err = eyre!( - " - {dataflow_id}/{node_id} failed with signal `{signal}` - - Check logs using: dora logs {dataflow_id} {node_id} - " - ); - tracing::error!("{err}"); - Some(err) - } - NodeExitStatus::Unknown => { - let err = eyre!( - " - {dataflow_id}/{node_id} failed with unknown exit code - - Check logs using: dora logs {dataflow_id} {node_id} - " - ); - tracing::error!("{err}"); - Some(err) + Err(NodeError { + timestamp: self.clock.new_timestamp(), + cause, + exit_status, + }) } }; - if let Some(err) = node_error { - self.dataflow_errors - .entry(dataflow_id) - .or_default() - .insert(node_id.clone(), err); - } + self.dataflow_node_results + .entry(dataflow_id) + .or_default() + .insert(node_id.clone(), node_result); self.handle_node_stop(dataflow_id, &node_id).await?; @@ -1423,6 +1415,12 @@ pub struct RunningDataflow { /// /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. empty_set: BTreeSet, + + /// Contains the node that caused the error for nodes that experienced a cascading error. + cascading_error_causes: CascadingErrorCauses, + grace_duration_kills: Arc>, + + node_stderr_most_recent: BTreeMap>>, } impl RunningDataflow { @@ -1441,6 +1439,9 @@ impl RunningDataflow { _timer_handles: Vec::new(), stop_sent: false, empty_set: BTreeSet::new(), + cascading_error_causes: Default::default(), + grace_duration_kills: Default::default(), + node_stderr_most_recent: BTreeMap::new(), } } @@ -1503,6 +1504,7 @@ impl RunningDataflow { } let running_nodes = self.running_nodes.clone(); + let grace_duration_kills = self.grace_duration_kills.clone(); tokio::spawn(async move { let duration = grace_duration.unwrap_or(Duration::from_millis(500)); tokio::time::sleep(duration).await; @@ -1512,6 +1514,7 @@ impl RunningDataflow { for (node, node_details) in running_nodes.iter() { if let Some(pid) = node_details.pid { if let Some(process) = system.process(Pid::from(pid as usize)) { + grace_duration_kills.insert(node.clone()); process.kill(); warn!( "{node} was killed due to not stopping within the {:#?} grace period", @@ -1644,39 +1647,6 @@ pub enum DoraEvent { }, } -#[derive(Debug)] -pub enum NodeExitStatus { - Success, - IoError(io::Error), - ExitCode(i32), - Signal(i32), - Unknown, -} - -impl From> for NodeExitStatus { - fn from(result: Result) -> Self { - match result { - Ok(status) => { - if status.success() { - NodeExitStatus::Success - } else if let Some(code) = status.code() { - Self::ExitCode(code) - } else { - #[cfg(unix)] - { - use std::os::unix::process::ExitStatusExt; - if let Some(signal) = status.signal() { - return Self::Signal(signal); - } - } - Self::Unknown - } - } - Err(err) => Self::IoError(err), - } - } -} - #[must_use] enum RunStatus { Continue, @@ -1723,3 +1693,23 @@ fn set_up_ctrlc_handler( Ok(ReceiverStream::new(ctrlc_rx)) } + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct CascadingErrorCauses { + caused_by: BTreeMap, +} + +impl CascadingErrorCauses { + pub fn experienced_cascading_error(&self, node: &NodeId) -> bool { + self.caused_by.contains_key(node) + } + + /// Return the ID of the node that caused a cascading error for the given node, if any. + pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> { + self.caused_by.get(node) + } + + pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) { + self.caused_by.entry(affected_node).or_insert(causing_node); + } +} diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index ccba1a56..d1fb1b30 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -9,7 +9,7 @@ use dora_core::{ use eyre::{bail, Context}; use tokio::{net::TcpStream, sync::oneshot}; -use crate::tcp_utils::tcp_send; +use crate::{tcp_utils::tcp_send, CascadingErrorCauses}; pub struct PendingNodes { dataflow_id: DataflowId, @@ -28,7 +28,7 @@ pub struct PendingNodes { /// /// If this list is non-empty, we should not start the dataflow at all. Instead, /// we report an error to the other nodes. - exited_before_subscribe: HashSet, + exited_before_subscribe: Vec, /// Whether the local init result was already reported to the coordinator. reported_init_to_coordinator: bool, @@ -42,7 +42,7 @@ impl PendingNodes { local_nodes: HashSet::new(), external_nodes: false, waiting_subscribers: HashMap::new(), - exited_before_subscribe: HashSet::new(), + exited_before_subscribe: Default::default(), reported_init_to_coordinator: false, } } @@ -61,12 +61,13 @@ impl PendingNodes { reply_sender: oneshot::Sender, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result { self.waiting_subscribers .insert(node_id.clone(), reply_sender); self.local_nodes.remove(&node_id); - self.update_dataflow_status(coordinator_connection, clock) + self.update_dataflow_status(coordinator_connection, clock, cascading_errors) .await } @@ -75,26 +76,28 @@ impl PendingNodes { node_id: &NodeId, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result<()> { if self.local_nodes.remove(node_id) { tracing::warn!("node `{node_id}` exited before initializing dora connection"); - self.exited_before_subscribe.insert(node_id.clone()); - self.update_dataflow_status(coordinator_connection, clock) + self.exited_before_subscribe.push(node_id.clone()); + self.update_dataflow_status(coordinator_connection, clock, cascading_errors) .await?; } Ok(()) } - pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> { + pub async fn handle_external_all_nodes_ready( + &mut self, + exited_before_subscribe: Vec, + cascading_errors: &mut CascadingErrorCauses, + ) -> eyre::Result<()> { if !self.local_nodes.is_empty() { bail!("received external `all_nodes_ready` event before local nodes were ready"); } - let external_error = if success { - None - } else { - Some("some nodes failed to initialize on remote machines".to_string()) - }; - self.answer_subscribe_requests(external_error).await; + + self.answer_subscribe_requests(exited_before_subscribe, cascading_errors) + .await; Ok(()) } @@ -103,6 +106,7 @@ impl PendingNodes { &mut self, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result { if self.local_nodes.is_empty() { if self.external_nodes { @@ -113,7 +117,8 @@ impl PendingNodes { } Ok(DataflowStatus::Pending) } else { - self.answer_subscribe_requests(None).await; + self.answer_subscribe_requests(Vec::new(), cascading_errors) + .await; Ok(DataflowStatus::AllNodesReady) } } else { @@ -121,33 +126,34 @@ impl PendingNodes { } } - async fn answer_subscribe_requests(&mut self, external_error: Option) { - let result = if self.exited_before_subscribe.is_empty() { - match external_error { - Some(err) => Err(err), - None => Ok(()), - } - } else { - let node_id_message = if self.exited_before_subscribe.len() == 1 { - self.exited_before_subscribe - .iter() - .next() - .map(|node_id| node_id.to_string()) - .unwrap_or("".to_string()) - } else { - "".to_string() - }; - Err(format!( - "Some nodes exited before subscribing to dora: {:?}\n\n\ - This is typically happens when an initialization error occurs - in the node or operator. To check the output of the failed - nodes, run `dora logs {} {node_id_message}`.", - self.exited_before_subscribe, self.dataflow_id - )) + async fn answer_subscribe_requests( + &mut self, + exited_before_subscribe_external: Vec, + cascading_errors: &mut CascadingErrorCauses, + ) { + let node_exited_before_subscribe = match self.exited_before_subscribe.as_slice() { + [first, ..] => Some(first), + [] => match exited_before_subscribe_external.as_slice() { + [first, ..] => Some(first), + [] => None, + }, }; + + let result = match &node_exited_before_subscribe { + Some(causing_node) => Err(format!( + "Node {causing_node} exited before initializing dora. For \ + more information, run `dora logs {} {causing_node}`.", + self.dataflow_id + )), + None => Ok(()), + }; + // answer all subscribe requests let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); - for reply_sender in subscribe_replies.into_values() { + for (node_id, reply_sender) in subscribe_replies.into_iter() { + if let Some(causing_node) = node_exited_before_subscribe { + cascading_errors.report_cascading_error(causing_node.clone(), node_id.clone()); + } let _ = reply_sender.send(DaemonReply::Result(result.clone())); } } @@ -161,15 +167,17 @@ impl PendingNodes { bail!("no coordinator connection to send AllNodesReady"); }; - let success = self.exited_before_subscribe.is_empty(); - tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes"); + tracing::info!( + "all local nodes are ready (exit before subscribe: {:?}), waiting for remote nodes", + self.exited_before_subscribe + ); let msg = serde_json::to_vec(&Timestamped { inner: CoordinatorRequest::Event { machine_id: self.machine_id.clone(), event: DaemonEvent::AllNodesReady { dataflow_id: self.dataflow_id, - success, + exited_before_subscribe: self.exited_before_subscribe.clone(), }, }, timestamp, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index e670d6f6..b0af2e31 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -3,6 +3,7 @@ use crate::{ OutputId, RunningNode, }; use aligned_vec::{AVec, ConstAlign}; +use crossbeam::queue::ArrayQueue; use dora_arrow_convert::IntoArrow; use dora_core::{ config::DataId, @@ -42,6 +43,7 @@ pub async fn spawn_node( daemon_tx: mpsc::Sender>, dataflow_descriptor: Descriptor, clock: Arc, + node_stderr_most_recent: Arc>, ) -> eyre::Result { let node_id = node.id.clone(); tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); @@ -358,18 +360,22 @@ pub async fn spawn_node( } }; - match String::from_utf8(raw) { - Ok(s) => buffer.push_str(&s), + let new = match String::from_utf8(raw) { + Ok(s) => s, Err(err) => { let lossy = String::from_utf8_lossy(err.as_bytes()); tracing::warn!( "stderr not valid UTF-8 string (node {node_id}): {}: {lossy}", err.utf8_error() ); - buffer.push_str(&lossy) + lossy.into_owned() } }; + buffer.push_str(&new); + + node_stderr_most_recent.force_push(new); + if buffer.starts_with("Traceback (most recent call last):") { if !finished { continue; diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 049dc7d8..7dcb1ab8 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -176,7 +176,7 @@ async fn running_dataflows(coordinator_events_tx: &Sender) -> eyre::Resul .await?; let result = reply.await??; let dataflows = match result { - ControlRequestReply::DataflowList { dataflows } => dataflows, + ControlRequestReply::DataflowList(list) => list.get_active(), ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected start dataflow reply: {other:?}"), }; diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 38e9eae2..5a4a1db9 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,4 +1,4 @@ -use crate::daemon_messages::DataflowId; +use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult}; use eyre::eyre; #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -18,11 +18,11 @@ pub enum CoordinatorRequest { pub enum DaemonEvent { AllNodesReady { dataflow_id: DataflowId, - success: bool, + exited_before_subscribe: Vec, }, AllNodesFinished { dataflow_id: DataflowId, - result: Result<(), String>, + result: DataflowDaemonResult, }, Heartbeat, } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index ae6fc262..ce6c459a 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -234,7 +234,7 @@ pub enum DaemonCoordinatorEvent { Spawn(SpawnDataflowNodes), AllNodesReady { dataflow_id: DataflowId, - success: bool, + exited_before_subscribe: Vec, }, StopDataflow { dataflow_id: DataflowId, diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 8e90e48f..7933c875 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,5 +1,7 @@ +use dora_message::uhlc; use std::{ - collections::BTreeSet, + borrow::Cow, + collections::{BTreeMap, BTreeSet}, fmt::Display, net::{IpAddr, Ipv4Addr}, path::PathBuf, @@ -55,24 +57,40 @@ pub enum ControlRequest { ConnectedMachines, } +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowList(pub Vec); + +impl DataflowList { + pub fn get_active(&self) -> Vec { + self.0 + .iter() + .filter(|d| d.status == DataflowStatus::Running) + .map(|d| d.id.clone()) + .collect() + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowListEntry { + pub id: DataflowId, + pub status: DataflowStatus, +} + +#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub enum DataflowStatus { + Running, + Finished, + Failed, +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { Error(String), CoordinatorStopped, - DataflowStarted { - uuid: Uuid, - }, - DataflowReloaded { - uuid: Uuid, - }, - DataflowStopped { - uuid: Uuid, - result: Result<(), String>, - }, - - DataflowList { - dataflows: Vec, - }, + DataflowStarted { uuid: Uuid }, + DataflowReloaded { uuid: Uuid }, + DataflowStopped { uuid: Uuid, result: DataflowResult }, + DataflowList(DataflowList), DestroyOk, DaemonConnected(bool), ConnectedMachines(BTreeSet), @@ -94,3 +112,138 @@ impl Display for DataflowId { } } } + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowResult { + pub uuid: Uuid, + pub timestamp: uhlc::Timestamp, + pub node_results: BTreeMap>, +} + +impl DataflowResult { + pub fn ok_empty(uuid: Uuid, timestamp: uhlc::Timestamp) -> Self { + Self { + uuid, + timestamp, + node_results: Default::default(), + } + } + + pub fn is_ok(&self) -> bool { + self.node_results.values().all(|r| r.is_ok()) + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowDaemonResult { + pub timestamp: uhlc::Timestamp, + pub node_results: BTreeMap>, +} + +impl DataflowDaemonResult { + pub fn is_ok(&self) -> bool { + self.node_results.values().all(|r| r.is_ok()) + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct NodeError { + pub timestamp: uhlc::Timestamp, + pub cause: NodeErrorCause, + pub exit_status: NodeExitStatus, +} + +impl std::fmt::Display for NodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.exit_status { + NodeExitStatus::Success => write!(f, ""), + NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"), + NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"), + NodeExitStatus::Signal(signal) => { + let signal_str: Cow<_> = match signal { + 1 => "SIGHUP".into(), + 2 => "SIGINT".into(), + 3 => "SIGQUIT".into(), + 4 => "SIGILL".into(), + 6 => "SIGABRT".into(), + 8 => "SIGFPE".into(), + 9 => "SIGKILL".into(), + 11 => "SIGSEGV".into(), + 13 => "SIGPIPE".into(), + 14 => "SIGALRM".into(), + 15 => "SIGTERM".into(), + 22 => "SIGABRT".into(), + 23 => "NSIG".into(), + other => other.to_string().into(), + }; + if matches!(self.cause, NodeErrorCause::GraceDuration) { + write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})") + } else { + write!(f, "exited because of signal {signal_str}") + } + } + NodeExitStatus::Unknown => write!(f, "unknown exit status"), + }?; + + match &self.cause { + NodeErrorCause::GraceDuration => {}, // handled above + NodeErrorCause::Cascading { caused_by_node } => write!( + f, + "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." + )?, + NodeErrorCause::Other { stderr } if stderr.is_empty() => {} + NodeErrorCause::Other { stderr } => { + let line: &str = "---------------------------------------------------------------------------------\n"; + write!(f, " with stderr output:\n{line}{stderr}{line}")? + }, + } + + Ok(()) + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub enum NodeErrorCause { + /// Node was killed because it didn't react to a stop message in time. + GraceDuration, + /// Node failed because another node failed before, + Cascading { + caused_by_node: NodeId, + }, + Other { + stderr: String, + }, +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub enum NodeExitStatus { + Success, + IoError(String), + ExitCode(i32), + Signal(i32), + Unknown, +} + +impl From> for NodeExitStatus { + fn from(result: Result) -> Self { + match result { + Ok(status) => { + if status.success() { + NodeExitStatus::Success + } else if let Some(code) = status.code() { + Self::ExitCode(code) + } else { + #[cfg(unix)] + { + use std::os::unix::process::ExitStatusExt; + if let Some(signal) = status.signal() { + return Self::Signal(signal); + } + } + Self::Unknown + } + } + Err(err) => Self::IoError(err.to_string()), + } + } +}