Browse Source

Move communication layer implementation into separate library

To make it usable for other projects.
tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
848e136a75
Failed to extract signature
17 changed files with 342 additions and 254 deletions
  1. +26
    -9
      Cargo.lock
  2. +1
    -1
      Cargo.toml
  3. +0
    -1
      apis/c/node/Cargo.toml
  4. +4
    -8
      apis/rust/node/Cargo.toml
  5. +161
    -0
      apis/rust/node/src/communication.rs
  6. +0
    -174
      apis/rust/node/src/communication/mod.rs
  7. +1
    -0
      apis/rust/node/src/config.rs
  8. +10
    -21
      apis/rust/node/src/lib.rs
  9. +2
    -0
      binaries/runtime/src/main.rs
  10. +6
    -3
      binaries/runtime/src/operator/mod.rs
  11. +4
    -1
      binaries/runtime/src/operator/python.rs
  12. +6
    -8
      binaries/runtime/src/operator/shared_lib.rs
  13. +1
    -1
      examples/c++-dataflow/node-rust-api/src/main.rs
  14. +21
    -0
      libraries/communication-layer/Cargo.toml
  15. +23
    -13
      libraries/communication-layer/src/iceoryx.rs
  16. +54
    -0
      libraries/communication-layer/src/lib.rs
  17. +22
    -14
      libraries/communication-layer/src/zenoh.rs

+ 26
- 9
Cargo.lock View File

@@ -474,6 +474,16 @@ dependencies = [
"unicode-width",
]

[[package]]
name = "communication-layer-pub-sub"
version = "0.1.0"
dependencies = [
"eyre",
"iceoryx-rs",
"zenoh",
"zenoh-config",
]

[[package]]
name = "concurrent-queue"
version = "1.2.2"
@@ -794,7 +804,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"uuid",
"uuid 0.8.2",
]

