Browse Source

Implement `stop` subcommand to send `stop` message to dataflow

The message is not recognized by the nodes/operators yet. This will be done in a follow-up commit.
tags/v0.0.0-test-pr-120
Philipp Oppermann 3 years ago
parent
commit
f063af1df6
Failed to extract signature
3 changed files with 84 additions and 5 deletions
  1. +36
    -4
      binaries/cli/src/main.rs
  2. +41
    -1
      binaries/coordinator/src/lib.rs
  3. +7
    -0
      libraries/core/src/topics.rs

+ 36
- 4
binaries/cli/src/main.rs View File

@@ -1,5 +1,8 @@
use clap::Parser;
use dora_core::topics::{StartDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START};
use dora_core::topics::{
StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START,
ZENOH_CONTROL_STOP,
};
use eyre::{bail, eyre, Context};
use std::{io::Write, path::PathBuf, sync::Arc};
use tempfile::NamedTempFile;
@@ -46,7 +49,9 @@ enum Command {
Start {
dataflow: PathBuf,
},
Stop,
Stop {
uuid: String,
},
Logs,
Metrics,
Stats,
@@ -126,7 +131,7 @@ fn main() -> eyre::Result<()> {
Command::Dashboard => todo!(),
Command::Up => todo!(),
Command::Start { dataflow } => start_dataflow(dataflow, &mut session)?,
Command::Stop => todo!(),
Command::Stop { uuid } => stop_dataflow(uuid, &mut session)?,
Command::Destroy => destroy(&mut session)?,
Command::Logs => todo!(),
Command::Metrics => todo!(),
@@ -165,13 +170,40 @@ fn start_dataflow(
serde_json::from_slice(&raw).wrap_err("failed to parse reply")?;
match result {
StartDataflowResult::Ok { uuid } => {
println!("Started dataflow under ID `{uuid}`");
println!("Started dataflow with UUID `{uuid}`");
Ok(())
}
StartDataflowResult::Error(err) => bail!(err),
}
}

fn stop_dataflow(
uuid: String,
session: &mut Option<Arc<zenoh::Session>>,
) -> Result<(), eyre::ErrReport> {
let reply_receiver = zenoh_control_session(session)?
.get(Selector {
key_selector: ZENOH_CONTROL_STOP.into(),
value_selector: uuid.as_str().into(),
})
.wait()
.map_err(|err| eyre!(err))
.wrap_err("failed to create publisher for start dataflow message")?;
let reply = reply_receiver
.recv()
.wrap_err("failed to receive reply from coordinator")?;
let raw = reply.sample.value.payload.contiguous();
let result: StopDataflowResult =
serde_json::from_slice(&raw).wrap_err("failed to parse reply")?;
match result {
StopDataflowResult::Ok => {
println!("Stopped dataflow with UUID `{uuid}`");
Ok(())
}
StopDataflowResult::Error(err) => bail!(err),
}
}

fn destroy(session: &mut Option<Arc<zenoh::Session>>) -> Result<(), eyre::ErrReport> {
let reply_receiver = zenoh_control_session(session)?
.get(ZENOH_CONTROL_DESTROY)


+ 41
- 1
binaries/coordinator/src/lib.rs View File

@@ -108,7 +108,47 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> {
StartDataflowResult::Error(format!("{err:?}"))
}
};
let _ = query
query
.reply_async(Sample::new("", serde_json::to_string(&reply).unwrap()))
.await;
}
ZENOH_CONTROL_STOP => {
let stop = async {
let uuid =
Uuid::parse_str(query.value_selector()).wrap_err("not a valid UUID")?;
let communication_config = match running_dataflows.get(&uuid) {
Some(config) => config.clone(),
None => bail!("No running dataflow found with UUID `{uuid}`"),
};

let mut communication = tokio::task::spawn_blocking(move || {
communication::init(&communication_config)
})
.await
.wrap_err("failed to join communication layer init task")?
.wrap_err("failed to init communication layer")?;

tracing::info!("sending stop message to dataflow `{uuid}`");

tokio::task::spawn_blocking(move || {
let topic = format!("dora/stoop");
let metadata = dora_message::Metadata::default();
let data = metadata.serialize().unwrap();
communication.publisher(&topic)?.publish(&data)
})
.await
.wrap_err("failed to join stop publish task")?
.map_err(|err| eyre!(err))
.wrap_err("failed to send stop message")?;

Result::<_, eyre::Report>::Ok(())
};
let reply = match stop.await {
Ok(()) => StopDataflowResult::Ok,
Err(err) => StopDataflowResult::Error(format!("{err:?}")),
};

query
.reply_async(Sample::new("", serde_json::to_string(&reply).unwrap()))
.await;
}


+ 7
- 0
libraries/core/src/topics.rs View File

@@ -1,5 +1,6 @@
pub const ZENOH_CONTROL_QUERYABLE: &str = "dora_control/*";
pub const ZENOH_CONTROL_START: &str = "dora_control/start";
pub const ZENOH_CONTROL_STOP: &str = "dora_control/stop";
pub const ZENOH_CONTROL_DESTROY: &str = "dora_control/destroy";

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -7,3 +8,9 @@ pub enum StartDataflowResult {
Ok { uuid: String },
Error(String),
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum StopDataflowResult {
Ok,
Error(String),
}

Loading…
Cancel
Save