diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cacbe4ba..85d114f1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,6 +13,7 @@ jobs: platform: [ubuntu-latest, macos-latest, windows-latest] fail-fast: false runs-on: ${{ matrix.platform }} + timeout-minutes: 30 steps: - uses: actions/checkout@v2 - name: Install Cap'n Proto and libacl-dev (Linux) diff --git a/Cargo.lock b/Cargo.lock index 746c4abe..ae591b71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -886,12 +886,13 @@ name = "dora-cli" version = "0.1.0" dependencies = [ "clap 4.0.3", - "communication-layer-pub-sub", "dora-core", "eyre", + "serde_json", "serde_yaml 0.9.11", "tempfile", "webbrowser", + "zenoh", ] [[package]] @@ -900,7 +901,6 @@ version = "0.1.0" dependencies = [ "bincode", "clap 3.2.20", - "communication-layer-pub-sub", "dora-core", "dora-message", "dora-node-api", @@ -909,6 +909,7 @@ dependencies = [ "futures-concurrency 5.0.1", "rand", "serde", + "serde_json", "serde_yaml 0.8.23", "time", "tokio", @@ -917,16 +918,18 @@ dependencies = [ "tracing", "tracing-subscriber", "uuid 0.8.2", + "zenoh", ] [[package]] name = "dora-core" version = "0.1.0" dependencies = [ - "dora-node-api", "eyre", + "once_cell", "serde", "serde_yaml 0.9.11", + "zenoh-config", ] [[package]] @@ -965,6 +968,7 @@ version = "0.1.0" dependencies = [ "capnp", "communication-layer-pub-sub", + "dora-core", "dora-message", "eyre", "flume", @@ -3235,9 +3239,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.79" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +checksum = "41feea4228a6f1cd09ec7a3593a682276702cd67b5273544757dae23c096f074" dependencies = [ "itoa", "ryu", diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 357960e5..9a40e299 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,6 +1,6 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] -use dora_node_api::{config::NodeId, DoraNode, Input}; +use dora_node_api::{core::config::NodeId, DoraNode, Input}; use dora_operator_api_python::{metadata_to_pydict, pydict_to_metadata}; use eyre::{Context, Result}; use flume::Receiver; diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index bffebcd0..bc7f2f34 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -23,6 +23,7 @@ communication-layer-pub-sub = { path = "../../../libraries/communication-layer", uuid = { version = "1.1.2", features = ["v4"] } capnp = "0.14.9" dora-message = { path = "../../../libraries/message" } +dora-core = { path = "../../../libraries/core" } [dev-dependencies] tokio = { version = "1.17.0", features = ["rt"] } diff --git a/apis/rust/node/src/communication.rs b/apis/rust/node/src/communication.rs index 49022022..ffd908b5 100644 --- a/apis/rust/node/src/communication.rs +++ b/apis/rust/node/src/communication.rs @@ -1,17 +1,17 @@ +use crate::BoxError; use communication_layer_pub_sub::ReceivedSample; pub use communication_layer_pub_sub::{CommunicationLayer, Publisher, Subscriber}; -use dora_message::Metadata; - -use crate::{ +use dora_core::{ config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, - BoxError, + topics, }; +use dora_message::Metadata; use eyre::Context; use std::{ borrow::Cow, collections::{BTreeMap, HashSet}, - mem, ops::Deref, + sync::Arc, thread, }; @@ -74,6 +74,7 @@ pub fn subscribe_all( inputs: &BTreeMap, ) -> eyre::Result> { let (inputs_tx, inputs_rx) = flume::bounded(10); + let inputs_tx = Arc::new(inputs_tx); for (input, mapping) in inputs { let topic = mapping.to_string(); let mut sub = communication @@ -144,7 +145,35 @@ pub fn subscribe_all( } }); } - mem::drop(inputs_tx); + + // subscribe to topic for manual stops + { + let topic = topics::MANUAL_STOP; + let mut sub = communication + .subscribe(topic) + .map_err(|err| eyre::eyre!(err)) + .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; + + // only keep a weak reference to the sender because we don't want to + // prevent it from being closed (e.g. when all sources are closed) + let sender = Arc::downgrade(&inputs_tx); + std::mem::drop(inputs_tx); + + thread::spawn(move || loop { + let event = match sub.recv().transpose() { + None => break, + Some(Ok(_)) => InputEvent::ManualStop, + Some(Err(err)) => InputEvent::Error(err), + }; + match sender.upgrade() { + Some(sender) => match sender.send(event) { + Ok(()) => {} + Err(flume::SendError(_)) => break, + }, + None => break, + } + }); + } let (combined_tx, combined) = flume::bounded(1); thread::spawn(move || loop { @@ -159,6 +188,10 @@ pub fn subscribe_all( break; } } + Ok(InputEvent::ManualStop) => { + tracing::info!("received manual stop message"); + break; + } Ok(InputEvent::ParseMessageError(err)) => { tracing::warn!("{err:?}"); } @@ -176,6 +209,7 @@ enum InputEvent { source: NodeId, operator: Option, }, + ManualStop, Error(BoxError), ParseMessageError(eyre::Report), } diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index ef9fe072..f3b06415 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,14 +1,13 @@ pub use communication::Input; -pub use dora_message::{uhlc, Metadata, MetadataParameters}; -pub use flume::Receiver; - use communication::STOP_TOPIC; use communication_layer_pub_sub::CommunicationLayer; -use config::{CommunicationConfig, DataId, NodeId, NodeRunConfig}; +pub use dora_core as core; +use dora_core::config::{CommunicationConfig, DataId, NodeId, NodeRunConfig}; +pub use dora_message::{uhlc, Metadata, MetadataParameters}; use eyre::WrapErr; +pub use flume::Receiver; pub mod communication; -pub mod config; pub struct DoraNode { id: NodeId, @@ -147,6 +146,8 @@ fn set_up_tracing() -> eyre::Result<()> { #[cfg(test)] mod tests { + use dora_core::config; + use super::*; #[test] diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index befc7c7b..abd7ca4b 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -16,6 +16,5 @@ dora-core = { path = "../../libraries/core" } serde_yaml = "0.9.11" tempfile = "3.3.0" webbrowser = "0.8.0" -communication-layer-pub-sub = { path = "../../libraries/communication-layer", default-features = false, features = [ - "zenoh", -] } +zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" } +serde_json = "1.0.86" diff --git a/binaries/cli/src/build.rs b/binaries/cli/src/build.rs index 48867dee..4b308ba6 100644 --- a/binaries/cli/src/build.rs +++ b/binaries/cli/src/build.rs @@ -1,5 +1,5 @@ use crate::graph; -use dora_core::descriptor::{OperatorId, SINGLE_OPERATOR_DEFAULT_ID}; +use dora_core::{config::OperatorId, descriptor::SINGLE_OPERATOR_DEFAULT_ID}; use eyre::{eyre, Context}; use std::{path::Path, process::Command}; diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 6643d483..01ce395c 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -1,7 +1,8 @@ use crate::graph::read_descriptor; use dora_core::{ adjust_shared_library_path, - descriptor::{self, CoreNodeKind, InputMapping, OperatorSource, UserInputMapping}, + config::{InputMapping, UserInputMapping}, + descriptor::{self, CoreNodeKind, OperatorSource}, }; use eyre::{bail, eyre, Context}; use std::{env::consts::EXE_EXTENSION, path::Path}; diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index d1049f63..d687d3cd 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,11 +1,15 @@ use clap::Parser; -use communication_layer_pub_sub::{zenoh::ZenohCommunicationLayer, CommunicationLayer}; use dora_core::topics::{ - ZENOH_CONTROL_PREFIX, ZENOH_CONTROL_START_DATAFLOW, ZENOH_CONTROL_STOP_ALL, + StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START, + ZENOH_CONTROL_STOP, }; -use eyre::{eyre, Context}; -use std::{io::Write, path::PathBuf}; +use eyre::{bail, eyre, Context}; +use std::{io::Write, path::PathBuf, sync::Arc}; use tempfile::NamedTempFile; +use zenoh::{ + prelude::{Receiver, Selector, SplitBuffer}, + sync::ZFuture, +}; mod build; mod check; @@ -45,7 +49,9 @@ enum Command { Start { dataflow: PathBuf, }, - Stop, + Stop { + uuid: String, + }, Logs, Metrics, Stats, @@ -124,34 +130,9 @@ fn main() -> eyre::Result<()> { Command::New { args } => template::create(args)?, Command::Dashboard => todo!(), Command::Up => todo!(), - Command::Start { dataflow } => { - let canonicalized = dataflow - .canonicalize() - .wrap_err("given dataflow file does not exist")?; - let path = &canonicalized - .to_str() - .ok_or_else(|| eyre!("dataflow path must be valid UTF-8"))?; - - let publisher = zenoh_control_session(&mut session)? - .publisher(ZENOH_CONTROL_START_DATAFLOW) - .map_err(|err| eyre!(err)) - .wrap_err("failed to create publisher for start dataflow message")?; - publisher - .publish(path.as_bytes()) - .map_err(|err| eyre!(err)) - .wrap_err("failed to publish start dataflow message")?; - } - Command::Stop => todo!(), - Command::Destroy => { - let publisher = zenoh_control_session(&mut session)? - .publisher(ZENOH_CONTROL_STOP_ALL) - .map_err(|err| eyre!(err)) - .wrap_err("failed to create publisher for stop message")?; - publisher - .publish(&[]) - .map_err(|err| eyre!(err)) - .wrap_err("failed to publish stop message")?; - } + Command::Start { dataflow } => start_dataflow(dataflow, &mut session)?, + Command::Stop { uuid } => stop_dataflow(uuid, &mut session)?, + Command::Destroy => destroy(&mut session)?, Command::Logs => todo!(), Command::Metrics => todo!(), Command::Stats => todo!(), @@ -163,15 +144,88 @@ fn main() -> eyre::Result<()> { Ok(()) } +fn start_dataflow( + dataflow: PathBuf, + session: &mut Option>, +) -> Result<(), eyre::ErrReport> { + let canonicalized = dataflow + .canonicalize() + .wrap_err("given dataflow file does not exist")?; + let path = canonicalized + .to_str() + .ok_or_else(|| eyre!("dataflow path must be valid UTF-8"))?; + let reply_receiver = zenoh_control_session(session)? + .get(Selector { + key_selector: ZENOH_CONTROL_START.into(), + value_selector: path.into(), + }) + .wait() + .map_err(|err| eyre!(err)) + .wrap_err("failed to create publisher for start dataflow message")?; + let reply = reply_receiver + .recv() + .wrap_err("failed to receive reply from coordinator")?; + let raw = reply.sample.value.payload.contiguous(); + let result: StartDataflowResult = + serde_json::from_slice(&raw).wrap_err("failed to parse reply")?; + match result { + StartDataflowResult::Ok { uuid } => { + println!("Started dataflow with UUID `{uuid}`"); + Ok(()) + } + StartDataflowResult::Error(err) => bail!(err), + } +} + +fn stop_dataflow( + uuid: String, + session: &mut Option>, +) -> Result<(), eyre::ErrReport> { + let reply_receiver = zenoh_control_session(session)? + .get(Selector { + key_selector: ZENOH_CONTROL_STOP.into(), + value_selector: uuid.as_str().into(), + }) + .wait() + .map_err(|err| eyre!(err)) + .wrap_err("failed to create publisher for start dataflow message")?; + let reply = reply_receiver + .recv() + .wrap_err("failed to receive reply from coordinator")?; + let raw = reply.sample.value.payload.contiguous(); + let result: StopDataflowResult = + serde_json::from_slice(&raw).wrap_err("failed to parse reply")?; + match result { + StopDataflowResult::Ok => { + println!("Stopped dataflow with UUID `{uuid}`"); + Ok(()) + } + StopDataflowResult::Error(err) => bail!(err), + } +} + +fn destroy(session: &mut Option>) -> Result<(), eyre::ErrReport> { + let reply_receiver = zenoh_control_session(session)? + .get(ZENOH_CONTROL_DESTROY) + .wait() + .map_err(|err| eyre!(err)) + .wrap_err("failed to create publisher for start dataflow message")?; + reply_receiver + .recv() + .wrap_err("failed to receive reply from coordinator")?; + Ok(()) +} + fn zenoh_control_session( - session: &mut Option, -) -> eyre::Result<&mut ZenohCommunicationLayer> { + session: &mut Option>, +) -> eyre::Result<&Arc> { Ok(match session { Some(session) => session, None => session.insert( - ZenohCommunicationLayer::init(Default::default(), ZENOH_CONTROL_PREFIX.into()) - .map_err(|err| eyre!(err)) - .wrap_err("failed to open zenoh control session")?, + zenoh::open(zenoh::config::Config::default()) + .wait() + .map_err(|err| eyre!(err))? + .into_arc(), ), }) } diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 04731d68..a8eba7db 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -25,6 +25,5 @@ dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" futures-concurrency = "5.0.1" -communication-layer-pub-sub = { path = "../../libraries/communication-layer", default-features = false, features = [ - "zenoh", -] } +zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" } +serde_json = "1.0.86" diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index 9dd28029..db530a40 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -1,112 +1,20 @@ use crate::Event; -use communication_layer_pub_sub::{CommunicationLayer, Subscriber}; -use dora_core::topics::{ - ZENOH_CONTROL_PREFIX, ZENOH_CONTROL_START_DATAFLOW, ZENOH_CONTROL_STOP_ALL, -}; -use eyre::{eyre, WrapErr}; +use dora_core::topics::ZENOH_CONTROL_QUERYABLE; +use eyre::eyre; use futures::{Stream, StreamExt}; -use std::path::Path; -use tokio_stream::wrappers::ReceiverStream; +use futures_concurrency::stream::IntoStream; +use zenoh::{prelude::EntityFactory, sync::ZFuture}; -pub(crate) fn control_events() -> impl Stream { - let (tx, rx) = tokio::sync::mpsc::channel(2); +pub(crate) async fn control_events() -> eyre::Result> { + let zenoh = zenoh::open(zenoh::config::Config::default()) + .wait() + .map_err(|err| eyre!(err))? + .into_arc(); - tokio::task::spawn_blocking(move || { - let result = subscribe_control_sync(tx.clone()); - match result { - Ok(()) => {} - Err(error) => { - let _ = tx.blocking_send(Event::ControlChannelError(error)); - } - } - }); + let queryable = zenoh + .queryable(ZENOH_CONTROL_QUERYABLE) + .wait() + .map_err(|err| eyre!(err))?; - ReceiverStream::new(rx).chain(futures::stream::iter(std::iter::from_fn(|| { - tracing::info!("control channel closed"); - None - }))) -} - -fn subscribe_control_sync(tx: tokio::sync::mpsc::Sender) -> eyre::Result<()> { - let mut zenoh_control_session = - communication_layer_pub_sub::zenoh::ZenohCommunicationLayer::init( - Default::default(), - ZENOH_CONTROL_PREFIX.into(), - ) - .map_err(|err| eyre!(err)) - .wrap_err("failed to open zenoh control session")?; - - let start_dataflow = zenoh_control_session - .subscribe(ZENOH_CONTROL_START_DATAFLOW) - .map_err(|err| eyre!(err)) - .wrap_err("failed to subscribe to start dataflow topic")?; - let start_tx = tx.downgrade(); - let _start_dataflow_thread = - std::thread::spawn(move || start_dataflow_handler(start_dataflow, start_tx)); - - let stop_tx = tx; - let stop = zenoh_control_session - .subscribe(ZENOH_CONTROL_STOP_ALL) - .map_err(|err| eyre!(err)) - .wrap_err("failed to subscribe to stop all topic")?; - let _stop_thread = tokio::task::spawn_blocking(move || { - stop_handler(stop, stop_tx); - }); - - Ok(()) -} - -fn stop_handler(mut stop: Box, stop_tx: tokio::sync::mpsc::Sender) { - let send_result = match stop.recv().map_err(|err| eyre!(err)) { - Ok(_) => stop_tx.blocking_send(Event::Stop), - Err(err) => stop_tx.blocking_send(Event::ControlChannelError( - err.wrap_err("failed to receive on control channel"), - )), - }; - let _ = send_result; -} - -fn start_dataflow_handler( - mut start_dataflow: Box, - start_tx: tokio::sync::mpsc::WeakSender, -) { - loop { - let recv_result = start_dataflow.recv(); - let start_tx = match start_tx.upgrade() { - Some(tx) => tx, - None => { - // control channel was closed after receiving stop message - break; - } - }; - let message = match recv_result { - Ok(Some(message)) => message, - Ok(None) => break, - Err(err) => { - let send_result = start_tx.blocking_send(Event::ControlChannelError( - eyre!(err).wrap_err("failed to receive on start_dataflow topic"), - )); - match send_result { - Ok(()) => continue, - Err(_) => break, - } - } - }; - let data = message.get(); - let path = match std::str::from_utf8(&data) { - Ok(path) => Path::new(path), - Err(err) => { - let send_result = start_tx.blocking_send(Event::ParseError( - eyre!(err).wrap_err("failed to parse start_dataflow message"), - )); - match send_result { - Ok(()) => continue, - Err(_) => break, - } - } - }; - let _ = start_tx.blocking_send(Event::StartDataflow { - path: path.to_owned(), - }); - } + Ok(queryable.into_stream().map(Event::Control)) } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index ed366765..6da09456 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -1,8 +1,23 @@ -use eyre::WrapErr; +use crate::run::spawn_dataflow; +use dora_core::{ + config::CommunicationConfig, + topics::{ + self, StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START, + ZENOH_CONTROL_STOP, + }, +}; +use dora_node_api::communication; +use eyre::{bail, eyre, WrapErr}; use futures::StreamExt; use futures_concurrency::stream::Merge; -use std::path::{Path, PathBuf}; +use run::{await_tasks, SpawnedDataflow}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, +}; use tokio_stream::wrappers::ReceiverStream; +use uuid::Uuid; +use zenoh::prelude::Sample; mod control; mod run; @@ -33,7 +48,7 @@ pub async fn run(args: Args) -> eyre::Result<()> { match run_dataflow { Some(path) => { // start the given dataflow directly - run::run_dataflow(path.clone(), &runtime_path) + run::run_dataflow(&path, &runtime_path) .await .wrap_err_with(|| format!("failed to run dataflow at {}", path.display()))?; } @@ -47,62 +62,95 @@ pub async fn run(args: Args) -> eyre::Result<()> { } async fn start(runtime_path: &Path) -> eyre::Result<()> { - let (dataflow_errors_tx, dataflow_errors) = tokio::sync::mpsc::channel(2); - let mut dataflow_errors_tx = Some(dataflow_errors_tx); - let dataflow_error_events = ReceiverStream::new(dataflow_errors).map(Event::DataflowError); + let (dataflow_events_tx, dataflow_events) = tokio::sync::mpsc::channel(2); + let mut dataflow_events_tx = Some(dataflow_events_tx); + let dataflow_error_events = ReceiverStream::new(dataflow_events); - let control_events = control::control_events(); + let (control_events, control_events_abort) = futures::stream::abortable( + control::control_events() + .await + .wrap_err("failed to create control events")?, + ); let mut events = (dataflow_error_events, control_events).merge(); + let mut running_dataflows = HashMap::new(); + while let Some(event) = events.next().await { - let mut stop = false; match event { - Event::DataflowError(err) => { - tracing::error!("{err:?}"); - } - Event::ParseError(err) => { - let err = err.wrap_err("failed to parse message"); - tracing::error!("{err:?}"); - } - Event::ControlChannelError(err) => { - let err = err.wrap_err("Stopping because of fatal control channel error"); - tracing::error!("{err:?}"); - stop = true; - } - Event::StartDataflow { path } => { - let runtime_path = runtime_path.to_owned(); - let dataflow_errors_tx = match &dataflow_errors_tx { - Some(channel) => channel.clone(), - None => { - tracing::error!("cannot start new dataflow after receiving stop command"); - continue; - } - }; - let task = async move { - let result = run::run_dataflow(path.clone(), &runtime_path) - .await - .wrap_err_with(|| format!("failed to run dataflow at {}", path.display())); + Event::Dataflow { uuid, event } => match event { + DataflowEvent::Finished { result } => { + running_dataflows.remove(&uuid); match result { - Ok(()) => {} + Ok(()) => { + tracing::info!("dataflow `{uuid}` finished successully"); + } Err(err) => { - let _ = dataflow_errors_tx.send(err).await; + let err = err.wrap_err(format!("error occured in dataflow `{uuid}`")); + tracing::error!("{err:?}"); } } - }; - tokio::spawn(task); - } - Event::Stop => { - tracing::info!("Received stop command"); - stop = true; - } - } + } + }, + + Event::Control(query) => match query.key_selector().as_str() { + ZENOH_CONTROL_START => { + let dataflow_path = query.value_selector(); + let result = + start_dataflow(Path::new(dataflow_path), runtime_path, &dataflow_events_tx) + .await; + let reply = match result { + Ok((uuid, communication_config)) => { + running_dataflows.insert(uuid, communication_config); + StartDataflowResult::Ok { + uuid: uuid.to_string(), + } + } + Err(err) => { + tracing::error!("{err:?}"); + StartDataflowResult::Error(format!("{err:?}")) + } + }; + query + .reply_async(Sample::new("", serde_json::to_string(&reply).unwrap())) + .await; + } + ZENOH_CONTROL_STOP => { + let stop = async { + let uuid = + Uuid::parse_str(query.value_selector()).wrap_err("not a valid UUID")?; + stop_dataflow(&running_dataflows, uuid).await?; + + Result::<_, eyre::Report>::Ok(()) + }; + let reply = match stop.await { + Ok(()) => StopDataflowResult::Ok, + Err(err) => StopDataflowResult::Error(format!("{err:?}")), + }; + + query + .reply_async(Sample::new("", serde_json::to_string(&reply).unwrap())) + .await; + } + ZENOH_CONTROL_DESTROY => { + tracing::info!("Received stop command"); + + control_events_abort.abort(); - if stop { - tracing::info!("stopping..."); + // ensure that no new dataflows can be started + dataflow_events_tx = None; - // ensure that no new dataflows can be started - dataflow_errors_tx = None; + // stop all running dataflows + for &uuid in running_dataflows.keys() { + stop_dataflow(&running_dataflows, uuid).await?; + } + + query.reply_async(Sample::new("", "")).await; + } + _ => { + query.reply_async(Sample::new("error", "invalid")).await; + } + }, } } @@ -111,10 +159,69 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { Ok(()) } +async fn stop_dataflow( + running_dataflows: &HashMap, + uuid: Uuid, +) -> eyre::Result<()> { + let communication_config = match running_dataflows.get(&uuid) { + Some(config) => config.clone(), + None => bail!("No running dataflow found with UUID `{uuid}`"), + }; + let mut communication = + tokio::task::spawn_blocking(move || communication::init(&communication_config)) + .await + .wrap_err("failed to join communication layer init task")? + .wrap_err("failed to init communication layer")?; + tracing::info!("sending stop message to dataflow `{uuid}`"); + tokio::task::spawn_blocking(move || { + let metadata = dora_message::Metadata::default(); + let data = metadata.serialize().unwrap(); + communication.publisher(topics::MANUAL_STOP)?.publish(&data) + }) + .await + .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre!(err)) + .wrap_err("failed to send stop message")?; + Ok(()) +} + +async fn start_dataflow( + path: &Path, + runtime_path: &Path, + dataflow_events_tx: &Option>, +) -> eyre::Result<(Uuid, CommunicationConfig)> { + let runtime_path = runtime_path.to_owned(); + let dataflow_events_tx = match dataflow_events_tx { + Some(channel) => channel.clone(), + None => bail!("cannot start new dataflow after receiving stop command"), + }; + let SpawnedDataflow { + uuid, + communication_config, + tasks, + } = spawn_dataflow(&runtime_path, path).await?; + let path = path.to_owned(); + let task = async move { + let result = await_tasks(tasks) + .await + .wrap_err_with(|| format!("failed to run dataflow at {}", path.display())); + + let _ = dataflow_events_tx + .send(Event::Dataflow { + uuid, + event: DataflowEvent::Finished { result }, + }) + .await; + }; + tokio::spawn(task); + Ok((uuid, communication_config)) +} + enum Event { - DataflowError(eyre::Report), - ControlChannelError(eyre::Report), - StartDataflow { path: PathBuf }, - ParseError(eyre::Report), - Stop, + Dataflow { uuid: Uuid, event: DataflowEvent }, + Control(zenoh::queryable::Query), +} + +enum DataflowEvent { + Finished { result: eyre::Result<()> }, } diff --git a/binaries/coordinator/src/run/custom.rs b/binaries/coordinator/src/run/custom.rs index 64c11a59..e3411353 100644 --- a/binaries/coordinator/src/run/custom.rs +++ b/binaries/coordinator/src/run/custom.rs @@ -1,6 +1,5 @@ use super::command_init_common_env; -use dora_core::descriptor; -use dora_node_api::config::NodeId; +use dora_core::{config::NodeId, descriptor}; use eyre::{eyre, WrapErr}; use std::{env::consts::EXE_EXTENSION, path::Path}; @@ -8,7 +7,7 @@ use std::{env::consts::EXE_EXTENSION, path::Path}; pub(super) fn spawn_custom_node( node_id: NodeId, node: &descriptor::CustomNode, - communication: &dora_node_api::config::CommunicationConfig, + communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { let mut args = node.run.split_ascii_whitespace(); diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 5afea3ca..1851944d 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -1,42 +1,46 @@ use self::{custom::spawn_custom_node, runtime::spawn_runtime_node}; -use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor, NodeId}; -use dora_node_api::{communication, config::format_duration}; +use dora_core::{ + config::{format_duration, CommunicationConfig, NodeId}, + descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor}, +}; +use dora_node_api::communication; use eyre::{bail, eyre, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; -use std::{ - env::consts::EXE_EXTENSION, - path::{Path, PathBuf}, -}; +use std::{env::consts::EXE_EXTENSION, path::Path}; use tokio_stream::wrappers::IntervalStream; +use uuid::Uuid; mod custom; mod runtime; -pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> { +pub async fn run_dataflow(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { + let tasks = spawn_dataflow(runtime, dataflow_path).await?.tasks; + await_tasks(tasks).await +} + +pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Result { let mut runtime = runtime.with_extension(EXE_EXTENSION); - let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| { + let descriptor = read_descriptor(dataflow_path).await.wrap_err_with(|| { format!( "failed to read dataflow descriptor at {}", dataflow_path.display() ) })?; - let working_dir = dataflow_path .canonicalize() .context("failed to canoncialize dataflow path")? .parent() .ok_or_else(|| eyre!("canonicalized dataflow path has no parent"))? .to_owned(); - let nodes = descriptor.resolve_aliases(); let dora_timers = collect_dora_timers(&nodes); + let uuid = Uuid::new_v4(); let communication_config = { let mut config = descriptor.communication; // add uuid as prefix to ensure isolation - config.add_topic_prefix(&uuid::Uuid::new_v4().to_string()); + config.add_topic_prefix(&uuid.to_string()); config }; - if nodes .iter() .any(|n| matches!(n.kind, CoreNodeKind::Runtime(_))) @@ -54,8 +58,7 @@ pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Resul } } } - - let mut tasks = FuturesUnordered::new(); + let tasks = FuturesUnordered::new(); for node in nodes { let node_id = node.id.clone(); @@ -81,7 +84,6 @@ pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Resul } } } - for interval in dora_timers { let communication_config = communication_config.clone(); let mut communication = @@ -107,13 +109,27 @@ pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Resul } }); } + Ok(SpawnedDataflow { + tasks, + communication_config, + uuid, + }) +} +pub struct SpawnedDataflow { + pub uuid: Uuid, + pub communication_config: CommunicationConfig, + pub tasks: FuturesUnordered>>, +} + +pub async fn await_tasks( + mut tasks: FuturesUnordered>>, +) -> eyre::Result<()> { while let Some(task_result) = tasks.next().await { task_result .wrap_err("failed to join async task")? .wrap_err("custom node failed")?; } - Ok(()) } @@ -129,7 +145,7 @@ async fn read_descriptor(file: &Path) -> Result { fn command_init_common_env( command: &mut tokio::process::Command, node_id: &NodeId, - communication: &dora_node_api::config::CommunicationConfig, + communication: &dora_core::config::CommunicationConfig, ) -> Result<(), eyre::Error> { command.env( "DORA_NODE_ID", diff --git a/binaries/coordinator/src/run/runtime.rs b/binaries/coordinator/src/run/runtime.rs index 4f7d7082..a0bf2797 100644 --- a/binaries/coordinator/src/run/runtime.rs +++ b/binaries/coordinator/src/run/runtime.rs @@ -1,6 +1,5 @@ use super::command_init_common_env; -use dora_core::descriptor; -use dora_node_api::config::NodeId; +use dora_core::{config::NodeId, descriptor}; use eyre::{eyre, WrapErr}; use std::path::Path; @@ -9,7 +8,7 @@ pub fn spawn_runtime_node( runtime: &Path, node_id: NodeId, node: &descriptor::RuntimeNode, - communication: &dora_node_api::config::CommunicationConfig, + communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { let mut command = tokio::process::Command::new(runtime); diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index fda84724..9253e798 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -1,10 +1,12 @@ #![warn(unsafe_op_in_unsafe_fn)] -use dora_core::descriptor::OperatorDefinition; +use dora_core::{ + config::{CommunicationConfig, DataId, NodeId, OperatorId}, + descriptor::OperatorDefinition, +}; use dora_node_api::{ self, communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, - config::{CommunicationConfig, DataId, NodeId, OperatorId}, }; use eyre::{bail, Context}; use futures::{Stream, StreamExt}; diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 036ef926..f7801df6 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -1,8 +1,8 @@ -use dora_core::descriptor::{OperatorDefinition, OperatorSource}; -use dora_node_api::{ - communication::{self, CommunicationLayer}, +use dora_core::{ config::NodeId, + descriptor::{OperatorDefinition, OperatorSource}, }; +use dora_node_api::communication::{self, CommunicationLayer}; use eyre::Context; #[cfg(feature = "tracing")] use opentelemetry::sdk::trace::Tracer; diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index db4d5554..03362a4a 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,8 +1,9 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] use super::{OperatorEvent, Tracer}; +use dora_core::config::DataId; use dora_message::uhlc; -use dora_node_api::{communication::Publisher, config::DataId}; +use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; use eyre::{bail, eyre, Context}; use pyo3::{ diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 9c567bb7..8a399995 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,7 +1,7 @@ use super::{OperatorEvent, Tracer}; -use dora_core::adjust_shared_library_path; +use dora_core::{adjust_shared_library_path, config::DataId}; use dora_message::uhlc; -use dora_node_api::{communication::Publisher, config::DataId}; +use dora_node_api::communication::Publisher; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput, DoraResult, DoraStatus, Metadata, OnInputResult, Output, SendOutput, diff --git a/examples/iceoryx/node/src/main.rs b/examples/iceoryx/node/src/main.rs index c8f8b422..03692e9d 100644 --- a/examples/iceoryx/node/src/main.rs +++ b/examples/iceoryx/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, config::DataId, DoraNode}; +use dora_node_api::{self, core::config::DataId, DoraNode}; fn main() -> eyre::Result<()> { let output = DataId::from("random".to_owned()); diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index c8f8b422..03692e9d 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, config::DataId, DoraNode}; +use dora_node_api::{self, core::config::DataId, DoraNode}; fn main() -> eyre::Result<()> { let output = DataId::from("random".to_owned()); diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 793bc8ef..af7fe3fe 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -7,7 +7,8 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../apis/rust/node" } eyre = "0.6.8" serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.9.11" +once_cell = "1.13.0" +zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" } diff --git a/apis/rust/node/src/config.rs b/libraries/core/src/config.rs similarity index 99% rename from apis/rust/node/src/config.rs rename to libraries/core/src/config.rs index 201d6341..2a880802 100644 --- a/apis/rust/node/src/config.rs +++ b/libraries/core/src/config.rs @@ -1,4 +1,3 @@ -use communication_layer_pub_sub::zenoh::zenoh_config; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use std::{ @@ -10,15 +9,6 @@ use std::{ time::Duration, }; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] -pub struct NodeRunConfig { - #[serde(default)] - pub inputs: BTreeMap, - #[serde(default)] - pub outputs: BTreeSet, -} - #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] pub struct NodeId(String); @@ -160,22 +150,6 @@ impl fmt::Display for InputMapping { } } -pub struct FormattedDuration(pub Duration); - -impl fmt::Display for FormattedDuration { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if self.0.subsec_millis() == 0 { - write!(f, "secs/{}", self.0.as_secs()) - } else { - write!(f, "millis/{}", self.0.as_millis()) - } - } -} - -pub fn format_duration(interval: Duration) -> FormattedDuration { - FormattedDuration(interval) -} - impl Serialize for InputMapping { fn serialize(&self, serializer: S) -> Result where @@ -262,6 +236,31 @@ pub struct UserInputMapping { pub output: DataId, } +pub struct FormattedDuration(pub Duration); + +impl fmt::Display for FormattedDuration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.0.subsec_millis() == 0 { + write!(f, "secs/{}", self.0.as_secs()) + } else { + write!(f, "millis/{}", self.0.as_millis()) + } + } +} + +pub fn format_duration(interval: Duration) -> FormattedDuration { + FormattedDuration(interval) +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct NodeRunConfig { + #[serde(default)] + pub inputs: BTreeMap, + #[serde(default)] + pub outputs: BTreeSet, +} + #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub enum CommunicationConfig { diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 4d8ffb77..664184db 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,5 +1,3 @@ -use dora_node_api::config::{CommunicationConfig, NodeRunConfig}; -pub use dora_node_api::config::{DataId, InputMapping, NodeId, OperatorId, UserInputMapping}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; @@ -9,6 +7,8 @@ use std::{ }; pub use visualize::collect_dora_timers; +use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; + mod visualize; #[derive(Debug, Serialize, Deserialize)] diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index a3e25643..fac88464 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -1,5 +1,5 @@ use super::{CoreNodeKind, CustomNode, OperatorDefinition, ResolvedNode, RuntimeNode}; -use dora_node_api::config::{format_duration, DataId, InputMapping, NodeId, UserInputMapping}; +use crate::config::{format_duration, DataId, InputMapping, NodeId, UserInputMapping}; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, fmt::Write as _, diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index 74220733..997a3204 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -4,6 +4,7 @@ use std::{ path::Path, }; +pub mod config; pub mod descriptor; pub mod topics; diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index cc4a382e..8266f267 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,3 +1,18 @@ -pub const ZENOH_CONTROL_PREFIX: &str = "dora_control"; -pub const ZENOH_CONTROL_STOP_ALL: &str = "stop_all"; -pub const ZENOH_CONTROL_START_DATAFLOW: &str = "start_dataflow"; +pub const MANUAL_STOP: &str = "dora/stop"; + +pub const ZENOH_CONTROL_QUERYABLE: &str = "dora_control/*"; +pub const ZENOH_CONTROL_START: &str = "dora_control/start"; +pub const ZENOH_CONTROL_STOP: &str = "dora_control/stop"; +pub const ZENOH_CONTROL_DESTROY: &str = "dora_control/destroy"; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum StartDataflowResult { + Ok { uuid: String }, + Error(String), +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum StopDataflowResult { + Ok, + Error(String), +}