[[package]]
@@ -839,18 +849,16 @@ dependencies = [
name = "dora-node-api"
version = "0.1.0"
dependencies = [
"communication-layer-pub-sub",
"eyre",
"flume",
"iceoryx-rs",
"once_cell",
"serde",
"serde_yaml",
"thiserror",
"tokio",
"tracing",
"uuid",
"zenoh",
"zenoh-config",
"uuid 1.1.2",
]

[[package]]
@@ -3478,7 +3486,7 @@ dependencies = [
"lazy_static",
"log",
"serde",
"uuid",
"uuid 0.8.2",
]

[[package]]
@@ -3587,6 +3595,15 @@ dependencies = [
"getrandom",
]

[[package]]
name = "uuid"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f"
dependencies = [
"getrandom",
]

[[package]]
name = "validated_struct"
version = "2.1.0"
@@ -3918,7 +3935,7 @@ dependencies = [
"socket2",
"stop-token",
"uhlc",
"uuid",
"uuid 0.8.2",
"vec_map",
"zenoh-buffers",
"zenoh-cfg-properties",
@@ -4130,7 +4147,7 @@ dependencies = [
"futures",
"log",
"nix 0.23.1",
"uuid",
"uuid 0.8.2",
"zenoh-core",
"zenoh-link-commons",
"zenoh-protocol-core",
@@ -4191,7 +4208,7 @@ dependencies = [
"lazy_static",
"serde",
"uhlc",
"uuid",
"uuid 0.8.2",
"zenoh-core",
]



+ 1
- 1
Cargo.toml View File

@@ -10,9 +10,9 @@ members = [
"examples/c++-dataflow/*-rust-*",
"examples/iceoryx/*",
"libraries/core",
"libraries/communication-layer",
"libraries/extensions/message",
"libraries/extensions/telemetry/*",
"libraries/extensions/telemetry/*",
"libraries/extensions/zenoh-logger",
]



+ 0
- 1
apis/c/node/Cargo.toml View File

@@ -16,4 +16,3 @@ flume = "0.10.14"
[dependencies.dora-node-api]
default-features = false
path = "../../rust/node"
features = ["zenoh"]

+ 4
- 8
apis/rust/node/Cargo.toml View File

@@ -6,8 +6,8 @@ license = "Apache-2.0"

[features]
default = ["zenoh", "iceoryx"]
zenoh = ["dep:zenoh", "dep:zenoh-config"]
iceoryx = ["dep:iceoryx-rs"]
zenoh = ["communication-layer-pub-sub/zenoh"]
iceoryx = ["communication-layer-pub-sub/iceoryx"]

[dependencies]
eyre = "0.6.7"
@@ -16,13 +16,9 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.8.23"
thiserror = "1.0.30"
tracing = "0.1.33"
uuid = "0.8.2"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true }
flume = "0.10.14"

[target.'cfg(unix)'.dependencies]
iceoryx-rs = { git = "https://github.com/eclipse-iceoryx/iceoryx-rs.git", optional = true }
communication-layer-pub-sub = { path = "../../../libraries/communication-layer", default-features = false }
uuid = { version = "1.1.2", features = ["v4"] }

[dev-dependencies]
tokio = { version = "1.17.0", features = ["rt"] }

+ 161
- 0
apis/rust/node/src/communication.rs View File

@@ -0,0 +1,161 @@
use crate::{
config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId},
BoxError,
};
use communication_layer_pub_sub::{
iceoryx::IceoryxCommunicationLayer, zenoh::ZenohCommunicationLayer,
};
pub use communication_layer_pub_sub::{CommunicationLayer, Publisher, Subscriber};
use eyre::Context;
use std::{
collections::{BTreeMap, HashSet},
mem, thread,
};
use uuid::Uuid;

#[doc(hidden)]
pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped";

pub fn init(
communication_config: &CommunicationConfig,
) -> eyre::Result<Box<dyn CommunicationLayer>> {
match communication_config {
#[cfg(feature = "zenoh")]
CommunicationConfig::Zenoh {
config: zenoh_config,
prefix: zenoh_prefix,
} => {
let layer = ZenohCommunicationLayer::init(zenoh_config.clone(), zenoh_prefix.clone())
.map_err(|err| eyre::eyre!(err))?;

Ok(Box::new(layer))
}
#[cfg(not(feature = "zenoh"))]
CommunicationConfig::Zenoh { .. } => {
eyre::bail!(
"cannot parse zenoh config because the compile-time `zenoh` feature \
of `dora-node-api` was disabled"
)
}
#[cfg(all(unix, feature = "iceoryx"))]
CommunicationConfig::Iceoryx {
app_name_prefix,
topic_prefix,
} => {
let app_name_prefix = app_name_prefix.clone();
let topic_prefix = topic_prefix.clone();
let app_name = format!("{app_name_prefix}-{}", Uuid::new_v4());
let layer = IceoryxCommunicationLayer::init(app_name, topic_prefix)
.map_err(|err| eyre::eyre!(err))?;

Ok(Box::new(layer))
}
#[cfg(not(all(unix, feature = "iceoryx")))]
CommunicationConfig::Iceoryx { .. } => {
eyre::bail!(
"cannot parse iceoryx config because the compile-time `iceoryx` feature \
of `dora-node-api` was disabled"
)
}
}
}

pub fn subscribe_all(
communication: &mut dyn CommunicationLayer,
inputs: &BTreeMap<DataId, InputMapping>,
) -> eyre::Result<flume::Receiver<Input>> {
let (inputs_tx, inputs_rx) = flume::bounded(10);
for (input, mapping) in inputs {
let topic = mapping.to_string();
let mut sub = communication
.subscribe(&topic)
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;

let input_id = input.to_owned();
let sender = inputs_tx.clone();
thread::spawn(move || loop {
let event = match sub.recv().transpose() {
None => break,
Some(Ok(data)) => InputEvent::Input(Input {
id: input_id.clone(),
data,
}),
Some(Err(err)) => InputEvent::Error(err),
};
match sender.send(event) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
}
});
}

let mut sources: HashSet<_> = inputs
.values()
.map(|v| (v.source().to_owned(), v.operator().to_owned()))
.collect();
for (source, operator) in &sources {
let topic = match operator {
Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"),
None => format!("{source}/{STOP_TOPIC}"),
};
let mut sub = communication
.subscribe(&topic)
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;

let source = source.to_owned();
let operator = operator.clone();
let sender = inputs_tx.clone();
thread::spawn(move || loop {
let event = match sub.recv().transpose() {
None => break,
Some(Ok(_)) => InputEvent::SourceClosed {
source: source.clone(),
operator: operator.clone(),
},
Some(Err(err)) => InputEvent::Error(err),
};
match sender.send(event) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
}
});
}
mem::drop(inputs_tx);

