diff --git a/Cargo.toml b/Cargo.toml index ea5de4ae..d98953ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ members = [ "binaries/*", "examples/rust-dataflow/*", "examples/iceoryx/*", - "libraries/communication-layer", + "libraries/communication-layer/*", "libraries/core", "libraries/message", "libraries/extensions/download", diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index bc7f2f34..4ae127d8 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -19,7 +19,7 @@ thiserror = "1.0.30" tracing = "0.1.33" tracing-subscriber = { version = "0.3.15", optional = true } flume = "0.10.14" -communication-layer-pub-sub = { path = "../../../libraries/communication-layer", default-features = false } +communication-layer-pub-sub = { path = "../../../libraries/communication-layer/pub-sub", default-features = false } uuid = { version = "1.1.2", features = ["v4"] } capnp = "0.14.9" dora-message = { path = "../../../libraries/message" } diff --git a/libraries/communication-layer/Cargo.toml b/libraries/communication-layer/pub-sub/Cargo.toml similarity index 100% rename from libraries/communication-layer/Cargo.toml rename to libraries/communication-layer/pub-sub/Cargo.toml diff --git a/libraries/communication-layer/src/iceoryx.rs b/libraries/communication-layer/pub-sub/src/iceoryx.rs similarity index 100% rename from libraries/communication-layer/src/iceoryx.rs rename to libraries/communication-layer/pub-sub/src/iceoryx.rs diff --git a/libraries/communication-layer/src/lib.rs b/libraries/communication-layer/pub-sub/src/lib.rs similarity index 100% rename from libraries/communication-layer/src/lib.rs rename to libraries/communication-layer/pub-sub/src/lib.rs diff --git a/libraries/communication-layer/src/zenoh.rs b/libraries/communication-layer/pub-sub/src/zenoh.rs similarity index 100% rename from libraries/communication-layer/src/zenoh.rs rename to libraries/communication-layer/pub-sub/src/zenoh.rs diff --git a/libraries/communication-layer/request-reply/Cargo.toml b/libraries/communication-layer/request-reply/Cargo.toml new file mode 100644 index 00000000..8f4a9392 --- /dev/null +++ b/libraries/communication-layer/request-reply/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "communication-layer-request-reply" +version = "0.1.0" +edition = "2021" + +[features] + +[dependencies] +eyre = "0.6.8" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] diff --git a/libraries/communication-layer/request-reply/src/lib.rs b/libraries/communication-layer/request-reply/src/lib.rs new file mode 100644 index 00000000..6f205462 --- /dev/null +++ b/libraries/communication-layer/request-reply/src/lib.rs @@ -0,0 +1,74 @@ +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +//! Abstraction of various request/reply communication backends. +//! +//! Provides a [`RequestReplyLayer`] trait as an abstraction for different request/reply +//! systems. The following set of backends are currently supported: +//! +//! TODO + +pub use tcp::*; + +mod tcp; + +/// Abstraction trait for different publisher/subscriber implementations. +pub trait RequestReplyLayer: Send + Sync { + type Address; + type RequestData; + type ReplyData; + type Error; + + fn listen( + &mut self, + addr: Self::Address, + ) -> Result< + Box< + dyn Iterator< + Item = Result< + Box< + dyn ListenConnection< + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, + >, + Self::Error, + >, + >, + >, + Self::Error, + >; + + fn connect( + &mut self, + addr: Self::Address, + ) -> Result< + Box< + dyn RequestReplyConnection< + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, + >, + Self::Error, + >; +} + +pub trait ListenConnection: Send + Sync { + type RequestData; + type ReplyData; + type Error; + + fn handle_next( + &mut self, + handler: Box Result>, + ) -> Result<(), Self::Error>; +} + +pub trait RequestReplyConnection: Send + Sync { + type RequestData; + type ReplyData; + type Error; + + fn request(&mut self, request: &Self::RequestData) -> Result; +} diff --git a/libraries/communication-layer/request-reply/src/tcp.rs b/libraries/communication-layer/request-reply/src/tcp.rs new file mode 100644 index 00000000..515b5684 --- /dev/null +++ b/libraries/communication-layer/request-reply/src/tcp.rs @@ -0,0 +1,156 @@ +use std::{ + io::{Read, Write}, + net::{SocketAddr, TcpListener, TcpStream}, +}; + +use crate::{ListenConnection, RequestReplyConnection, RequestReplyLayer}; + +pub type TcpRequestReplyConnection = + dyn RequestReplyConnection, ReplyData = Vec, Error = std::io::Error>; + +pub struct TcpLayer {} + +impl TcpLayer { + pub fn new() -> Self { + Self {} + } +} + +impl RequestReplyLayer for TcpLayer { + type Address = SocketAddr; + type RequestData = Vec; + type ReplyData = Vec; + type Error = std::io::Error; + + fn listen( + &mut self, + addr: Self::Address, + ) -> Result< + Box< + dyn Iterator< + Item = Result< + Box< + dyn crate::ListenConnection< + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, + >, + Self::Error, + >, + >, + >, + Self::Error, + > { + let incoming: Box>> = Box::new( + IntoIncoming { + listener: TcpListener::bind(addr)?, + } + .map(|r| { + r.map(|stream| { + let connection: Box< + dyn ListenConnection< + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, + > = Box::new(TcpConnection { stream }); + connection + }) + }), + ); + Ok(incoming) + } + + fn connect( + &mut self, + addr: Self::Address, + ) -> Result< + Box< + dyn crate::RequestReplyConnection< + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, + >, + Self::Error, + > { + TcpStream::connect(addr).map(|s| { + let connection: Box< + dyn RequestReplyConnection< + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, + > = Box::new(TcpConnection { stream: s }); + connection + }) + } +} + +struct TcpConnection { + stream: TcpStream, +} + +impl ListenConnection for TcpConnection { + type RequestData = Vec; + type ReplyData = Vec; + type Error = std::io::Error; + + fn handle_next( + &mut self, + handler: Box Result>, + ) -> Result<(), Self::Error> { + let request = self.receive()?; + let reply = handler(request)?; + self.send(&reply)?; + Ok(()) + } +} + +impl RequestReplyConnection for TcpConnection { + type RequestData = Vec; + type ReplyData = Vec; + type Error = std::io::Error; + + fn request(&mut self, request: &Self::RequestData) -> Result { + self.send(request)?; + let reply = self.receive()?; + + Ok(reply) + } +} + +impl TcpConnection { + fn send(&mut self, request: &[u8]) -> std::io::Result<()> { + let len_raw = (request.len() as u64).to_le_bytes(); + self.stream.write_all(&len_raw)?; + self.stream.write_all(&request)?; + Ok(()) + } + + fn receive(&mut self) -> std::io::Result> { + let reply_len = { + let mut raw = [0; 8]; + self.stream.read_exact(&mut raw)?; + u64::from_le_bytes(raw) as usize + }; + let mut reply = vec![0; reply_len]; + self.stream.read_exact(&mut reply)?; + Ok(reply) + } +} + +// taken from `std::net::tcp` module (still unstable) +pub struct IntoIncoming { + listener: TcpListener, +} + +impl Iterator for IntoIncoming { + type Item = std::io::Result; + fn next(&mut self) -> Option> { + Some(self.listener.accept().map(|p| p.0)) + } +} + +impl std::iter::FusedIterator for IntoIncoming {}