From 94ecacdfee11ac4fe1a7baf41f0a00df128856e0 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 11 Nov 2022 10:40:28 +0100 Subject: [PATCH] Implement an interactive `dora stop` command Queries the list of running dataflows from the coordinator and prompts the user to choose one. --- Cargo.lock | 72 ++++++++++++++++++++++++++++++++- binaries/cli/Cargo.toml | 1 + binaries/cli/src/main.rs | 51 +++++++++++++++++++---- binaries/coordinator/src/lib.rs | 9 +---- 4 files changed, 117 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e8c49ca5..18797814 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -682,6 +682,31 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "crossterm" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67" +dependencies = [ + "bitflags", + "crossterm_winapi", + "libc", + "mio", + "parking_lot", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c" +dependencies = [ + "winapi", +] + [[package]] name = "crypto-bigint" version = "0.2.11" @@ -865,6 +890,7 @@ dependencies = [ "clap 4.0.3", "dora-core", "eyre", + "inquire", "serde", "serde_json", "serde_yaml 0.9.11", @@ -1111,6 +1137,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453440c271cf5577fd2a40e4942540cb7d0d2f85e27c8d07dd0023c925a67541" +[[package]] +name = "dyn-clone" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f94fa09c2aeea5b8839e414b7b841bf429fd25b9c522116ac97ee87856d88b2" + [[package]] name = "either" version = "1.6.1" @@ -1706,6 +1738,22 @@ dependencies = [ "unindent", ] +[[package]] +name = "inquire" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6055ce38cac9b10ac819ed4a509d92ccbc60808152c19ff9121c98198964272" +dependencies = [ + "bitflags", + "crossterm", + "dyn-clone", + "lazy_static", + "newline-converter", + "thiserror", + "unicode-segmentation", + "unicode-width", +] + [[package]] name = "instant" version = "0.1.12" @@ -2166,6 +2214,15 @@ dependencies = [ "jni-sys", ] +[[package]] +name = "newline-converter" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f71d09d5c87634207f894c6b31b6a2b2c64ea3bdcf71bd5599fdbbe1600c00f" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "nix" version = "0.22.3" @@ -3526,6 +3583,17 @@ dependencies = [ "signal-hook-registry", ] +[[package]] +name = "signal-hook-mio" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -4096,9 +4164,9 @@ dependencies = [ [[package]] name = "unicode-segmentation" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99" +checksum = "0fdbf052a0783de01e944a6ce7a8cb939e295b1e7be835a1112c3b9a7f047a5a" [[package]] name = "unicode-width" diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 1b9c79ca..e5757088 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -23,3 +23,4 @@ termcolor = "1.1.3" atty = "0.2.14" uuid = { version = "1.2.1", features = ["v4"] } sysinfo = "0.26.6" +inquire = "0.5.2" diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 429ed78c..deae4f74 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -5,6 +5,7 @@ use dora_core::topics::{ }; use eyre::{bail, eyre, Context}; use std::{ops::Deref, path::PathBuf, sync::Arc}; +use uuid::Uuid; use zenoh::{ prelude::{Receiver, Selector, SplitBuffer}, sync::ZFuture, @@ -62,7 +63,7 @@ enum Command { dataflow: PathBuf, }, Stop { - uuid: String, + uuid: Option, }, Logs, Metrics, @@ -133,7 +134,10 @@ fn main() -> eyre::Result<()> { )?, Command::Start { dataflow } => start_dataflow(dataflow, &mut session)?, Command::List => list(&mut session)?, - Command::Stop { uuid } => stop_dataflow(uuid, &mut session)?, + Command::Stop { uuid } => match uuid { + Some(uuid) => stop_dataflow(uuid.parse().wrap_err("not a valid UUID")?, &mut session)?, + None => stop_dataflow_interactive(&mut session)?, + }, Command::Destroy { config } => up::destroy(config.as_deref(), &mut session)?, Command::Logs => todo!(), Command::Metrics => todo!(), @@ -178,14 +182,26 @@ fn start_dataflow( } } +fn stop_dataflow_interactive(session: &mut Option>) -> eyre::Result<()> { + let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; + if uuids.is_empty() { + eprintln!("No dataflows are running"); + } else { + let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?; + stop_dataflow(selection, session)?; + } + + Ok(()) +} + fn stop_dataflow( - uuid: String, + uuid: Uuid, session: &mut Option>, ) -> Result<(), eyre::ErrReport> { let reply_receiver = zenoh_control_session(session)? .get(Selector { key_selector: ZENOH_CONTROL_STOP.into(), - value_selector: uuid.as_str().into(), + value_selector: uuid.to_string().into(), }) .wait() .map_err(|err| eyre!(err)) @@ -202,7 +218,24 @@ fn stop_dataflow( } } -fn list(session: &mut Option>) -> Result<(), eyre::ErrReport> { +fn list(session: &mut Option>) -> eyre::Result<()> { + let uuids = query_running_dataflows(session)?; + + if uuids.is_empty() { + eprintln!("No dataflows are running"); + } else { + println!("Running dataflows:"); + for uuid in uuids { + println!("- {uuid}"); + } + } + + Ok(()) +} + +fn query_running_dataflows( + session: &mut Option>, +) -> Result, eyre::ErrReport> { let reply_receiver = zenoh_control_session(session)? .get(ZENOH_CONTROL_LIST) .wait() @@ -213,9 +246,13 @@ fn list(session: &mut Option>) -> Result<(), eyre::ErrReport .wrap_err("failed to receive reply from coordinator")?; let raw = reply.sample.value.payload.contiguous(); let reply_string = std::str::from_utf8(raw.deref()).wrap_err("reply is not valid UTF8")?; - println!("{reply_string}"); + let uuids = reply_string + .lines() + .map(Uuid::try_from) + .collect::>() + .wrap_err("failed to parse UUIDs returned by coordinator")?; - Ok(()) + Ok(uuids) } fn zenoh_control_session( diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 445c50dd..d2ee667f 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -151,13 +151,8 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { ZENOH_CONTROL_LIST => { let mut output = String::new(); - if running_dataflows.is_empty() { - writeln!(output, "No running dataflows")?; - } else { - writeln!(output, "Running dataflows:")?; - for uuid in running_dataflows.keys() { - writeln!(output, "- {uuid}")?; - } + for uuid in running_dataflows.keys() { + writeln!(output, "{uuid}")?; } query.reply_async(Sample::new("", output)).await;