Browse Source

Create a request/reply communication layer abstraction

And a TCP-based implemenation.
tags/v0.1.0^2^2
Philipp Oppermann 3 years ago
parent
commit
fc50a8232f
Failed to extract signature
9 changed files with 245 additions and 2 deletions
  1. +1
    -1
      Cargo.toml
  2. +1
    -1
      apis/rust/node/Cargo.toml
  3. +0
    -0
      libraries/communication-layer/pub-sub/Cargo.toml
  4. +0
    -0
      libraries/communication-layer/pub-sub/src/iceoryx.rs
  5. +0
    -0
      libraries/communication-layer/pub-sub/src/lib.rs
  6. +0
    -0
      libraries/communication-layer/pub-sub/src/zenoh.rs
  7. +13
    -0
      libraries/communication-layer/request-reply/Cargo.toml
  8. +74
    -0
      libraries/communication-layer/request-reply/src/lib.rs
  9. +156
    -0
      libraries/communication-layer/request-reply/src/tcp.rs

+ 1
- 1
Cargo.toml View File

@@ -10,7 +10,7 @@ members = [
"binaries/*",
"examples/rust-dataflow/*",
"examples/iceoryx/*",
"libraries/communication-layer",
"libraries/communication-layer/*",
"libraries/core",
"libraries/message",
"libraries/extensions/download",


+ 1
- 1
apis/rust/node/Cargo.toml View File

@@ -19,7 +19,7 @@ thiserror = "1.0.30"
tracing = "0.1.33"
tracing-subscriber = { version = "0.3.15", optional = true }
flume = "0.10.14"
communication-layer-pub-sub = { path = "../../../libraries/communication-layer", default-features = false }
communication-layer-pub-sub = { path = "../../../libraries/communication-layer/pub-sub", default-features = false }
uuid = { version = "1.1.2", features = ["v4"] }
capnp = "0.14.9"
dora-message = { path = "../../../libraries/message" }


libraries/communication-layer/Cargo.toml → libraries/communication-layer/pub-sub/Cargo.toml View File


libraries/communication-layer/src/iceoryx.rs → libraries/communication-layer/pub-sub/src/iceoryx.rs View File


libraries/communication-layer/src/lib.rs → libraries/communication-layer/pub-sub/src/lib.rs View File


libraries/communication-layer/src/zenoh.rs → libraries/communication-layer/pub-sub/src/zenoh.rs View File


+ 13
- 0
libraries/communication-layer/request-reply/Cargo.toml View File

@@ -0,0 +1,13 @@
[package]
name = "communication-layer-request-reply"
version = "0.1.0"
edition = "2021"

[features]

[dependencies]
eyre = "0.6.8"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

+ 74
- 0
libraries/communication-layer/request-reply/src/lib.rs View File

@@ -0,0 +1,74 @@
#![cfg_attr(docsrs, feature(doc_auto_cfg))]

//! Abstraction of various request/reply communication backends.
//!
//! Provides a [`RequestReplyLayer`] trait as an abstraction for different request/reply
//! systems. The following set of backends are currently supported:
//!
//! TODO

pub use tcp::*;

mod tcp;

/// Abstraction trait for different publisher/subscriber implementations.
pub trait RequestReplyLayer: Send + Sync {
type Address;
type RequestData;
type ReplyData;
type Error;

fn listen(
&mut self,
addr: Self::Address,
) -> Result<
Box<
dyn Iterator<
Item = Result<
Box<
dyn ListenConnection<
RequestData = Self::RequestData,
ReplyData = Self::ReplyData,
Error = Self::Error,
>,
>,
Self::Error,
>,
>,
>,
Self::Error,
>;

fn connect(
&mut self,
addr: Self::Address,
) -> Result<
Box<
dyn RequestReplyConnection<
RequestData = Self::RequestData,
ReplyData = Self::ReplyData,
Error = Self::Error,
>,
>,
Self::Error,
>;
}

pub trait ListenConnection: Send + Sync {
type RequestData;
type ReplyData;
type Error;

fn handle_next(
&mut self,
handler: Box<dyn FnOnce(Self::RequestData) -> Result<Self::ReplyData, Self::Error>>,
) -> Result<(), Self::Error>;
}

pub trait RequestReplyConnection: Send + Sync {
type RequestData;
type ReplyData;
type Error;

fn request(&mut self, request: &Self::RequestData) -> Result<Self::ReplyData, Self::Error>;
}

+ 156
- 0
libraries/communication-layer/request-reply/src/tcp.rs View File

@@ -0,0 +1,156 @@
use std::{
io::{Read, Write},
net::{SocketAddr, TcpListener, TcpStream},
};

use crate::{ListenConnection, RequestReplyConnection, RequestReplyLayer};

pub type TcpRequestReplyConnection =
dyn RequestReplyConnection<RequestData = Vec<u8>, ReplyData = Vec<u8>, Error = std::io::Error>;

pub struct TcpLayer {}

impl TcpLayer {
pub fn new() -> Self {
Self {}
}
}

impl RequestReplyLayer for TcpLayer {
type Address = SocketAddr;
type RequestData = Vec<u8>;
type ReplyData = Vec<u8>;
type Error = std::io::Error;

fn listen(
&mut self,
addr: Self::Address,
) -> Result<
Box<
dyn Iterator<
Item = Result<
Box<
dyn crate::ListenConnection<
RequestData = Self::RequestData,
ReplyData = Self::ReplyData,
Error = Self::Error,
>,
>,
Self::Error,
>,
>,
>,
Self::Error,
> {
let incoming: Box<dyn Iterator<Item = Result<_, _>>> = Box::new(
IntoIncoming {
listener: TcpListener::bind(addr)?,
}
.map(|r| {
r.map(|stream| {
let connection: Box<
dyn ListenConnection<
RequestData = Self::RequestData,
ReplyData = Self::ReplyData,
Error = Self::Error,
>,
> = Box::new(TcpConnection { stream });
connection
})
}),
);
Ok(incoming)
}

fn connect(
&mut self,
addr: Self::Address,
) -> Result<
Box<
dyn crate::RequestReplyConnection<
RequestData = Self::RequestData,
ReplyData = Self::ReplyData,
Error = Self::Error,
>,
>,
Self::Error,
> {
TcpStream::connect(addr).map(|s| {
let connection: Box<
dyn RequestReplyConnection<
RequestData = Self::RequestData,
ReplyData = Self::ReplyData,
Error = Self::Error,
>,
> = Box::new(TcpConnection { stream: s });
connection
})
}
}

struct TcpConnection {
stream: TcpStream,
}

impl ListenConnection for TcpConnection {
type RequestData = Vec<u8>;
type ReplyData = Vec<u8>;
type Error = std::io::Error;

fn handle_next(
&mut self,
handler: Box<dyn FnOnce(Self::RequestData) -> Result<Self::ReplyData, Self::Error>>,
) -> Result<(), Self::Error> {
let request = self.receive()?;
let reply = handler(request)?;
self.send(&reply)?;
Ok(())
}
}

impl RequestReplyConnection for TcpConnection {
type RequestData = Vec<u8>;
type ReplyData = Vec<u8>;
type Error = std::io::Error;

fn request(&mut self, request: &Self::RequestData) -> Result<Self::ReplyData, Self::Error> {
self.send(request)?;
let reply = self.receive()?;

Ok(reply)
}
}

impl TcpConnection {
fn send(&mut self, request: &[u8]) -> std::io::Result<()> {
let len_raw = (request.len() as u64).to_le_bytes();
self.stream.write_all(&len_raw)?;
self.stream.write_all(&request)?;
Ok(())
}

fn receive(&mut self) -> std::io::Result<Vec<u8>> {
let reply_len = {
let mut raw = [0; 8];
self.stream.read_exact(&mut raw)?;
u64::from_le_bytes(raw) as usize
};
let mut reply = vec![0; reply_len];
self.stream.read_exact(&mut reply)?;
Ok(reply)
}
}

// taken from `std::net::tcp` module (still unstable)
pub struct IntoIncoming {
listener: TcpListener,
}

impl Iterator for IntoIncoming {
type Item = std::io::Result<TcpStream>;
fn next(&mut self) -> Option<std::io::Result<TcpStream>> {
Some(self.listener.accept().map(|p| p.0))
}
}

impl std::iter::FusedIterator for IntoIncoming {}

Loading…
Cancel
Save