let (combined_tx, combined) = flume::bounded(1);
thread::spawn(move || loop {
match inputs_rx.recv() {
Ok(InputEvent::Input(input)) => match combined_tx.send(input) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
},
Ok(InputEvent::SourceClosed { source, operator }) => {
sources.remove(&(source, operator));
if sources.is_empty() {
break;
}
}
Ok(InputEvent::Error(err)) => panic!("{err}"),
Err(_) => break,
}
});

Ok(combined)
}

enum InputEvent {
Input(Input),
SourceClosed {
source: NodeId,
operator: Option<OperatorId>,
},
Error(BoxError),
}

#[derive(Debug)]
pub struct Input {
pub id: DataId,
pub data: Vec<u8>,
}

+ 0
- 174
apis/rust/node/src/communication/mod.rs View File

@@ -1,174 +0,0 @@
use crate::{
config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId},
BoxError,
};
use eyre::Context;
pub use flume::Receiver;
use std::{
collections::{BTreeMap, HashSet},
mem, thread,
};

#[doc(hidden)]
pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped";

#[cfg(all(unix, feature = "iceoryx"))]
pub mod iceoryx;
#[cfg(feature = "zenoh")]
pub mod zenoh;

pub fn init(
communication_config: &CommunicationConfig,
) -> eyre::Result<Box<dyn CommunicationLayer>> {
match communication_config {
#[cfg(feature = "zenoh")]
CommunicationConfig::Zenoh {
config: zenoh_config,
prefix: zenoh_prefix,
} => {
let layer =
zenoh::ZenohCommunicationLayer::init(zenoh_config.clone(), zenoh_prefix.clone())?;

Ok(Box::new(layer))
}
#[cfg(not(feature = "zenoh"))]
CommunicationConfig::Zenoh { .. } => {
eyre::bail!(
"cannot parse zenoh config because the compile-time `zenoh` feature \
of `dora-node-api` was disabled"
)
}
#[cfg(all(unix, feature = "iceoryx"))]
CommunicationConfig::Iceoryx {
app_name_prefix,
topic_prefix,
} => {
let app_name_prefix = app_name_prefix.clone();
let topic_prefix = topic_prefix.clone();
let layer = iceoryx::IceoryxCommunicationLayer::init(app_name_prefix, topic_prefix)?;

Ok(Box::new(layer))
}
#[cfg(not(all(unix, feature = "iceoryx")))]
CommunicationConfig::Iceoryx { .. } => {
eyre::bail!(
"cannot parse iceoryx config because the compile-time `iceoryx` feature \
of `dora-node-api` was disabled"
)
}
}
}

pub trait CommunicationLayer: Send + Sync {
fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, BoxError>;

fn subscribe(&mut self, topic: &str) -> Result<Box<dyn Subscriber>, BoxError>;

fn subscribe_all(
&mut self,
inputs: &BTreeMap<DataId, InputMapping>,
) -> eyre::Result<flume::Receiver<Input>> {
let (inputs_tx, inputs_rx) = flume::bounded(10);
for (input, mapping) in inputs {
let topic = mapping.to_string();
let mut sub = self
.subscribe(&topic)
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;

let input_id = input.to_owned();
let sender = inputs_tx.clone();
thread::spawn(move || loop {
let event = match sub.recv().transpose() {
None => break,
Some(Ok(data)) => InputEvent::Input(Input {
id: input_id.clone(),
data,
}),
Some(Err(err)) => InputEvent::Error(err),
};
match sender.send(event) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
}
});
}

let mut sources: HashSet<_> = inputs
.values()
.map(|v| (v.source().to_owned(), v.operator().to_owned()))
.collect();
for (source, operator) in &sources {
let topic = match operator {
Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"),
None => format!("{source}/{STOP_TOPIC}"),
};
let mut sub = self
.subscribe(&topic)
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;

let source = source.to_owned();
let operator = operator.clone();
let sender = inputs_tx.clone();
thread::spawn(move || loop {
let event = match sub.recv().transpose() {
None => break,
Some(Ok(_)) => InputEvent::SourceClosed {
source: source.clone(),
operator: operator.clone(),
},
Some(Err(err)) => InputEvent::Error(err),
};
match sender.send(event) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
}
});
}
mem::drop(inputs_tx);

