From 8056c23db7b38beadf2d8456a94c317bb8a75848 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 13 Jun 2024 11:38:58 +0200 Subject: [PATCH 01/15] List failed and finished dataflows in `dora list` --- binaries/cli/src/main.rs | 39 ++++++++++++++++++++++---------- binaries/coordinator/src/lib.rs | 23 +++++++++++++++---- examples/multiple-daemons/run.rs | 6 ++--- libraries/core/src/topics.rs | 11 ++++++--- 4 files changed, 57 insertions(+), 22 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 97a12400..290a0cc9 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -5,7 +5,7 @@ use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, topics::{ - ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, }, }; @@ -321,17 +321,18 @@ fn run() -> eyre::Result<()> { } => { let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; - let uuids = query_running_dataflows(&mut *session) + let list = query_running_dataflows(&mut *session) .wrap_err("failed to query running dataflows")?; if let Some(dataflow) = dataflow { let uuid = Uuid::parse_str(&dataflow).ok(); let name = if uuid.is_some() { None } else { Some(dataflow) }; logs::logs(&mut *session, uuid, name, node)? } else { - let uuid = match &uuids[..] { + let uuid = match &list.active[..] { [] => bail!("No dataflows are running"), [uuid] => uuid.clone(), - _ => inquire::Select::new("Choose dataflow to show logs:", uuids).prompt()?, + _ => inquire::Select::new("Choose dataflow to show logs:", list.active) + .prompt()?, }; logs::logs(&mut *session, Some(uuid.uuid), None, node)? } @@ -509,11 +510,11 @@ fn stop_dataflow_interactive( grace_duration: Option, session: &mut TcpRequestReplyConnection, ) -> eyre::Result<()> { - let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; - if uuids.is_empty() { + let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; + if list.active.is_empty() { eprintln!("No dataflows are running"); } else { - let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?; + let selection = inquire::Select::new("Choose dataflow to stop:", list.active).prompt()?; stop_dataflow(selection.uuid, grace_duration, session)?; } @@ -571,13 +572,27 @@ fn stop_dataflow_by_name( } fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { - let ids = query_running_dataflows(session)?; + let list = query_running_dataflows(session)?; - if ids.is_empty() { + if list.active.is_empty() { eprintln!("No dataflows are running"); } else { println!("Running dataflows:"); - for id in ids { + for id in list.active { + println!("- {id}"); + } + } + + if !list.failed.is_empty() { + println!("Failed dataflows:"); + for id in list.failed { + println!("- {id}"); + } + } + + if !list.finished.is_empty() { + println!("Finished dataflows:"); + for id in list.finished { println!("- {id}"); } } @@ -587,14 +602,14 @@ fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> fn query_running_dataflows( session: &mut TcpRequestReplyConnection, -) -> Result, eyre::ErrReport> { +) -> Result { let reply_raw = session .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) .wrap_err("failed to send list message")?; let reply: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; let ids = match reply { - ControlRequestReply::DataflowList { dataflows } => dataflows, + ControlRequestReply::DataflowList(list) => list, ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected list dataflow reply: {other:?}"), }; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 3b0c6459..5ab19416 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,7 +9,7 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{ControlRequest, ControlRequestReply, DataflowId}, + topics::{ControlRequest, ControlRequestReply, DataflowId, DataflowList}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -458,15 +458,30 @@ async fn start_inner( let mut dataflows: Vec<_> = running_dataflows.values().collect(); dataflows.sort_by_key(|d| (&d.name, d.uuid)); - let reply = Ok(ControlRequestReply::DataflowList { - dataflows: dataflows + let mut finished = Vec::new(); + let mut failed = Vec::new(); + for (&uuid, results) in &dataflow_results { + let name = + archived_dataflows.get(&uuid).and_then(|d| d.name.clone()); + let id = DataflowId { uuid, name }; + if results.values().all(|r| r.is_ok()) { + finished.push(id); + } else { + failed.push(id); + } + } + + let reply = Ok(ControlRequestReply::DataflowList(DataflowList { + active: dataflows .into_iter() .map(|d| DataflowId { uuid: d.uuid, name: d.name.clone(), }) .collect(), - }); + finished, + failed, + })); let _ = reply_sender.send(reply); } ControlRequest::DaemonConnected => { diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 049dc7d8..c86ae7ed 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -2,8 +2,8 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, topics::{ - ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - DORA_COORDINATOR_PORT_DEFAULT, + ControlRequest, ControlRequestReply, DataflowId, DataflowList, + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, }, }; use dora_tracing::set_up_tracing; @@ -176,7 +176,7 @@ async fn running_dataflows(coordinator_events_tx: &Sender) -> eyre::Resul .await?; let result = reply.await??; let dataflows = match result { - ControlRequestReply::DataflowList { dataflows } => dataflows, + ControlRequestReply::DataflowList(DataflowList { active, .. }) => active, ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected start dataflow reply: {other:?}"), }; diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 8e90e48f..b94c7d61 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -55,6 +55,13 @@ pub enum ControlRequest { ConnectedMachines, } +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowList { + pub active: Vec, + pub finished: Vec, + pub failed: Vec, +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { Error(String), @@ -70,9 +77,7 @@ pub enum ControlRequestReply { result: Result<(), String>, }, - DataflowList { - dataflows: Vec, - }, + DataflowList(DataflowList), DestroyOk, DaemonConnected(bool), ConnectedMachines(BTreeSet), From fb2b0e6c4064a7ab5bf59a06309eb3ef26cbf628 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:44:55 +0000 Subject: [PATCH 02/15] Update dependencies --- Cargo.lock | 101 ++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 85 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74ec1d4f..45237130 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -982,7 +982,7 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "sync_wrapper", + "sync_wrapper 0.1.2", "tower", "tower-layer", "tower-service", @@ -3959,7 +3959,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.7", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -3987,19 +3987,20 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.26.0" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" +checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" dependencies = [ "futures-util", "http 1.1.0", "hyper 1.3.1", "hyper-util", - "rustls 0.22.4", + "rustls 0.23.10", "rustls-pki-types", "tokio", "tokio-rustls", "tower-service", + "webpki-roots 0.26.1", ] [[package]] @@ -6580,8 +6581,8 @@ checksum = "2e8b432585672228923edbbf64b8b12c14e1112f62e88737655b4a083dbcd78e" dependencies = [ "bytes", "pin-project-lite", - "quinn-proto", - "quinn-udp", + "quinn-proto 0.9.6", + "quinn-udp 0.3.2", "rustc-hash", "rustls 0.20.9", "thiserror", @@ -6590,6 +6591,23 @@ dependencies = [ "webpki", ] +[[package]] +name = "quinn" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto 0.11.3", + "quinn-udp 0.5.2", + "rustc-hash", + "rustls 0.23.10", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "quinn-proto" version = "0.9.6" @@ -6609,6 +6627,23 @@ dependencies = [ "webpki", ] +[[package]] +name = "quinn-proto" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" +dependencies = [ + "bytes", + "rand", + "ring 0.17.8", + "rustc-hash", + "rustls 0.23.10", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + [[package]] name = "quinn-udp" version = "0.3.2" @@ -6616,12 +6651,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "641538578b21f5e5c8ea733b736895576d0fe329bb883b937db6f4d163dbaaf4" dependencies = [ "libc", - "quinn-proto", + "quinn-proto 0.9.6", "socket2 0.4.10", "tracing", "windows-sys 0.42.0", ] +[[package]] +name = "quinn-udp" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46" +dependencies = [ + "libc", + "once_cell", + "socket2 0.5.7", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.36" @@ -7829,9 +7877,9 @@ checksum = "19b30a45b0cd0bcca8037f3d0dc3421eaf95327a17cad11964fb8179b4fc4832" [[package]] name = "reqwest" -version = "0.12.4" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" +checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" dependencies = [ "base64 0.22.1", "bytes", @@ -7850,13 +7898,14 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.22.4", + "quinn 0.11.2", + "rustls 0.23.10", "rustls-pemfile 2.1.2", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "tokio", "tokio-rustls", "tower-service", @@ -8234,6 +8283,20 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls" +version = "0.23.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" +dependencies = [ + "once_cell", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -9026,6 +9089,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "syntect" version = "5.2.0" @@ -9329,11 +9398,11 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.22.4", + "rustls 0.23.10", "rustls-pki-types", "tokio", ] @@ -11128,7 +11197,7 @@ dependencies = [ "async-trait", "futures", "log", - "quinn", + "quinn 0.9.4", "rustls 0.20.9", "rustls-native-certs", "rustls-pemfile 1.0.4", From cdb3123fa85b07d79d156aa35d1568d0470167c1 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 20 Jun 2024 16:38:58 +0200 Subject: [PATCH 03/15] Format dora list output as table --- Cargo.lock | 12 ++++++++- binaries/cli/Cargo.toml | 1 + binaries/cli/src/main.rs | 54 ++++++++++++++++++++++++++-------------- 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74ec1d4f..8c26f680 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2268,6 +2268,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml 0.9.34+deprecated", + "tabwriter", "termcolor", "tokio", "tokio-stream", @@ -9088,6 +9089,15 @@ dependencies = [ "windows 0.52.0", ] +[[package]] +name = "tabwriter" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a327282c4f64f6dc37e3bba4c2b6842cc3a992f204fa58d917696a89f691e5f6" +dependencies = [ + "unicode-width", +] + [[package]] name = "tap" version = "1.0.1" @@ -9604,7 +9614,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 15ee0424..cc910b39 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -42,3 +42,4 @@ tokio = { version = "1.20.1", features = ["full"] } tokio-stream = { version = "0.1.8", features = ["io-util", "net"] } futures = "0.3.21" duration-str = "0.5" +tabwriter = "1.4.0" diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 290a0cc9..1dcc33f9 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -15,12 +15,13 @@ use dora_tracing::set_up_tracing; use dora_tracing::set_up_tracing_opts; use duration_str::parse; use eyre::{bail, Context}; -use std::net::SocketAddr; +use std::{io::Write, net::SocketAddr}; use std::{ net::{IpAddr, Ipv4Addr}, path::PathBuf, time::Duration, }; +use tabwriter::TabWriter; use tokio::runtime::Builder; use uuid::Uuid; @@ -574,29 +575,46 @@ fn stop_dataflow_by_name( fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { let list = query_running_dataflows(session)?; - if list.active.is_empty() { - eprintln!("No dataflows are running"); - } else { - println!("Running dataflows:"); - for id in list.active { - println!("- {id}"); - } + let mut tw = TabWriter::new(vec![]); + tw.write_all(b"UUID\tName\tStatus\n")?; + + for id in list.active { + tw.write_all( + format!( + "{}\t{}\trunning\n", + id.uuid, + id.name.as_deref().unwrap_or_default() + ) + .as_bytes(), + )?; } - if !list.failed.is_empty() { - println!("Failed dataflows:"); - for id in list.failed { - println!("- {id}"); - } + for id in list.failed { + tw.write_all( + format!( + "{}\t{}\tFAILED\n", + id.uuid, + id.name.as_deref().unwrap_or_default() + ) + .as_bytes(), + )?; } - if !list.finished.is_empty() { - println!("Finished dataflows:"); - for id in list.finished { - println!("- {id}"); - } + for id in list.finished { + tw.write_all( + format!( + "{}\t{}\tfinished\n", + id.uuid, + id.name.as_deref().unwrap_or_default() + ) + .as_bytes(), + )?; } + tw.flush()?; + let formatted = String::from_utf8(tw.into_inner()?)?; + println!("{formatted}"); + Ok(()) } From 262f39fb98349ef07dfb80b9d0bededb5bba9ab9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 20 Jun 2024 16:56:25 +0200 Subject: [PATCH 04/15] Refactor: Report dataflow list as one `Vec` --- binaries/cli/src/main.rs | 58 ++++++++++----------------------- binaries/coordinator/src/lib.rs | 49 ++++++++++++++-------------- libraries/core/src/topics.rs | 27 ++++++++++++--- 3 files changed, 65 insertions(+), 69 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 1dcc33f9..1643d39d 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -329,11 +329,11 @@ fn run() -> eyre::Result<()> { let name = if uuid.is_some() { None } else { Some(dataflow) }; logs::logs(&mut *session, uuid, name, node)? } else { - let uuid = match &list.active[..] { + let active = list.get_active(); + let uuid = match &active[..] { [] => bail!("No dataflows are running"), [uuid] => uuid.clone(), - _ => inquire::Select::new("Choose dataflow to show logs:", list.active) - .prompt()?, + _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, }; logs::logs(&mut *session, Some(uuid.uuid), None, node)? } @@ -512,10 +512,11 @@ fn stop_dataflow_interactive( session: &mut TcpRequestReplyConnection, ) -> eyre::Result<()> { let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; - if list.active.is_empty() { + let active = list.get_active(); + if active.is_empty() { eprintln!("No dataflows are running"); } else { - let selection = inquire::Select::new("Choose dataflow to stop:", list.active).prompt()?; + let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?; stop_dataflow(selection.uuid, grace_duration, session)?; } @@ -577,50 +578,25 @@ fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> let mut tw = TabWriter::new(vec![]); tw.write_all(b"UUID\tName\tStatus\n")?; - - for id in list.active { - tw.write_all( - format!( - "{}\t{}\trunning\n", - id.uuid, - id.name.as_deref().unwrap_or_default() - ) - .as_bytes(), - )?; - } - - for id in list.failed { - tw.write_all( - format!( - "{}\t{}\tFAILED\n", - id.uuid, - id.name.as_deref().unwrap_or_default() - ) - .as_bytes(), - )?; - } - - for id in list.finished { - tw.write_all( - format!( - "{}\t{}\tfinished\n", - id.uuid, - id.name.as_deref().unwrap_or_default() - ) - .as_bytes(), - )?; + for entry in list.0 { + let uuid = entry.id.uuid; + let name = entry.id.name.unwrap_or_default(); + let status = match entry.status { + dora_core::topics::DataflowStatus::Running => "running", + dora_core::topics::DataflowStatus::Finished => "finished", + dora_core::topics::DataflowStatus::Failed => "FAILED", + }; + tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; } - tw.flush()?; let formatted = String::from_utf8(tw.into_inner()?)?; + println!("{formatted}"); Ok(()) } -fn query_running_dataflows( - session: &mut TcpRequestReplyConnection, -) -> Result { +fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result { let reply_raw = session .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) .wrap_err("failed to send list message")?; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 5ab19416..d00dfe22 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,7 +9,7 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{ControlRequest, ControlRequestReply, DataflowId, DataflowList}, + topics::{ControlRequest, ControlRequestReply, DataflowId, DataflowListEntry}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -458,30 +458,31 @@ async fn start_inner( let mut dataflows: Vec<_> = running_dataflows.values().collect(); dataflows.sort_by_key(|d| (&d.name, d.uuid)); - let mut finished = Vec::new(); - let mut failed = Vec::new(); - for (&uuid, results) in &dataflow_results { - let name = - archived_dataflows.get(&uuid).and_then(|d| d.name.clone()); - let id = DataflowId { uuid, name }; - if results.values().all(|r| r.is_ok()) { - finished.push(id); - } else { - failed.push(id); - } - } + let running = dataflows.into_iter().map(|d| DataflowListEntry { + id: DataflowId { + uuid: d.uuid, + name: d.name.clone(), + }, + status: dora_core::topics::DataflowStatus::Running, + }); + let finished_failed = + dataflow_results.iter().map(|(&uuid, results)| { + let name = + archived_dataflows.get(&uuid).and_then(|d| d.name.clone()); + let id = DataflowId { uuid, name }; + let status = if results.values().all(|r| r.is_ok()) { + dora_core::topics::DataflowStatus::Finished + } else { + dora_core::topics::DataflowStatus::Failed + }; + DataflowListEntry { id, status } + }); - let reply = Ok(ControlRequestReply::DataflowList(DataflowList { - active: dataflows - .into_iter() - .map(|d| DataflowId { - uuid: d.uuid, - name: d.name.clone(), - }) - .collect(), - finished, - failed, - })); + let reply = Ok(ControlRequestReply::DataflowList( + dora_core::topics::DataflowList( + running.chain(finished_failed).collect(), + ), + )); let _ = reply_sender.send(reply); } ControlRequest::DaemonConnected => { diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index b94c7d61..9677ec6b 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -56,10 +56,29 @@ pub enum ControlRequest { } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct DataflowList { - pub active: Vec, - pub finished: Vec, - pub failed: Vec, +pub struct DataflowList(pub Vec); + +impl DataflowList { + pub fn get_active(&self) -> Vec { + self.0 + .iter() + .filter(|d| d.status == DataflowStatus::Running) + .map(|d| d.id.clone()) + .collect() + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowListEntry { + pub id: DataflowId, + pub status: DataflowStatus, +} + +#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub enum DataflowStatus { + Running, + Finished, + Failed, } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] From d2e2eef99e8e79275bc8f9203ef1d93e79f95f3d Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 20 Jun 2024 16:57:19 +0200 Subject: [PATCH 05/15] Slightly adjust how status is printed --- binaries/cli/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 1643d39d..630f3f19 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -582,9 +582,9 @@ fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> let uuid = entry.id.uuid; let name = entry.id.name.unwrap_or_default(); let status = match entry.status { - dora_core::topics::DataflowStatus::Running => "running", - dora_core::topics::DataflowStatus::Finished => "finished", - dora_core::topics::DataflowStatus::Failed => "FAILED", + dora_core::topics::DataflowStatus::Running => "Running", + dora_core::topics::DataflowStatus::Finished => "Succeeded", + dora_core::topics::DataflowStatus::Failed => "Failed", }; tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; } From 24e6c9c30aec456049eb0b83b9a257d39ec59e14 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 21 Jun 2024 17:43:09 +0200 Subject: [PATCH 06/15] Add space --- libraries/core/src/topics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index cb4061e5..dd708292 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -157,7 +157,7 @@ impl std::fmt::Display for NodeError { NodeErrorCause::Other { stderr } if stderr.is_empty() => {} NodeErrorCause::Other { stderr } => { let line: &str = "---------------------------------------------------------------------------------\n"; - write!(f, "with stderr output:\n{line}[...]\n{stderr}{line}")? + write!(f, " with stderr output:\n{line}[...]\n{stderr}{line}")? }, } From 6b91c25d819343c7e23f3597a5e08769734d8a0e Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 24 Jun 2024 05:26:31 +0000 Subject: [PATCH 07/15] Update dependencies --- Cargo.lock | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 45237130..c08cddc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -885,9 +885,12 @@ dependencies = [ [[package]] name = "atomic" -version = "0.5.3" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" +checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" +dependencies = [ + "bytemuck", +] [[package]] name = "atomic-waker" @@ -9871,9 +9874,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "3ea73390fe27785838dcbf75b91b1d84799e28f1ce71e6f372a5dc2200c80de5" dependencies = [ "atomic", "getrandom", @@ -9885,9 +9888,9 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2" +checksum = "cdb394c528d1cb434c2f0522027e50c0705305caf6d20405c07a6f7e4cf9543c" dependencies = [ "proc-macro2", "quote", From ee394361e9f1cbc3b2bed3c1f613ac8482e8ec41 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 11:08:43 +0200 Subject: [PATCH 08/15] Add error cause for grace duration kills --- Cargo.lock | 13 ++++++++++++- binaries/daemon/Cargo.toml | 1 + binaries/daemon/src/lib.rs | 8 ++++++++ libraries/core/src/topics.rs | 6 ++++++ 4 files changed, 27 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index ad71dee9..0043ef5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1925,6 +1925,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -2321,6 +2331,7 @@ dependencies = [ "async-trait", "bincode", "crossbeam", + "crossbeam-skiplist", "ctrlc", "dora-arrow-convert", "dora-core", @@ -9605,7 +9616,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 274c68bd..2f1be890 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -40,3 +40,4 @@ ctrlc = "3.2.5" which = "5.0.0" sysinfo = "0.30.11" crossbeam = "0.8.4" +crossbeam-skiplist = "0.1.3" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index f31e284f..d06453b5 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1169,12 +1169,16 @@ impl Daemon { dataflow.cascading_error_causes.error_caused_by(&node_id) }) .cloned(); + let grace_duration_kill = dataflow + .map(|d| d.grace_duration_kills.contains(&node_id)) + .unwrap_or_default(); let cause = match caused_by_node { Some(caused_by_node) => { tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"); NodeErrorCause::Cascading { caused_by_node } } + None if grace_duration_kill => NodeErrorCause::GraceDuration, None => NodeErrorCause::Other { stderr: dataflow .and_then(|d| d.node_stderr_most_recent.get(&node_id)) @@ -1410,6 +1414,7 @@ pub struct RunningDataflow { /// Contains the node that caused the error for nodes that experienced a cascading error. cascading_error_causes: CascadingErrorCauses, + grace_duration_kills: Arc>, node_stderr_most_recent: BTreeMap>>, } @@ -1431,6 +1436,7 @@ impl RunningDataflow { stop_sent: false, empty_set: BTreeSet::new(), cascading_error_causes: Default::default(), + grace_duration_kills: Default::default(), node_stderr_most_recent: BTreeMap::new(), } } @@ -1494,6 +1500,7 @@ impl RunningDataflow { } let running_nodes = self.running_nodes.clone(); + let grace_duration_kills = self.grace_duration_kills.clone(); tokio::spawn(async move { let duration = grace_duration.unwrap_or(Duration::from_millis(500)); tokio::time::sleep(duration).await; @@ -1503,6 +1510,7 @@ impl RunningDataflow { for (node, node_details) in running_nodes.iter() { if let Some(pid) = node_details.pid { if let Some(process) = system.process(Pid::from(pid as usize)) { + grace_duration_kills.insert(node.clone()); process.kill(); warn!( "{node} was killed due to not stopping within the {:#?} grace period", diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index dd708292..8b587427 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -150,6 +150,10 @@ impl std::fmt::Display for NodeError { }?; match &self.cause { + NodeErrorCause::GraceDuration => write!( + f, + "\n\nThe node was killed by dora because it didn't react to a stop message in time." + )?, NodeErrorCause::Cascading { caused_by_node } => write!( f, "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." @@ -167,6 +171,8 @@ impl std::fmt::Display for NodeError { #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum NodeErrorCause { + /// Node was killed because it didn't react to a stop message in time. + GraceDuration, /// Node failed because another node failed before, Cascading { caused_by_node: NodeId, From ece3f72a39694227d75289c30daf716d2317cdd9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 11:14:56 +0200 Subject: [PATCH 09/15] Shorten grace duration error message to one line --- libraries/core/src/topics.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 8b587427..25f16c48 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -144,16 +144,17 @@ impl std::fmt::Display for NodeError { 23 => "NSIG".into(), other => other.to_string().into(), }; - write!(f, "exited because of signal {signal_str}") + if matches!(self.cause, NodeErrorCause::GraceDuration) { + write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})") + } else { + write!(f, "exited because of signal {signal_str}") + } } NodeExitStatus::Unknown => write!(f, "unknown exit status"), }?; match &self.cause { - NodeErrorCause::GraceDuration => write!( - f, - "\n\nThe node was killed by dora because it didn't react to a stop message in time." - )?, + NodeErrorCause::GraceDuration => {}, // handled above NodeErrorCause::Cascading { caused_by_node } => write!( f, "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." From f91c9e44b2fb06e88c532984d39352597be5a658 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 11:21:31 +0200 Subject: [PATCH 10/15] Make `dora start` attach by default, add `--detach` to opt-out --- binaries/cli/src/main.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index a746d0b8..25eeab23 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -117,6 +117,9 @@ enum Command { /// Attach to the dataflow and wait for its completion #[clap(long, action)] attach: bool, + /// Run the dataflow in background + #[clap(long, action)] + detach: bool, /// Enable hot reloading (Python only) #[clap(long, action)] hot_reload: bool, @@ -341,6 +344,7 @@ fn run() -> eyre::Result<()> { coordinator_addr, coordinator_port, attach, + detach, hot_reload, } => { let dataflow_descriptor = @@ -368,6 +372,16 @@ fn run() -> eyre::Result<()> { &mut *session, )?; + let attach = match (attach, detach) { + (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), + (true, false) => true, + (false, true) => false, + (false, false) => { + println!("attaching to dataflow (use `--detach` to run in background)"); + true + } + }; + if attach { attach_dataflow( dataflow_descriptor, From 6736436e150ff4a2424c27b80abc8e7bea3c354d Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 24 Jun 2024 09:32:44 +0000 Subject: [PATCH 11/15] Update dependencies --- Cargo.lock | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c08cddc0..533bf1a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2014,9 +2014,9 @@ checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991" [[package]] name = "cxx" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8194f089b6da4751d6c1da1ef37c17255df51f9346cdb160f8b096562ae4a85c" +checksum = "273dcfd3acd4e1e276af13ed2a43eea7001318823e7a726a6b3ed39b4acc0b82" dependencies = [ "cc", "cxxbridge-flags", @@ -2026,9 +2026,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e8df9a089caae66634d754672d5f909395f30f38af6ff19366980d8a8b57501" +checksum = "d8b2766fbd92be34e9ed143898fce6c572dc009de39506ed6903e5a05b68914e" dependencies = [ "cc", "codespan-reporting", @@ -2041,15 +2041,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25290be4751803672a70b98c68b51c1e7d0a640ab5a4377f240f9d2e70054cd1" +checksum = "839fcd5e43464614ffaa989eaf1c139ef1f0c51672a1ed08023307fa1b909ccd" [[package]] name = "cxxbridge-macro" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8cb317cb13604b4752416783bb25070381c36e844743e4146b7f8e55de7d140" +checksum = "4b2c1c1776b986979be68bb2285da855f8d8a35851a769fca8740df7c3d07877" dependencies = [ "proc-macro2", "quote", @@ -4583,7 +4583,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if 1.0.0", - "windows-targets 0.52.5", + "windows-targets 0.48.5", ] [[package]] @@ -6391,9 +6391,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] From 5bda3338afca32736fe0768922df51cb5049abb4 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 12:59:28 +0200 Subject: [PATCH 12/15] Only add `[...]` omission marker if stderr buffer is full --- binaries/daemon/src/lib.rs | 6 +++++- libraries/core/src/topics.rs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index d06453b5..2f030a61 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1183,7 +1183,11 @@ impl Daemon { stderr: dataflow .and_then(|d| d.node_stderr_most_recent.get(&node_id)) .map(|queue| { - let mut s = String::new(); + let mut s = if queue.is_full() { + "[...]".into() + } else { + String::new() + }; while let Some(line) = queue.pop() { s += &line; } diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 25f16c48..faebe9d0 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -162,7 +162,7 @@ impl std::fmt::Display for NodeError { NodeErrorCause::Other { stderr } if stderr.is_empty() => {} NodeErrorCause::Other { stderr } => { let line: &str = "---------------------------------------------------------------------------------\n"; - write!(f, " with stderr output:\n{line}[...]\n{stderr}{line}")? + write!(f, " with stderr output:\n{line}{stderr}{line}")? }, } From 3bf521211d40f036bbbf02fd6c02ee1ae17e0522 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 18:14:00 +0200 Subject: [PATCH 13/15] Fix CI: Use `--detach` to start dataflow in background --- .github/workflows/ci.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e5a4017c..8708ba96 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -286,12 +286,12 @@ jobs: cargo build --all dora up dora list - dora start dataflow.yml --name ci-rust-test + dora start dataflow.yml --name ci-rust-test --detach sleep 10 dora stop --name ci-rust-test --grace-duration 5s cd .. dora build examples/rust-dataflow/dataflow_dynamic.yml - dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic + dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic --detach cargo run -p rust-dataflow-example-sink-dynamic sleep 5 dora stop --name ci-rust-dynamic --grace-duration 5s @@ -315,11 +315,11 @@ jobs: cd test_python_project dora up dora list - dora start dataflow.yml --name ci-python-test + dora start dataflow.yml --name ci-python-test --detach sleep 10 dora stop --name ci-python-test --grace-duration 5s pip install "numpy<2.0.0" opencv-python - dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic + dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach python ../examples/python-dataflow/plot_dynamic.py sleep 5 dora stop --name ci-python-test --grace-duration 5s @@ -339,7 +339,7 @@ jobs: cmake -B build cmake --build build cmake --install build - dora start dataflow.yml --name ci-c-test + dora start dataflow.yml --name ci-c-test --detach sleep 10 dora stop --name ci-c-test --grace-duration 5s dora destroy @@ -358,7 +358,7 @@ jobs: cmake -B build cmake --build build cmake --install build - dora start dataflow.yml --name ci-cxx-test + dora start dataflow.yml --name ci-cxx-test --detach sleep 10 dora stop --name ci-cxx-test --grace-duration 5s dora destroy From 12049c4039822a47ac6777d1930f5fc0aa8be484 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 18:56:55 +0200 Subject: [PATCH 14/15] Fix: Add `DataflowDaemonResult::is_ok` method --- libraries/core/src/topics.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 18a48008..7933c875 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -140,6 +140,12 @@ pub struct DataflowDaemonResult { pub node_results: BTreeMap>, } +impl DataflowDaemonResult { + pub fn is_ok(&self) -> bool { + self.node_results.values().all(|r| r.is_ok()) + } +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct NodeError { pub timestamp: uhlc::Timestamp, From cf45f11257f1b699d7de052a8927d0c1cefb8cd2 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 24 Jun 2024 19:00:07 +0200 Subject: [PATCH 15/15] Fix example --- examples/multiple-daemons/run.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index c86ae7ed..7dcb1ab8 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -2,8 +2,8 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, topics::{ - ControlRequest, ControlRequestReply, DataflowId, DataflowList, - DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, + ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, }, }; use dora_tracing::set_up_tracing; @@ -176,7 +176,7 @@ async fn running_dataflows(coordinator_events_tx: &Sender) -> eyre::Resul .await?; let result = reply.await??; let dataflows = match result { - ControlRequestReply::DataflowList(DataflowList { active, .. }) => active, + ControlRequestReply::DataflowList(list) => list.get_active(), ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected start dataflow reply: {other:?}"), };