diff --git a/Cargo.lock b/Cargo.lock index 700b6958..7656d737 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -474,6 +474,16 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "communication-layer-pub-sub" +version = "0.1.0" +dependencies = [ + "eyre", + "iceoryx-rs", + "zenoh", + "zenoh-config", +] + [[package]] name = "concurrent-queue" version = "1.2.2" @@ -794,7 +804,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util 0.7.1", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -839,18 +849,16 @@ dependencies = [ name = "dora-node-api" version = "0.1.0" dependencies = [ + "communication-layer-pub-sub", "eyre", "flume", - "iceoryx-rs", "once_cell", "serde", "serde_yaml", "thiserror", "tokio", "tracing", - "uuid", - "zenoh", - "zenoh-config", + "uuid 1.1.2", ] [[package]] @@ -3478,7 +3486,7 @@ dependencies = [ "lazy_static", "log", "serde", - "uuid", + "uuid 0.8.2", ] [[package]] @@ -3587,6 +3595,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "uuid" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" +dependencies = [ + "getrandom", +] + [[package]] name = "validated_struct" version = "2.1.0" @@ -3918,7 +3935,7 @@ dependencies = [ "socket2", "stop-token", "uhlc", - "uuid", + "uuid 0.8.2", "vec_map", "zenoh-buffers", "zenoh-cfg-properties", @@ -4130,7 +4147,7 @@ dependencies = [ "futures", "log", "nix 0.23.1", - "uuid", + "uuid 0.8.2", "zenoh-core", "zenoh-link-commons", "zenoh-protocol-core", @@ -4191,7 +4208,7 @@ dependencies = [ "lazy_static", "serde", "uhlc", - "uuid", + "uuid 0.8.2", "zenoh-core", ] diff --git a/Cargo.toml b/Cargo.toml index 1d083b1b..77c93fd3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,9 +10,9 @@ members = [ "examples/c++-dataflow/*-rust-*", "examples/iceoryx/*", "libraries/core", + "libraries/communication-layer", "libraries/extensions/message", "libraries/extensions/telemetry/*", - "libraries/extensions/telemetry/*", "libraries/extensions/zenoh-logger", ] diff --git a/apis/c/node/Cargo.toml b/apis/c/node/Cargo.toml index 113e12b4..77fcda37 100644 --- a/apis/c/node/Cargo.toml +++ b/apis/c/node/Cargo.toml @@ -16,4 +16,3 @@ flume = "0.10.14" [dependencies.dora-node-api] default-features = false path = "../../rust/node" -features = ["zenoh"] diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index 7b7a7d08..c750fa29 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -6,8 +6,8 @@ license = "Apache-2.0" [features] default = ["zenoh", "iceoryx"] -zenoh = ["dep:zenoh", "dep:zenoh-config"] -iceoryx = ["dep:iceoryx-rs"] +zenoh = ["communication-layer-pub-sub/zenoh"] +iceoryx = ["communication-layer-pub-sub/iceoryx"] [dependencies] eyre = "0.6.7" @@ -16,13 +16,9 @@ serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.8.23" thiserror = "1.0.30" tracing = "0.1.33" -uuid = "0.8.2" -zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true } -zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true } flume = "0.10.14" - -[target.'cfg(unix)'.dependencies] -iceoryx-rs = { git = "https://github.com/eclipse-iceoryx/iceoryx-rs.git", optional = true } +communication-layer-pub-sub = { path = "../../../libraries/communication-layer", default-features = false } +uuid = { version = "1.1.2", features = ["v4"] } [dev-dependencies] tokio = { version = "1.17.0", features = ["rt"] } diff --git a/apis/rust/node/src/communication.rs b/apis/rust/node/src/communication.rs new file mode 100644 index 00000000..fc944322 --- /dev/null +++ b/apis/rust/node/src/communication.rs @@ -0,0 +1,161 @@ +use crate::{ + config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, + BoxError, +}; +use communication_layer_pub_sub::{ + iceoryx::IceoryxCommunicationLayer, zenoh::ZenohCommunicationLayer, +}; +pub use communication_layer_pub_sub::{CommunicationLayer, Publisher, Subscriber}; +use eyre::Context; +use std::{ + collections::{BTreeMap, HashSet}, + mem, thread, +}; +use uuid::Uuid; + +#[doc(hidden)] +pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped"; + +pub fn init( + communication_config: &CommunicationConfig, +) -> eyre::Result> { + match communication_config { + #[cfg(feature = "zenoh")] + CommunicationConfig::Zenoh { + config: zenoh_config, + prefix: zenoh_prefix, + } => { + let layer = ZenohCommunicationLayer::init(zenoh_config.clone(), zenoh_prefix.clone()) + .map_err(|err| eyre::eyre!(err))?; + + Ok(Box::new(layer)) + } + #[cfg(not(feature = "zenoh"))] + CommunicationConfig::Zenoh { .. } => { + eyre::bail!( + "cannot parse zenoh config because the compile-time `zenoh` feature \ + of `dora-node-api` was disabled" + ) + } + #[cfg(all(unix, feature = "iceoryx"))] + CommunicationConfig::Iceoryx { + app_name_prefix, + topic_prefix, + } => { + let app_name_prefix = app_name_prefix.clone(); + let topic_prefix = topic_prefix.clone(); + let app_name = format!("{app_name_prefix}-{}", Uuid::new_v4()); + let layer = IceoryxCommunicationLayer::init(app_name, topic_prefix) + .map_err(|err| eyre::eyre!(err))?; + + Ok(Box::new(layer)) + } + #[cfg(not(all(unix, feature = "iceoryx")))] + CommunicationConfig::Iceoryx { .. } => { + eyre::bail!( + "cannot parse iceoryx config because the compile-time `iceoryx` feature \ + of `dora-node-api` was disabled" + ) + } + } +} + +pub fn subscribe_all( + communication: &mut dyn CommunicationLayer, + inputs: &BTreeMap, +) -> eyre::Result> { + let (inputs_tx, inputs_rx) = flume::bounded(10); + for (input, mapping) in inputs { + let topic = mapping.to_string(); + let mut sub = communication + .subscribe(&topic) + .map_err(|err| eyre::eyre!(err)) + .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; + + let input_id = input.to_owned(); + let sender = inputs_tx.clone(); + thread::spawn(move || loop { + let event = match sub.recv().transpose() { + None => break, + Some(Ok(data)) => InputEvent::Input(Input { + id: input_id.clone(), + data, + }), + Some(Err(err)) => InputEvent::Error(err), + }; + match sender.send(event) { + Ok(()) => {} + Err(flume::SendError(_)) => break, + } + }); + } + + let mut sources: HashSet<_> = inputs + .values() + .map(|v| (v.source().to_owned(), v.operator().to_owned())) + .collect(); + for (source, operator) in &sources { + let topic = match operator { + Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"), + None => format!("{source}/{STOP_TOPIC}"), + }; + let mut sub = communication + .subscribe(&topic) + .map_err(|err| eyre::eyre!(err)) + .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; + + let source = source.to_owned(); + let operator = operator.clone(); + let sender = inputs_tx.clone(); + thread::spawn(move || loop { + let event = match sub.recv().transpose() { + None => break, + Some(Ok(_)) => InputEvent::SourceClosed { + source: source.clone(), + operator: operator.clone(), + }, + Some(Err(err)) => InputEvent::Error(err), + }; + match sender.send(event) { + Ok(()) => {} + Err(flume::SendError(_)) => break, + } + }); + } + mem::drop(inputs_tx); + + let (combined_tx, combined) = flume::bounded(1); + thread::spawn(move || loop { + match inputs_rx.recv() { + Ok(InputEvent::Input(input)) => match combined_tx.send(input) { + Ok(()) => {} + Err(flume::SendError(_)) => break, + }, + Ok(InputEvent::SourceClosed { source, operator }) => { + sources.remove(&(source, operator)); + if sources.is_empty() { + break; + } + } + Ok(InputEvent::Error(err)) => panic!("{err}"), + Err(_) => break, + } + }); + + Ok(combined) +} + +enum InputEvent { + Input(Input), + SourceClosed { + source: NodeId, + operator: Option, + }, + Error(BoxError), +} + +#[derive(Debug)] +pub struct Input { + pub id: DataId, + pub data: Vec, +} diff --git a/apis/rust/node/src/communication/mod.rs b/apis/rust/node/src/communication/mod.rs deleted file mode 100644 index 55de5235..00000000 --- a/apis/rust/node/src/communication/mod.rs +++ /dev/null @@ -1,174 +0,0 @@ -use crate::{ - config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, - BoxError, -}; -use eyre::Context; -pub use flume::Receiver; -use std::{ - collections::{BTreeMap, HashSet}, - mem, thread, -}; - -#[doc(hidden)] -pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped"; - -#[cfg(all(unix, feature = "iceoryx"))] -pub mod iceoryx; -#[cfg(feature = "zenoh")] -pub mod zenoh; - -pub fn init( - communication_config: &CommunicationConfig, -) -> eyre::Result> { - match communication_config { - #[cfg(feature = "zenoh")] - CommunicationConfig::Zenoh { - config: zenoh_config, - prefix: zenoh_prefix, - } => { - let layer = - zenoh::ZenohCommunicationLayer::init(zenoh_config.clone(), zenoh_prefix.clone())?; - - Ok(Box::new(layer)) - } - #[cfg(not(feature = "zenoh"))] - CommunicationConfig::Zenoh { .. } => { - eyre::bail!( - "cannot parse zenoh config because the compile-time `zenoh` feature \ - of `dora-node-api` was disabled" - ) - } - #[cfg(all(unix, feature = "iceoryx"))] - CommunicationConfig::Iceoryx { - app_name_prefix, - topic_prefix, - } => { - let app_name_prefix = app_name_prefix.clone(); - let topic_prefix = topic_prefix.clone(); - let layer = iceoryx::IceoryxCommunicationLayer::init(app_name_prefix, topic_prefix)?; - - Ok(Box::new(layer)) - } - #[cfg(not(all(unix, feature = "iceoryx")))] - CommunicationConfig::Iceoryx { .. } => { - eyre::bail!( - "cannot parse iceoryx config because the compile-time `iceoryx` feature \ - of `dora-node-api` was disabled" - ) - } - } -} - -pub trait CommunicationLayer: Send + Sync { - fn publisher(&mut self, topic: &str) -> Result, BoxError>; - - fn subscribe(&mut self, topic: &str) -> Result, BoxError>; - - fn subscribe_all( - &mut self, - inputs: &BTreeMap, - ) -> eyre::Result> { - let (inputs_tx, inputs_rx) = flume::bounded(10); - for (input, mapping) in inputs { - let topic = mapping.to_string(); - let mut sub = self - .subscribe(&topic) - .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; - - let input_id = input.to_owned(); - let sender = inputs_tx.clone(); - thread::spawn(move || loop { - let event = match sub.recv().transpose() { - None => break, - Some(Ok(data)) => InputEvent::Input(Input { - id: input_id.clone(), - data, - }), - Some(Err(err)) => InputEvent::Error(err), - }; - match sender.send(event) { - Ok(()) => {} - Err(flume::SendError(_)) => break, - } - }); - } - - let mut sources: HashSet<_> = inputs - .values() - .map(|v| (v.source().to_owned(), v.operator().to_owned())) - .collect(); - for (source, operator) in &sources { - let topic = match operator { - Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"), - None => format!("{source}/{STOP_TOPIC}"), - }; - let mut sub = self - .subscribe(&topic) - .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; - - let source = source.to_owned(); - let operator = operator.clone(); - let sender = inputs_tx.clone(); - thread::spawn(move || loop { - let event = match sub.recv().transpose() { - None => break, - Some(Ok(_)) => InputEvent::SourceClosed { - source: source.clone(), - operator: operator.clone(), - }, - Some(Err(err)) => InputEvent::Error(err), - }; - match sender.send(event) { - Ok(()) => {} - Err(flume::SendError(_)) => break, - } - }); - } - mem::drop(inputs_tx); - - let (combined_tx, combined) = flume::bounded(1); - thread::spawn(move || loop { - match inputs_rx.recv() { - Ok(InputEvent::Input(input)) => match combined_tx.send(input) { - Ok(()) => {} - Err(flume::SendError(_)) => break, - }, - Ok(InputEvent::SourceClosed { source, operator }) => { - sources.remove(&(source, operator)); - if sources.is_empty() { - break; - } - } - Ok(InputEvent::Error(err)) => panic!("{err}"), - Err(_) => break, - } - }); - - Ok(combined) - } -} - -pub trait Publisher: Send + Sync { - fn publish(&self, data: &[u8]) -> Result<(), BoxError>; - - fn boxed_clone(&self) -> Box; -} - -pub trait Subscriber: Send + Sync { - fn recv(&mut self) -> Result>, BoxError>; -} - -enum InputEvent { - Input(Input), - SourceClosed { - source: NodeId, - operator: Option, - }, - Error(BoxError), -} - -#[derive(Debug)] -pub struct Input { - pub id: DataId, - pub data: Vec, -} diff --git a/apis/rust/node/src/config.rs b/apis/rust/node/src/config.rs index 9acc19e9..697910f7 100644 --- a/apis/rust/node/src/config.rs +++ b/apis/rust/node/src/config.rs @@ -1,3 +1,4 @@ +use communication_layer_pub_sub::zenoh::zenoh_config; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use std::{ diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index cdc5ab7a..3b933940 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,5 +1,8 @@ pub use communication::Input; -use communication::{CommunicationLayer, STOP_TOPIC}; +pub use flume::Receiver; + +use communication::STOP_TOPIC; +use communication_layer_pub_sub::CommunicationLayer; use config::{CommunicationConfig, DataId, NodeId, NodeRunConfig}; use eyre::WrapErr; @@ -46,7 +49,7 @@ impl DoraNode { } pub fn inputs(&mut self) -> eyre::Result> { - self.communication.subscribe_all(&self.node_config.inputs) + communication::subscribe_all(self.communication.as_mut(), &self.node_config.inputs) } pub fn send_output(&mut self, output_id: &DataId, data: &[u8]) -> eyre::Result<()> { @@ -59,8 +62,10 @@ impl DoraNode { let topic = format!("{self_id}/{output_id}"); self.communication .publisher(&topic) + .map_err(|err| eyre::eyre!(err)) .wrap_err_with(|| format!("failed create publisher for output {output_id}"))? .publish(data) + .map_err(|err| eyre::eyre!(err)) .wrap_err_with(|| format!("failed to send data for output {output_id}"))?; Ok(()) } @@ -81,11 +86,13 @@ impl Drop for DoraNode { let result = self .communication .publisher(&topic) + .map_err(|err| eyre::eyre!(err)) .wrap_err_with(|| { format!("failed to create publisher for stop message for node `{self_id}`") }) .and_then(|p| { p.publish(&[]) + .map_err(|err| eyre::eyre!(err)) .wrap_err_with(|| format!("failed to send stop message for node `{self_id}`")) }); match result { @@ -97,25 +104,7 @@ impl Drop for DoraNode { } } -pub struct BoxError(pub Box); - -impl std::fmt::Debug for BoxError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Debug::fmt(&self.0, f) - } -} - -impl std::fmt::Display for BoxError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) - } -} - -impl std::error::Error for BoxError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.0.source() - } -} +pub type BoxError = Box; #[cfg(test)] mod tests { diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index 35affe23..564b1b08 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -100,6 +100,7 @@ async fn run( tokio::task::spawn_blocking(move || stop_publisher.publish(&[])) .await .wrap_err("failed to join stop publish task")? + .map_err(|err| eyre::eyre!(err)) .with_context(|| { format!( "failed to send stop message for operator `{node_id}/{id}`" @@ -131,6 +132,7 @@ fn publisher( let topic = format!("{self_id}/{operator_id}/{output_id}"); communication .publisher(&topic) + .map_err(|err| eyre::eyre!(err)) .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) } diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 3ef9f5a3..e9f0c7e4 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -1,5 +1,8 @@ use dora_core::descriptor::{OperatorDefinition, OperatorSource}; -use dora_node_api::{communication::CommunicationLayer, config::NodeId}; +use dora_node_api::{ + communication::{self, CommunicationLayer}, + config::NodeId, +}; use eyre::Context; use std::any::Any; use tokio::sync::mpsc::Sender; @@ -13,8 +16,7 @@ pub fn spawn_operator( events_tx: Sender, communication: &mut dyn CommunicationLayer, ) -> eyre::Result<()> { - let inputs = communication - .subscribe_all(&operator_definition.config.inputs) + let inputs = communication::subscribe_all(communication, &operator_definition.config.inputs) .wrap_err_with(|| { format!( "failed to subscribe to inputs of operator {}", @@ -33,6 +35,7 @@ pub fn spawn_operator( ); communication .publisher(&topic) + .map_err(|err| eyre::eyre!(err)) .wrap_err_with(|| format!("failed to create publisher for output {output_id}")) .map(|p| (output_id.to_owned(), p)) }) diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 3efe892f..1dd47c24 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -156,7 +156,10 @@ mod callback_impl { impl SendOutputCallback { fn __call__(&mut self, output: &str, data: &[u8]) -> PyResult<()> { match self.publishers.get(output) { - Some(publisher) => publisher.publish(data).context("publish failed"), + Some(publisher) => publisher + .publish(data) + .map_err(|err| eyre::eyre!(err)) + .context("publish failed"), None => Err(eyre!( "unexpected output {output} (not defined in dataflow config)" )), diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 47146d53..e1c1674a 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,5 +1,5 @@ use super::OperatorEvent; -use dora_node_api::{communication::Publisher, config::DataId, BoxError}; +use dora_node_api::{communication::Publisher, config::DataId}; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput, DoraResult, DoraStatus, Metadata, OnInputResult, Output, SendOutput, @@ -92,13 +92,11 @@ impl<'lib> SharedLibraryOperator<'lib> { let send_output_closure = Arc::new(move |output: Output| { let result = match publishers.get(output.id.deref()) { Some(publisher) => publisher.publish(&output.data), - None => Err(BoxError( - eyre!( - "unexpected output {} (not defined in dataflow config)", - output.id.deref() - ) - .into(), - )), + None => Err(eyre!( + "unexpected output {} (not defined in dataflow config)", + output.id.deref() + ) + .into()), }; let error = match result { diff --git a/examples/c++-dataflow/node-rust-api/src/main.rs b/examples/c++-dataflow/node-rust-api/src/main.rs index 33b60641..95c8a42c 100644 --- a/examples/c++-dataflow/node-rust-api/src/main.rs +++ b/examples/c++-dataflow/node-rust-api/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, communication::Receiver, DoraNode, Input}; +use dora_node_api::{self, DoraNode, Input, Receiver}; #[cxx::bridge] mod ffi { diff --git a/libraries/communication-layer/Cargo.toml b/libraries/communication-layer/Cargo.toml new file mode 100644 index 00000000..488d21c6 --- /dev/null +++ b/libraries/communication-layer/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "communication-layer-pub-sub" +version = "0.1.0" +edition = "2021" + +[features] +default = ["zenoh", "iceoryx"] +zenoh = ["dep:zenoh", "dep:zenoh-config"] +iceoryx = ["dep:iceoryx-rs"] + +[dependencies] +eyre = "0.6.8" +zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true } +zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true } + +[target.'cfg(unix)'.dependencies] +iceoryx-rs = { git = "https://github.com/eclipse-iceoryx/iceoryx-rs.git", optional = true } + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] diff --git a/apis/rust/node/src/communication/iceoryx.rs b/libraries/communication-layer/src/iceoryx.rs similarity index 73% rename from apis/rust/node/src/communication/iceoryx.rs rename to libraries/communication-layer/src/iceoryx.rs index f997ab75..99100179 100644 --- a/apis/rust/node/src/communication/iceoryx.rs +++ b/libraries/communication-layer/src/iceoryx.rs @@ -1,19 +1,29 @@ -use eyre::Context; -use std::{collections::HashMap, sync::Arc, time::Duration}; -use uuid::Uuid; - -use crate::BoxError; +//! Provides [`IceoryxCommunicationLayer`] to communicate over `iceoryx`. use super::{CommunicationLayer, Publisher, Subscriber}; +use crate::BoxError; +use eyre::Context; +use std::{collections::HashMap, sync::Arc, time::Duration}; +/// Enables local communication based on `iceoryx`. pub struct IceoryxCommunicationLayer { topic_prefix: Arc, publishers: HashMap>>, } impl IceoryxCommunicationLayer { - pub fn init(app_name_prefix: String, topic_prefix: String) -> eyre::Result { - let app_name = format!("{app_name_prefix}-{}", Uuid::new_v4()); + /// Initializes a new `iceoryx` connection with default configuration. + /// + /// The given `app_name` must be unique. The `topic_prefix` is used as _instance name_ + /// when using the [`publisher`][Self::publisher] and [`subscriber`][Self::subscribe] + /// methods. + /// + /// Note: In order to use iceoryx, you need to start its broker deamon called + /// [_RouDi_](https://iceoryx.io/v2.0.2/getting-started/overview/#roudi) first. + /// Its executable name is `iox-roudi`. See the + /// [`iceoryx` installation chapter](https://iceoryx.io/v2.0.2/getting-started/installation/) + /// for ways to install it. + pub fn init(app_name: String, topic_prefix: String) -> Result { iceoryx_rs::Runtime::init(&app_name); Ok(Self { @@ -53,7 +63,7 @@ impl CommunicationLayer for IceoryxCommunicationLayer { fn publisher(&mut self, topic: &str) -> Result, crate::BoxError> { let publisher = self .get_or_create_publisher(topic) - .map_err(|err| BoxError(err.into()))?; + .map_err(BoxError::from)?; Ok(Box::new(IceoryxPublisher { publisher })) } @@ -64,7 +74,7 @@ impl CommunicationLayer for IceoryxCommunicationLayer { .queue_capacity(5) .create_mt() .context("failed to create iceoryx subscriber") - .map_err(|err| BoxError(err.into()))?; + .map_err(BoxError::from)?; let receiver = subscriber.get_sample_receiver(token); Ok(Box::new(IceoryxReceiver { receiver })) @@ -72,7 +82,7 @@ impl CommunicationLayer for IceoryxCommunicationLayer { } #[derive(Clone)] -pub struct IceoryxPublisher { +struct IceoryxPublisher { publisher: Arc>, } @@ -82,18 +92,18 @@ impl Publisher for IceoryxPublisher { .publisher .loan_slice(data.len()) .context("failed to loan iceoryx slice for publishing") - .map_err(|err| BoxError(err.into()))?; + .map_err(BoxError::from)?; sample.copy_from_slice(data); self.publisher.publish(sample); Ok(()) } - fn boxed_clone(&self) -> Box { + fn dyn_clone(&self) -> Box { Box::new(self.clone()) } } -pub struct IceoryxReceiver { +struct IceoryxReceiver { receiver: iceoryx_rs::mt::SampleReceiver<[u8]>, } diff --git a/libraries/communication-layer/src/lib.rs b/libraries/communication-layer/src/lib.rs new file mode 100644 index 00000000..298a4eb2 --- /dev/null +++ b/libraries/communication-layer/src/lib.rs @@ -0,0 +1,54 @@ +#![warn(missing_docs)] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +//! Abstraction of various publisher/subscriber communication backends. +//! +//! Provides a [`CommunicationLayer`] trait as an abstraction for different publisher/subscriber +//! systems. The following set of backends are currently supported: +//! +//! - **[Zenoh](https://zenoh.io/):** The zenoh project implements a distributed +//! publisher/subscriber system with automated routing. To use zenoh, use the +//! [`ZenohCommunicationLayer`][zenoh::ZenohCommunicationLayer] struct. +//! - **[Iceoryx](https://iceoryx.io/):** The Eclipse iceoryxâ„¢ project provides an IPC middleware +//! based on shared memory. It is very fast, but it only supports local communication. To use +//! iceoryx, use the [`IceoryxCommunicationLayer`][iceoryx::IceoryxCommunicationLayer] struct. + +#[cfg(all(unix, feature = "iceoryx"))] +pub mod iceoryx; +#[cfg(feature = "zenoh")] +pub mod zenoh; + +type BoxError = Box; + +/// Abstraction trait for different publisher/subscriber implementations. +pub trait CommunicationLayer: Send + Sync { + /// Creates a publisher for the given topic. + fn publisher(&mut self, topic: &str) -> Result, BoxError>; + + /// Subscribe to the given topic. + fn subscribe(&mut self, topic: &str) -> Result, BoxError>; +} + +/// Allows publishing messages to subscribers. +pub trait Publisher: Send + Sync { + /// Publish the given data to subscribers. + /// + /// The data is published to the topic that was used to create this publisher + /// (see [`CommunicationLayer::publisher`]). + fn publish(&self, data: &[u8]) -> Result<(), BoxError>; + + /// Clone this publisher, returning the clone as a + /// [trait object](https://doc.rust-lang.org/book/ch17-02-trait-objects.html). + fn dyn_clone(&self) -> Box; +} + +/// Allows receiving messages published on a topic. +pub trait Subscriber: Send + Sync { + /// Receives the next message. + /// + /// Blocks until the next message is available. + /// + /// Depending on the chosen communication backend, some messages might be dropped if + /// the publisher is faster than the subscriber. + fn recv(&mut self) -> Result>, BoxError>; +} diff --git a/apis/rust/node/src/communication/zenoh.rs b/libraries/communication-layer/src/zenoh.rs similarity index 69% rename from apis/rust/node/src/communication/zenoh.rs rename to libraries/communication-layer/src/zenoh.rs index 03f0a295..5e42cf74 100644 --- a/apis/rust/node/src/communication/zenoh.rs +++ b/libraries/communication-layer/src/zenoh.rs @@ -1,3 +1,7 @@ +//! Provides [`ZenohCommunicationLayer`] to communicate over `zenoh`. + +pub use zenoh_config; + use super::{CommunicationLayer, Publisher, Subscriber}; use crate::BoxError; use std::{sync::Arc, time::Duration}; @@ -6,16 +10,22 @@ use zenoh::{ publication::CongestionControl, }; +/// Allows communication over `zenoh`. pub struct ZenohCommunicationLayer { zenoh: Arc, topic_prefix: String, } impl ZenohCommunicationLayer { - pub fn init(config: zenoh_config::Config, prefix: String) -> eyre::Result { + /// Initializes a new `zenoh` session with the given configuration. + /// + /// The `prefix` is added to all topic names when using the [`publisher`][Self::publisher] + /// and [`subscriber`][Self::subscribe] methods. Pass an empty string if no prefix is + /// desired. + pub fn init(config: zenoh_config::Config, prefix: String) -> Result { let zenoh = ::zenoh::open(config) .wait() - .map_err(|err| BoxError(err.into()))? + .map_err(BoxError::from)? .into_arc(); Ok(Self { zenoh, @@ -29,14 +39,14 @@ impl ZenohCommunicationLayer { } impl CommunicationLayer for ZenohCommunicationLayer { - fn publisher(&mut self, topic: &str) -> Result, crate::BoxError> { + fn publisher(&mut self, topic: &str) -> Result, BoxError> { let publisher = self .zenoh .publish(self.prefixed(topic)) .congestion_control(CongestionControl::Block) .priority(Priority::RealTime) .wait() - .map_err(|err| BoxError(err.into()))?; + .map_err(BoxError::from)?; Ok(Box::new(ZenohPublisher { publisher })) } @@ -47,7 +57,7 @@ impl CommunicationLayer for ZenohCommunicationLayer { .subscribe(self.prefixed(topic)) .reliable() .wait() - .map_err(|err| BoxError(err.into()))?; + .map_err(BoxError::from)?; Ok(Box::new(ZenohReceiver(subscriber))) } @@ -65,29 +75,27 @@ impl Drop for ZenohCommunicationLayer { } #[derive(Clone)] -pub struct ZenohPublisher { +struct ZenohPublisher { publisher: zenoh::publication::Publisher<'static>, } impl Publisher for ZenohPublisher { - fn publish(&self, data: &[u8]) -> Result<(), crate::BoxError> { - self.publisher - .send(data) - .map_err(|err| BoxError(err.into())) + fn publish(&self, data: &[u8]) -> Result<(), BoxError> { + self.publisher.send(data).map_err(BoxError::from) } - fn boxed_clone(&self) -> Box { + fn dyn_clone(&self) -> Box { Box::new(self.clone()) } } -pub struct ZenohReceiver(zenoh::subscriber::Subscriber<'static>); +struct ZenohReceiver(zenoh::subscriber::Subscriber<'static>); impl Subscriber for ZenohReceiver { - fn recv(&mut self) -> Result>, crate::BoxError> { + fn recv(&mut self) -> Result>, BoxError> { match self.0.recv() { Ok(sample) => Ok(Some(sample.value.payload.contiguous().into_owned())), - Err(flume::RecvError::Disconnected) => Ok(None), + Err(_) => Ok(None), } } }