Browse Source

Implement an interactive `dora stop` command

Queries the list of running dataflows from the coordinator and prompts the user to choose one.
tags/v0.1.0^2
Philipp Oppermann 3 years ago
parent
commit
94ecacdfee
Failed to extract signature
4 changed files with 117 additions and 16 deletions
  1. +70
    -2
      Cargo.lock
  2. +1
    -0
      binaries/cli/Cargo.toml
  3. +44
    -7
      binaries/cli/src/main.rs
  4. +2
    -7
      binaries/coordinator/src/lib.rs

+ 70
- 2
Cargo.lock View File

@@ -682,6 +682,31 @@ dependencies = [
"lazy_static",
]

[[package]]
name = "crossterm"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67"
dependencies = [
"bitflags",
"crossterm_winapi",
"libc",
"mio",
"parking_lot",
"signal-hook",
"signal-hook-mio",
"winapi",
]

[[package]]
name = "crossterm_winapi"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c"
dependencies = [
"winapi",
]

[[package]]
name = "crypto-bigint"
version = "0.2.11"
@@ -865,6 +890,7 @@ dependencies = [
"clap 4.0.3",
"dora-core",
"eyre",
"inquire",
"serde",
"serde_json",
"serde_yaml 0.9.11",
@@ -1111,6 +1137,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453440c271cf5577fd2a40e4942540cb7d0d2f85e27c8d07dd0023c925a67541"

[[package]]
name = "dyn-clone"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f94fa09c2aeea5b8839e414b7b841bf429fd25b9c522116ac97ee87856d88b2"

[[package]]
name = "either"
version = "1.6.1"
@@ -1706,6 +1738,22 @@ dependencies = [
"unindent",
]

[[package]]
name = "inquire"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6055ce38cac9b10ac819ed4a509d92ccbc60808152c19ff9121c98198964272"
dependencies = [
"bitflags",
"crossterm",
"dyn-clone",
"lazy_static",
"newline-converter",
"thiserror",
"unicode-segmentation",
"unicode-width",
]

[[package]]
name = "instant"
version = "0.1.12"
@@ -2166,6 +2214,15 @@ dependencies = [
"jni-sys",
]

[[package]]
name = "newline-converter"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f71d09d5c87634207f894c6b31b6a2b2c64ea3bdcf71bd5599fdbbe1600c00f"
dependencies = [
"unicode-segmentation",
]

[[package]]
name = "nix"
version = "0.22.3"
@@ -3526,6 +3583,17 @@ dependencies = [
"signal-hook-registry",
]

[[package]]
name = "signal-hook-mio"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
dependencies = [
"libc",
"mio",
"signal-hook",
]

[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@@ -4096,9 +4164,9 @@ dependencies = [

[[package]]
name = "unicode-segmentation"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99"
checksum = "0fdbf052a0783de01e944a6ce7a8cb939e295b1e7be835a1112c3b9a7f047a5a"

[[package]]
name = "unicode-width"


+ 1
- 0
binaries/cli/Cargo.toml View File

@@ -23,3 +23,4 @@ termcolor = "1.1.3"
atty = "0.2.14"
uuid = { version = "1.2.1", features = ["v4"] }
sysinfo = "0.26.6"
inquire = "0.5.2"

+ 44
- 7
binaries/cli/src/main.rs View File

@@ -5,6 +5,7 @@ use dora_core::topics::{
};
use eyre::{bail, eyre, Context};
use std::{ops::Deref, path::PathBuf, sync::Arc};
use uuid::Uuid;
use zenoh::{
prelude::{Receiver, Selector, SplitBuffer},
sync::ZFuture,
@@ -62,7 +63,7 @@ enum Command {
dataflow: PathBuf,
},
Stop {
uuid: String,
uuid: Option<String>,
},
Logs,
Metrics,
@@ -133,7 +134,10 @@ fn main() -> eyre::Result<()> {
)?,
Command::Start { dataflow } => start_dataflow(dataflow, &mut session)?,
Command::List => list(&mut session)?,
Command::Stop { uuid } => stop_dataflow(uuid, &mut session)?,
Command::Stop { uuid } => match uuid {
Some(uuid) => stop_dataflow(uuid.parse().wrap_err("not a valid UUID")?, &mut session)?,
None => stop_dataflow_interactive(&mut session)?,
},
Command::Destroy { config } => up::destroy(config.as_deref(), &mut session)?,
Command::Logs => todo!(),
Command::Metrics => todo!(),
@@ -178,14 +182,26 @@ fn start_dataflow(
}
}

fn stop_dataflow_interactive(session: &mut Option<Arc<zenoh::Session>>) -> eyre::Result<()> {
let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
if uuids.is_empty() {
eprintln!("No dataflows are running");
} else {
let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?;
stop_dataflow(selection, session)?;
}

Ok(())
}

fn stop_dataflow(
uuid: String,
uuid: Uuid,
session: &mut Option<Arc<zenoh::Session>>,
) -> Result<(), eyre::ErrReport> {
let reply_receiver = zenoh_control_session(session)?
.get(Selector {
key_selector: ZENOH_CONTROL_STOP.into(),
value_selector: uuid.as_str().into(),
value_selector: uuid.to_string().into(),
})
.wait()
.map_err(|err| eyre!(err))
@@ -202,7 +218,24 @@ fn stop_dataflow(
}
}

fn list(session: &mut Option<Arc<zenoh::Session>>) -> Result<(), eyre::ErrReport> {
fn list(session: &mut Option<Arc<zenoh::Session>>) -> eyre::Result<()> {
let uuids = query_running_dataflows(session)?;

if uuids.is_empty() {
eprintln!("No dataflows are running");
} else {
println!("Running dataflows:");
for uuid in uuids {
println!("- {uuid}");
}
}

Ok(())
}

fn query_running_dataflows(
session: &mut Option<Arc<zenoh::Session>>,
) -> Result<Vec<Uuid>, eyre::ErrReport> {
let reply_receiver = zenoh_control_session(session)?
.get(ZENOH_CONTROL_LIST)
.wait()
@@ -213,9 +246,13 @@ fn list(session: &mut Option<Arc<zenoh::Session>>) -> Result<(), eyre::ErrReport
.wrap_err("failed to receive reply from coordinator")?;
let raw = reply.sample.value.payload.contiguous();
let reply_string = std::str::from_utf8(raw.deref()).wrap_err("reply is not valid UTF8")?;
println!("{reply_string}");
let uuids = reply_string
.lines()
.map(Uuid::try_from)
.collect::<Result<_, _>>()
.wrap_err("failed to parse UUIDs returned by coordinator")?;

Ok(())
Ok(uuids)
}

fn zenoh_control_session(


+ 2
- 7
binaries/coordinator/src/lib.rs View File

@@ -151,13 +151,8 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> {
ZENOH_CONTROL_LIST => {
let mut output = String::new();

if running_dataflows.is_empty() {
writeln!(output, "No running dataflows")?;
} else {
writeln!(output, "Running dataflows:")?;
for uuid in running_dataflows.keys() {
writeln!(output, "- {uuid}")?;
}
for uuid in running_dataflows.keys() {
writeln!(output, "{uuid}")?;
}

query.reply_async(Sample::new("", output)).await;


Loading…
Cancel
Save