let (combined_tx, combined) = flume::bounded(1);
thread::spawn(move || loop {
match inputs_rx.recv() {
Ok(InputEvent::Input(input)) => match combined_tx.send(input) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
},
Ok(InputEvent::SourceClosed { source, operator }) => {
sources.remove(&(source, operator));
if sources.is_empty() {
break;
}
}
Ok(InputEvent::Error(err)) => panic!("{err}"),
Err(_) => break,
}
});

Ok(combined)
}
}

pub trait Publisher: Send + Sync {
fn publish(&self, data: &[u8]) -> Result<(), BoxError>;

fn boxed_clone(&self) -> Box<dyn Publisher>;
}

pub trait Subscriber: Send + Sync {
fn recv(&mut self) -> Result<Option<Vec<u8>>, BoxError>;
}

enum InputEvent {
Input(Input),
SourceClosed {
source: NodeId,
operator: Option<OperatorId>,
},
Error(BoxError),
}

#[derive(Debug)]
pub struct Input {
pub id: DataId,
pub data: Vec<u8>,
}

+ 1
- 0
apis/rust/node/src/config.rs View File

@@ -1,3 +1,4 @@
use communication_layer_pub_sub::zenoh::zenoh_config;
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use std::{


+ 10
- 21
apis/rust/node/src/lib.rs View File

@@ -1,5 +1,8 @@
pub use communication::Input;
use communication::{CommunicationLayer, STOP_TOPIC};
pub use flume::Receiver;

use communication::STOP_TOPIC;
use communication_layer_pub_sub::CommunicationLayer;
use config::{CommunicationConfig, DataId, NodeId, NodeRunConfig};
use eyre::WrapErr;

@@ -46,7 +49,7 @@ impl DoraNode {
}

pub fn inputs(&mut self) -> eyre::Result<flume::Receiver<Input>> {
self.communication.subscribe_all(&self.node_config.inputs)
communication::subscribe_all(self.communication.as_mut(), &self.node_config.inputs)
}

pub fn send_output(&mut self, output_id: &DataId, data: &[u8]) -> eyre::Result<()> {
@@ -59,8 +62,10 @@ impl DoraNode {
let topic = format!("{self_id}/{output_id}");
self.communication
.publisher(&topic)
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| format!("failed create publisher for output {output_id}"))?
.publish(data)
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| format!("failed to send data for output {output_id}"))?;
Ok(())
}
@@ -81,11 +86,13 @@ impl Drop for DoraNode {
let result = self
.communication
.publisher(&topic)
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| {
format!("failed to create publisher for stop message for node `{self_id}`")
})
.and_then(|p| {
p.publish(&[])
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| format!("failed to send stop message for node `{self_id}`"))
});
match result {
@@ -97,25 +104,7 @@ impl Drop for DoraNode {
}
}

pub struct BoxError(pub Box<dyn std::error::Error + Send + Sync + 'static>);

impl std::fmt::Debug for BoxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&self.0, f)
}
}

impl std::fmt::Display for BoxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}

impl std::error::Error for BoxError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

