From 5d89f81d86c5d4a6527173a2b2121718b4aca9da Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 14 Oct 2022 17:19:38 +0200 Subject: [PATCH 1/8] Refactor control API as zenoh queries Allows replying to requests. We use this to send the dataflow UUID back to the CLI after starting it. --- Cargo.lock | 10 ++- binaries/cli/Cargo.toml | 5 +- binaries/cli/src/main.rs | 98 ++++++++++++++--------- binaries/coordinator/Cargo.toml | 5 +- binaries/coordinator/src/control.rs | 120 ++++------------------------ binaries/coordinator/src/lib.rs | 116 ++++++++++++++++----------- binaries/coordinator/src/run/mod.rs | 46 ++++++----- libraries/core/src/topics.rs | 12 ++- 8 files changed, 191 insertions(+), 221 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d2628b5e..86d21a2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -886,12 +886,13 @@ name = "dora-cli" version = "0.1.0" dependencies = [ "clap 4.0.3", - "communication-layer-pub-sub", "dora-core", "eyre", + "serde_json", "serde_yaml 0.9.11", "tempfile", "webbrowser", + "zenoh", ] [[package]] @@ -900,7 +901,6 @@ version = "0.1.0" dependencies = [ "bincode", "clap 3.2.20", - "communication-layer-pub-sub", "dora-core", "dora-message", "dora-node-api", @@ -909,6 +909,7 @@ dependencies = [ "futures-concurrency 5.0.1", "rand", "serde", + "serde_json", "serde_yaml 0.8.23", "time", "tokio", @@ -917,6 +918,7 @@ dependencies = [ "tracing", "tracing-subscriber", "uuid 0.8.2", + "zenoh", ] [[package]] @@ -3234,9 +3236,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.79" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +checksum = "41feea4228a6f1cd09ec7a3593a682276702cd67b5273544757dae23c096f074" dependencies = [ "itoa", "ryu", diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index befc7c7b..abd7ca4b 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -16,6 +16,5 @@ dora-core = { path = "../../libraries/core" } serde_yaml = "0.9.11" tempfile = "3.3.0" webbrowser = "0.8.0" -communication-layer-pub-sub = { path = "../../libraries/communication-layer", default-features = false, features = [ - "zenoh", -] } +zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" } +serde_json = "1.0.86" diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index d1049f63..1063a5e8 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,11 +1,12 @@ use clap::Parser; -use communication_layer_pub_sub::{zenoh::ZenohCommunicationLayer, CommunicationLayer}; -use dora_core::topics::{ - ZENOH_CONTROL_PREFIX, ZENOH_CONTROL_START_DATAFLOW, ZENOH_CONTROL_STOP_ALL, -}; -use eyre::{eyre, Context}; -use std::{io::Write, path::PathBuf}; +use dora_core::topics::{StartDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START}; +use eyre::{bail, eyre, Context}; +use std::{io::Write, path::PathBuf, sync::Arc}; use tempfile::NamedTempFile; +use zenoh::{ + prelude::{Receiver, Selector, SplitBuffer}, + sync::ZFuture, +}; mod build; mod check; @@ -124,34 +125,9 @@ fn main() -> eyre::Result<()> { Command::New { args } => template::create(args)?, Command::Dashboard => todo!(), Command::Up => todo!(), - Command::Start { dataflow } => { - let canonicalized = dataflow - .canonicalize() - .wrap_err("given dataflow file does not exist")?; - let path = &canonicalized - .to_str() - .ok_or_else(|| eyre!("dataflow path must be valid UTF-8"))?; - - let publisher = zenoh_control_session(&mut session)? - .publisher(ZENOH_CONTROL_START_DATAFLOW) - .map_err(|err| eyre!(err)) - .wrap_err("failed to create publisher for start dataflow message")?; - publisher - .publish(path.as_bytes()) - .map_err(|err| eyre!(err)) - .wrap_err("failed to publish start dataflow message")?; - } + Command::Start { dataflow } => start_dataflow(dataflow, &mut session)?, Command::Stop => todo!(), - Command::Destroy => { - let publisher = zenoh_control_session(&mut session)? - .publisher(ZENOH_CONTROL_STOP_ALL) - .map_err(|err| eyre!(err)) - .wrap_err("failed to create publisher for stop message")?; - publisher - .publish(&[]) - .map_err(|err| eyre!(err)) - .wrap_err("failed to publish stop message")?; - } + Command::Destroy => destroy(&mut session)?, Command::Logs => todo!(), Command::Metrics => todo!(), Command::Stats => todo!(), @@ -163,15 +139,61 @@ fn main() -> eyre::Result<()> { Ok(()) } +fn start_dataflow( + dataflow: PathBuf, + session: &mut Option>, +) -> Result<(), eyre::ErrReport> { + let canonicalized = dataflow + .canonicalize() + .wrap_err("given dataflow file does not exist")?; + let path = canonicalized + .to_str() + .ok_or_else(|| eyre!("dataflow path must be valid UTF-8"))?; + let reply_receiver = zenoh_control_session(session)? + .get(Selector { + key_selector: ZENOH_CONTROL_START.into(), + value_selector: path.into(), + }) + .wait() + .map_err(|err| eyre!(err)) + .wrap_err("failed to create publisher for start dataflow message")?; + let reply = reply_receiver + .recv() + .wrap_err("failed to receive reply from coordinator")?; + let raw = reply.sample.value.payload.contiguous(); + let result: StartDataflowResult = + serde_json::from_slice(&raw).wrap_err("failed to parse reply")?; + match result { + StartDataflowResult::Ok { uuid } => { + println!("Started dataflow under ID `{uuid}`"); + Ok(()) + } + StartDataflowResult::Error(err) => bail!(err), + } +} + +fn destroy(session: &mut Option>) -> Result<(), eyre::ErrReport> { + let reply_receiver = zenoh_control_session(session)? + .get(ZENOH_CONTROL_DESTROY) + .wait() + .map_err(|err| eyre!(err)) + .wrap_err("failed to create publisher for start dataflow message")?; + reply_receiver + .recv() + .wrap_err("failed to receive reply from coordinator")?; + Ok(()) +} + fn zenoh_control_session( - session: &mut Option, -) -> eyre::Result<&mut ZenohCommunicationLayer> { + session: &mut Option>, +) -> eyre::Result<&Arc> { Ok(match session { Some(session) => session, None => session.insert( - ZenohCommunicationLayer::init(Default::default(), ZENOH_CONTROL_PREFIX.into()) - .map_err(|err| eyre!(err)) - .wrap_err("failed to open zenoh control session")?, + zenoh::open(zenoh::config::Config::default()) + .wait() + .map_err(|err| eyre!(err))? + .into_arc(), ), }) } diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 04731d68..a8eba7db 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -25,6 +25,5 @@ dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" futures-concurrency = "5.0.1" -communication-layer-pub-sub = { path = "../../libraries/communication-layer", default-features = false, features = [ - "zenoh", -] } +zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" } +serde_json = "1.0.86" diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index 9dd28029..db530a40 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -1,112 +1,20 @@ use crate::Event; -use communication_layer_pub_sub::{CommunicationLayer, Subscriber}; -use dora_core::topics::{ - ZENOH_CONTROL_PREFIX, ZENOH_CONTROL_START_DATAFLOW, ZENOH_CONTROL_STOP_ALL, -}; -use eyre::{eyre, WrapErr}; +use dora_core::topics::ZENOH_CONTROL_QUERYABLE; +use eyre::eyre; use futures::{Stream, StreamExt}; -use std::path::Path; -use tokio_stream::wrappers::ReceiverStream; +use futures_concurrency::stream::IntoStream; +use zenoh::{prelude::EntityFactory, sync::ZFuture}; -pub(crate) fn control_events() -> impl Stream { - let (tx, rx) = tokio::sync::mpsc::channel(2); +pub(crate) async fn control_events() -> eyre::Result> { + let zenoh = zenoh::open(zenoh::config::Config::default()) + .wait() + .map_err(|err| eyre!(err))? + .into_arc(); - tokio::task::spawn_blocking(move || { - let result = subscribe_control_sync(tx.clone()); - match result { - Ok(()) => {} - Err(error) => { - let _ = tx.blocking_send(Event::ControlChannelError(error)); - } - } - }); + let queryable = zenoh + .queryable(ZENOH_CONTROL_QUERYABLE) + .wait() + .map_err(|err| eyre!(err))?; - ReceiverStream::new(rx).chain(futures::stream::iter(std::iter::from_fn(|| { - tracing::info!("control channel closed"); - None - }))) -} - -fn subscribe_control_sync(tx: tokio::sync::mpsc::Sender) -> eyre::Result<()> { - let mut zenoh_control_session = - communication_layer_pub_sub::zenoh::ZenohCommunicationLayer::init( - Default::default(), - ZENOH_CONTROL_PREFIX.into(), - ) - .map_err(|err| eyre!(err)) - .wrap_err("failed to open zenoh control session")?; - - let start_dataflow = zenoh_control_session - .subscribe(ZENOH_CONTROL_START_DATAFLOW) - .map_err(|err| eyre!(err)) - .wrap_err("failed to subscribe to start dataflow topic")?; - let start_tx = tx.downgrade(); - let _start_dataflow_thread = - std::thread::spawn(move || start_dataflow_handler(start_dataflow, start_tx)); - - let stop_tx = tx; - let stop = zenoh_control_session - .subscribe(ZENOH_CONTROL_STOP_ALL) - .map_err(|err| eyre!(err)) - .wrap_err("failed to subscribe to stop all topic")?; - let _stop_thread = tokio::task::spawn_blocking(move || { - stop_handler(stop, stop_tx); - }); - - Ok(()) -} - -fn stop_handler(mut stop: Box, stop_tx: tokio::sync::mpsc::Sender) { - let send_result = match stop.recv().map_err(|err| eyre!(err)) { - Ok(_) => stop_tx.blocking_send(Event::Stop), - Err(err) => stop_tx.blocking_send(Event::ControlChannelError( - err.wrap_err("failed to receive on control channel"), - )), - }; - let _ = send_result; -} - -fn start_dataflow_handler( - mut start_dataflow: Box, - start_tx: tokio::sync::mpsc::WeakSender, -) { - loop { - let recv_result = start_dataflow.recv(); - let start_tx = match start_tx.upgrade() { - Some(tx) => tx, - None => { - // control channel was closed after receiving stop message - break; - } - }; - let message = match recv_result { - Ok(Some(message)) => message, - Ok(None) => break, - Err(err) => { - let send_result = start_tx.blocking_send(Event::ControlChannelError( - eyre!(err).wrap_err("failed to receive on start_dataflow topic"), - )); - match send_result { - Ok(()) => continue, - Err(_) => break, - } - } - }; - let data = message.get(); - let path = match std::str::from_utf8(&data) { - Ok(path) => Path::new(path), - Err(err) => { - let send_result = start_tx.blocking_send(Event::ParseError( - eyre!(err).wrap_err("failed to parse start_dataflow message"), - )); - match send_result { - Ok(()) => continue, - Err(_) => break, - } - } - }; - let _ = start_tx.blocking_send(Event::StartDataflow { - path: path.to_owned(), - }); - } + Ok(queryable.into_stream().map(Event::Control)) } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index ed366765..855ded1b 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -1,8 +1,12 @@ -use eyre::WrapErr; +use crate::run::spawn_dataflow; +use dora_core::topics::{StartDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START}; +use eyre::{bail, WrapErr}; use futures::StreamExt; use futures_concurrency::stream::Merge; use std::path::{Path, PathBuf}; use tokio_stream::wrappers::ReceiverStream; +use uuid::Uuid; +use zenoh::prelude::Sample; mod control; mod run; @@ -33,7 +37,7 @@ pub async fn run(args: Args) -> eyre::Result<()> { match run_dataflow { Some(path) => { // start the given dataflow directly - run::run_dataflow(path.clone(), &runtime_path) + run::run_dataflow(&path, &runtime_path) .await .wrap_err_with(|| format!("failed to run dataflow at {}", path.display()))?; } @@ -51,58 +55,52 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { let mut dataflow_errors_tx = Some(dataflow_errors_tx); let dataflow_error_events = ReceiverStream::new(dataflow_errors).map(Event::DataflowError); - let control_events = control::control_events(); + let (control_events, control_events_abort) = futures::stream::abortable( + control::control_events() + .await + .wrap_err("failed to create control events")?, + ); let mut events = (dataflow_error_events, control_events).merge(); while let Some(event) = events.next().await { - let mut stop = false; match event { Event::DataflowError(err) => { tracing::error!("{err:?}"); } - Event::ParseError(err) => { - let err = err.wrap_err("failed to parse message"); - tracing::error!("{err:?}"); - } - Event::ControlChannelError(err) => { - let err = err.wrap_err("Stopping because of fatal control channel error"); - tracing::error!("{err:?}"); - stop = true; - } - Event::StartDataflow { path } => { - let runtime_path = runtime_path.to_owned(); - let dataflow_errors_tx = match &dataflow_errors_tx { - Some(channel) => channel.clone(), - None => { - tracing::error!("cannot start new dataflow after receiving stop command"); - continue; - } - }; - let task = async move { - let result = run::run_dataflow(path.clone(), &runtime_path) - .await - .wrap_err_with(|| format!("failed to run dataflow at {}", path.display())); - match result { - Ok(()) => {} + Event::Control(query) => match query.key_selector().as_str() { + ZENOH_CONTROL_START => { + let dataflow_path = query.value_selector(); + let result = + start_dataflow(Path::new(dataflow_path), runtime_path, &dataflow_errors_tx) + .await; + let reply = match result { + Ok(uuid) => StartDataflowResult::Ok { + uuid: uuid.to_string(), + }, Err(err) => { - let _ = dataflow_errors_tx.send(err).await; + tracing::error!("{err:?}"); + StartDataflowResult::Error(format!("{err:?}")) } - } - }; - tokio::spawn(task); - } - Event::Stop => { - tracing::info!("Received stop command"); - stop = true; - } - } + }; + let _ = query + .reply_async(Sample::new("", serde_json::to_string(&reply).unwrap())) + .await; + } + ZENOH_CONTROL_DESTROY => { + tracing::info!("Received stop command"); + + control_events_abort.abort(); - if stop { - tracing::info!("stopping..."); + // ensure that no new dataflows can be started + dataflow_errors_tx = None; - // ensure that no new dataflows can be started - dataflow_errors_tx = None; + query.reply_async(Sample::new("", "")).await; + } + _ => { + query.reply_async(Sample::new("error", "invalid")).await; + } + }, } } @@ -111,10 +109,36 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { Ok(()) } +async fn start_dataflow( + path: &Path, + runtime_path: &Path, + dataflow_errors_tx: &Option>, +) -> eyre::Result { + let runtime_path = runtime_path.to_owned(); + let dataflow_errors_tx = match dataflow_errors_tx { + Some(channel) => channel.clone(), + None => bail!("cannot start new dataflow after receiving stop command"), + }; + let dataflow = spawn_dataflow(&runtime_path, &path).await?; + let uuid = dataflow.uuid; + let path = path.to_owned(); + let task = async move { + let result = dataflow + .await_tasks() + .await + .wrap_err_with(|| format!("failed to run dataflow at {}", path.display())); + match result { + Ok(()) => {} + Err(err) => { + let _ = dataflow_errors_tx.send(err).await; + } + } + }; + tokio::spawn(task); + Ok(uuid) +} + enum Event { DataflowError(eyre::Report), - ControlChannelError(eyre::Report), - StartDataflow { path: PathBuf }, - ParseError(eyre::Report), - Stop, + Control(zenoh::queryable::Query), } diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 7b20f303..a9d68610 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -3,16 +3,21 @@ use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor, use dora_node_api::{communication, config::format_duration}; use eyre::{bail, eyre, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; -use std::{ - env::consts::EXE_EXTENSION, - path::{Path, PathBuf}, -}; +use std::{env::consts::EXE_EXTENSION, path::Path}; use tokio_stream::wrappers::IntervalStream; +use uuid::Uuid; mod custom; mod runtime; -pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> { +pub async fn run_dataflow(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { + spawn_dataflow(runtime, dataflow_path) + .await? + .await_tasks() + .await +} + +pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Result { let mut runtime = runtime.with_extension(EXE_EXTENSION); let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| { format!( @@ -20,23 +25,21 @@ pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Resul dataflow_path.display() ) })?; - let working_dir = dataflow_path .canonicalize() .context("failed to canoncialize dataflow path")? .parent() .ok_or_else(|| eyre!("canonicalized dataflow path has no parent"))? .to_owned(); - let nodes = descriptor.resolve_aliases(); let dora_timers = collect_dora_timers(&nodes); + let uuid = Uuid::new_v4(); let communication_config = { let mut config = descriptor.communication; // add uuid as prefix to ensure isolation - config.add_topic_prefix(&uuid::Uuid::new_v4().to_string()); + config.add_topic_prefix(&uuid.to_string()); config }; - if nodes .iter() .any(|n| matches!(n.kind, CoreNodeKind::Runtime(_))) @@ -54,8 +57,7 @@ pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Resul } } } - - let mut tasks = FuturesUnordered::new(); + let tasks = FuturesUnordered::new(); for node in nodes { let node_id = node.id.clone(); @@ -81,7 +83,6 @@ pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Resul } } } - for interval in dora_timers { let communication_config = communication_config.clone(); let mut communication = @@ -106,14 +107,23 @@ pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Resul } }); } + Ok(SpawnedDataflow { tasks, uuid }) +} - while let Some(task_result) = tasks.next().await { - task_result - .wrap_err("failed to join async task")? - .wrap_err("custom node failed")?; - } +pub struct SpawnedDataflow { + pub uuid: Uuid, + pub tasks: FuturesUnordered>>, +} - Ok(()) +impl SpawnedDataflow { + pub async fn await_tasks(mut self) -> eyre::Result<()> { + while let Some(task_result) = self.tasks.next().await { + task_result + .wrap_err("failed to join async task")? + .wrap_err("custom node failed")?; + } + Ok(()) + } } async fn read_descriptor(file: &Path) -> Result { diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index cc4a382e..587bcb3e 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,3 +1,9 @@ -pub const ZENOH_CONTROL_PREFIX: &str = "dora_control"; -pub const ZENOH_CONTROL_STOP_ALL: &str = "stop_all"; -pub const ZENOH_CONTROL_START_DATAFLOW: &str = "start_dataflow"; +pub const ZENOH_CONTROL_QUERYABLE: &str = "dora_control/*"; +pub const ZENOH_CONTROL_START: &str = "dora_control/start"; +pub const ZENOH_CONTROL_DESTROY: &str = "dora_control/destroy"; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum StartDataflowResult { + Ok { uuid: String }, + Error(String), +} From d0db3e596c92a64cd4ee77795b12f8664f845d21 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 14 Oct 2022 20:06:32 +0200 Subject: [PATCH 2/8] Keep track of UUIDs of all running dataflows --- binaries/coordinator/src/lib.rs | 90 +++++++++++++++++++---------- binaries/coordinator/src/run/mod.rs | 34 ++++++----- 2 files changed, 81 insertions(+), 43 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 855ded1b..c7f4d080 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -1,9 +1,17 @@ use crate::run::spawn_dataflow; -use dora_core::topics::{StartDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START}; -use eyre::{bail, WrapErr}; +use dora_core::topics::{ + StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START, + ZENOH_CONTROL_STOP, +}; +use dora_node_api::{communication, config::CommunicationConfig}; +use eyre::{bail, eyre, WrapErr}; use futures::StreamExt; use futures_concurrency::stream::Merge; -use std::path::{Path, PathBuf}; +use run::{await_tasks, SpawnedDataflow}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, +}; use tokio_stream::wrappers::ReceiverStream; use uuid::Uuid; use zenoh::prelude::Sample; @@ -51,9 +59,9 @@ pub async fn run(args: Args) -> eyre::Result<()> { } async fn start(runtime_path: &Path) -> eyre::Result<()> { - let (dataflow_errors_tx, dataflow_errors) = tokio::sync::mpsc::channel(2); - let mut dataflow_errors_tx = Some(dataflow_errors_tx); - let dataflow_error_events = ReceiverStream::new(dataflow_errors).map(Event::DataflowError); + let (dataflow_events_tx, dataflow_events) = tokio::sync::mpsc::channel(2); + let mut dataflow_events_tx = Some(dataflow_events_tx); + let dataflow_error_events = ReceiverStream::new(dataflow_events); let (control_events, control_events_abort) = futures::stream::abortable( control::control_events() @@ -63,21 +71,38 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { let mut events = (dataflow_error_events, control_events).merge(); + let mut running_dataflows = HashMap::new(); + while let Some(event) = events.next().await { match event { - Event::DataflowError(err) => { - tracing::error!("{err:?}"); - } + Event::Dataflow { uuid, event } => match event { + DataflowEvent::Finished { result } => { + running_dataflows.remove(&uuid); + match result { + Ok(()) => { + tracing::info!("dataflow `{uuid}` finished successully"); + } + Err(err) => { + let err = err.wrap_err(format!("error occured in dataflow `{uuid}`")); + tracing::error!("{err:?}"); + } + } + } + }, + Event::Control(query) => match query.key_selector().as_str() { ZENOH_CONTROL_START => { let dataflow_path = query.value_selector(); let result = - start_dataflow(Path::new(dataflow_path), runtime_path, &dataflow_errors_tx) + start_dataflow(Path::new(dataflow_path), runtime_path, &dataflow_events_tx) .await; let reply = match result { - Ok(uuid) => StartDataflowResult::Ok { - uuid: uuid.to_string(), - }, + Ok((uuid, communication_config)) => { + running_dataflows.insert(uuid, communication_config); + StartDataflowResult::Ok { + uuid: uuid.to_string(), + } + } Err(err) => { tracing::error!("{err:?}"); StartDataflowResult::Error(format!("{err:?}")) @@ -93,7 +118,7 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { control_events_abort.abort(); // ensure that no new dataflows can be started - dataflow_errors_tx = None; + dataflow_events_tx = None; query.reply_async(Sample::new("", "")).await; } @@ -112,33 +137,40 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { async fn start_dataflow( path: &Path, runtime_path: &Path, - dataflow_errors_tx: &Option>, -) -> eyre::Result { + dataflow_events_tx: &Option>, +) -> eyre::Result<(Uuid, CommunicationConfig)> { let runtime_path = runtime_path.to_owned(); - let dataflow_errors_tx = match dataflow_errors_tx { + let dataflow_events_tx = match dataflow_events_tx { Some(channel) => channel.clone(), None => bail!("cannot start new dataflow after receiving stop command"), }; - let dataflow = spawn_dataflow(&runtime_path, &path).await?; - let uuid = dataflow.uuid; + let SpawnedDataflow { + uuid, + communication_config, + tasks, + } = spawn_dataflow(&runtime_path, &path).await?; let path = path.to_owned(); let task = async move { - let result = dataflow - .await_tasks() + let result = await_tasks(tasks) .await .wrap_err_with(|| format!("failed to run dataflow at {}", path.display())); - match result { - Ok(()) => {} - Err(err) => { - let _ = dataflow_errors_tx.send(err).await; - } - } + + let _ = dataflow_events_tx + .send(Event::Dataflow { + uuid, + event: DataflowEvent::Finished { result }, + }) + .await; }; tokio::spawn(task); - Ok(uuid) + Ok((uuid, communication_config)) } enum Event { - DataflowError(eyre::Report), + Dataflow { uuid: Uuid, event: DataflowEvent }, Control(zenoh::queryable::Query), } + +enum DataflowEvent { + Finished { result: eyre::Result<()> }, +} diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index a9d68610..824a0415 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -1,6 +1,9 @@ use self::{custom::spawn_custom_node, runtime::spawn_runtime_node}; use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor, NodeId}; -use dora_node_api::{communication, config::format_duration}; +use dora_node_api::{ + communication, + config::{format_duration, CommunicationConfig}, +}; use eyre::{bail, eyre, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; use std::{env::consts::EXE_EXTENSION, path::Path}; @@ -11,10 +14,8 @@ mod custom; mod runtime; pub async fn run_dataflow(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { - spawn_dataflow(runtime, dataflow_path) - .await? - .await_tasks() - .await + let tasks = spawn_dataflow(runtime, dataflow_path).await?.tasks; + await_tasks(tasks).await } pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Result { @@ -107,23 +108,28 @@ pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Resul } }); } - Ok(SpawnedDataflow { tasks, uuid }) + Ok(SpawnedDataflow { + tasks, + communication_config, + uuid, + }) } pub struct SpawnedDataflow { pub uuid: Uuid, + pub communication_config: CommunicationConfig, pub tasks: FuturesUnordered>>, } -impl SpawnedDataflow { - pub async fn await_tasks(mut self) -> eyre::Result<()> { - while let Some(task_result) = self.tasks.next().await { - task_result - .wrap_err("failed to join async task")? - .wrap_err("custom node failed")?; - } - Ok(()) +pub async fn await_tasks( + mut tasks: FuturesUnordered>>, +) -> eyre::Result<()> { + while let Some(task_result) = tasks.next().await { + task_result + .wrap_err("failed to join async task")? + .wrap_err("custom node failed")?; } + Ok(()) } async fn read_descriptor(file: &Path) -> Result { From f063af1df6e0abfc2bae7b00c8fcd01dfc17d0a5 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 14 Oct 2022 20:10:53 +0200 Subject: [PATCH 3/8] Implement `stop` subcommand to send `stop` message to dataflow The message is not recognized by the nodes/operators yet. This will be done in a follow-up commit. --- binaries/cli/src/main.rs | 40 +++++++++++++++++++++++++++---- binaries/coordinator/src/lib.rs | 42 ++++++++++++++++++++++++++++++++- libraries/core/src/topics.rs | 7 ++++++ 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 1063a5e8..d687d3cd 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,5 +1,8 @@ use clap::Parser; -use dora_core::topics::{StartDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START}; +use dora_core::topics::{ + StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START, + ZENOH_CONTROL_STOP, +}; use eyre::{bail, eyre, Context}; use std::{io::Write, path::PathBuf, sync::Arc}; use tempfile::NamedTempFile; @@ -46,7 +49,9 @@ enum Command { Start { dataflow: PathBuf, }, - Stop, + Stop { + uuid: String, + }, Logs, Metrics, Stats, @@ -126,7 +131,7 @@ fn main() -> eyre::Result<()> { Command::Dashboard => todo!(), Command::Up => todo!(), Command::Start { dataflow } => start_dataflow(dataflow, &mut session)?, - Command::Stop => todo!(), + Command::Stop { uuid } => stop_dataflow(uuid, &mut session)?, Command::Destroy => destroy(&mut session)?, Command::Logs => todo!(), Command::Metrics => todo!(), @@ -165,13 +170,40 @@ fn start_dataflow( serde_json::from_slice(&raw).wrap_err("failed to parse reply")?; match result { StartDataflowResult::Ok { uuid } => { - println!("Started dataflow under ID `{uuid}`"); + println!("Started dataflow with UUID `{uuid}`"); Ok(()) } StartDataflowResult::Error(err) => bail!(err), } } +fn stop_dataflow( + uuid: String, + session: &mut Option>, +) -> Result<(), eyre::ErrReport> { + let reply_receiver = zenoh_control_session(session)? + .get(Selector { + key_selector: ZENOH_CONTROL_STOP.into(), + value_selector: uuid.as_str().into(), + }) + .wait() + .map_err(|err| eyre!(err)) + .wrap_err("failed to create publisher for start dataflow message")?; + let reply = reply_receiver + .recv() + .wrap_err("failed to receive reply from coordinator")?; + let raw = reply.sample.value.payload.contiguous(); + let result: StopDataflowResult = + serde_json::from_slice(&raw).wrap_err("failed to parse reply")?; + match result { + StopDataflowResult::Ok => { + println!("Stopped dataflow with UUID `{uuid}`"); + Ok(()) + } + StopDataflowResult::Error(err) => bail!(err), + } +} + fn destroy(session: &mut Option>) -> Result<(), eyre::ErrReport> { let reply_receiver = zenoh_control_session(session)? .get(ZENOH_CONTROL_DESTROY) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index c7f4d080..8e59fe72 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -108,7 +108,47 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { StartDataflowResult::Error(format!("{err:?}")) } }; - let _ = query + query + .reply_async(Sample::new("", serde_json::to_string(&reply).unwrap())) + .await; + } + ZENOH_CONTROL_STOP => { + let stop = async { + let uuid = + Uuid::parse_str(query.value_selector()).wrap_err("not a valid UUID")?; + let communication_config = match running_dataflows.get(&uuid) { + Some(config) => config.clone(), + None => bail!("No running dataflow found with UUID `{uuid}`"), + }; + + let mut communication = tokio::task::spawn_blocking(move || { + communication::init(&communication_config) + }) + .await + .wrap_err("failed to join communication layer init task")? + .wrap_err("failed to init communication layer")?; + + tracing::info!("sending stop message to dataflow `{uuid}`"); + + tokio::task::spawn_blocking(move || { + let topic = format!("dora/stoop"); + let metadata = dora_message::Metadata::default(); + let data = metadata.serialize().unwrap(); + communication.publisher(&topic)?.publish(&data) + }) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre!(err)) + .wrap_err("failed to send stop message")?; + + Result::<_, eyre::Report>::Ok(()) + }; + let reply = match stop.await { + Ok(()) => StopDataflowResult::Ok, + Err(err) => StopDataflowResult::Error(format!("{err:?}")), + }; + + query .reply_async(Sample::new("", serde_json::to_string(&reply).unwrap())) .await; } diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 587bcb3e..66d4d449 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,5 +1,6 @@ pub const ZENOH_CONTROL_QUERYABLE: &str = "dora_control/*"; pub const ZENOH_CONTROL_START: &str = "dora_control/start"; +pub const ZENOH_CONTROL_STOP: &str = "dora_control/stop"; pub const ZENOH_CONTROL_DESTROY: &str = "dora_control/destroy"; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -7,3 +8,9 @@ pub enum StartDataflowResult { Ok { uuid: String }, Error(String), } + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum StopDataflowResult { + Ok, + Error(String), +} From c4da2cbdad1e12575782365e8c6b7bdea01f53c6 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 18 Oct 2022 17:49:24 +0200 Subject: [PATCH 4/8] Refactor: Move configuration to `core` crate --- Cargo.lock | 4 +- apis/python/node/src/lib.rs | 2 +- apis/rust/node/Cargo.toml | 1 + apis/rust/node/src/communication.rs | 7 +-- apis/rust/node/src/lib.rs | 11 ++-- binaries/cli/src/build.rs | 2 +- binaries/cli/src/check.rs | 3 +- binaries/coordinator/src/lib.rs | 13 +++-- binaries/coordinator/src/run/custom.rs | 5 +- binaries/coordinator/src/run/mod.rs | 12 ++--- binaries/coordinator/src/run/runtime.rs | 5 +- binaries/runtime/src/main.rs | 6 ++- binaries/runtime/src/operator/mod.rs | 6 +-- binaries/runtime/src/operator/python.rs | 3 +- binaries/runtime/src/operator/shared_lib.rs | 4 +- examples/iceoryx/node/src/main.rs | 2 +- examples/rust-dataflow/node/src/main.rs | 2 +- libraries/core/Cargo.toml | 3 +- .../node => libraries/core}/src/config.rs | 51 +++++++++---------- libraries/core/src/descriptor/mod.rs | 4 +- libraries/core/src/descriptor/visualize.rs | 2 +- libraries/core/src/lib.rs | 1 + 22 files changed, 78 insertions(+), 71 deletions(-) rename {apis/rust/node => libraries/core}/src/config.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index 86d21a2d..e3a37e2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -925,10 +925,11 @@ dependencies = [ name = "dora-core" version = "0.1.0" dependencies = [ - "dora-node-api", "eyre", + "once_cell", "serde", "serde_yaml 0.9.11", + "zenoh-config", ] [[package]] @@ -966,6 +967,7 @@ version = "0.1.0" dependencies = [ "capnp", "communication-layer-pub-sub", + "dora-core", "dora-message", "eyre", "flume", diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 7fec3e78..0c5cc7b7 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,6 +1,6 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] -use dora_node_api::{config::NodeId, DoraNode, Input}; +use dora_node_api::{core::config::NodeId, DoraNode, Input}; use dora_operator_api_python::{metadata_to_pydict, pydict_to_metadata}; use eyre::{Context, Result}; use flume::Receiver; diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index bffebcd0..bc7f2f34 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -23,6 +23,7 @@ communication-layer-pub-sub = { path = "../../../libraries/communication-layer", uuid = { version = "1.1.2", features = ["v4"] } capnp = "0.14.9" dora-message = { path = "../../../libraries/message" } +dora-core = { path = "../../../libraries/core" } [dev-dependencies] tokio = { version = "1.17.0", features = ["rt"] } diff --git a/apis/rust/node/src/communication.rs b/apis/rust/node/src/communication.rs index 49022022..c9245b7f 100644 --- a/apis/rust/node/src/communication.rs +++ b/apis/rust/node/src/communication.rs @@ -1,11 +1,8 @@ +use crate::BoxError; use communication_layer_pub_sub::ReceivedSample; pub use communication_layer_pub_sub::{CommunicationLayer, Publisher, Subscriber}; +use dora_core::config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}; use dora_message::Metadata; - -use crate::{ - config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, - BoxError, -}; use eyre::Context; use std::{ borrow::Cow, diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index b75e7306..8fa5350e 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,14 +1,13 @@ pub use communication::Input; -pub use dora_message::Metadata; -pub use flume::Receiver; - use communication::STOP_TOPIC; use communication_layer_pub_sub::CommunicationLayer; -use config::{CommunicationConfig, DataId, NodeId, NodeRunConfig}; +pub use dora_core as core; +use dora_core::config::{CommunicationConfig, DataId, NodeId, NodeRunConfig}; +pub use dora_message::Metadata; use eyre::WrapErr; +pub use flume::Receiver; pub mod communication; -pub mod config; pub struct DoraNode { id: NodeId, @@ -144,6 +143,8 @@ fn set_up_tracing() -> eyre::Result<()> { #[cfg(test)] mod tests { + use dora_core::config; + use super::*; #[test] diff --git a/binaries/cli/src/build.rs b/binaries/cli/src/build.rs index 48867dee..4b308ba6 100644 --- a/binaries/cli/src/build.rs +++ b/binaries/cli/src/build.rs @@ -1,5 +1,5 @@ use crate::graph; -use dora_core::descriptor::{OperatorId, SINGLE_OPERATOR_DEFAULT_ID}; +use dora_core::{config::OperatorId, descriptor::SINGLE_OPERATOR_DEFAULT_ID}; use eyre::{eyre, Context}; use std::{path::Path, process::Command}; diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 6643d483..01ce395c 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -1,7 +1,8 @@ use crate::graph::read_descriptor; use dora_core::{ adjust_shared_library_path, - descriptor::{self, CoreNodeKind, InputMapping, OperatorSource, UserInputMapping}, + config::{InputMapping, UserInputMapping}, + descriptor::{self, CoreNodeKind, OperatorSource}, }; use eyre::{bail, eyre, Context}; use std::{env::consts::EXE_EXTENSION, path::Path}; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 8e59fe72..84242119 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -1,9 +1,12 @@ use crate::run::spawn_dataflow; -use dora_core::topics::{ - StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START, - ZENOH_CONTROL_STOP, +use dora_core::{ + config::CommunicationConfig, + topics::{ + StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START, + ZENOH_CONTROL_STOP, + }, }; -use dora_node_api::{communication, config::CommunicationConfig}; +use dora_node_api::communication; use eyre::{bail, eyre, WrapErr}; use futures::StreamExt; use futures_concurrency::stream::Merge; @@ -188,7 +191,7 @@ async fn start_dataflow( uuid, communication_config, tasks, - } = spawn_dataflow(&runtime_path, &path).await?; + } = spawn_dataflow(&runtime_path, path).await?; let path = path.to_owned(); let task = async move { let result = await_tasks(tasks) diff --git a/binaries/coordinator/src/run/custom.rs b/binaries/coordinator/src/run/custom.rs index 64c11a59..e3411353 100644 --- a/binaries/coordinator/src/run/custom.rs +++ b/binaries/coordinator/src/run/custom.rs @@ -1,6 +1,5 @@ use super::command_init_common_env; -use dora_core::descriptor; -use dora_node_api::config::NodeId; +use dora_core::{config::NodeId, descriptor}; use eyre::{eyre, WrapErr}; use std::{env::consts::EXE_EXTENSION, path::Path}; @@ -8,7 +7,7 @@ use std::{env::consts::EXE_EXTENSION, path::Path}; pub(super) fn spawn_custom_node( node_id: NodeId, node: &descriptor::CustomNode, - communication: &dora_node_api::config::CommunicationConfig, + communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { let mut args = node.run.split_ascii_whitespace(); diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 824a0415..b8e34b83 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -1,9 +1,9 @@ use self::{custom::spawn_custom_node, runtime::spawn_runtime_node}; -use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor, NodeId}; -use dora_node_api::{ - communication, - config::{format_duration, CommunicationConfig}, +use dora_core::{ + config::{format_duration, CommunicationConfig, NodeId}, + descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor}, }; +use dora_node_api::communication; use eyre::{bail, eyre, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; use std::{env::consts::EXE_EXTENSION, path::Path}; @@ -20,7 +20,7 @@ pub async fn run_dataflow(dataflow_path: &Path, runtime: &Path) -> eyre::Result< pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Result { let mut runtime = runtime.with_extension(EXE_EXTENSION); - let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| { + let descriptor = read_descriptor(dataflow_path).await.wrap_err_with(|| { format!( "failed to read dataflow descriptor at {}", dataflow_path.display() @@ -144,7 +144,7 @@ async fn read_descriptor(file: &Path) -> Result { fn command_init_common_env( command: &mut tokio::process::Command, node_id: &NodeId, - communication: &dora_node_api::config::CommunicationConfig, + communication: &dora_core::config::CommunicationConfig, ) -> Result<(), eyre::Error> { command.env( "DORA_NODE_ID", diff --git a/binaries/coordinator/src/run/runtime.rs b/binaries/coordinator/src/run/runtime.rs index 4f7d7082..a0bf2797 100644 --- a/binaries/coordinator/src/run/runtime.rs +++ b/binaries/coordinator/src/run/runtime.rs @@ -1,6 +1,5 @@ use super::command_init_common_env; -use dora_core::descriptor; -use dora_node_api::config::NodeId; +use dora_core::{config::NodeId, descriptor}; use eyre::{eyre, WrapErr}; use std::path::Path; @@ -9,7 +8,7 @@ pub fn spawn_runtime_node( runtime: &Path, node_id: NodeId, node: &descriptor::RuntimeNode, - communication: &dora_node_api::config::CommunicationConfig, + communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { let mut command = tokio::process::Command::new(runtime); diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index fda84724..9253e798 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -1,10 +1,12 @@ #![warn(unsafe_op_in_unsafe_fn)] -use dora_core::descriptor::OperatorDefinition; +use dora_core::{ + config::{CommunicationConfig, DataId, NodeId, OperatorId}, + descriptor::OperatorDefinition, +}; use dora_node_api::{ self, communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, - config::{CommunicationConfig, DataId, NodeId, OperatorId}, }; use eyre::{bail, Context}; use futures::{Stream, StreamExt}; diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 036ef926..f7801df6 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -1,8 +1,8 @@ -use dora_core::descriptor::{OperatorDefinition, OperatorSource}; -use dora_node_api::{ - communication::{self, CommunicationLayer}, +use dora_core::{ config::NodeId, + descriptor::{OperatorDefinition, OperatorSource}, }; +use dora_node_api::communication::{self, CommunicationLayer}; use eyre::Context; #[cfg(feature = "tracing")] use opentelemetry::sdk::trace::Tracer; diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index fb90ec86..7c60dd97 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,7 +1,8 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] use super::{OperatorEvent, Tracer}; -use dora_node_api::{communication::Publisher, config::DataId}; +use dora_core::config::DataId; +use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; use eyre::{bail, eyre, Context}; use pyo3::{ diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index dd524734..7da7670d 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,6 +1,6 @@ use super::{OperatorEvent, Tracer}; -use dora_core::adjust_shared_library_path; -use dora_node_api::{communication::Publisher, config::DataId}; +use dora_core::{adjust_shared_library_path, config::DataId}; +use dora_node_api::communication::Publisher; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput, DoraResult, DoraStatus, Metadata, OnInputResult, Output, SendOutput, diff --git a/examples/iceoryx/node/src/main.rs b/examples/iceoryx/node/src/main.rs index 06437786..7ca69c08 100644 --- a/examples/iceoryx/node/src/main.rs +++ b/examples/iceoryx/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, config::DataId, DoraNode}; +use dora_node_api::{self, core::config::DataId, DoraNode}; fn main() -> eyre::Result<()> { let output = DataId::from("random".to_owned()); diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index 06437786..7ca69c08 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, config::DataId, DoraNode}; +use dora_node_api::{self, core::config::DataId, DoraNode}; fn main() -> eyre::Result<()> { let output = DataId::from("random".to_owned()); diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 793bc8ef..af7fe3fe 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -7,7 +7,8 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../apis/rust/node" } eyre = "0.6.8" serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.9.11" +once_cell = "1.13.0" +zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" } diff --git a/apis/rust/node/src/config.rs b/libraries/core/src/config.rs similarity index 99% rename from apis/rust/node/src/config.rs rename to libraries/core/src/config.rs index 201d6341..2a880802 100644 --- a/apis/rust/node/src/config.rs +++ b/libraries/core/src/config.rs @@ -1,4 +1,3 @@ -use communication_layer_pub_sub::zenoh::zenoh_config; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use std::{ @@ -10,15 +9,6 @@ use std::{ time::Duration, }; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] -pub struct NodeRunConfig { - #[serde(default)] - pub inputs: BTreeMap, - #[serde(default)] - pub outputs: BTreeSet, -} - #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] pub struct NodeId(String); @@ -160,22 +150,6 @@ impl fmt::Display for InputMapping { } } -pub struct FormattedDuration(pub Duration); - -impl fmt::Display for FormattedDuration { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if self.0.subsec_millis() == 0 { - write!(f, "secs/{}", self.0.as_secs()) - } else { - write!(f, "millis/{}", self.0.as_millis()) - } - } -} - -pub fn format_duration(interval: Duration) -> FormattedDuration { - FormattedDuration(interval) -} - impl Serialize for InputMapping { fn serialize(&self, serializer: S) -> Result where @@ -262,6 +236,31 @@ pub struct UserInputMapping { pub output: DataId, } +pub struct FormattedDuration(pub Duration); + +impl fmt::Display for FormattedDuration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.0.subsec_millis() == 0 { + write!(f, "secs/{}", self.0.as_secs()) + } else { + write!(f, "millis/{}", self.0.as_millis()) + } + } +} + +pub fn format_duration(interval: Duration) -> FormattedDuration { + FormattedDuration(interval) +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct NodeRunConfig { + #[serde(default)] + pub inputs: BTreeMap, + #[serde(default)] + pub outputs: BTreeSet, +} + #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub enum CommunicationConfig { diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 4d8ffb77..664184db 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,5 +1,3 @@ -use dora_node_api::config::{CommunicationConfig, NodeRunConfig}; -pub use dora_node_api::config::{DataId, InputMapping, NodeId, OperatorId, UserInputMapping}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; @@ -9,6 +7,8 @@ use std::{ }; pub use visualize::collect_dora_timers; +use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; + mod visualize; #[derive(Debug, Serialize, Deserialize)] diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index a3e25643..fac88464 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -1,5 +1,5 @@ use super::{CoreNodeKind, CustomNode, OperatorDefinition, ResolvedNode, RuntimeNode}; -use dora_node_api::config::{format_duration, DataId, InputMapping, NodeId, UserInputMapping}; +use crate::config::{format_duration, DataId, InputMapping, NodeId, UserInputMapping}; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, fmt::Write as _, diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index 74220733..997a3204 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -4,6 +4,7 @@ use std::{ path::Path, }; +pub mod config; pub mod descriptor; pub mod topics; From 35c64293dbf95246ecc0bac30fa489759365636f Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 18 Oct 2022 18:03:54 +0200 Subject: [PATCH 5/8] Listen for manual stop messages in custom nodes --- apis/rust/node/src/communication.rs | 34 ++++++++++++++++++++++++++--- binaries/coordinator/src/lib.rs | 5 ++--- libraries/core/src/topics.rs | 2 ++ 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/apis/rust/node/src/communication.rs b/apis/rust/node/src/communication.rs index c9245b7f..d03dda15 100644 --- a/apis/rust/node/src/communication.rs +++ b/apis/rust/node/src/communication.rs @@ -1,13 +1,15 @@ use crate::BoxError; use communication_layer_pub_sub::ReceivedSample; pub use communication_layer_pub_sub::{CommunicationLayer, Publisher, Subscriber}; -use dora_core::config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}; +use dora_core::{ + config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, + topics, +}; use dora_message::Metadata; use eyre::Context; use std::{ borrow::Cow, collections::{BTreeMap, HashSet}, - mem, ops::Deref, thread, }; @@ -141,7 +143,28 @@ pub fn subscribe_all( } }); } - mem::drop(inputs_tx); + + // subscribe to topic for manual stops + { + let topic = topics::MANUAL_STOP; + let mut sub = communication + .subscribe(topic) + .map_err(|err| eyre::eyre!(err)) + .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; + + let sender = inputs_tx; + thread::spawn(move || loop { + let event = match sub.recv().transpose() { + None => break, + Some(Ok(_)) => InputEvent::ManualStop, + Some(Err(err)) => InputEvent::Error(err), + }; + match sender.send(event) { + Ok(()) => {} + Err(flume::SendError(_)) => break, + } + }); + } let (combined_tx, combined) = flume::bounded(1); thread::spawn(move || loop { @@ -156,6 +179,10 @@ pub fn subscribe_all( break; } } + Ok(InputEvent::ManualStop) => { + tracing::info!("received manual stop message"); + break; + } Ok(InputEvent::ParseMessageError(err)) => { tracing::warn!("{err:?}"); } @@ -173,6 +200,7 @@ enum InputEvent { source: NodeId, operator: Option, }, + ManualStop, Error(BoxError), ParseMessageError(eyre::Report), } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 84242119..ae1bd765 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -2,7 +2,7 @@ use crate::run::spawn_dataflow; use dora_core::{ config::CommunicationConfig, topics::{ - StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START, + self, StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START, ZENOH_CONTROL_STOP, }, }; @@ -134,10 +134,9 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { tracing::info!("sending stop message to dataflow `{uuid}`"); tokio::task::spawn_blocking(move || { - let topic = format!("dora/stoop"); let metadata = dora_message::Metadata::default(); let data = metadata.serialize().unwrap(); - communication.publisher(&topic)?.publish(&data) + communication.publisher(topics::MANUAL_STOP)?.publish(&data) }) .await .wrap_err("failed to join stop publish task")? diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 66d4d449..8266f267 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,3 +1,5 @@ +pub const MANUAL_STOP: &str = "dora/stop"; + pub const ZENOH_CONTROL_QUERYABLE: &str = "dora_control/*"; pub const ZENOH_CONTROL_START: &str = "dora_control/start"; pub const ZENOH_CONTROL_STOP: &str = "dora_control/stop"; From bc54d1d6707679b710376faa8c34509976667cd8 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 18 Oct 2022 18:11:54 +0200 Subject: [PATCH 6/8] Stop all running dataflows on `destroy` command --- binaries/coordinator/src/lib.rs | 55 +++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index ae1bd765..6da09456 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -119,29 +119,7 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { let stop = async { let uuid = Uuid::parse_str(query.value_selector()).wrap_err("not a valid UUID")?; - let communication_config = match running_dataflows.get(&uuid) { - Some(config) => config.clone(), - None => bail!("No running dataflow found with UUID `{uuid}`"), - }; - - let mut communication = tokio::task::spawn_blocking(move || { - communication::init(&communication_config) - }) - .await - .wrap_err("failed to join communication layer init task")? - .wrap_err("failed to init communication layer")?; - - tracing::info!("sending stop message to dataflow `{uuid}`"); - - tokio::task::spawn_blocking(move || { - let metadata = dora_message::Metadata::default(); - let data = metadata.serialize().unwrap(); - communication.publisher(topics::MANUAL_STOP)?.publish(&data) - }) - .await - .wrap_err("failed to join stop publish task")? - .map_err(|err| eyre!(err)) - .wrap_err("failed to send stop message")?; + stop_dataflow(&running_dataflows, uuid).await?; Result::<_, eyre::Report>::Ok(()) }; @@ -162,6 +140,11 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { // ensure that no new dataflows can be started dataflow_events_tx = None; + // stop all running dataflows + for &uuid in running_dataflows.keys() { + stop_dataflow(&running_dataflows, uuid).await?; + } + query.reply_async(Sample::new("", "")).await; } _ => { @@ -176,6 +159,32 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { Ok(()) } +async fn stop_dataflow( + running_dataflows: &HashMap, + uuid: Uuid, +) -> eyre::Result<()> { + let communication_config = match running_dataflows.get(&uuid) { + Some(config) => config.clone(), + None => bail!("No running dataflow found with UUID `{uuid}`"), + }; + let mut communication = + tokio::task::spawn_blocking(move || communication::init(&communication_config)) + .await + .wrap_err("failed to join communication layer init task")? + .wrap_err("failed to init communication layer")?; + tracing::info!("sending stop message to dataflow `{uuid}`"); + tokio::task::spawn_blocking(move || { + let metadata = dora_message::Metadata::default(); + let data = metadata.serialize().unwrap(); + communication.publisher(topics::MANUAL_STOP)?.publish(&data) + }) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre!(err)) + .wrap_err("failed to send stop message")?; + Ok(()) +} + async fn start_dataflow( path: &Path, runtime_path: &Path, From c2f3587de0797a16b5f29ef0c9f9257460d29580 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 19 Oct 2022 10:14:04 +0200 Subject: [PATCH 7/8] Use a weak sender for manual stop subscription Don't keep the inputs channel open if only the manual stop subscription remains. For example, this happens when a operator has no inputs at all. --- apis/rust/node/src/communication.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/apis/rust/node/src/communication.rs b/apis/rust/node/src/communication.rs index d03dda15..ffd908b5 100644 --- a/apis/rust/node/src/communication.rs +++ b/apis/rust/node/src/communication.rs @@ -11,6 +11,7 @@ use std::{ borrow::Cow, collections::{BTreeMap, HashSet}, ops::Deref, + sync::Arc, thread, }; @@ -73,6 +74,7 @@ pub fn subscribe_all( inputs: &BTreeMap, ) -> eyre::Result> { let (inputs_tx, inputs_rx) = flume::bounded(10); + let inputs_tx = Arc::new(inputs_tx); for (input, mapping) in inputs { let topic = mapping.to_string(); let mut sub = communication @@ -152,16 +154,23 @@ pub fn subscribe_all( .map_err(|err| eyre::eyre!(err)) .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; - let sender = inputs_tx; + // only keep a weak reference to the sender because we don't want to + // prevent it from being closed (e.g. when all sources are closed) + let sender = Arc::downgrade(&inputs_tx); + std::mem::drop(inputs_tx); + thread::spawn(move || loop { let event = match sub.recv().transpose() { None => break, Some(Ok(_)) => InputEvent::ManualStop, Some(Err(err)) => InputEvent::Error(err), }; - match sender.send(event) { - Ok(()) => {} - Err(flume::SendError(_)) => break, + match sender.upgrade() { + Some(sender) => match sender.send(event) { + Ok(()) => {} + Err(flume::SendError(_)) => break, + }, + None => break, } }); } From b83de6c41bc890236b5d782f74f62464a5e939ab Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 19 Oct 2022 10:17:35 +0200 Subject: [PATCH 8/8] Add timeout for CI test job --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cacbe4ba..85d114f1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,6 +13,7 @@ jobs: platform: [ubuntu-latest, macos-latest, windows-latest] fail-fast: false runs-on: ${{ matrix.platform }} + timeout-minutes: 30 steps: - uses: actions/checkout@v2 - name: Install Cap'n Proto and libacl-dev (Linux)