From 03348c5be86527d65b12111fc546e56353a39623 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 2 Sep 2022 12:32:34 +0200 Subject: [PATCH] Redesign communication layer to be synchronous and add support for iceoryx --- Cargo.lock | 126 ++++++++-- Cargo.toml | 5 + apis/c/node/Cargo.toml | 2 +- apis/c/node/src/lib.rs | 24 +- apis/python/node/src/lib.rs | 52 ++-- apis/rust/node/Cargo.toml | 15 +- apis/rust/node/src/communication.rs | 108 -------- apis/rust/node/src/communication/iceoryx.rs | 109 ++++++++ apis/rust/node/src/communication/mod.rs | 172 +++++++++++++ apis/rust/node/src/communication/zenoh.rs | 82 +++++++ apis/rust/node/src/config.rs | 36 +++ apis/rust/node/src/lib.rs | 107 ++------ binaries/coordinator/src/lib.rs | 24 +- binaries/runtime/Cargo.toml | 1 + binaries/runtime/src/main.rs | 259 +++++--------------- binaries/runtime/src/operator/mod.rs | 123 ++++------ binaries/runtime/src/operator/python.rs | 33 +-- binaries/runtime/src/operator/shared_lib.rs | 44 ++-- examples/iceoryx/dataflow.yml | 26 ++ examples/iceoryx/node/Cargo.toml | 11 + examples/iceoryx/node/src/main.rs | 26 ++ examples/iceoryx/operator/Cargo.toml | 13 + examples/iceoryx/operator/src/lib.rs | 47 ++++ examples/iceoryx/run.rs | 33 +++ examples/iceoryx/sink/Cargo.toml | 12 + examples/iceoryx/sink/src/main.rs | 32 +++ examples/rust-dataflow/node/src/main.rs | 20 +- examples/rust-dataflow/sink/Cargo.toml | 2 - examples/rust-dataflow/sink/src/main.rs | 17 +- libraries/core/src/lib.rs | 20 -- 30 files changed, 965 insertions(+), 616 deletions(-) delete mode 100644 apis/rust/node/src/communication.rs create mode 100644 apis/rust/node/src/communication/iceoryx.rs create mode 100644 apis/rust/node/src/communication/mod.rs create mode 100644 apis/rust/node/src/communication/zenoh.rs create mode 100644 examples/iceoryx/dataflow.yml create mode 100644 examples/iceoryx/node/Cargo.toml create mode 100644 examples/iceoryx/node/src/main.rs create mode 100644 examples/iceoryx/operator/Cargo.toml create mode 100644 examples/iceoryx/operator/src/lib.rs create mode 100644 examples/iceoryx/run.rs create mode 100644 examples/iceoryx/sink/Cargo.toml create mode 100644 examples/iceoryx/sink/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 39509e11..b1ff1318 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -505,6 +505,56 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +[[package]] +name = "cpp" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec5e86d4f6547f0218ad923d9508244a71ef83b763196e6698b4f70f3595185" +dependencies = [ + "cpp_macros", +] + +[[package]] +name = "cpp_build" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f4d303b8ec35fb3afd7e963e2c898117f1e49930becb703e4a7ac528ad2dd0" +dependencies = [ + "cc", + "cpp_common", + "lazy_static", + "proc-macro2", + "regex", + "syn", + "unicode-xid", +] + +[[package]] +name = "cpp_common" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76071bb9c8c4dd2b5eb209907deab7b031323cf1be3dfdc6ec5d37f4f187d8a1" +dependencies = [ + "lazy_static", + "proc-macro2", + "syn", +] + +[[package]] +name = "cpp_macros" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fdaa01904c12a8989dbfa110b41ef27efc432ac9934f691b9732f01cb64dc01" +dependencies = [ + "aho-corasick", + "byteorder", + "cpp_common", + "lazy_static", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "cpufeatures" version = "0.2.2" @@ -791,11 +841,9 @@ dependencies = [ name = "dora-node-api" version = "0.1.0" dependencies = [ - "async-trait", "eyre", - "futures", - "futures-concurrency", - "futures-time", + "flume", + "iceoryx-rs", "once_cell", "serde", "serde_yaml", @@ -813,7 +861,7 @@ version = "0.1.0" dependencies = [ "dora-node-api", "eyre", - "futures", + "flume", ] [[package]] @@ -871,6 +919,7 @@ dependencies = [ "dora-operator-api-types", "eyre", "fern", + "flume", "futures", "futures-concurrency", "libloading", @@ -975,9 +1024,9 @@ dependencies = [ [[package]] name = "flume" -version = "0.10.12" +version = "0.10.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" dependencies = [ "futures-core", "futures-sink", @@ -1099,18 +1148,6 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" -[[package]] -name = "futures-time" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "553673e17c187f65e79ed63a9e58b148560fd5982e1d739f37913f320139edb0" -dependencies = [ - "async-channel", - "async-io", - "futures-core", - "pin-project-lite", -] - [[package]] name = "futures-util" version = "0.3.21" @@ -1371,6 +1408,51 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "iceoryx-example-node" +version = "0.1.0" +dependencies = [ + "dora-node-api", + "eyre", + "rand", +] + +[[package]] +name = "iceoryx-example-operator" +version = "0.1.0" +dependencies = [ + "dora-operator-api", +] + +[[package]] +name = "iceoryx-example-sink" +version = "0.1.0" +dependencies = [ + "dora-node-api", + "eyre", + "futures", + "tokio", +] + +[[package]] +name = "iceoryx-rs" +version = "0.1.0" +source = "git+https://github.com/eclipse-iceoryx/iceoryx-rs.git#68fbd034a77c6ed98cb75a939d406429ef26b0dd" +dependencies = [ + "iceoryx-sys", + "thiserror", +] + +[[package]] +name = "iceoryx-sys" +version = "0.1.0" +source = "git+https://github.com/eclipse-iceoryx/iceoryx-rs.git#68fbd034a77c6ed98cb75a939d406429ef26b0dd" +dependencies = [ + "cpp", + "cpp_build", + "thiserror", +] + [[package]] name = "idna" version = "0.2.3" @@ -2658,8 +2740,6 @@ version = "0.1.0" dependencies = [ "dora-node-api", "eyre", - "futures", - "tokio", ] [[package]] @@ -3217,9 +3297,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" dependencies = [ "futures-core", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 3e268403..1d083b1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "binaries/*", "examples/rust-dataflow/*", "examples/c++-dataflow/*-rust-*", + "examples/iceoryx/*", "libraries/core", "libraries/extensions/message", "libraries/extensions/telemetry/*", @@ -42,3 +43,7 @@ path = "examples/c++-dataflow/run.rs" [[example]] name = "python-dataflow" path = "examples/python-dataflow/run.rs" + +[[example]] +name = "iceoryx" +path = "examples/iceoryx/run.rs" diff --git a/apis/c/node/Cargo.toml b/apis/c/node/Cargo.toml index aa95e458..84bea3c9 100644 --- a/apis/c/node/Cargo.toml +++ b/apis/c/node/Cargo.toml @@ -12,4 +12,4 @@ crate-type = ["staticlib"] [dependencies] dora-node-api = { path = "../../rust/node" } eyre = "0.6.8" -futures = "0.3.21" +flume = "0.10.14" diff --git a/apis/c/node/src/lib.rs b/apis/c/node/src/lib.rs index 5e12a938..4a16b7c6 100644 --- a/apis/c/node/src/lib.rs +++ b/apis/c/node/src/lib.rs @@ -1,12 +1,11 @@ #![deny(unsafe_op_in_unsafe_fn)] use dora_node_api::{DoraNode, Input}; -use futures::{executor::block_on, Stream, StreamExt}; -use std::{pin::Pin, ptr, slice}; +use std::{ptr, slice}; struct DoraContext { - node: &'static DoraNode, - inputs: Pin>>, + node: &'static mut DoraNode, + inputs: flume::Receiver, } /// Initializes a dora context from the environment variables that were set by @@ -20,12 +19,13 @@ struct DoraContext { /// On error, a null pointer is returned. #[no_mangle] pub extern "C" fn init_dora_context_from_env() -> *mut () { - let context = match block_on(async { - let node = DoraNode::init_from_env().await?; + let context = || { + let node = DoraNode::init_from_env()?; let node = Box::leak(Box::new(node)); - let inputs: Pin>> = Box::pin(node.inputs().await?); + let inputs = node.inputs()?; Ok(DoraContext { node, inputs }) - }) { + }; + let context = match context() { Ok(n) => n, Err(err) => { let err: eyre::Error = err; @@ -71,9 +71,9 @@ pub unsafe extern "C" fn free_dora_context(context: *mut ()) { #[no_mangle] pub unsafe extern "C" fn dora_next_input(context: *mut ()) -> *mut () { let context: &mut DoraContext = unsafe { &mut *context.cast() }; - match block_on(context.inputs.next()) { - Some(input) => Box::into_raw(Box::new(input)).cast(), - None => ptr::null_mut(), + match context.inputs.recv() { + Ok(input) => Box::into_raw(Box::new(input)).cast(), + Err(flume::RecvError::Disconnected) => ptr::null_mut(), } } @@ -191,5 +191,5 @@ unsafe fn try_send_output( let id = std::str::from_utf8(unsafe { slice::from_raw_parts(id_ptr, id_len) })?; let output_id = id.to_owned().into(); let data = unsafe { slice::from_raw_parts(data_ptr, data_len) }; - block_on(context.node.send_output(&output_id, data)) + context.node.send_output(&output_id, data) } diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 276ce3b8..c8e58ab3 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,7 +1,6 @@ use dora_node_api::config::{DataId, NodeId}; use dora_node_api::{DoraNode, Input}; use eyre::{Context, Result}; -use futures::StreamExt; use pyo3::prelude::*; use pyo3::types::PyBytes; use std::sync::Arc; @@ -42,32 +41,31 @@ impl Node { // It would have been difficult to expose the FutureStream of Dora directly. thread::spawn(move || -> Result<()> { let rt = tokio::runtime::Builder::new_multi_thread().build()?; - rt.block_on(async move { - let node = Arc::new(DoraNode::init_from_env().await?); - let _node = node.clone(); - let receive_handle = tokio::spawn(async move { - let mut inputs = _node.inputs().await.unwrap(); - while let Some(input) = inputs.next().await { - tx_input.send(input).await? - } - Result::<_, eyre::Error>::Ok(()) - }); - let send_handle = tokio::spawn(async move { - while let Some((output_str, data)) = rx_output.recv().await { - let output_id = DataId::from(output_str); - node.send_output(&output_id, data.as_slice()).await? - } - Result::<_, eyre::Error>::Ok(()) - }); - let (receiver, sender) = tokio::join!(receive_handle, send_handle); - receiver - .wrap_err("Handle to the receiver failed")? - .wrap_err("Receiving messages from receiver channel failed")?; - sender - .wrap_err("Handle to the sender failed")? - .wrap_err("Sending messages using sender channel failed")?; - Ok(()) - }) + + let node = Arc::new(DoraNode::init_from_env()?); + let _node = node.clone(); + let receive_handle = tokio::task::spawn_blocking(async move { + let mut inputs = _node.inputs().unwrap(); + while let Ok(input) = inputs.recv() { + tx_input.blocking_send(input)? + } + Result::<_, eyre::Error>::Ok(()) + }); + let send_handle = tokio::task::spawn_blocking(async move { + while let Some((output_str, data)) = rx_output.recv().await { + let output_id = DataId::from(output_str); + node.send_output(&output_id, data.as_slice())? + } + Result::<_, eyre::Error>::Ok(()) + }); + let (receiver, sender) = tokio::join!(receive_handle, send_handle); + receiver + .wrap_err("Handle to the receiver failed")? + .wrap_err("Receiving messages from receiver channel failed")?; + sender + .wrap_err("Handle to the sender failed")? + .wrap_err("Sending messages using sender channel failed")?; + Ok(()) }); Ok(Node { diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index 7023da85..1919ea79 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -4,22 +4,23 @@ version = "0.1.0" edition = "2021" license = "Apache-2.0" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["zenoh", "iceoryx"] +zenoh = ["dep:zenoh", "dep:zenoh-config"] +iceoryx = ["dep:iceoryx-rs"] [dependencies] -async-trait = "0.1.53" eyre = "0.6.7" -futures = "0.3.21" -futures-concurrency = "2.0.3" -futures-time = "1.0.0" once_cell = "1.13.0" serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.8.23" thiserror = "1.0.30" tracing = "0.1.33" uuid = "0.8.2" -zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" } -zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" } +zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true } +zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true } +iceoryx-rs = { git = "https://github.com/eclipse-iceoryx/iceoryx-rs.git", optional = true } +flume = "0.10.14" [dev-dependencies] tokio = { version = "1.17.0", features = ["rt"] } diff --git a/apis/rust/node/src/communication.rs b/apis/rust/node/src/communication.rs deleted file mode 100644 index 62af0ce7..00000000 --- a/apis/rust/node/src/communication.rs +++ /dev/null @@ -1,108 +0,0 @@ -use async_trait::async_trait; -use eyre::Context; -use futures::StreamExt; -use futures_time::future::FutureExt; -use std::pin::Pin; -use zenoh::{ - prelude::{Priority, SplitBuffer, ZFuture}, - publication::CongestionControl, -}; - -use crate::{config::CommunicationConfig, BoxError}; - -pub async fn init( - communication_config: &CommunicationConfig, -) -> eyre::Result> { - match communication_config { - CommunicationConfig::Zenoh { - config: zenoh_config, - prefix: zenoh_prefix, - } => { - let zenoh = zenoh::open(zenoh_config.clone()) - .await - .map_err(BoxError) - .wrap_err("failed to create zenoh session")?; - let layer = ZenohCommunicationLayer { - zenoh, - topic_prefix: zenoh_prefix.clone(), - }; - Ok(Box::new(layer)) - } - } -} - -#[async_trait] -pub trait CommunicationLayer: Send + Sync { - async fn subscribe<'a>( - &'a self, - topic: &str, - ) -> Result> + Send + 'a>>, BoxError>; - - async fn publish(&self, topic: &str, data: &[u8]) -> Result<(), BoxError>; - - fn publish_sync(&self, topic: &str, data: &[u8]) -> Result<(), BoxError>; - - async fn close(self: Box) -> Result<(), BoxError>; -} - -struct ZenohCommunicationLayer { - zenoh: zenoh::Session, - topic_prefix: String, -} - -impl ZenohCommunicationLayer { - fn prefixed(&self, topic: &str) -> String { - format!("{}/{topic}", self.topic_prefix) - } -} - -#[async_trait] -impl CommunicationLayer for ZenohCommunicationLayer { - async fn subscribe<'a>( - &'a self, - topic: &str, - ) -> Result> + Send + 'a>>, BoxError> { - zenoh::Session::subscribe(&self.zenoh, self.prefixed(topic)) - .reliable() - .await - .map(|s| { - let trait_object: Pin> + Send + 'a>> = - Box::pin(s.map(|s| s.value.payload.contiguous().into_owned())); - trait_object - }) - .map_err(BoxError) - } - - async fn publish(&self, topic: &str, data: &[u8]) -> Result<(), BoxError> { - let writer = self - .zenoh - .put(self.prefixed(topic), data) - .congestion_control(CongestionControl::Block) - .priority(Priority::RealTime); - - let result = writer.await.map_err(BoxError); - result - } - - fn publish_sync(&self, topic: &str, data: &[u8]) -> Result<(), BoxError> { - let writer = self - .zenoh - .put(self.prefixed(topic), data) - .congestion_control(CongestionControl::Block) - .priority(Priority::RealTime); - - writer.wait().map_err(BoxError) - } - - async fn close(self: Box) -> Result<(), BoxError> { - zenoh::Session::close(self.zenoh) - // wait a bit before closing to ensure that remaining published - // messages are sent out - // - // TODO: create a minimal example to reproduce the dropped messages - // and report this issue in the zenoh repo - .delay(futures_time::time::Duration::from_secs_f32(0.5)) - .await - .map_err(BoxError) - } -} diff --git a/apis/rust/node/src/communication/iceoryx.rs b/apis/rust/node/src/communication/iceoryx.rs new file mode 100644 index 00000000..f997ab75 --- /dev/null +++ b/apis/rust/node/src/communication/iceoryx.rs @@ -0,0 +1,109 @@ +use eyre::Context; +use std::{collections::HashMap, sync::Arc, time::Duration}; +use uuid::Uuid; + +use crate::BoxError; + +use super::{CommunicationLayer, Publisher, Subscriber}; + +pub struct IceoryxCommunicationLayer { + topic_prefix: Arc, + publishers: HashMap>>, +} + +impl IceoryxCommunicationLayer { + pub fn init(app_name_prefix: String, topic_prefix: String) -> eyre::Result { + let app_name = format!("{app_name_prefix}-{}", Uuid::new_v4()); + iceoryx_rs::Runtime::init(&app_name); + + Ok(Self { + topic_prefix: Arc::new(topic_prefix.clone()), + publishers: Default::default(), + }) + } +} + +impl IceoryxCommunicationLayer { + fn get_or_create_publisher( + &mut self, + topic: &str, + ) -> eyre::Result>> { + match self.publishers.get(topic) { + Some(p) => Ok(p.clone()), + None => { + let publisher = Self::create_publisher(&self.topic_prefix, topic) + .context("failed to create iceoryx publisher")?; + + let publisher = Arc::new(publisher); + self.publishers.insert(topic.to_owned(), publisher.clone()); + Ok(publisher) + } + } + } + + fn create_publisher( + topic_prefix: &str, + topic: &str, + ) -> Result, iceoryx_rs::IceoryxError> { + iceoryx_rs::PublisherBuilder::new("dora", &topic_prefix, &topic).create() + } +} + +impl CommunicationLayer for IceoryxCommunicationLayer { + fn publisher(&mut self, topic: &str) -> Result, crate::BoxError> { + let publisher = self + .get_or_create_publisher(topic) + .map_err(|err| BoxError(err.into()))?; + + Ok(Box::new(IceoryxPublisher { publisher })) + } + + fn subscribe(&mut self, topic: &str) -> Result, crate::BoxError> { + let (subscriber, token) = + iceoryx_rs::SubscriberBuilder::new("dora", &self.topic_prefix, topic) + .queue_capacity(5) + .create_mt() + .context("failed to create iceoryx subscriber") + .map_err(|err| BoxError(err.into()))?; + let receiver = subscriber.get_sample_receiver(token); + + Ok(Box::new(IceoryxReceiver { receiver })) + } +} + +#[derive(Clone)] +pub struct IceoryxPublisher { + publisher: Arc>, +} + +impl Publisher for IceoryxPublisher { + fn publish(&self, data: &[u8]) -> Result<(), crate::BoxError> { + let mut sample = self + .publisher + .loan_slice(data.len()) + .context("failed to loan iceoryx slice for publishing") + .map_err(|err| BoxError(err.into()))?; + sample.copy_from_slice(data); + self.publisher.publish(sample); + Ok(()) + } + + fn boxed_clone(&self) -> Box { + Box::new(self.clone()) + } +} + +pub struct IceoryxReceiver { + receiver: iceoryx_rs::mt::SampleReceiver<[u8]>, +} + +impl Subscriber for IceoryxReceiver { + fn recv(&mut self) -> Result>, crate::BoxError> { + self.receiver + .wait_for_samples(Duration::from_secs(u64::MAX)); + match self.receiver.take() { + Some(sample) => Ok(Some(sample.to_owned())), + None => Ok(None), + } + } +} diff --git a/apis/rust/node/src/communication/mod.rs b/apis/rust/node/src/communication/mod.rs new file mode 100644 index 00000000..509ba836 --- /dev/null +++ b/apis/rust/node/src/communication/mod.rs @@ -0,0 +1,172 @@ +use crate::{ + config::{CommunicationConfig, DataId, InputMapping}, + BoxError, +}; +use eyre::{eyre, Context}; +use std::{ + collections::{BTreeMap, HashMap}, + mem, thread, +}; + +#[doc(hidden)] +pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped"; + +pub mod iceoryx; +pub mod zenoh; + +pub fn init( + communication_config: &CommunicationConfig, +) -> eyre::Result> { + match communication_config { + CommunicationConfig::Zenoh { + config: zenoh_config, + prefix: zenoh_prefix, + } => { + let layer = + zenoh::ZenohCommunicationLayer::init(zenoh_config.clone(), zenoh_prefix.clone())?; + + Ok(Box::new(layer)) + } + CommunicationConfig::Iceoryx { + app_name_prefix, + topic_prefix, + } => { + let app_name_prefix = app_name_prefix.clone(); + let topic_prefix = topic_prefix.clone(); + let layer = iceoryx::IceoryxCommunicationLayer::init(app_name_prefix, topic_prefix)?; + + Ok(Box::new(layer)) + } + } +} + +pub trait CommunicationLayer: Send + Sync { + fn publisher(&mut self, topic: &str) -> Result, BoxError>; + + fn subscribe(&mut self, topic: &str) -> Result, BoxError>; + + fn subscribe_all( + &mut self, + inputs: &BTreeMap, + ) -> eyre::Result> { + let (inputs_tx, inputs_rx) = flume::bounded(10); + for (input, mapping) in inputs { + let topic = mapping.to_string(); + let mut sub = self + .subscribe(&topic) + .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; + + let input_id = input.to_owned(); + let sender = inputs_tx.clone(); + thread::spawn(move || loop { + match sub.recv().transpose() { + None => break, + Some(value) => { + let input = value.map(|data| Input { + id: input_id.clone(), + data, + }); + match sender.send(input) { + Ok(()) => {} + Err(flume::SendError(_)) => break, + } + } + } + }); + } + mem::drop(inputs_tx); + + let (stop_tx, stop_rx) = flume::bounded(10); + let mut sources: HashMap<_, _> = inputs + .values() + .map(|v| (v.source().to_owned(), v.operator().to_owned())) + .collect(); + for (source, operator) in &sources { + let topic = match operator { + Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"), + None => format!("{source}/{STOP_TOPIC}"), + }; + let mut sub = self + .subscribe(&topic) + .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; + + let source_id = source.to_owned(); + let sender = stop_tx.clone(); + thread::spawn(move || loop { + match sub.recv().transpose() { + None => break, + Some(value) => { + let input = value.map(|_| source_id.clone()); + match sender.send(input) { + Ok(()) => {} + Err(flume::SendError(_)) => break, + } + } + } + }); + } + mem::drop(stop_tx); + + let (combined_tx, combined) = flume::bounded(1); + thread::spawn(move || loop { + let selector = flume::Selector::new() + .recv(&inputs_rx, |v| match v { + Ok(Ok(value)) => InputEvent::Input(value), + Ok(Err(err)) => InputEvent::Error(err), + Err(flume::RecvError::Disconnected) => InputEvent::Error(BoxError( + eyre!("input stream was disconnected unexpectedly").into(), + )), + }) + .recv(&stop_rx, |v| match v { + Ok(Ok(stopped_source)) => { + sources.remove(&stopped_source); + InputEvent::InputClosed { + number_of_remaining_sources: sources.len(), + } + } + Ok(Err(err)) => InputEvent::Error(err), + Err(flume::RecvError::Disconnected) => InputEvent::Error(BoxError( + eyre!("stop stream was disconnected unexpectedly").into(), + )), + }); + match selector.wait() { + InputEvent::Input(input) => match combined_tx.send(input) { + Ok(()) => {} + Err(flume::SendError(_)) => break, + }, + InputEvent::InputClosed { + number_of_remaining_sources, + } => { + if number_of_remaining_sources == 0 { + break; + } + } + InputEvent::Error(err) => panic!("{err}"), + } + }); + + Ok(combined) + } +} + +pub trait Publisher: Send + Sync { + fn publish(&self, data: &[u8]) -> Result<(), BoxError>; + + fn boxed_clone(&self) -> Box; +} + +pub trait Subscriber: Send + Sync { + fn recv(&mut self) -> Result>, BoxError>; +} + +enum InputEvent { + Input(Input), + InputClosed { number_of_remaining_sources: usize }, + Error(BoxError), +} + +#[derive(Debug)] +pub struct Input { + pub id: DataId, + pub data: Vec, +} diff --git a/apis/rust/node/src/communication/zenoh.rs b/apis/rust/node/src/communication/zenoh.rs new file mode 100644 index 00000000..b0342260 --- /dev/null +++ b/apis/rust/node/src/communication/zenoh.rs @@ -0,0 +1,82 @@ +use super::{CommunicationLayer, Publisher, Subscriber}; +use crate::BoxError; +use std::sync::Arc; +use zenoh::{ + prelude::{EntityFactory, Priority, Receiver as _, SplitBuffer, ZFuture}, + publication::CongestionControl, +}; + +pub struct ZenohCommunicationLayer { + zenoh: Arc, + topic_prefix: String, +} + +impl ZenohCommunicationLayer { + pub fn init(config: zenoh_config::Config, prefix: String) -> eyre::Result { + let zenoh = ::zenoh::open(config) + .wait() + .map_err(|err| BoxError(err.into()))? + .into_arc(); + Ok(Self { + zenoh, + topic_prefix: prefix, + }) + } + + fn prefixed(&self, topic: &str) -> String { + format!("{}/{topic}", self.topic_prefix) + } +} + +impl CommunicationLayer for ZenohCommunicationLayer { + fn publisher(&mut self, topic: &str) -> Result, crate::BoxError> { + let publisher = self + .zenoh + .publish(self.prefixed(topic)) + .congestion_control(CongestionControl::Block) + .priority(Priority::RealTime) + .wait() + .map_err(|err| BoxError(err.into()))?; + + Ok(Box::new(ZenohPublisher { publisher })) + } + + fn subscribe(&mut self, topic: &str) -> Result, BoxError> { + let subscriber = self + .zenoh + .subscribe(self.prefixed(topic)) + .reliable() + .wait() + .map_err(|err| BoxError(err.into()))?; + + Ok(Box::new(ZenohReceiver(subscriber))) + } +} + +#[derive(Clone)] +pub struct ZenohPublisher { + publisher: zenoh::publication::Publisher<'static>, +} + +impl Publisher for ZenohPublisher { + fn publish(&self, data: &[u8]) -> Result<(), crate::BoxError> { + self.publisher + .send(data) + .map_err(|err| BoxError(err.into())) + } + + fn boxed_clone(&self) -> Box { + Box::new(self.clone()) + } +} + +pub struct ZenohReceiver(zenoh::subscriber::Subscriber<'static>); + +impl Subscriber for ZenohReceiver { + fn recv(&mut self) -> Result>, crate::BoxError> { + match self.0.recv() { + Ok(sample) => Ok(Some(sample.value.payload.contiguous().into_owned())), + Err(flume::RecvError::Disconnected) => Ok(None), + } + } +} diff --git a/apis/rust/node/src/config.rs b/apis/rust/node/src/config.rs index dccae1d1..7fa51b8f 100644 --- a/apis/rust/node/src/config.rs +++ b/apis/rust/node/src/config.rs @@ -1,6 +1,7 @@ use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use std::{ + borrow::Borrow, collections::{BTreeMap, BTreeSet}, convert::Infallible, fmt::{self, Write as _}, @@ -92,6 +93,30 @@ impl std::ops::Deref for DataId { } } +impl AsRef for DataId { + fn as_ref(&self) -> &String { + &self.0 + } +} + +impl AsRef for DataId { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl Borrow for DataId { + fn borrow(&self) -> &String { + &self.0 + } +} + +impl Borrow for DataId { + fn borrow(&self) -> &str { + &self.0 + } +} + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum InputMapping { Timer { interval: Duration }, @@ -244,6 +269,11 @@ pub enum CommunicationConfig { config: zenoh_config::Config, prefix: String, }, + Iceoryx { + app_name_prefix: String, + #[serde(skip)] + topic_prefix: String, + }, } impl CommunicationConfig { @@ -255,6 +285,12 @@ impl CommunicationConfig { } => { write!(zenoh_prefix, "/{}", prefix).unwrap(); } + CommunicationConfig::Iceoryx { topic_prefix, .. } => { + if !topic_prefix.is_empty() { + topic_prefix.push_str("-"); + } + topic_prefix.push_str(prefix); + } } } } diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 168611c6..566784d9 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,16 +1,11 @@ -use communication::CommunicationLayer; +pub use communication::Input; +use communication::{CommunicationLayer, STOP_TOPIC}; use config::{CommunicationConfig, DataId, NodeId, NodeRunConfig}; use eyre::WrapErr; -use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; -use futures_concurrency::Merge; -use std::collections::HashSet; pub mod communication; pub mod config; -#[doc(hidden)] -pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped"; - pub struct DoraNode { id: NodeId, node_config: NodeRunConfig, @@ -18,7 +13,7 @@ pub struct DoraNode { } impl DoraNode { - pub async fn init_from_env() -> eyre::Result { + pub fn init_from_env() -> eyre::Result { let id = { let raw = std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; @@ -34,15 +29,15 @@ impl DoraNode { .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; serde_yaml::from_str(&raw).context("failed to deserialize communication config")? }; - Self::init(id, node_config, communication_config).await + Self::init(id, node_config, communication_config) } - pub async fn init( + pub fn init( id: NodeId, node_config: NodeRunConfig, communication_config: CommunicationConfig, ) -> eyre::Result { - let communication = communication::init(&communication_config).await?; + let communication = communication::init(&communication_config)?; Ok(Self { id, node_config, @@ -50,51 +45,11 @@ impl DoraNode { }) } - pub async fn inputs(&self) -> eyre::Result + '_> { - let mut streams = Vec::new(); - for (input, mapping) in &self.node_config.inputs { - let topic = mapping.to_string(); - let sub = self - .communication - .subscribe(&topic) - .await - .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; - streams.push(sub.map(|data| Input { - id: input.clone(), - data, - })) - } - - let stop_messages = FuturesUnordered::new(); - let sources: HashSet<_> = self - .node_config - .inputs - .values() - .map(|v| (v.source(), v.operator())) - .collect(); - for (source, operator) in &sources { - let topic = match operator { - Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"), - None => format!("{source}/{STOP_TOPIC}"), - }; - let sub = self - .communication - .subscribe(&topic) - .await - .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; - stop_messages.push(sub.into_future()); - } - let node_id = self.id.clone(); - let finished = Box::pin( - stop_messages - .all(|_| async { true }) - .map(move |_| println!("all inputs finished for node {node_id}")), - ); - - Ok(streams.merge().take_until(finished)) + pub fn inputs(&mut self) -> eyre::Result> { + self.communication.subscribe_all(&self.node_config.inputs) } - pub async fn send_output(&self, output_id: &DataId, data: &[u8]) -> eyre::Result<()> { + pub fn send_output(&mut self, output_id: &DataId, data: &[u8]) -> eyre::Result<()> { if !self.node_config.outputs.contains(output_id) { eyre::bail!("unknown output"); } @@ -103,8 +58,9 @@ impl DoraNode { let topic = format!("{self_id}/{output_id}"); self.communication - .publish(&topic, data) - .await + .publisher(&topic) + .wrap_err_with(|| format!("failed create publisher for output {output_id}"))? + .publish(data) .wrap_err_with(|| format!("failed to send data for output {output_id}"))?; Ok(()) } @@ -124,8 +80,14 @@ impl Drop for DoraNode { let topic = format!("{self_id}/{STOP_TOPIC}"); let result = self .communication - .publish_sync(&topic, &[]) - .wrap_err_with(|| format!("failed to send stop message for source `{self_id}`")); + .publisher(&topic) + .wrap_err_with(|| { + format!("failed to create publisher for stop message for node `{self_id}`") + }) + .and_then(|p| { + p.publish(&[]) + .wrap_err_with(|| format!("failed to send stop message for node `{self_id}`")) + }); match result { Ok(()) => println!("sent stop message for {self_id}"), Err(err) => { @@ -136,13 +98,7 @@ impl Drop for DoraNode { } } -#[derive(Debug)] -pub struct Input { - pub id: DataId, - pub data: Vec, -} - -pub struct BoxError(Box); +pub struct BoxError(pub Box); impl std::fmt::Debug for BoxError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -166,16 +122,6 @@ impl std::error::Error for BoxError { mod tests { use super::*; - fn run(future: F) -> O - where - F: std::future::Future, - { - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); - rt.block_on(future) - } - #[test] fn no_op_operator() { let id = uuid::Uuid::new_v4().to_string().into(); @@ -188,12 +134,9 @@ mod tests { prefix: format!("/{}", uuid::Uuid::new_v4()), }; - run(async { - let operator = DoraNode::init(id, node_config, communication_config) - .await - .unwrap(); - let mut inputs = operator.inputs().await.unwrap(); - assert!(inputs.next().await.is_none()); - }); + let mut node = DoraNode::init(id, node_config, communication_config).unwrap(); + + let inputs = node.inputs().unwrap(); + assert!(inputs.recv().is_err()); } } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index d718b5fc..2c564348 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -64,7 +64,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() let nodes = descriptor.resolve_aliases(); let dora_timers = collect_dora_timers(&nodes); - let mut communication = descriptor.communication; + let mut communication_config = descriptor.communication; if nodes .iter() @@ -78,7 +78,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() } // add uuid as prefix to ensure isolation - communication.add_topic_prefix(&uuid::Uuid::new_v4().to_string()); + communication_config.add_topic_prefix(&uuid::Uuid::new_v4().to_string()); let mut tasks = FuturesUnordered::new(); for node in nodes { @@ -86,14 +86,14 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() match node.kind { descriptor::CoreNodeKind::Custom(node) => { - let result = spawn_custom_node(node_id.clone(), &node, &communication) + let result = spawn_custom_node(node_id.clone(), &node, &communication_config) .wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?; tasks.push(result); } descriptor::CoreNodeKind::Runtime(node) => { if !node.operators.is_empty() { let result = - spawn_runtime_node(&runtime, node_id.clone(), &node, &communication) + spawn_runtime_node(&runtime, node_id.clone(), &node, &communication_config) .wrap_err_with(|| format!("failed to spawn runtime node {node_id}"))?; tasks.push(result); } @@ -102,9 +102,12 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() } for interval in dora_timers { - let communication = communication::init(&communication) - .await - .wrap_err("failed to init communication layer")?; + let communication_config = communication_config.clone(); + let mut communication = + tokio::task::spawn_blocking(move || communication::init(&communication_config)) + .await + .wrap_err("failed to join communication layer init task")? + .wrap_err("failed to init communication layer")?; tokio::spawn(async move { let topic = { let duration = format_duration(interval); @@ -112,8 +115,11 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() }; let mut stream = IntervalStream::new(tokio::time::interval(interval)); while let Some(_) = stream.next().await { - let publish = communication.publish(&topic, &[]); - publish.await.expect("failed to publish timer tick message"); + communication + .publisher(&topic) + .unwrap() + .publish(&[]) + .expect("failed to publish timer tick message"); } }); } diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index b94b9d57..bd5fd6b8 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -23,3 +23,4 @@ zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" } log = "0.4.17" fern = "0.6.1" pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre"] } +flume = "0.10.14" diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index cb05a6ff..35affe23 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -3,29 +3,22 @@ use dora_core::descriptor::OperatorDefinition; use dora_node_api::{ self, - communication::{self, CommunicationLayer}, - config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId, UserInputMapping}, - STOP_TOPIC, + communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, + config::{CommunicationConfig, DataId, NodeId, OperatorId}, }; -use eyre::{bail, eyre, Context}; -use futures::{ - stream::{self, FuturesUnordered}, - Future, FutureExt, StreamExt, -}; -use futures_concurrency::Merge; -use operator::{Operator, OperatorEvent}; +use eyre::{bail, Context}; +use futures::{Stream, StreamExt}; +use operator::{spawn_operator, OperatorEvent}; use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{BTreeSet, HashMap}, mem, - pin::Pin, }; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamMap}; mod operator; -#[tokio::main] -async fn main() -> eyre::Result<()> { +fn main() -> eyre::Result<()> { set_up_logger()?; let node_id = { @@ -44,112 +37,79 @@ async fn main() -> eyre::Result<()> { serde_yaml::from_str(&raw).context("failed to deserialize operator config")? }; - let mut operator_map = BTreeMap::new(); - let mut stopped_operators = BTreeSet::new(); + let mut communication: Box = + communication::init(&communication_config)?; + let mut operator_events = StreamMap::new(); - let mut operator_events_tx = HashMap::new(); + let mut operator_stop_publishers = HashMap::new(); for operator_config in &operators { let (events_tx, events) = mpsc::channel(1); - let operator = Operator::init(operator_config.clone(), events_tx.clone()) - .await - .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; - operator_map.insert(&operator_config.id, operator); + spawn_operator( + &node_id, + operator_config.clone(), + events_tx.clone(), + communication.as_mut(), + ) + .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); - operator_events_tx.insert(operator_config.id.clone(), events_tx); + + let stop_publisher = publisher( + &node_id, + operator_config.id.clone(), + STOP_TOPIC.to_owned().into(), + communication.as_mut(), + ) + .with_context(|| { + format!( + "failed to create stop publisher for operator {}", + operator_config.id + ) + })?; + operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher); } - let communication: Box = - communication::init(&communication_config).await?; + let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); - let inputs = subscribe(&operators, communication.as_ref()) - .await - .context("failed to subscribe")?; + tokio::runtime::Runtime::new()?.block_on(run( + node_id, + operator_events, + operator_stop_publishers, + )) +} - let input_events = inputs.map(Event::External); - let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event }); - let mut events = (input_events, operator_events).merge(); +async fn run( + node_id: NodeId, + mut events: impl Stream + Unpin, + mut operator_stop_publishers: HashMap>, +) -> eyre::Result<()> { + let mut stopped_operators = BTreeSet::new(); while let Some(event) = events.next().await { match event { - Event::External(event) => match event { - SubscribeEvent::Input(input) => { - let operator = match operator_map.get_mut(&input.target_operator) { - Some(op) => op, - None => { - if stopped_operators.contains(&input.target_operator) { - continue; // operator was stopped already -> ignore input - } else { - bail!( - "received input for unexpected operator `{}`", - input.target_operator - ); - } - } - }; - - operator - .handle_input(input.id.clone(), input.data) - .wrap_err_with(|| { - format!( - "operator {} failed to handle input {}", - input.target_operator, input.id - ) - })?; - } - SubscribeEvent::InputsStopped { target_operator } => { - println!("all inputs finished for operator {node_id}/{target_operator}"); - match operator_map.get_mut(&target_operator) { - Some(op) => op.close_input_stream(), - None => { - if !stopped_operators.contains(&target_operator) { - bail!( - "received InputsStopped event for unknown operator `{}`", - target_operator - ); - } - } - } - } - }, Event::Operator { id, event } => { - let operator = operator_map - .get(&id) - .ok_or_else(|| eyre!("received event from unknown operator {id}"))?; match event { - OperatorEvent::Output { id: data_id, value } => { - if !operator.definition().config.outputs.contains(&data_id) { - eyre::bail!("unknown output {data_id} for operator {id}"); - } - publish(&node_id, id, data_id, &value, communication.as_ref()) - .await - .context("failed to publish operator output")?; - } OperatorEvent::Error(err) => { bail!(err.wrap_err(format!("operator {id} failed"))) } OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), OperatorEvent::Finished => { - if operator_map.remove(&id).is_some() { + if let Some(stop_publisher) = operator_stop_publishers.remove(&id) { println!("operator {node_id}/{id} finished"); stopped_operators.insert(id.clone()); // send stopped message - publish( - &node_id, - id.clone(), - STOP_TOPIC.to_owned().into(), - &[], - communication.as_ref(), - ) - .await - .with_context(|| { - format!("failed to send stop message for operator `{node_id}/{id}`") - })?; - - operator_events_tx.remove(&id); - } - - if operator_map.is_empty() { - break; + tokio::task::spawn_blocking(move || stop_publisher.publish(&[])) + .await + .wrap_err("failed to join stop publish task")? + .with_context(|| { + format!( + "failed to send stop message for operator `{node_id}/{id}`" + ) + })?; + if operator_stop_publishers.is_empty() { + break; + } + } else { + log::warn!("no stop publisher for {id}"); } } } @@ -159,119 +119,28 @@ async fn main() -> eyre::Result<()> { mem::drop(events); - communication.close().await?; - Ok(()) } -async fn subscribe<'a>( - operators: &'a [OperatorDefinition], - communication: &'a dyn CommunicationLayer, -) -> eyre::Result + 'a> { - let mut streams = Vec::new(); - - for operator in operators { - let events = subscribe_operator(operator, communication).await?; - streams.push(events); - } - - Ok(streams.merge()) -} - -async fn subscribe_operator<'a>( - operator: &'a OperatorDefinition, - communication: &'a dyn CommunicationLayer, -) -> Result + 'a, eyre::Error> { - let stop_messages: FuturesUnordered>>> = - FuturesUnordered::new(); - for mapping in operator.config.inputs.values() { - match mapping { - InputMapping::User(UserInputMapping { - source, operator, .. - }) => { - let topic = match operator { - Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"), - None => format!("{source}/{STOP_TOPIC}"), - }; - let sub = communication - .subscribe(&topic) - .await - .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; - stop_messages.push(Box::pin(sub.into_future().map(|_| ()))); - } - InputMapping::Timer { .. } => { - // dora timer inputs run forever - stop_messages.push(Box::pin(futures::future::pending())); - } - } - } - let finished = Box::pin(stop_messages.all(|()| async { true }).shared()); - - let mut streams = Vec::new(); - for (input, mapping) in &operator.config.inputs { - let topic = mapping.to_string(); - let sub = communication - .subscribe(&topic) - .await - .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; - let stream = sub - .map(|data| OperatorInput { - target_operator: operator.id.clone(), - id: input.clone(), - data, - }) - .map(SubscribeEvent::Input) - .take_until(finished.clone()); - streams.push(stream); - } - - Ok(streams.merge().chain(stream::once(async { - SubscribeEvent::InputsStopped { - target_operator: operator.id.clone(), - } - }))) -} - -async fn publish( +fn publisher( self_id: &NodeId, operator_id: OperatorId, output_id: DataId, - value: &[u8], - communication: &dyn CommunicationLayer, -) -> eyre::Result<()> { + communication: &mut dyn CommunicationLayer, +) -> eyre::Result> { let topic = format!("{self_id}/{operator_id}/{output_id}"); communication - .publish(&topic, value) - .await - .wrap_err_with(|| format!("failed to send data for output {output_id}"))?; - - Ok(()) + .publisher(&topic) + .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) } enum Event { - External(SubscribeEvent), Operator { id: OperatorId, event: OperatorEvent, }, } -enum SubscribeEvent { - /// New input for an operator - Input(OperatorInput), - /// All input streams for an operator are finished. - InputsStopped { - /// The operator whose inputs are all finished. - target_operator: OperatorId, - }, -} - -struct OperatorInput { - pub target_operator: OperatorId, - pub id: DataId, - pub data: Vec, -} - fn set_up_logger() -> Result<(), fern::InitError> { fern::Dispatch::new() .format(|out, message, record| { diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 7a49f514..3ef9f5a3 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -1,90 +1,69 @@ use dora_core::descriptor::{OperatorDefinition, OperatorSource}; -use dora_node_api::config::DataId; -use eyre::{eyre, Context}; -use log::warn; +use dora_node_api::{communication::CommunicationLayer, config::NodeId}; +use eyre::Context; use std::any::Any; -use tokio::sync::mpsc::{self, Sender}; +use tokio::sync::mpsc::Sender; mod python; mod shared_lib; -pub struct Operator { - operator_task: Option>, - definition: OperatorDefinition, -} - -impl Operator { - pub async fn init( - operator_definition: OperatorDefinition, - events_tx: Sender, - ) -> eyre::Result { - let (operator_task, operator_rx) = mpsc::channel(10); +pub fn spawn_operator( + node_id: &NodeId, + operator_definition: OperatorDefinition, + events_tx: Sender, + communication: &mut dyn CommunicationLayer, +) -> eyre::Result<()> { + let inputs = communication + .subscribe_all(&operator_definition.config.inputs) + .wrap_err_with(|| { + format!( + "failed to subscribe to inputs of operator {}", + operator_definition.id + ) + })?; - match &operator_definition.config.source { - OperatorSource::SharedLibrary(path) => { - shared_lib::spawn(path, events_tx, operator_rx).wrap_err_with(|| { - format!( - "failed to spawn shared library operator for {}", - operator_definition.id - ) - })?; - } - OperatorSource::Python(path) => { - python::spawn(path, events_tx, operator_rx).wrap_err_with(|| { - format!( - "failed to spawn Python operator for {}", - operator_definition.id - ) - })?; - } - OperatorSource::Wasm(_path) => { - eprintln!("WARNING: WASM operators are not supported yet"); - } - } - Ok(Self { - operator_task: Some(operator_task), - definition: operator_definition, + let publishers = operator_definition + .config + .outputs + .iter() + .map(|output_id| { + let topic = format!( + "{node_id}/{operator_id}/{output_id}", + operator_id = operator_definition.id + ); + communication + .publisher(&topic) + .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) + .map(|p| (output_id.to_owned(), p)) }) - } + .collect::>()?; - pub fn handle_input(&mut self, id: DataId, value: Vec) -> eyre::Result<()> { - self.operator_task - .as_mut() - .ok_or_else(|| { - eyre!( - "input channel for {} was already closed", - self.definition.id + match &operator_definition.config.source { + OperatorSource::SharedLibrary(path) => { + shared_lib::spawn(path, events_tx, inputs, publishers).wrap_err_with(|| { + format!( + "failed to spawn shared library operator for {}", + operator_definition.id ) - })? - .try_send(OperatorInput { id, value }) - .or_else(|err| match err { - tokio::sync::mpsc::error::TrySendError::Closed(_) => Err(eyre!("operator crashed")), - tokio::sync::mpsc::error::TrySendError::Full(_) => { - warn!("operator queue full"); - Ok(()) - } - }) - } - - pub fn close_input_stream(&mut self) { - self.operator_task = None; - } - - /// Get a reference to the operator's definition. - #[must_use] - pub fn definition(&self) -> &OperatorDefinition { - &self.definition + })?; + } + OperatorSource::Python(path) => { + python::spawn(path, events_tx, inputs, publishers).wrap_err_with(|| { + format!( + "failed to spawn Python operator for {}", + operator_definition.id + ) + })?; + } + OperatorSource::Wasm(_path) => { + eprintln!("WARNING: WASM operators are not supported yet"); + } } + Ok(()) } pub enum OperatorEvent { - Output { id: DataId, value: Vec }, Error(eyre::Error), Panic(Box), Finished, } - -pub struct OperatorInput { - id: DataId, - value: Vec, -} diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index b4f3ba29..3efe892f 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,12 +1,15 @@ -use super::{OperatorEvent, OperatorInput}; +use super::OperatorEvent; +use dora_node_api::{communication::Publisher, config::DataId}; use eyre::{bail, eyre, Context}; use pyo3::{pyclass, types::IntoPyDict, types::PyBytes, Py, Python}; use std::{ + collections::HashMap, panic::{catch_unwind, AssertUnwindSafe}, path::Path, + sync::Arc, thread, }; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::mpsc::Sender; fn traceback(err: pyo3::PyErr) -> eyre::Report { Python::with_gil(|py| { @@ -23,7 +26,8 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report { pub fn spawn( path: &Path, events_tx: Sender, - mut inputs: Receiver, + inputs: flume::Receiver, + publishers: HashMap>, ) -> eyre::Result<()> { if !path.exists() { bail!("No python file exists at {}", path.display()); @@ -34,7 +38,7 @@ pub fn spawn( let path_cloned = path.clone(); let send_output = SendOutputCallback { - events_tx: events_tx.clone(), + publishers: Arc::new(publishers), }; let init_operator = move |py: Python| { @@ -75,7 +79,7 @@ pub fn spawn( let operator = Python::with_gil(init_operator).wrap_err("failed to init python operator")?; - while let Some(input) = inputs.blocking_recv() { + while let Ok(input) = inputs.recv() { let status_enum = Python::with_gil(|py| { operator .call_method1( @@ -83,7 +87,7 @@ pub fn spawn( "on_input", ( input.id.to_string(), - PyBytes::new(py, &input.value), + PyBytes::new(py, &input.data), send_output.clone(), ), ) @@ -139,24 +143,25 @@ pub fn spawn( #[pyclass] #[derive(Clone)] struct SendOutputCallback { - events_tx: Sender, + publishers: Arc>>, } #[allow(unsafe_op_in_unsafe_fn)] mod callback_impl { use super::SendOutputCallback; - use crate::operator::OperatorEvent; + use eyre::{eyre, Context}; use pyo3::{pymethods, PyResult}; #[pymethods] impl SendOutputCallback { fn __call__(&mut self, output: &str, data: &[u8]) -> PyResult<()> { - let result = self.events_tx.blocking_send(OperatorEvent::Output { - id: output.to_owned().into(), - value: data.to_owned(), - }); - result - .map_err(|_| eyre::eyre!("channel to dora runtime was closed unexpectedly").into()) + match self.publishers.get(output) { + Some(publisher) => publisher.publish(data).context("publish failed"), + None => Err(eyre!( + "unexpected output {output} (not defined in dataflow config)" + )), + } + .map_err(|err| err.into()) } } } diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index a7ced706..47146d53 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,23 +1,28 @@ -use super::{OperatorEvent, OperatorInput}; +use super::OperatorEvent; +use dora_node_api::{communication::Publisher, config::DataId, BoxError}; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput, DoraResult, DoraStatus, Metadata, OnInputResult, Output, SendOutput, }; use eyre::{bail, eyre, Context}; +use flume::Receiver; use libloading::Symbol; use std::{ + collections::HashMap, ffi::c_void, + ops::Deref, panic::{catch_unwind, AssertUnwindSafe}, path::Path, sync::Arc, thread, }; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::mpsc::Sender; pub fn spawn( path: &Path, events_tx: Sender, - inputs: Receiver, + inputs: Receiver, + publishers: HashMap>, ) -> eyre::Result<()> { let file_name = path .file_name() @@ -41,13 +46,9 @@ pub fn spawn( let closure = AssertUnwindSafe(|| { let bindings = Bindings::init(&library).context("failed to init operator")?; - let operator = SharedLibraryOperator { - events_tx: events_tx.clone(), - inputs, - bindings, - }; + let operator = SharedLibraryOperator { inputs, bindings }; - operator.run() + operator.run(publishers) }); match catch_unwind(closure) { Ok(Ok(())) => { @@ -66,14 +67,13 @@ pub fn spawn( } struct SharedLibraryOperator<'lib> { - events_tx: Sender, - inputs: Receiver, + inputs: Receiver, bindings: Bindings<'lib>, } impl<'lib> SharedLibraryOperator<'lib> { - fn run(mut self) -> eyre::Result<()> { + fn run(self, publishers: HashMap>) -> eyre::Result<()> { let operator_context = { let DoraInitResult { result, @@ -89,13 +89,17 @@ impl<'lib> SharedLibraryOperator<'lib> { } }; - let closure_events_tx = self.events_tx.clone(); let send_output_closure = Arc::new(move |output: Output| { - let id: String = output.id.into(); - let result = closure_events_tx.blocking_send(OperatorEvent::Output { - id: id.into(), - value: output.data.into(), - }); + let result = match publishers.get(output.id.deref()) { + Some(publisher) => publisher.publish(&output.data), + None => Err(BoxError( + eyre!( + "unexpected output {} (not defined in dataflow config)", + output.id.deref() + ) + .into(), + )), + }; let error = match result { Ok(()) => None, @@ -105,10 +109,10 @@ impl<'lib> SharedLibraryOperator<'lib> { DoraResult { error } }); - while let Some(input) = self.inputs.blocking_recv() { + while let Ok(input) = self.inputs.recv() { let operator_input = dora_operator_api_types::Input { id: String::from(input.id).into(), - data: input.value.into(), + data: input.data.into(), metadata: Metadata { open_telemetry_context: String::new().into(), }, diff --git a/examples/iceoryx/dataflow.yml b/examples/iceoryx/dataflow.yml new file mode 100644 index 00000000..f8b447db --- /dev/null +++ b/examples/iceoryx/dataflow.yml @@ -0,0 +1,26 @@ +communication: + iceoryx: + app_name_prefix: dora-iceoryx-example + +nodes: + - id: rust-node + custom: + run: ../../target/debug/iceoryx-example-node + inputs: + tick: dora/timer/millis/300 + outputs: + - random + - id: runtime-node + operators: + - id: rust-operator + shared-library: ../../target/debug/iceoryx_example_operator + inputs: + tick: dora/timer/millis/100 + random: rust-node/random + outputs: + - status + - id: rust-sink + custom: + run: ../../target/debug/iceoryx-example-sink + inputs: + message: runtime-node/rust-operator/status diff --git a/examples/iceoryx/node/Cargo.toml b/examples/iceoryx/node/Cargo.toml new file mode 100644 index 00000000..dafe3892 --- /dev/null +++ b/examples/iceoryx/node/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "iceoryx-example-node" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +eyre = "0.6.8" +rand = "0.8.5" diff --git a/examples/iceoryx/node/src/main.rs b/examples/iceoryx/node/src/main.rs new file mode 100644 index 00000000..a69cb172 --- /dev/null +++ b/examples/iceoryx/node/src/main.rs @@ -0,0 +1,26 @@ +use dora_node_api::{self, config::DataId, DoraNode}; + +fn main() -> eyre::Result<()> { + let output = DataId::from("random".to_owned()); + + let mut operator = DoraNode::init_from_env()?; + + let inputs = operator.inputs()?; + + for _ in 0..20 { + let input = match inputs.recv() { + Ok(input) => input, + Err(_) => break, + }; + + match input.id.as_str() { + "tick" => { + let random: u64 = rand::random(); + operator.send_output(&output, &random.to_le_bytes())?; + } + other => eprintln!("Ignoring unexpected input `{other}`"), + } + } + + Ok(()) +} diff --git a/examples/iceoryx/operator/Cargo.toml b/examples/iceoryx/operator/Cargo.toml new file mode 100644 index 00000000..474d6103 --- /dev/null +++ b/examples/iceoryx/operator/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "iceoryx-example-operator" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +crate-type = ["cdylib"] + +[dependencies] +dora-operator-api = { path = "../../../apis/rust/operator" } diff --git a/examples/iceoryx/operator/src/lib.rs b/examples/iceoryx/operator/src/lib.rs new file mode 100644 index 00000000..3c0713a1 --- /dev/null +++ b/examples/iceoryx/operator/src/lib.rs @@ -0,0 +1,47 @@ +#![warn(unsafe_op_in_unsafe_fn)] + +use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus}; +use std::time::{Duration, Instant}; + +register_operator!(ExampleOperator); + +#[derive(Debug, Default)] +struct ExampleOperator { + ticks: usize, + last_random_at: Option, +} + +impl DoraOperator for ExampleOperator { + fn on_input( + &mut self, + id: &str, + data: &[u8], + output_sender: &mut DoraOutputSender, + ) -> Result { + match id { + "tick" => { + self.ticks += 1; + } + "random" => { + let parsed = { + let data: [u8; 8] = data.try_into().map_err(|_| "unexpected random data")?; + u64::from_le_bytes(data) + }; + let output = format!( + "operator received random value {parsed} after {} ticks", + self.ticks + ); + output_sender.send("status".into(), output.into_bytes())?; + self.last_random_at = Some(Instant::now()); + } + other => eprintln!("ignoring unexpected input {other}"), + } + if let Some(last_random_at) = self.last_random_at { + if last_random_at.elapsed() > Duration::from_secs(1) { + // looks like the node sending the random values finished -> exit too + return Ok(DoraStatus::Stop); + } + } + Ok(DoraStatus::Continue) + } +} diff --git a/examples/iceoryx/run.rs b/examples/iceoryx/run.rs new file mode 100644 index 00000000..e6562fa6 --- /dev/null +++ b/examples/iceoryx/run.rs @@ -0,0 +1,33 @@ +use eyre::{bail, Context}; +use std::path::Path; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + build_package("iceoryx-example-node").await?; + build_package("iceoryx-example-operator").await?; + build_package("iceoryx-example-sink").await?; + build_package("dora-runtime").await?; + + dora_coordinator::run(dora_coordinator::Command::Run { + dataflow: Path::new("dataflow.yml").to_owned(), + runtime: Some(root.join("target").join("debug").join("dora-runtime")), + }) + .await?; + + Ok(()) +} + +async fn build_package(package: &str) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("build"); + cmd.arg("--package").arg(package); + if !cmd.status().await?.success() { + bail!("failed to build {package}"); + }; + Ok(()) +} diff --git a/examples/iceoryx/sink/Cargo.toml b/examples/iceoryx/sink/Cargo.toml new file mode 100644 index 00000000..fa48b258 --- /dev/null +++ b/examples/iceoryx/sink/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "iceoryx-example-sink" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +eyre = "0.6.8" +futures = "0.3.21" +tokio = { version = "1.20.1", features = ["macros"] } diff --git a/examples/iceoryx/sink/src/main.rs b/examples/iceoryx/sink/src/main.rs new file mode 100644 index 00000000..0fd5099b --- /dev/null +++ b/examples/iceoryx/sink/src/main.rs @@ -0,0 +1,32 @@ +use dora_node_api::{self, DoraNode}; +use eyre::{bail, Context}; + +fn main() -> eyre::Result<()> { + let mut operator = DoraNode::init_from_env()?; + + let inputs = operator.inputs()?; + + loop { + let input = match inputs.recv() { + Ok(input) => input, + Err(_) => break, + }; + + match input.id.as_str() { + "message" => { + let received_string = String::from_utf8(input.data) + .wrap_err("received message was not utf8-encoded")?; + println!("received message: {}", received_string); + if !received_string.starts_with("operator received random value ") { + bail!("unexpected message format (should start with 'operator received random value')") + } + if !received_string.ends_with(" ticks") { + bail!("unexpected message format (should end with 'ticks')") + } + } + other => eprintln!("Ignoring unexpected input `{other}`"), + } + } + + Ok(()) +} diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index edb349f7..a69cb172 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -1,28 +1,22 @@ use dora_node_api::{self, config::DataId, DoraNode}; -use eyre::bail; -use futures::StreamExt; -use std::time::Duration; -#[tokio::main] -async fn main() -> eyre::Result<()> { +fn main() -> eyre::Result<()> { let output = DataId::from("random".to_owned()); - let operator = DoraNode::init_from_env().await?; + let mut operator = DoraNode::init_from_env()?; - let mut inputs = operator.inputs().await?; + let inputs = operator.inputs()?; for _ in 0..20 { - let timeout = Duration::from_secs(3); - let input = match tokio::time::timeout(timeout, inputs.next()).await { - Ok(Some(input)) => input, - Ok(None) => break, - Err(_) => bail!("timeout while waiting for input"), + let input = match inputs.recv() { + Ok(input) => input, + Err(_) => break, }; match input.id.as_str() { "tick" => { let random: u64 = rand::random(); - operator.send_output(&output, &random.to_le_bytes()).await?; + operator.send_output(&output, &random.to_le_bytes())?; } other => eprintln!("Ignoring unexpected input `{other}`"), } diff --git a/examples/rust-dataflow/sink/Cargo.toml b/examples/rust-dataflow/sink/Cargo.toml index 284ed20a..e80b5a61 100644 --- a/examples/rust-dataflow/sink/Cargo.toml +++ b/examples/rust-dataflow/sink/Cargo.toml @@ -8,5 +8,3 @@ edition = "2021" [dependencies] dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } eyre = "0.6.8" -futures = "0.3.21" -tokio = { version = "1.20.1", features = ["macros"] } diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index 0bd3929f..0fd5099b 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -1,20 +1,15 @@ use dora_node_api::{self, DoraNode}; use eyre::{bail, Context}; -use futures::StreamExt; -use std::time::Duration; -#[tokio::main] -async fn main() -> eyre::Result<()> { - let operator = DoraNode::init_from_env().await?; +fn main() -> eyre::Result<()> { + let mut operator = DoraNode::init_from_env()?; - let mut inputs = operator.inputs().await?; + let inputs = operator.inputs()?; loop { - let timeout = Duration::from_secs(5); - let input = match tokio::time::timeout(timeout, inputs.next()).await { - Ok(Some(input)) => input, - Ok(None) => break, - Err(_) => bail!("timeout while waiting for input"), + let input = match inputs.recv() { + Ok(input) => input, + Err(_) => break, }; match input.id.as_str() { diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index 70e14260..f0d46f32 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -1,21 +1 @@ pub mod descriptor; - -pub struct BoxError(pub Box); - -impl std::fmt::Debug for BoxError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Debug::fmt(&self.0, f) - } -} - -impl std::fmt::Display for BoxError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) - } -} - -impl std::error::Error for BoxError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.0.source() - } -}