#[cfg(test)]
mod tests {


+ 2
- 0
binaries/runtime/src/main.rs View File

@@ -100,6 +100,7 @@ async fn run(
tokio::task::spawn_blocking(move || stop_publisher.publish(&[]))
.await
.wrap_err("failed to join stop publish task")?
.map_err(|err| eyre::eyre!(err))
.with_context(|| {
format!(
"failed to send stop message for operator `{node_id}/{id}`"
@@ -131,6 +132,7 @@ fn publisher(
let topic = format!("{self_id}/{operator_id}/{output_id}");
communication
.publisher(&topic)
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| format!("failed to create publisher for output {output_id}"))
}



+ 6
- 3
binaries/runtime/src/operator/mod.rs View File

@@ -1,5 +1,8 @@
use dora_core::descriptor::{OperatorDefinition, OperatorSource};
use dora_node_api::{communication::CommunicationLayer, config::NodeId};
use dora_node_api::{
communication::{self, CommunicationLayer},
config::NodeId,
};
use eyre::Context;
use std::any::Any;
use tokio::sync::mpsc::Sender;
@@ -13,8 +16,7 @@ pub fn spawn_operator(
events_tx: Sender<OperatorEvent>,
communication: &mut dyn CommunicationLayer,
) -> eyre::Result<()> {
let inputs = communication
.subscribe_all(&operator_definition.config.inputs)
let inputs = communication::subscribe_all(communication, &operator_definition.config.inputs)
.wrap_err_with(|| {
format!(
"failed to subscribe to inputs of operator {}",
@@ -33,6 +35,7 @@ pub fn spawn_operator(
);
communication
.publisher(&topic)
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| format!("failed to create publisher for output {output_id}"))
.map(|p| (output_id.to_owned(), p))
})


+ 4
- 1
binaries/runtime/src/operator/python.rs View File

@@ -156,7 +156,10 @@ mod callback_impl {
impl SendOutputCallback {
fn __call__(&mut self, output: &str, data: &[u8]) -> PyResult<()> {
match self.publishers.get(output) {
Some(publisher) => publisher.publish(data).context("publish failed"),
Some(publisher) => publisher
.publish(data)
.map_err(|err| eyre::eyre!(err))
.context("publish failed"),
None => Err(eyre!(
"unexpected output {output} (not defined in dataflow config)"
)),


+ 6
- 8
binaries/runtime/src/operator/shared_lib.rs View File

@@ -1,5 +1,5 @@
use super::OperatorEvent;
use dora_node_api::{communication::Publisher, config::DataId, BoxError};
use dora_node_api::{communication::Publisher, config::DataId};
use dora_operator_api_types::{
safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput,
DoraResult, DoraStatus, Metadata, OnInputResult, Output, SendOutput,
@@ -92,13 +92,11 @@ impl<'lib> SharedLibraryOperator<'lib> {
let send_output_closure = Arc::new(move |output: Output| {
let result = match publishers.get(output.id.deref()) {
Some(publisher) => publisher.publish(&output.data),
None => Err(BoxError(
eyre!(
"unexpected output {} (not defined in dataflow config)",
output.id.deref()
)
.into(),
)),
None => Err(eyre!(
"unexpected output {} (not defined in dataflow config)",
output.id.deref()
)
.into()),
};

let error = match result {


+ 1
- 1
examples/c++-dataflow/node-rust-api/src/main.rs View File

@@ -1,4 +1,4 @@
use dora_node_api::{self, communication::Receiver, DoraNode, Input};
use dora_node_api::{self, DoraNode, Input, Receiver};

#[cxx::bridge]
mod ffi {


+ 21
- 0
libraries/communication-layer/Cargo.toml View File

@@ -0,0 +1,21 @@
[package]
name = "communication-layer-pub-sub"
version = "0.1.0"
edition = "2021"

[features]
default = ["zenoh", "iceoryx"]
zenoh = ["dep:zenoh", "dep:zenoh-config"]
iceoryx = ["dep:iceoryx-rs"]

[dependencies]
eyre = "0.6.8"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true }

[target.'cfg(unix)'.dependencies]
iceoryx-rs = { git = "https://github.com/eclipse-iceoryx/iceoryx-rs.git", optional = true }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

apis/rust/node/src/communication/iceoryx.rs → libraries/communication-layer/src/iceoryx.rs View File

@@ -1,19 +1,29 @@
use eyre::Context;
use std::{collections::HashMap, sync::Arc, time::Duration};
use uuid::Uuid;

use crate::BoxError;
//! Provides [`IceoryxCommunicationLayer`] to communicate over `iceoryx`.

use super::{CommunicationLayer, Publisher, Subscriber};
use crate::BoxError;
use eyre::Context;
use std::{collections::HashMap, sync::Arc, time::Duration};

/// Enables local communication based on `iceoryx`.
pub struct IceoryxCommunicationLayer {
topic_prefix: Arc<String>,
publishers: HashMap<String, Arc<iceoryx_rs::Publisher<[u8]>>>,
}

impl IceoryxCommunicationLayer {
pub fn init(app_name_prefix: String, topic_prefix: String) -> eyre::Result<Self> {
let app_name = format!("{app_name_prefix}-{}", Uuid::new_v4());
/// Initializes a new `iceoryx` connection with default configuration.
///
/// The given `app_name` must be unique. The `topic_prefix` is used as _instance name_
/// when using the [`publisher`][Self::publisher] and [`subscriber`][Self::subscribe]
/// methods.
///
/// Note: In order to use iceoryx, you need to start its broker deamon called
/// [_RouDi_](https://iceoryx.io/v2.0.2/getting-started/overview/#roudi) first.
/// Its executable name is `iox-roudi`. See the
/// [`iceoryx` installation chapter](https://iceoryx.io/v2.0.2/getting-started/installation/)
/// for ways to install it.
pub fn init(app_name: String, topic_prefix: String) -> Result<Self, BoxError> {
iceoryx_rs::Runtime::init(&app_name);

Ok(Self {
@@ -53,7 +63,7 @@ impl CommunicationLayer for IceoryxCommunicationLayer {
fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, crate::BoxError> {
let publisher = self
.get_or_create_publisher(topic)
.map_err(|err| BoxError(err.into()))?;
.map_err(BoxError::from)?;

Ok(Box::new(IceoryxPublisher { publisher }))
}
@@ -64,7 +74,7 @@ impl CommunicationLayer for IceoryxCommunicationLayer {
.queue_capacity(5)
.create_mt()
.context("failed to create iceoryx subscriber")
.map_err(|err| BoxError(err.into()))?;
.map_err(BoxError::from)?;
let receiver = subscriber.get_sample_receiver(token);

Ok(Box::new(IceoryxReceiver { receiver }))
@@ -72,7 +82,7 @@ impl CommunicationLayer for IceoryxCommunicationLayer {
}

#[derive(Clone)]
pub struct IceoryxPublisher {
struct IceoryxPublisher {
publisher: Arc<iceoryx_rs::Publisher<[u8]>>,
}

@@ -82,18 +92,18 @@ impl Publisher for IceoryxPublisher {
.publisher
.loan_slice(data.len())
.context("failed to loan iceoryx slice for publishing")
.map_err(|err| BoxError(err.into()))?;
.map_err(BoxError::from)?;
sample.copy_from_slice(data);
self.publisher.publish(sample);
Ok(())
}

fn boxed_clone(&self) -> Box<dyn Publisher> {
fn dyn_clone(&self) -> Box<dyn Publisher> {
Box::new(self.clone())
}
}

pub struct IceoryxReceiver {
struct IceoryxReceiver {
receiver: iceoryx_rs::mt::SampleReceiver<[u8]>,
}


+ 54
- 0
libraries/communication-layer/src/lib.rs View File

@@ -0,0 +1,54 @@
#![warn(missing_docs)]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]

//! Abstraction of various publisher/subscriber communication backends.
//!
//! Provides a [`CommunicationLayer`] trait as an abstraction for different publisher/subscriber
//! systems. The following set of backends are currently supported:
//!
//! - **[Zenoh](https://zenoh.io/):** The zenoh project implements a distributed
//! publisher/subscriber system with automated routing. To use zenoh, use the
//! [`ZenohCommunicationLayer`][zenoh::ZenohCommunicationLayer] struct.
//! - **[Iceoryx](https://iceoryx.io/):** The Eclipse iceoryx™ project provides an IPC middleware
//! based on shared memory. It is very fast, but it only supports local communication. To use
//! iceoryx, use the [`IceoryxCommunicationLayer`][iceoryx::IceoryxCommunicationLayer] struct.

#[cfg(all(unix, feature = "iceoryx"))]
pub mod iceoryx;
#[cfg(feature = "zenoh")]
pub mod zenoh;

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// Abstraction trait for different publisher/subscriber implementations.
pub trait CommunicationLayer: Send + Sync {
/// Creates a publisher for the given topic.
fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, BoxError>;

/// Subscribe to the given topic.
fn subscribe(&mut self, topic: &str) -> Result<Box<dyn Subscriber>, BoxError>;
}

/// Allows publishing messages to subscribers.
pub trait Publisher: Send + Sync {
/// Publish the given data to subscribers.
///
/// The data is published to the topic that was used to create this publisher
/// (see [`CommunicationLayer::publisher`]).
fn publish(&self, data: &[u8]) -> Result<(), BoxError>;

/// Clone this publisher, returning the clone as a
/// [trait object](https://doc.rust-lang.org/book/ch17-02-trait-objects.html).
fn dyn_clone(&self) -> Box<dyn Publisher>;
}

/// Allows receiving messages published on a topic.
pub trait Subscriber: Send + Sync {
/// Receives the next message.
///
/// Blocks until the next message is available.
///
/// Depending on the chosen communication backend, some messages might be dropped if
/// the publisher is faster than the subscriber.
fn recv(&mut self) -> Result<Option<Vec<u8>>, BoxError>;
}

apis/rust/node/src/communication/zenoh.rs → libraries/communication-layer/src/zenoh.rs View File

@@ -1,3 +1,7 @@
//! Provides [`ZenohCommunicationLayer`] to communicate over `zenoh`.

pub use zenoh_config;

use super::{CommunicationLayer, Publisher, Subscriber};
use crate::BoxError;
use std::{sync::Arc, time::Duration};
@@ -6,16 +10,22 @@ use zenoh::{
publication::CongestionControl,
};

/// Allows communication over `zenoh`.
pub struct ZenohCommunicationLayer {
zenoh: Arc<zenoh::Session>,
topic_prefix: String,
}

impl ZenohCommunicationLayer {
pub fn init(config: zenoh_config::Config, prefix: String) -> eyre::Result<Self> {
/// Initializes a new `zenoh` session with the given configuration.
///
/// The `prefix` is added to all topic names when using the [`publisher`][Self::publisher]
/// and [`subscriber`][Self::subscribe] methods. Pass an empty string if no prefix is
/// desired.
pub fn init(config: zenoh_config::Config, prefix: String) -> Result<Self, BoxError> {
let zenoh = ::zenoh::open(config)
.wait()
.map_err(|err| BoxError(err.into()))?
.map_err(BoxError::from)?
.into_arc();
Ok(Self {
zenoh,
@@ -29,14 +39,14 @@ impl ZenohCommunicationLayer {
}

impl CommunicationLayer for ZenohCommunicationLayer {
fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, crate::BoxError> {
fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, BoxError> {
let publisher = self
.zenoh
.publish(self.prefixed(topic))
.congestion_control(CongestionControl::Block)
.priority(Priority::RealTime)
.wait()
.map_err(|err| BoxError(err.into()))?;
.map_err(BoxError::from)?;

Ok(Box::new(ZenohPublisher { publisher }))
}
@@ -47,7 +57,7 @@ impl CommunicationLayer for ZenohCommunicationLayer {
.subscribe(self.prefixed(topic))
.reliable()
.wait()
.map_err(|err| BoxError(err.into()))?;
.map_err(BoxError::from)?;

Ok(Box::new(ZenohReceiver(subscriber)))
}
@@ -65,29 +75,27 @@ impl Drop for ZenohCommunicationLayer {
}

#[derive(Clone)]
pub struct ZenohPublisher {
struct ZenohPublisher {
publisher: zenoh::publication::Publisher<'static>,
}

impl Publisher for ZenohPublisher {
fn publish(&self, data: &[u8]) -> Result<(), crate::BoxError> {
self.publisher
.send(data)
.map_err(|err| BoxError(err.into()))
fn publish(&self, data: &[u8]) -> Result<(), BoxError> {
self.publisher.send(data).map_err(BoxError::from)
}

fn boxed_clone(&self) -> Box<dyn Publisher> {
fn dyn_clone(&self) -> Box<dyn Publisher> {
Box::new(self.clone())
}
}

pub struct ZenohReceiver(zenoh::subscriber::Subscriber<'static>);
struct ZenohReceiver(zenoh::subscriber::Subscriber<'static>);

impl Subscriber for ZenohReceiver {
fn recv(&mut self) -> Result<Option<Vec<u8>>, crate::BoxError> {
fn recv(&mut self) -> Result<Option<Vec<u8>>, BoxError> {
match self.0.recv() {
Ok(sample) => Ok(Some(sample.value.payload.contiguous().into_owned())),
Err(flume::RecvError::Disconnected) => Ok(None),
Err(_) => Ok(None),
}
}
}

Loading…
Cancel
Save