diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 1063a5e8..d687d3cd 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,5 +1,8 @@ use clap::Parser; -use dora_core::topics::{StartDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START}; +use dora_core::topics::{ + StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START, + ZENOH_CONTROL_STOP, +}; use eyre::{bail, eyre, Context}; use std::{io::Write, path::PathBuf, sync::Arc}; use tempfile::NamedTempFile; @@ -46,7 +49,9 @@ enum Command { Start { dataflow: PathBuf, }, - Stop, + Stop { + uuid: String, + }, Logs, Metrics, Stats, @@ -126,7 +131,7 @@ fn main() -> eyre::Result<()> { Command::Dashboard => todo!(), Command::Up => todo!(), Command::Start { dataflow } => start_dataflow(dataflow, &mut session)?, - Command::Stop => todo!(), + Command::Stop { uuid } => stop_dataflow(uuid, &mut session)?, Command::Destroy => destroy(&mut session)?, Command::Logs => todo!(), Command::Metrics => todo!(), @@ -165,13 +170,40 @@ fn start_dataflow( serde_json::from_slice(&raw).wrap_err("failed to parse reply")?; match result { StartDataflowResult::Ok { uuid } => { - println!("Started dataflow under ID `{uuid}`"); + println!("Started dataflow with UUID `{uuid}`"); Ok(()) } StartDataflowResult::Error(err) => bail!(err), } } +fn stop_dataflow( + uuid: String, + session: &mut Option>, +) -> Result<(), eyre::ErrReport> { + let reply_receiver = zenoh_control_session(session)? + .get(Selector { + key_selector: ZENOH_CONTROL_STOP.into(), + value_selector: uuid.as_str().into(), + }) + .wait() + .map_err(|err| eyre!(err)) + .wrap_err("failed to create publisher for start dataflow message")?; + let reply = reply_receiver + .recv() + .wrap_err("failed to receive reply from coordinator")?; + let raw = reply.sample.value.payload.contiguous(); + let result: StopDataflowResult = + serde_json::from_slice(&raw).wrap_err("failed to parse reply")?; + match result { + StopDataflowResult::Ok => { + println!("Stopped dataflow with UUID `{uuid}`"); + Ok(()) + } + StopDataflowResult::Error(err) => bail!(err), + } +} + fn destroy(session: &mut Option>) -> Result<(), eyre::ErrReport> { let reply_receiver = zenoh_control_session(session)? .get(ZENOH_CONTROL_DESTROY) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index c7f4d080..8e59fe72 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -108,7 +108,47 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { StartDataflowResult::Error(format!("{err:?}")) } }; - let _ = query + query + .reply_async(Sample::new("", serde_json::to_string(&reply).unwrap())) + .await; + } + ZENOH_CONTROL_STOP => { + let stop = async { + let uuid = + Uuid::parse_str(query.value_selector()).wrap_err("not a valid UUID")?; + let communication_config = match running_dataflows.get(&uuid) { + Some(config) => config.clone(), + None => bail!("No running dataflow found with UUID `{uuid}`"), + }; + + let mut communication = tokio::task::spawn_blocking(move || { + communication::init(&communication_config) + }) + .await + .wrap_err("failed to join communication layer init task")? + .wrap_err("failed to init communication layer")?; + + tracing::info!("sending stop message to dataflow `{uuid}`"); + + tokio::task::spawn_blocking(move || { + let topic = format!("dora/stoop"); + let metadata = dora_message::Metadata::default(); + let data = metadata.serialize().unwrap(); + communication.publisher(&topic)?.publish(&data) + }) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre!(err)) + .wrap_err("failed to send stop message")?; + + Result::<_, eyre::Report>::Ok(()) + }; + let reply = match stop.await { + Ok(()) => StopDataflowResult::Ok, + Err(err) => StopDataflowResult::Error(format!("{err:?}")), + }; + + query .reply_async(Sample::new("", serde_json::to_string(&reply).unwrap())) .await; } diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 587bcb3e..66d4d449 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,5 +1,6 @@ pub const ZENOH_CONTROL_QUERYABLE: &str = "dora_control/*"; pub const ZENOH_CONTROL_START: &str = "dora_control/start"; +pub const ZENOH_CONTROL_STOP: &str = "dora_control/stop"; pub const ZENOH_CONTROL_DESTROY: &str = "dora_control/destroy"; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -7,3 +8,9 @@ pub enum StartDataflowResult { Ok { uuid: String }, Error(String), } + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum StopDataflowResult { + Ok, + Error(String), +}