From 963f39fa99a93dca5ebb66be54e734031d6044d7 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 11 Oct 2022 18:48:11 +0200 Subject: [PATCH] Implement cli `destroy` command using zenoh-based control channel --- Cargo.lock | 79 +++++--- binaries/cli/Cargo.toml | 3 + binaries/cli/src/main.rs | 25 ++- binaries/coordinator/Cargo.toml | 5 +- binaries/coordinator/src/control.rs | 112 +++++++++++ binaries/coordinator/src/lib.rs | 253 ++++-------------------- binaries/coordinator/src/run/custom.rs | 60 ++++++ binaries/coordinator/src/run/mod.rs | 123 ++++++++++++ binaries/coordinator/src/run/runtime.rs | 39 ++++ libraries/core/src/lib.rs | 4 +- libraries/core/src/topics.rs | 3 + 11 files changed, 459 insertions(+), 247 deletions(-) create mode 100644 binaries/coordinator/src/control.rs create mode 100644 binaries/coordinator/src/run/custom.rs create mode 100644 binaries/coordinator/src/run/mod.rs create mode 100644 binaries/coordinator/src/run/runtime.rs create mode 100644 libraries/core/src/topics.rs diff --git a/Cargo.lock b/Cargo.lock index 6e1c7e5b..ca2fb719 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -886,6 +886,7 @@ name = "dora-cli" version = "0.1.0" dependencies = [ "clap 4.0.3", + "communication-layer-pub-sub", "dora-core", "eyre", "serde_yaml 0.9.11", @@ -899,6 +900,7 @@ version = "0.1.0" dependencies = [ "bincode", "clap 3.2.20", + "communication-layer-pub-sub", "dora-core", "dora-message", "dora-node-api", @@ -1886,25 +1888,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.2" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" +checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" dependencies = [ "libc", "log", - "miow", - "ntapi", "wasi 0.11.0+wasi-snapshot-preview1", - "winapi", -] - -[[package]] -name = "miow" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" -dependencies = [ - "winapi", + "windows-sys 0.36.1", ] [[package]] @@ -2351,7 +2342,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-sys", + "windows-sys 0.32.0", ] [[package]] @@ -3570,9 +3561,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.20.1" +version = "1.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a8325f63a7d4774dd041e363b2409ed1c5cbbd0f867795e661df066b2b0a581" +checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" dependencies = [ "autocfg 1.1.0", "bytes", @@ -3580,7 +3571,6 @@ dependencies = [ "memchr", "mio", "num_cpus", - "once_cell", "parking_lot", "pin-project-lite", "signal-hook-registry", @@ -4232,11 +4222,24 @@ version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6" dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", + "windows_aarch64_msvc 0.32.0", + "windows_i686_gnu 0.32.0", + "windows_i686_msvc 0.32.0", + "windows_x86_64_gnu 0.32.0", + "windows_x86_64_msvc 0.32.0", +] + +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc 0.36.1", + "windows_i686_gnu 0.36.1", + "windows_i686_msvc 0.36.1", + "windows_x86_64_gnu 0.36.1", + "windows_x86_64_msvc 0.36.1", ] [[package]] @@ -4245,30 +4248,60 @@ version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + [[package]] name = "windows_i686_gnu" version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + [[package]] name = "windows_i686_msvc" version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + [[package]] name = "windows_x86_64_gnu" version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + [[package]] name = "windows_x86_64_msvc" version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" +[[package]] +name = "windows_x86_64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" + [[package]] name = "with_builtin_macros" version = "0.0.3" diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 485b22f5..befc7c7b 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -16,3 +16,6 @@ dora-core = { path = "../../libraries/core" } serde_yaml = "0.9.11" tempfile = "3.3.0" webbrowser = "0.8.0" +communication-layer-pub-sub = { path = "../../libraries/communication-layer", default-features = false, features = [ + "zenoh", +] } diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 583f402f..130e4609 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,5 +1,7 @@ use clap::Parser; -use eyre::Context; +use communication_layer_pub_sub::CommunicationLayer; +use dora_core::topics::{ZENOH_CONTROL_PREFIX, ZENOH_CONTROL_STOP_ALL}; +use eyre::{eyre, Context}; use std::{io::Write, path::PathBuf}; use tempfile::NamedTempFile; @@ -36,6 +38,8 @@ enum Command { args: CommandNew, }, Dashboard, + Up, + Destroy, Start, Stop, Logs, @@ -73,6 +77,14 @@ enum Lang { fn main() -> eyre::Result<()> { let args = Args::parse(); + let mut zenoh_control_session = + communication_layer_pub_sub::zenoh::ZenohCommunicationLayer::init( + Default::default(), + ZENOH_CONTROL_PREFIX.into(), + ) + .map_err(|err| eyre!(err)) + .wrap_err("failed to open zenoh control session")?; + match args.command { Command::Check { dataflow, @@ -113,8 +125,19 @@ fn main() -> eyre::Result<()> { } Command::New { args } => template::create(args)?, Command::Dashboard => todo!(), + Command::Up => todo!(), Command::Start => todo!(), Command::Stop => todo!(), + Command::Destroy => { + let publisher = zenoh_control_session + .publisher(ZENOH_CONTROL_STOP_ALL) + .map_err(|err| eyre!(err)) + .wrap_err("failed to create publisher for stop message")?; + publisher + .publish(&[]) + .map_err(|err| eyre!(err)) + .wrap_err("failed to publish stop message")?; + } Command::Logs => todo!(), Command::Metrics => todo!(), Command::Stats => todo!(), diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index ed9ac418..04731d68 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -13,7 +13,7 @@ eyre = "0.6.7" futures = "0.3.21" serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.8.23" -tokio = { version = "1.17.0", features = ["full"] } +tokio = { version = "1.21.2", features = ["full"] } tokio-stream = { version = "0.1.8", features = ["io-util"] } tokio-util = { version = "0.7.1", features = ["codec"] } clap = { version = "3.1.8", features = ["derive"] } @@ -25,3 +25,6 @@ dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" futures-concurrency = "5.0.1" +communication-layer-pub-sub = { path = "../../libraries/communication-layer", default-features = false, features = [ + "zenoh", +] } diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs new file mode 100644 index 00000000..9dd28029 --- /dev/null +++ b/binaries/coordinator/src/control.rs @@ -0,0 +1,112 @@ +use crate::Event; +use communication_layer_pub_sub::{CommunicationLayer, Subscriber}; +use dora_core::topics::{ + ZENOH_CONTROL_PREFIX, ZENOH_CONTROL_START_DATAFLOW, ZENOH_CONTROL_STOP_ALL, +}; +use eyre::{eyre, WrapErr}; +use futures::{Stream, StreamExt}; +use std::path::Path; +use tokio_stream::wrappers::ReceiverStream; + +pub(crate) fn control_events() -> impl Stream { + let (tx, rx) = tokio::sync::mpsc::channel(2); + + tokio::task::spawn_blocking(move || { + let result = subscribe_control_sync(tx.clone()); + match result { + Ok(()) => {} + Err(error) => { + let _ = tx.blocking_send(Event::ControlChannelError(error)); + } + } + }); + + ReceiverStream::new(rx).chain(futures::stream::iter(std::iter::from_fn(|| { + tracing::info!("control channel closed"); + None + }))) +} + +fn subscribe_control_sync(tx: tokio::sync::mpsc::Sender) -> eyre::Result<()> { + let mut zenoh_control_session = + communication_layer_pub_sub::zenoh::ZenohCommunicationLayer::init( + Default::default(), + ZENOH_CONTROL_PREFIX.into(), + ) + .map_err(|err| eyre!(err)) + .wrap_err("failed to open zenoh control session")?; + + let start_dataflow = zenoh_control_session + .subscribe(ZENOH_CONTROL_START_DATAFLOW) + .map_err(|err| eyre!(err)) + .wrap_err("failed to subscribe to start dataflow topic")?; + let start_tx = tx.downgrade(); + let _start_dataflow_thread = + std::thread::spawn(move || start_dataflow_handler(start_dataflow, start_tx)); + + let stop_tx = tx; + let stop = zenoh_control_session + .subscribe(ZENOH_CONTROL_STOP_ALL) + .map_err(|err| eyre!(err)) + .wrap_err("failed to subscribe to stop all topic")?; + let _stop_thread = tokio::task::spawn_blocking(move || { + stop_handler(stop, stop_tx); + }); + + Ok(()) +} + +fn stop_handler(mut stop: Box, stop_tx: tokio::sync::mpsc::Sender) { + let send_result = match stop.recv().map_err(|err| eyre!(err)) { + Ok(_) => stop_tx.blocking_send(Event::Stop), + Err(err) => stop_tx.blocking_send(Event::ControlChannelError( + err.wrap_err("failed to receive on control channel"), + )), + }; + let _ = send_result; +} + +fn start_dataflow_handler( + mut start_dataflow: Box, + start_tx: tokio::sync::mpsc::WeakSender, +) { + loop { + let recv_result = start_dataflow.recv(); + let start_tx = match start_tx.upgrade() { + Some(tx) => tx, + None => { + // control channel was closed after receiving stop message + break; + } + }; + let message = match recv_result { + Ok(Some(message)) => message, + Ok(None) => break, + Err(err) => { + let send_result = start_tx.blocking_send(Event::ControlChannelError( + eyre!(err).wrap_err("failed to receive on start_dataflow topic"), + )); + match send_result { + Ok(()) => continue, + Err(_) => break, + } + } + }; + let data = message.get(); + let path = match std::str::from_utf8(&data) { + Ok(path) => Path::new(path), + Err(err) => { + let send_result = start_tx.blocking_send(Event::ParseError( + eyre!(err).wrap_err("failed to parse start_dataflow message"), + )); + match send_result { + Ok(()) => continue, + Err(_) => break, + } + } + }; + let _ = start_tx.blocking_send(Event::StartDataflow { + path: path.to_owned(), + }); + } +} diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 6de19cfb..3657eb82 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -1,17 +1,11 @@ -use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor}; -use dora_node_api::{ - communication, - config::{format_duration, NodeId}, -}; -use eyre::{bail, eyre, WrapErr}; -use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use eyre::WrapErr; +use futures::StreamExt; use futures_concurrency::stream::Merge; -use std::{ - env::consts::EXE_EXTENSION, - path::{Path, PathBuf}, - time::Duration, -}; -use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; +use std::path::{Path, PathBuf}; +use tokio_stream::wrappers::ReceiverStream; + +mod control; +mod run; #[derive(Debug, Clone, clap::Parser)] #[clap(about = "Dora coordinator")] @@ -37,7 +31,7 @@ pub async fn run(args: Args) -> eyre::Result<()> { match run_dataflow { Some(path) => { // start the given dataflow directly - self::run_dataflow(path.clone(), &runtime_path) + run::run_dataflow(path.clone(), &runtime_path) .await .wrap_err_with(|| format!("failed to run dataflow at {}", path.display()))?; } @@ -55,17 +49,25 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { let mut dataflow_errors_tx = Some(dataflow_errors_tx); let dataflow_error_events = ReceiverStream::new(dataflow_errors).map(Event::DataflowError); - let stop_events = tokio::time::sleep(Duration::from_secs(5)) - .into_stream() - .map(|()| Event::Stop); + let control_events = control::control_events(); - let mut events = (dataflow_error_events, stop_events).merge(); + let mut events = (dataflow_error_events, control_events).merge(); while let Some(event) = events.next().await { + let mut stop = false; match event { Event::DataflowError(err) => { tracing::error!("{err:?}"); } + Event::ParseError(err) => { + let err = err.wrap_err("failed to parse message"); + tracing::error!("{err:?}"); + } + Event::ControlChannelError(err) => { + let err = err.wrap_err("Stopping because of fatal control channel error"); + tracing::error!("{err:?}"); + stop = true; + } Event::StartDataflow { path } => { let runtime_path = runtime_path.to_owned(); let dataflow_errors_tx = match &dataflow_errors_tx { @@ -76,7 +78,7 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { } }; let task = async move { - let result = run_dataflow(path.clone(), &runtime_path) + let result = run::run_dataflow(path.clone(), &runtime_path) .await .wrap_err_with(|| format!("failed to run dataflow at {}", path.display())); match result { @@ -90,216 +92,27 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { } Event::Stop => { tracing::info!("Received stop command"); - // ensure that no new dataflows can be started - dataflow_errors_tx = None; + stop = true; } } - } - - Ok(()) -} - -enum Event { - DataflowError(eyre::Report), - StartDataflow { path: PathBuf }, - Stop, -} - -async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> { - let runtime = runtime.with_extension(EXE_EXTENSION); - let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| { - format!( - "failed to read dataflow descriptor at {}", - dataflow_path.display() - ) - })?; - - let nodes = descriptor.resolve_aliases(); - let dora_timers = collect_dora_timers(&nodes); - let communication_config = { - let mut config = descriptor.communication; - // add uuid as prefix to ensure isolation - config.add_topic_prefix(&uuid::Uuid::new_v4().to_string()); - config - }; - - if nodes - .iter() - .any(|n| matches!(n.kind, CoreNodeKind::Runtime(_))) - && !runtime.is_file() - { - bail!( - "There is no runtime at {}, or it is not a file", - runtime.display() - ); - } - let mut tasks = FuturesUnordered::new(); - for node in nodes { - let node_id = node.id.clone(); + if stop { + tracing::info!("stopping..."); - match node.kind { - descriptor::CoreNodeKind::Custom(node) => { - let result = spawn_custom_node(node_id.clone(), &node, &communication_config) - .wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?; - tasks.push(result); - } - descriptor::CoreNodeKind::Runtime(node) => { - if !node.operators.is_empty() { - let result = - spawn_runtime_node(&runtime, node_id.clone(), &node, &communication_config) - .wrap_err_with(|| format!("failed to spawn runtime node {node_id}"))?; - tasks.push(result); - } - } + // ensure that no new dataflows can be started + dataflow_errors_tx = None; } } - for interval in dora_timers { - let communication_config = communication_config.clone(); - let mut communication = - tokio::task::spawn_blocking(move || communication::init(&communication_config)) - .await - .wrap_err("failed to join communication layer init task")? - .wrap_err("failed to init communication layer")?; - tokio::spawn(async move { - let topic = { - let duration = format_duration(interval); - format!("dora/timer/{duration}") - }; - let metadata = dora_message::Metadata::default(); - let data = metadata.serialize().unwrap(); - let mut stream = IntervalStream::new(tokio::time::interval(interval)); - while (stream.next().await).is_some() { - communication - .publisher(&topic) - .unwrap() - .publish(&data) - .expect("failed to publish timer tick message"); - } - }); - } - - while let Some(task_result) = tasks.next().await { - task_result - .wrap_err("failed to join async task")? - .wrap_err("custom node failed")?; - } + tracing::info!("stopped"); Ok(()) } -#[tracing::instrument] -fn spawn_custom_node( - node_id: NodeId, - node: &descriptor::CustomNode, - communication: &dora_node_api::config::CommunicationConfig, -) -> eyre::Result>> { - let mut args = node.run.split_ascii_whitespace(); - let cmd = { - let raw = Path::new( - args.next() - .ok_or_else(|| eyre!("`run` field must not be empty"))?, - ); - let path = if raw.extension().is_none() { - raw.with_extension(EXE_EXTENSION) - } else { - raw.to_owned() - }; - path.canonicalize() - .wrap_err_with(|| format!("no node exists at `{}`", path.display()))? - }; - - let mut command = tokio::process::Command::new(cmd); - command.args(args); - command_init_common_env(&mut command, &node_id, communication)?; - command.env( - "DORA_NODE_RUN_CONFIG", - serde_yaml::to_string(&node.run_config) - .wrap_err("failed to serialize custom node run config")?, - ); - - // Injecting the env variable defined in the `yaml` into - // the node runtime. - if let Some(envs) = &node.env { - for (key, value) in envs { - command.env(key, value.to_string()); - } - } - - let mut child = command - .spawn() - .wrap_err_with(|| format!("failed to run command `{}`", &node.run))?; - let result = tokio::spawn(async move { - let status = child.wait().await.context("child process failed")?; - if status.success() { - tracing::info!("node {node_id} finished"); - Ok(()) - } else if let Some(code) = status.code() { - Err(eyre!("node {node_id} failed with exit code: {code}")) - } else { - Err(eyre!("node {node_id} failed (unknown exit code)")) - } - }); - Ok(result) -} - -#[tracing::instrument(skip(node))] -fn spawn_runtime_node( - runtime: &Path, - node_id: NodeId, - node: &descriptor::RuntimeNode, - communication: &dora_node_api::config::CommunicationConfig, -) -> eyre::Result>> { - let mut command = tokio::process::Command::new(runtime); - command_init_common_env(&mut command, &node_id, communication)?; - command.env( - "DORA_OPERATORS", - serde_yaml::to_string(&node.operators) - .wrap_err("failed to serialize custom node run config")?, - ); - - let mut child = command - .spawn() - .wrap_err_with(|| format!("failed to run runtime at `{}`", runtime.display()))?; - let result = tokio::spawn(async move { - let status = child.wait().await.context("child process failed")?; - if status.success() { - tracing::info!("runtime node {node_id} finished"); - Ok(()) - } else if let Some(code) = status.code() { - Err(eyre!( - "runtime node {node_id} failed with exit code: {code}" - )) - } else { - Err(eyre!("runtime node {node_id} failed (unknown exit code)")) - } - }); - Ok(result) -} - -fn command_init_common_env( - command: &mut tokio::process::Command, - node_id: &NodeId, - communication: &dora_node_api::config::CommunicationConfig, -) -> Result<(), eyre::Error> { - command.env( - "DORA_NODE_ID", - serde_yaml::to_string(&node_id).wrap_err("failed to serialize custom node ID")?, - ); - command.env( - "DORA_COMMUNICATION_CONFIG", - serde_yaml::to_string(communication) - .wrap_err("failed to serialize communication config")?, - ); - Ok(()) -} - -async fn read_descriptor(file: &Path) -> Result { - let descriptor_file = tokio::fs::read(file) - .await - .context("failed to open given file")?; - let descriptor: Descriptor = - serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?; - Ok(descriptor) +enum Event { + DataflowError(eyre::Report), + ControlChannelError(eyre::Report), + StartDataflow { path: PathBuf }, + ParseError(eyre::Report), + Stop, } diff --git a/binaries/coordinator/src/run/custom.rs b/binaries/coordinator/src/run/custom.rs new file mode 100644 index 00000000..ec2d6120 --- /dev/null +++ b/binaries/coordinator/src/run/custom.rs @@ -0,0 +1,60 @@ +use super::command_init_common_env; +use dora_core::descriptor; +use dora_node_api::config::NodeId; +use eyre::{eyre, WrapErr}; +use std::{env::consts::EXE_EXTENSION, path::Path}; + +#[tracing::instrument] +pub(super) fn spawn_custom_node( + node_id: NodeId, + node: &descriptor::CustomNode, + communication: &dora_node_api::config::CommunicationConfig, +) -> eyre::Result>> { + let mut args = node.run.split_ascii_whitespace(); + let cmd = { + let raw = Path::new( + args.next() + .ok_or_else(|| eyre!("`run` field must not be empty"))?, + ); + let path = if raw.extension().is_none() { + raw.with_extension(EXE_EXTENSION) + } else { + raw.to_owned() + }; + path.canonicalize() + .wrap_err_with(|| format!("no node exists at `{}`", path.display()))? + }; + + let mut command = tokio::process::Command::new(cmd); + command.args(args); + command_init_common_env(&mut command, &node_id, communication)?; + command.env( + "DORA_NODE_RUN_CONFIG", + serde_yaml::to_string(&node.run_config) + .wrap_err("failed to serialize custom node run config")?, + ); + + // Injecting the env variable defined in the `yaml` into + // the node runtime. + if let Some(envs) = &node.env { + for (key, value) in envs { + command.env(key, value.to_string()); + } + } + + let mut child = command + .spawn() + .wrap_err_with(|| format!("failed to run command `{}`", &node.run))?; + let result = tokio::spawn(async move { + let status = child.wait().await.context("child process failed")?; + if status.success() { + tracing::info!("node {node_id} finished"); + Ok(()) + } else if let Some(code) = status.code() { + Err(eyre!("node {node_id} failed with exit code: {code}")) + } else { + Err(eyre!("node {node_id} failed (unknown exit code)")) + } + }); + Ok(result) +} diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs new file mode 100644 index 00000000..6e0f5e1c --- /dev/null +++ b/binaries/coordinator/src/run/mod.rs @@ -0,0 +1,123 @@ +use self::{custom::spawn_custom_node, runtime::spawn_runtime_node}; +use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor, NodeId}; +use dora_node_api::{communication, config::format_duration}; +use eyre::{bail, WrapErr}; +use futures::{stream::FuturesUnordered, StreamExt}; +use std::{ + env::consts::EXE_EXTENSION, + path::{Path, PathBuf}, +}; +use tokio_stream::wrappers::IntervalStream; + +mod custom; +mod runtime; + +pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> { + let runtime = runtime.with_extension(EXE_EXTENSION); + let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| { + format!( + "failed to read dataflow descriptor at {}", + dataflow_path.display() + ) + })?; + + let nodes = descriptor.resolve_aliases(); + let dora_timers = collect_dora_timers(&nodes); + let communication_config = { + let mut config = descriptor.communication; + // add uuid as prefix to ensure isolation + config.add_topic_prefix(&uuid::Uuid::new_v4().to_string()); + config + }; + + if nodes + .iter() + .any(|n| matches!(n.kind, CoreNodeKind::Runtime(_))) + && !runtime.is_file() + { + bail!( + "There is no runtime at {}, or it is not a file", + runtime.display() + ); + } + + let mut tasks = FuturesUnordered::new(); + for node in nodes { + let node_id = node.id.clone(); + + match node.kind { + descriptor::CoreNodeKind::Custom(node) => { + let result = spawn_custom_node(node_id.clone(), &node, &communication_config) + .wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?; + tasks.push(result); + } + descriptor::CoreNodeKind::Runtime(node) => { + if !node.operators.is_empty() { + let result = + spawn_runtime_node(&runtime, node_id.clone(), &node, &communication_config) + .wrap_err_with(|| format!("failed to spawn runtime node {node_id}"))?; + tasks.push(result); + } + } + } + } + + for interval in dora_timers { + let communication_config = communication_config.clone(); + let mut communication = + tokio::task::spawn_blocking(move || communication::init(&communication_config)) + .await + .wrap_err("failed to join communication layer init task")? + .wrap_err("failed to init communication layer")?; + tokio::spawn(async move { + let topic = { + let duration = format_duration(interval); + format!("dora/timer/{duration}") + }; + let metadata = dora_message::Metadata::default(); + let data = metadata.serialize().unwrap(); + let mut stream = IntervalStream::new(tokio::time::interval(interval)); + while (stream.next().await).is_some() { + communication + .publisher(&topic) + .unwrap() + .publish(&data) + .expect("failed to publish timer tick message"); + } + }); + } + + while let Some(task_result) = tasks.next().await { + task_result + .wrap_err("failed to join async task")? + .wrap_err("custom node failed")?; + } + + Ok(()) +} + +async fn read_descriptor(file: &Path) -> Result { + let descriptor_file = tokio::fs::read(file) + .await + .context("failed to open given file")?; + let descriptor: Descriptor = + serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?; + Ok(descriptor) +} + +fn command_init_common_env( + command: &mut tokio::process::Command, + node_id: &NodeId, + communication: &dora_node_api::config::CommunicationConfig, +) -> Result<(), eyre::Error> { + command.env( + "DORA_NODE_ID", + serde_yaml::to_string(&node_id).wrap_err("failed to serialize custom node ID")?, + ); + command.env( + "DORA_COMMUNICATION_CONFIG", + serde_yaml::to_string(communication) + .wrap_err("failed to serialize communication config")?, + ); + Ok(()) +} diff --git a/binaries/coordinator/src/run/runtime.rs b/binaries/coordinator/src/run/runtime.rs new file mode 100644 index 00000000..90aee416 --- /dev/null +++ b/binaries/coordinator/src/run/runtime.rs @@ -0,0 +1,39 @@ +use super::command_init_common_env; +use dora_core::descriptor; +use dora_node_api::config::NodeId; +use eyre::{eyre, WrapErr}; +use std::path::Path; + +#[tracing::instrument(skip(node))] +pub fn spawn_runtime_node( + runtime: &Path, + node_id: NodeId, + node: &descriptor::RuntimeNode, + communication: &dora_node_api::config::CommunicationConfig, +) -> eyre::Result>> { + let mut command = tokio::process::Command::new(runtime); + command_init_common_env(&mut command, &node_id, communication)?; + command.env( + "DORA_OPERATORS", + serde_yaml::to_string(&node.operators) + .wrap_err("failed to serialize custom node run config")?, + ); + + let mut child = command + .spawn() + .wrap_err_with(|| format!("failed to run runtime at `{}`", runtime.display()))?; + let result = tokio::spawn(async move { + let status = child.wait().await.context("child process failed")?; + if status.success() { + tracing::info!("runtime node {node_id} finished"); + Ok(()) + } else if let Some(code) = status.code() { + Err(eyre!( + "runtime node {node_id} failed with exit code: {code}" + )) + } else { + Err(eyre!("runtime node {node_id} failed (unknown exit code)")) + } + }); + Ok(result) +} diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index 1f2dd9df..74220733 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -1,11 +1,11 @@ +use eyre::{bail, eyre}; use std::{ env::consts::{DLL_PREFIX, DLL_SUFFIX}, path::Path, }; -use eyre::{bail, eyre}; - pub mod descriptor; +pub mod topics; pub fn adjust_shared_library_path(path: &Path) -> Result { let file_name = path diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs new file mode 100644 index 00000000..cc4a382e --- /dev/null +++ b/libraries/core/src/topics.rs @@ -0,0 +1,3 @@ +pub const ZENOH_CONTROL_PREFIX: &str = "dora_control"; +pub const ZENOH_CONTROL_STOP_ALL: &str = "stop_all"; +pub const ZENOH_CONTROL_START_DATAFLOW: &str = "start_dataflow";