diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e5a4017c..8708ba96 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -286,12 +286,12 @@ jobs: cargo build --all dora up dora list - dora start dataflow.yml --name ci-rust-test + dora start dataflow.yml --name ci-rust-test --detach sleep 10 dora stop --name ci-rust-test --grace-duration 5s cd .. dora build examples/rust-dataflow/dataflow_dynamic.yml - dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic + dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic --detach cargo run -p rust-dataflow-example-sink-dynamic sleep 5 dora stop --name ci-rust-dynamic --grace-duration 5s @@ -315,11 +315,11 @@ jobs: cd test_python_project dora up dora list - dora start dataflow.yml --name ci-python-test + dora start dataflow.yml --name ci-python-test --detach sleep 10 dora stop --name ci-python-test --grace-duration 5s pip install "numpy<2.0.0" opencv-python - dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic + dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach python ../examples/python-dataflow/plot_dynamic.py sleep 5 dora stop --name ci-python-test --grace-duration 5s @@ -339,7 +339,7 @@ jobs: cmake -B build cmake --build build cmake --install build - dora start dataflow.yml --name ci-c-test + dora start dataflow.yml --name ci-c-test --detach sleep 10 dora stop --name ci-c-test --grace-duration 5s dora destroy @@ -358,7 +358,7 @@ jobs: cmake -B build cmake --build build cmake --install build - dora start dataflow.yml --name ci-cxx-test + dora start dataflow.yml --name ci-cxx-test --detach sleep 10 dora stop --name ci-cxx-test --grace-duration 5s dora destroy diff --git a/Cargo.lock b/Cargo.lock index e48a9c86..ba1261ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -885,9 +885,12 @@ dependencies = [ [[package]] name = "atomic" -version = "0.5.3" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" +checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" +dependencies = [ + "bytemuck", +] [[package]] name = "atomic-waker" @@ -982,7 +985,7 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "sync_wrapper", + "sync_wrapper 0.1.2", "tower", "tower-layer", "tower-service", @@ -1935,6 +1938,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -2021,9 +2034,9 @@ checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991" [[package]] name = "cxx" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8194f089b6da4751d6c1da1ef37c17255df51f9346cdb160f8b096562ae4a85c" +checksum = "273dcfd3acd4e1e276af13ed2a43eea7001318823e7a726a6b3ed39b4acc0b82" dependencies = [ "cc", "cxxbridge-flags", @@ -2033,9 +2046,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e8df9a089caae66634d754672d5f909395f30f38af6ff19366980d8a8b57501" +checksum = "d8b2766fbd92be34e9ed143898fce6c572dc009de39506ed6903e5a05b68914e" dependencies = [ "cc", "codespan-reporting", @@ -2048,15 +2061,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25290be4751803672a70b98c68b51c1e7d0a640ab5a4377f240f9d2e70054cd1" +checksum = "839fcd5e43464614ffaa989eaf1c139ef1f0c51672a1ed08023307fa1b909ccd" [[package]] name = "cxxbridge-macro" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8cb317cb13604b4752416783bb25070381c36e844743e4146b7f8e55de7d140" +checksum = "4b2c1c1776b986979be68bb2285da855f8d8a35851a769fca8740df7c3d07877" dependencies = [ "proc-macro2", "quote", @@ -2281,6 +2294,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml 0.9.34+deprecated", + "tabwriter", "termcolor", "tokio", "tokio-stream", @@ -2336,6 +2350,7 @@ dependencies = [ "async-trait", "bincode", "crossbeam", + "crossbeam-skiplist", "ctrlc", "dora-arrow-convert", "dora-core", @@ -3998,7 +4013,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.7", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -4026,19 +4041,20 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.26.0" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" +checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" dependencies = [ "futures-util", "http 1.1.0", "hyper 1.3.1", "hyper-util", - "rustls 0.22.4", + "rustls 0.23.10", "rustls-pki-types", "tokio", "tokio-rustls", "tower-service", + "webpki-roots 0.26.1", ] [[package]] @@ -4618,7 +4634,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if 1.0.0", - "windows-targets 0.52.5", + "windows-targets 0.48.5", ] [[package]] @@ -6427,9 +6443,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] @@ -6620,8 +6636,8 @@ checksum = "2e8b432585672228923edbbf64b8b12c14e1112f62e88737655b4a083dbcd78e" dependencies = [ "bytes", "pin-project-lite", - "quinn-proto", - "quinn-udp", + "quinn-proto 0.9.6", + "quinn-udp 0.3.2", "rustc-hash", "rustls 0.20.9", "thiserror", @@ -6630,6 +6646,23 @@ dependencies = [ "webpki", ] +[[package]] +name = "quinn" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto 0.11.3", + "quinn-udp 0.5.2", + "rustc-hash", + "rustls 0.23.10", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "quinn-proto" version = "0.9.6" @@ -6649,6 +6682,23 @@ dependencies = [ "webpki", ] +[[package]] +name = "quinn-proto" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" +dependencies = [ + "bytes", + "rand", + "ring 0.17.8", + "rustc-hash", + "rustls 0.23.10", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + [[package]] name = "quinn-udp" version = "0.3.2" @@ -6656,12 +6706,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "641538578b21f5e5c8ea733b736895576d0fe329bb883b937db6f4d163dbaaf4" dependencies = [ "libc", - "quinn-proto", + "quinn-proto 0.9.6", "socket2 0.4.10", "tracing", "windows-sys 0.42.0", ] +[[package]] +name = "quinn-udp" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46" +dependencies = [ + "libc", + "once_cell", + "socket2 0.5.7", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.36" @@ -7869,9 +7932,9 @@ checksum = "19b30a45b0cd0bcca8037f3d0dc3421eaf95327a17cad11964fb8179b4fc4832" [[package]] name = "reqwest" -version = "0.12.4" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" +checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" dependencies = [ "base64 0.22.1", "bytes", @@ -7890,13 +7953,14 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.22.4", + "quinn 0.11.2", + "rustls 0.23.10", "rustls-pemfile 2.1.2", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "tokio", "tokio-rustls", "tower-service", @@ -8274,6 +8338,20 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls" +version = "0.23.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" +dependencies = [ + "once_cell", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -9066,6 +9144,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "syntect" version = "5.2.0" @@ -9128,6 +9212,15 @@ dependencies = [ "windows 0.52.0", ] +[[package]] +name = "tabwriter" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a327282c4f64f6dc37e3bba4c2b6842cc3a992f204fa58d917696a89f691e5f6" +dependencies = [ + "unicode-width", +] + [[package]] name = "tap" version = "1.0.1" @@ -9369,11 +9462,11 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.22.4", + "rustls 0.23.10", "rustls-pki-types", "tokio", ] @@ -9842,9 +9935,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "3ea73390fe27785838dcbf75b91b1d84799e28f1ce71e6f372a5dc2200c80de5" dependencies = [ "atomic", "getrandom", @@ -9856,9 +9949,9 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2" +checksum = "cdb394c528d1cb434c2f0522027e50c0705305caf6d20405c07a6f7e4cf9543c" dependencies = [ "proc-macro2", "quote", @@ -11168,7 +11261,7 @@ dependencies = [ "async-trait", "futures", "log", - "quinn", + "quinn 0.9.4", "rustls 0.20.9", "rustls-native-certs", "rustls-pemfile 1.0.4", diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index a56c6ac5..80b05987 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -42,6 +42,7 @@ tokio = { version = "1.20.1", features = ["full"] } tokio-stream = { version = "0.1.8", features = ["io-util", "net"] } futures = "0.3.21" duration-str = "0.5" +tabwriter = "1.4.0" log = { version = "0.4.21", features = ["serde"] } colored = "2.1.0" env_logger = "0.11.3" diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 71c59c4d..8dcb6590 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -6,7 +6,7 @@ use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, topics::{ - ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, }, }; @@ -17,12 +17,13 @@ use dora_tracing::set_up_tracing_opts; use duration_str::parse; use eyre::{bail, Context}; use formatting::FormatDataflowError; -use std::net::SocketAddr; +use std::{io::Write, net::SocketAddr}; use std::{ net::{IpAddr, Ipv4Addr}, path::PathBuf, time::Duration, }; +use tabwriter::TabWriter; use tokio::runtime::Builder; use uuid::Uuid; @@ -120,6 +121,9 @@ enum Command { /// Attach to the dataflow and wait for its completion #[clap(long, action)] attach: bool, + /// Run the dataflow in background + #[clap(long, action)] + detach: bool, /// Enable hot reloading (Python only) #[clap(long, action)] hot_reload: bool, @@ -330,17 +334,18 @@ fn run() -> eyre::Result<()> { } => { let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; - let uuids = query_running_dataflows(&mut *session) + let list = query_running_dataflows(&mut *session) .wrap_err("failed to query running dataflows")?; if let Some(dataflow) = dataflow { let uuid = Uuid::parse_str(&dataflow).ok(); let name = if uuid.is_some() { None } else { Some(dataflow) }; logs::logs(&mut *session, uuid, name, node)? } else { - let uuid = match &uuids[..] { + let active = list.get_active(); + let uuid = match &active[..] { [] => bail!("No dataflows are running"), [uuid] => uuid.clone(), - _ => inquire::Select::new("Choose dataflow to show logs:", uuids).prompt()?, + _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, }; logs::logs(&mut *session, Some(uuid.uuid), None, node)? } @@ -351,6 +356,7 @@ fn run() -> eyre::Result<()> { coordinator_addr, coordinator_port, attach, + detach, hot_reload, } => { let dataflow_descriptor = @@ -379,6 +385,16 @@ fn run() -> eyre::Result<()> { &mut *session, )?; + let attach = match (attach, detach) { + (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), + (true, false) => true, + (false, true) => false, + (false, false) => { + println!("attaching to dataflow (use `--detach` to run in background)"); + true + } + }; + if attach { attach_dataflow( dataflow_descriptor, @@ -522,11 +538,12 @@ fn stop_dataflow_interactive( grace_duration: Option, session: &mut TcpRequestReplyConnection, ) -> eyre::Result<()> { - let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; - if uuids.is_empty() { + let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; + let active = list.get_active(); + if active.is_empty() { eprintln!("No dataflows are running"); } else { - let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?; + let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?; stop_dataflow(selection.uuid, grace_duration, session)?; } @@ -602,30 +619,36 @@ fn stop_dataflow_by_name( } fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { - let ids = query_running_dataflows(session)?; + let list = query_running_dataflows(session)?; - if ids.is_empty() { - eprintln!("No dataflows are running"); - } else { - println!("Running dataflows:"); - for id in ids { - println!("- {id}"); - } + let mut tw = TabWriter::new(vec![]); + tw.write_all(b"UUID\tName\tStatus\n")?; + for entry in list.0 { + let uuid = entry.id.uuid; + let name = entry.id.name.unwrap_or_default(); + let status = match entry.status { + dora_core::topics::DataflowStatus::Running => "Running", + dora_core::topics::DataflowStatus::Finished => "Succeeded", + dora_core::topics::DataflowStatus::Failed => "Failed", + }; + tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; } + tw.flush()?; + let formatted = String::from_utf8(tw.into_inner()?)?; + + println!("{formatted}"); Ok(()) } -fn query_running_dataflows( - session: &mut TcpRequestReplyConnection, -) -> Result, eyre::ErrReport> { +fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result { let reply_raw = session .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) .wrap_err("failed to send list message")?; let reply: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; let ids = match reply { - ControlRequestReply::DataflowList { dataflows } => dataflows, + ControlRequestReply::DataflowList(list) => list, ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected list dataflow reply: {other:?}"), }; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 504c6136..5cc02e33 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -10,7 +10,8 @@ use dora_core::{ descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, topics::{ - ControlRequest, ControlRequestReply, DataflowDaemonResult, DataflowId, DataflowResult, + ControlRequest, ControlRequestReply, DataflowDaemonResult, DataflowId, DataflowListEntry, + DataflowResult, }, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; @@ -468,15 +469,31 @@ async fn start_inner( let mut dataflows: Vec<_> = running_dataflows.values().collect(); dataflows.sort_by_key(|d| (&d.name, d.uuid)); - let reply = Ok(ControlRequestReply::DataflowList { - dataflows: dataflows - .into_iter() - .map(|d| DataflowId { - uuid: d.uuid, - name: d.name.clone(), - }) - .collect(), + let running = dataflows.into_iter().map(|d| DataflowListEntry { + id: DataflowId { + uuid: d.uuid, + name: d.name.clone(), + }, + status: dora_core::topics::DataflowStatus::Running, }); + let finished_failed = + dataflow_results.iter().map(|(&uuid, results)| { + let name = + archived_dataflows.get(&uuid).and_then(|d| d.name.clone()); + let id = DataflowId { uuid, name }; + let status = if results.values().all(|r| r.is_ok()) { + dora_core::topics::DataflowStatus::Finished + } else { + dora_core::topics::DataflowStatus::Failed + }; + DataflowListEntry { id, status } + }); + + let reply = Ok(ControlRequestReply::DataflowList( + dora_core::topics::DataflowList( + running.chain(finished_failed).collect(), + ), + )); let _ = reply_sender.send(reply); } ControlRequest::DaemonConnected => { diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 274c68bd..2f1be890 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -40,3 +40,4 @@ ctrlc = "3.2.5" which = "5.0.0" sysinfo = "0.30.11" crossbeam = "0.8.4" +crossbeam-skiplist = "0.1.3" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 35f73696..29d6a0c3 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1209,17 +1209,25 @@ impl Daemon { dataflow.cascading_error_causes.error_caused_by(&node_id) }) .cloned(); + let grace_duration_kill = dataflow + .map(|d| d.grace_duration_kills.contains(&node_id)) + .unwrap_or_default(); let cause = match caused_by_node { Some(caused_by_node) => { tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"); NodeErrorCause::Cascading { caused_by_node } } + None if grace_duration_kill => NodeErrorCause::GraceDuration, None => NodeErrorCause::Other { stderr: dataflow .and_then(|d| d.node_stderr_most_recent.get(&node_id)) .map(|queue| { - let mut s = String::new(); + let mut s = if queue.is_full() { + "[...]".into() + } else { + String::new() + }; while let Some(line) = queue.pop() { s += &line; } @@ -1469,6 +1477,7 @@ pub struct RunningDataflow { /// Contains the node that caused the error for nodes that experienced a cascading error. cascading_error_causes: CascadingErrorCauses, + grace_duration_kills: Arc>, node_stderr_most_recent: BTreeMap>>, } @@ -1490,6 +1499,7 @@ impl RunningDataflow { stop_sent: false, empty_set: BTreeSet::new(), cascading_error_causes: Default::default(), + grace_duration_kills: Default::default(), node_stderr_most_recent: BTreeMap::new(), } } @@ -1553,6 +1563,7 @@ impl RunningDataflow { } let running_nodes = self.running_nodes.clone(); + let grace_duration_kills = self.grace_duration_kills.clone(); tokio::spawn(async move { let duration = grace_duration.unwrap_or(Duration::from_millis(500)); tokio::time::sleep(duration).await; @@ -1562,6 +1573,7 @@ impl RunningDataflow { for (node, node_details) in running_nodes.iter() { if let Some(pid) = node_details.pid { if let Some(process) = system.process(Pid::from(pid as usize)) { + grace_duration_kills.insert(node.clone()); process.kill(); warn!( "{node} was killed due to not stopping within the {:#?} grace period", diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 049dc7d8..7dcb1ab8 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -176,7 +176,7 @@ async fn running_dataflows(coordinator_events_tx: &Sender) -> eyre::Resul .await?; let result = reply.await??; let dataflows = match result { - ControlRequestReply::DataflowList { dataflows } => dataflows, + ControlRequestReply::DataflowList(list) => list.get_active(), ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected start dataflow reply: {other:?}"), }; diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 13829ed8..6ad37261 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -61,6 +61,32 @@ pub enum ControlRequest { }, } +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowList(pub Vec); + +impl DataflowList { + pub fn get_active(&self) -> Vec { + self.0 + .iter() + .filter(|d| d.status == DataflowStatus::Running) + .map(|d| d.id.clone()) + .collect() + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowListEntry { + pub id: DataflowId, + pub status: DataflowStatus, +} + +#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub enum DataflowStatus { + Running, + Finished, + Failed, +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { Error(String), @@ -68,7 +94,7 @@ pub enum ControlRequestReply { DataflowStarted { uuid: Uuid }, DataflowReloaded { uuid: Uuid }, DataflowStopped { uuid: Uuid, result: DataflowResult }, - DataflowList { dataflows: Vec }, + DataflowList(DataflowList), DestroyOk, DaemonConnected(bool), ConnectedMachines(BTreeSet), @@ -118,6 +144,12 @@ pub struct DataflowDaemonResult { pub node_results: BTreeMap>, } +impl DataflowDaemonResult { + pub fn is_ok(&self) -> bool { + self.node_results.values().all(|r| r.is_ok()) + } +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct NodeError { pub timestamp: uhlc::Timestamp, @@ -148,12 +180,17 @@ impl std::fmt::Display for NodeError { 23 => "NSIG".into(), other => other.to_string().into(), }; - write!(f, "exited because of signal {signal_str}") + if matches!(self.cause, NodeErrorCause::GraceDuration) { + write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})") + } else { + write!(f, "exited because of signal {signal_str}") + } } NodeExitStatus::Unknown => write!(f, "unknown exit status"), }?; match &self.cause { + NodeErrorCause::GraceDuration => {}, // handled above NodeErrorCause::Cascading { caused_by_node } => write!( f, "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." @@ -161,7 +198,7 @@ impl std::fmt::Display for NodeError { NodeErrorCause::Other { stderr } if stderr.is_empty() => {} NodeErrorCause::Other { stderr } => { let line: &str = "---------------------------------------------------------------------------------\n"; - write!(f, "with stderr output:\n{line}[...]\n{stderr}{line}")? + write!(f, " with stderr output:\n{line}{stderr}{line}")? }, } @@ -171,6 +208,8 @@ impl std::fmt::Display for NodeError { #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum NodeErrorCause { + /// Node was killed because it didn't react to a stop message in time. + GraceDuration, /// Node failed because another node failed before, Cascading { caused_by_node: NodeId,