Browse Source

Add domain unix socket supports (#594)

tags/v0.3.6-rc0
chang xu GitHub 1 year ago
parent
commit
bccb1ae27d
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
19 changed files with 321 additions and 24 deletions
  1. +3
    -0
      .github/workflows/ci.yml
  2. +15
    -0
      apis/rust/node/src/daemon_connection/mod.rs
  3. +84
    -0
      apis/rust/node/src/daemon_connection/unix_domain.rs
  4. +12
    -0
      apis/rust/node/src/event_stream/mod.rs
  5. +5
    -0
      apis/rust/node/src/node/control_channel.rs
  6. +6
    -0
      apis/rust/node/src/node/drop_stream.rs
  7. +5
    -5
      binaries/daemon/src/coordinator.rs
  8. +3
    -3
      binaries/daemon/src/inter_daemon.rs
  9. +5
    -5
      binaries/daemon/src/lib.rs
  10. +3
    -3
      binaries/daemon/src/local_listener.rs
  11. +34
    -0
      binaries/daemon/src/node_communication/mod.rs
  12. +3
    -3
      binaries/daemon/src/node_communication/tcp.rs
  13. +93
    -0
      binaries/daemon/src/node_communication/unix_domain.rs
  14. +2
    -2
      binaries/daemon/src/pending.rs
  15. +4
    -2
      binaries/daemon/src/socket_stream_utils.rs
  16. +32
    -0
      examples/rust-dataflow/dataflow_socket.yml
  17. +7
    -1
      examples/rust-dataflow/run.rs
  18. +1
    -0
      libraries/core/src/config.rs
  19. +4
    -0
      libraries/core/src/daemon_messages.rs

+ 3
- 0
.github/workflows/ci.yml View File

@@ -129,6 +129,9 @@ jobs:
if: runner.os == 'Linux'
timeout-minutes: 30
run: cargo run --example cmake-dataflow
- name: "Unix Domain Socket example"
if: runner.os == 'Linux'
run: cargo run --example rust-dataflow -- dataflow_socket.yml

# python examples
- uses: actions/setup-python@v2


+ 15
- 0
apis/rust/node/src/daemon_connection/mod.rs View File

@@ -5,16 +5,22 @@ use dora_core::{
};
use eyre::{bail, eyre, Context};
use shared_memory_server::{ShmemClient, ShmemConf};
#[cfg(unix)]
use std::os::unix::net::UnixStream;
use std::{
net::{SocketAddr, TcpStream},
time::Duration,
};

mod tcp;
#[cfg(unix)]
mod unix_domain;

pub enum DaemonChannel {
Shmem(ShmemClient<Timestamped<DaemonRequest>, DaemonReply>),
Tcp(TcpStream),
#[cfg(unix)]
UnixDomain(UnixStream),
}

impl DaemonChannel {
@@ -38,6 +44,13 @@ impl DaemonChannel {
Ok(channel)
}

#[cfg(unix)]
#[tracing::instrument(level = "trace")]
pub fn new_unix_socket(path: &std::path::PathBuf) -> eyre::Result<Self> {
let stream = UnixStream::connect(path).wrap_err("failed to open Unix socket")?;
Ok(DaemonChannel::UnixDomain(stream))
}

pub fn register(
&mut self,
dataflow_id: DataflowId,
@@ -69,6 +82,8 @@ impl DaemonChannel {
match self {
DaemonChannel::Shmem(client) => client.request(request),
DaemonChannel::Tcp(stream) => tcp::request(stream, request),
#[cfg(unix)]
DaemonChannel::UnixDomain(stream) => unix_domain::request(stream, request),
}
}
}

+ 84
- 0
apis/rust/node/src/daemon_connection/unix_domain.rs View File

@@ -0,0 +1,84 @@
use dora_core::daemon_messages::{DaemonReply, DaemonRequest, Timestamped};
use eyre::{eyre, Context};
use std::{
io::{Read, Write},
os::unix::net::UnixStream,
};

enum Serializer {
Bincode,
SerdeJson,
}
pub fn request(
connection: &mut UnixStream,
request: &Timestamped<DaemonRequest>,
) -> eyre::Result<DaemonReply> {
send_message(connection, request)?;
if request.inner.expects_tcp_bincode_reply() {
receive_reply(connection, Serializer::Bincode)
.and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly")))
// Use serde json for message with variable length
} else if request.inner.expects_tcp_json_reply() {
receive_reply(connection, Serializer::SerdeJson)
.and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly")))
} else {
Ok(DaemonReply::Empty)
}
}

fn send_message(
connection: &mut UnixStream,
message: &Timestamped<DaemonRequest>,
) -> eyre::Result<()> {
let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonRequest")?;
stream_send(connection, &serialized).wrap_err("failed to send DaemonRequest")?;
Ok(())
}

fn receive_reply(
connection: &mut UnixStream,
serializer: Serializer,
) -> eyre::Result<Option<DaemonReply>> {
let raw = match stream_receive(connection) {
Ok(raw) => raw,
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => {
return Ok(None)
}
other => {
return Err(err).with_context(|| {
format!(
"unexpected I/O error (kind {other:?}) while trying to receive DaemonReply"
)
})
}
},
};
match serializer {
Serializer::Bincode => bincode::deserialize(&raw)
.wrap_err("failed to deserialize DaemonReply")
.map(Some),
Serializer::SerdeJson => serde_json::from_slice(&raw)
.wrap_err("failed to deserialize DaemonReply")
.map(Some),
}
}

fn stream_send(connection: &mut (impl Write + Unpin), message: &[u8]) -> std::io::Result<()> {
let len_raw = (message.len() as u64).to_le_bytes();
connection.write_all(&len_raw)?;
connection.write_all(message)?;
connection.flush()?;
Ok(())
}

fn stream_receive(connection: &mut (impl Read + Unpin)) -> std::io::Result<Vec<u8>> {
let reply_len = {
let mut raw = [0; 8];
connection.read_exact(&mut raw)?;
u64::from_le_bytes(raw) as usize
};
let mut reply = vec![0; reply_len];
connection.read_exact(&mut reply)?;
Ok(reply)
}

+ 12
- 0
apis/rust/node/src/event_stream/mod.rs View File

@@ -50,6 +50,12 @@ impl EventStream {
)?,
DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr)
.wrap_err_with(|| format!("failed to connect event stream for node `{node_id}`"))?,
#[cfg(unix)]
DaemonCommunication::UnixDomain { socket_file } => {
DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
format!("failed to connect event stream for node `{node_id}`")
})?
}
};

let close_channel = match daemon_communication {
@@ -63,6 +69,12 @@ impl EventStream {
.wrap_err_with(|| {
format!("failed to connect event close channel for node `{node_id}`")
})?,
#[cfg(unix)]
DaemonCommunication::UnixDomain { socket_file } => {
DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
format!("failed to connect event close channel for node `{node_id}`")
})?
}
};

Self::init_on_channel(dataflow_id, node_id, channel, close_channel, clock)


+ 5
- 0
apis/rust/node/src/node/control_channel.rs View File

@@ -29,6 +29,11 @@ impl ControlChannel {
.wrap_err("failed to create shmem control channel")?,
DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr)
.wrap_err("failed to connect control channel")?,
#[cfg(unix)]
DaemonCommunication::UnixDomain { socket_file } => {
DaemonChannel::new_unix_socket(socket_file)
.wrap_err("failed to connect control channel")?
}
};

Self::init_on_channel(dataflow_id, node_id, channel, clock)


+ 6
- 0
apis/rust/node/src/node/drop_stream.rs View File

@@ -36,6 +36,12 @@ impl DropStream {
}
DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr)
.wrap_err_with(|| format!("failed to connect drop stream for node `{node_id}`"))?,
#[cfg(unix)]
DaemonCommunication::UnixDomain { socket_file } => {
DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| {
format!("failed to connect drop stream for node `{node_id}`")
})?
}
};

Self::init_on_channel(dataflow_id, node_id, channel, hlc)


+ 5
- 5
binaries/daemon/src/coordinator.rs View File

@@ -1,5 +1,5 @@
use crate::{
tcp_utils::{tcp_receive, tcp_send},
socket_stream_utils::{socket_stream_receive, socket_stream_send},
DaemonCoordinatorEvent,
};
use dora_core::{
@@ -41,10 +41,10 @@ pub async fn register(
},
timestamp: clock.new_timestamp(),
})?;
tcp_send(&mut stream, &register)
socket_stream_send(&mut stream, &register)
.await
.wrap_err("failed to send register request to dora-coordinator")?;
let reply_raw = tcp_receive(&mut stream)
let reply_raw = socket_stream_receive(&mut stream)
.await
.wrap_err("failed to register reply from dora-coordinator")?;
let result: Timestamped<RegisterResult> = serde_json::from_slice(&reply_raw)
@@ -59,7 +59,7 @@ pub async fn register(
let (tx, rx) = mpsc::channel(1);
tokio::spawn(async move {
loop {
let event = match tcp_receive(&mut stream).await {
let event = match socket_stream_receive(&mut stream).await {
Ok(raw) => match serde_json::from_slice(&raw) {
Ok(event) => event,
Err(err) => {
@@ -109,7 +109,7 @@ pub async fn register(
continue;
}
};
if let Err(err) = tcp_send(&mut stream, &serialized).await {
if let Err(err) = socket_stream_send(&mut stream, &serialized).await {
tracing::warn!("failed to send reply to coordinator: {err}");
continue;
};


+ 3
- 3
binaries/daemon/src/inter_daemon.rs View File

@@ -1,4 +1,4 @@
use crate::tcp_utils::{tcp_receive, tcp_send};
use crate::socket_stream_utils::{socket_stream_receive, socket_stream_send};
use dora_core::daemon_messages::{InterDaemonEvent, Timestamped};
use eyre::{Context, ContextCompat};
use std::{collections::BTreeMap, io::ErrorKind, net::SocketAddr};
@@ -52,7 +52,7 @@ pub async fn send_inter_daemon_event(
.connect()
.await
.wrap_err_with(|| format!("failed to connect to machine `{target_machine}`"))?;
tcp_send(connection, &message)
socket_stream_send(connection, &message)
.await
.wrap_err_with(|| format!("failed to send event to machine `{target_machine}`"))?;
}
@@ -131,7 +131,7 @@ async fn handle_connection_loop(
async fn receive_message(
connection: &mut TcpStream,
) -> eyre::Result<Option<Timestamped<InterDaemonEvent>>> {
let raw = match tcp_receive(connection).await {
let raw = match socket_stream_receive(connection).await {
Ok(raw) => raw,
Err(err) => match err.kind() {
ErrorKind::UnexpectedEof


+ 5
- 5
binaries/daemon/src/lib.rs View File

@@ -30,6 +30,7 @@ use inter_daemon::InterDaemonConnection;
use local_listener::DynamicNodeEventWrapper;
use pending::PendingNodes;
use shared_memory_server::ShmemConf;
use socket_stream_utils::socket_stream_send;
use std::sync::Arc;
use std::time::Instant;
use std::{
@@ -39,7 +40,6 @@ use std::{
time::Duration,
};
use sysinfo::Pid;
use tcp_utils::tcp_send;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
@@ -56,8 +56,8 @@ mod local_listener;
mod log;
mod node_communication;
mod pending;
mod socket_stream_utils;
mod spawn;
mod tcp_utils;

#[cfg(feature = "telemetry")]
use dora_tracing::telemetry::serialize_context;
@@ -314,7 +314,7 @@ impl Daemon {
},
timestamp: self.clock.new_timestamp(),
})?;
tcp_send(connection, &msg)
socket_stream_send(connection, &msg)
.await
.wrap_err("failed to send watchdog message to dora-coordinator")?;

@@ -345,7 +345,7 @@ impl Daemon {
},
timestamp: self.clock.new_timestamp(),
})?;
tcp_send(connection, &msg)
socket_stream_send(connection, &msg)
.await
.wrap_err("failed to send watchdog message to dora-coordinator")?;

@@ -1103,7 +1103,7 @@ impl Daemon {
},
timestamp: self.clock.new_timestamp(),
})?;
tcp_send(connection, &msg)
socket_stream_send(connection, &msg)
.await
.wrap_err("failed to report dataflow finish to dora-coordinator")?;
}


+ 3
- 3
binaries/daemon/src/local_listener.rs View File

@@ -1,4 +1,4 @@
use crate::tcp_utils::{tcp_receive, tcp_send};
use crate::socket_stream_utils::{socket_stream_receive, socket_stream_send};
use dora_core::daemon_messages::{DaemonReply, DaemonRequest, DynamicNodeEvent, Timestamped};
use eyre::Context;
use std::{io::ErrorKind, net::SocketAddr};
@@ -99,7 +99,7 @@ async fn handle_connection_loop(
continue;
}
};
if let Err(err) = tcp_send(&mut connection, &serialized).await {
if let Err(err) = socket_stream_send(&mut connection, &serialized).await {
tracing::warn!("failed to send reply: {err}");
continue;
};
@@ -120,7 +120,7 @@ async fn handle_connection_loop(
async fn receive_message(
connection: &mut TcpStream,
) -> eyre::Result<Option<Timestamped<DaemonRequest>>> {
let raw = match tcp_receive(connection).await {
let raw = match socket_stream_receive(connection).await {
Ok(raw) => raw,
Err(err) => match err.kind() {
ErrorKind::UnexpectedEof


+ 34
- 0
binaries/daemon/src/node_communication/mod.rs View File

@@ -17,6 +17,8 @@ use std::{
sync::Arc,
task::Poll,
};
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::{
net::TcpListener,
sync::{
@@ -28,6 +30,8 @@ use tokio::{
// TODO unify and avoid duplication;
pub mod shmem;
pub mod tcp;
#[cfg(unix)]
pub mod unix_domain;

pub async fn spawn_listener_loop(
dataflow_id: &DataflowId,
@@ -138,6 +142,36 @@ pub async fn spawn_listener_loop(
daemon_events_close_region_id,
})
}
#[cfg(unix)]
LocalCommunicationConfig::UnixDomain => {
use std::path::Path;
let tmpfile_dir = Path::new("/tmp");
let tmpfile_dir = tmpfile_dir.join(dataflow_id.to_string());
if !tmpfile_dir.exists() {
std::fs::create_dir_all(&tmpfile_dir).context("could not create tmp dir")?;
}
let socket_file = tmpfile_dir.join(format!("{}.sock", node_id));
let socket = match UnixListener::bind(&socket_file) {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err)
.wrap_err("failed to create local Unix domain socket"))
}
};

let event_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
tokio::spawn(async move {
unix_domain::listener_loop(socket, daemon_tx, queue_sizes, clock).await;
tracing::debug!("event listener loop finished for `{event_loop_node_id}`");
});

Ok(DaemonCommunication::UnixDomain { socket_file })
}
#[cfg(not(unix))]
LocalCommunicationConfig::UnixDomain => {
eyre::bail!("Communication via UNIX domain sockets is only supported on UNIX systems")
}
}
}



+ 3
- 3
binaries/daemon/src/node_communication/tcp.rs View File

@@ -2,7 +2,7 @@ use std::{collections::BTreeMap, io::ErrorKind, sync::Arc};

use super::{Connection, Listener};
use crate::{
tcp_utils::{tcp_receive, tcp_send},
socket_stream_utils::{socket_stream_receive, socket_stream_send},
Event,
};
use dora_core::{
@@ -63,7 +63,7 @@ struct TcpConnection(TcpStream);
#[async_trait::async_trait]
impl Connection for TcpConnection {
async fn receive_message(&mut self) -> eyre::Result<Option<Timestamped<DaemonRequest>>> {
let raw = match tcp_receive(&mut self.0).await {
let raw = match socket_stream_receive(&mut self.0).await {
Ok(raw) => raw,
Err(err) => match err.kind() {
ErrorKind::UnexpectedEof
@@ -87,7 +87,7 @@ impl Connection for TcpConnection {
}
let serialized =
bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?;
tcp_send(&mut self.0, &serialized)
socket_stream_send(&mut self.0, &serialized)
.await
.wrap_err("failed to send DaemonReply")?;
Ok(())


+ 93
- 0
binaries/daemon/src/node_communication/unix_domain.rs View File

@@ -0,0 +1,93 @@
use std::{collections::BTreeMap, io::ErrorKind, sync::Arc};

use dora_core::{
config::DataId,
daemon_messages::{DaemonReply, DaemonRequest, Timestamped},
message::uhlc::HLC,
};
use eyre::Context;
use tokio::{
net::{UnixListener, UnixStream},
sync::mpsc,
};

use crate::{
socket_stream_utils::{socket_stream_receive, socket_stream_send},
Event,
};

use super::{Connection, Listener};

#[tracing::instrument(skip(listener, daemon_tx, clock), level = "trace")]
pub async fn listener_loop(
listener: UnixListener,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
queue_sizes: BTreeMap<DataId, usize>,
clock: Arc<HLC>,
) {
loop {
match listener
.accept()
.await
.wrap_err("failed to accept new connection")
{
Err(err) => {
tracing::info!("{err}");
}
Ok((connection, _)) => {
tokio::spawn(handle_connection_loop(
connection,
daemon_tx.clone(),
queue_sizes.clone(),
clock.clone(),
));
}
}
}
}

#[tracing::instrument(skip(connection, daemon_tx, clock), level = "trace")]
async fn handle_connection_loop(
connection: UnixStream,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
queue_sizes: BTreeMap<DataId, usize>,
clock: Arc<HLC>,
) {
Listener::run(UnixConnection(connection), daemon_tx, queue_sizes, clock).await
}

struct UnixConnection(UnixStream);

#[async_trait::async_trait]
impl Connection for UnixConnection {
async fn receive_message(&mut self) -> eyre::Result<Option<Timestamped<DaemonRequest>>> {
let raw = match socket_stream_receive(&mut self.0).await {
Ok(raw) => raw,
Err(err) => match err.kind() {
ErrorKind::UnexpectedEof
| ErrorKind::ConnectionAborted
| ErrorKind::ConnectionReset => return Ok(None),
_other => {
return Err(err)
.context("unexpected I/O error while trying to receive DaemonRequest")
}
},
};
bincode::deserialize(&raw)
.wrap_err("failed to deserialize DaemonRequest")
.map(Some)
}

async fn send_reply(&mut self, message: DaemonReply) -> eyre::Result<()> {
if matches!(message, DaemonReply::Empty) {
// don't send empty replies
return Ok(());
}
let serialized =
bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?;
socket_stream_send(&mut self.0, &serialized)
.await
.wrap_err("failed to send DaemonReply")?;
Ok(())
}
}

+ 2
- 2
binaries/daemon/src/pending.rs View File

@@ -9,7 +9,7 @@ use dora_core::{
use eyre::{bail, Context};
use tokio::{net::TcpStream, sync::oneshot};

use crate::{tcp_utils::tcp_send, CascadingErrorCauses};
use crate::{socket_stream_utils::socket_stream_send, CascadingErrorCauses};

pub struct PendingNodes {
dataflow_id: DataflowId,
@@ -210,7 +210,7 @@ impl PendingNodes {
},
timestamp,
})?;
tcp_send(connection, &msg)
socket_stream_send(connection, &msg)
.await
.wrap_err("failed to send AllNodesReady message to dora-coordinator")?;
Ok(())


binaries/daemon/src/tcp_utils.rs → binaries/daemon/src/socket_stream_utils.rs View File

@@ -1,6 +1,6 @@
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

pub async fn tcp_send(
pub async fn socket_stream_send(
connection: &mut (impl AsyncWrite + Unpin),
message: &[u8],
) -> std::io::Result<()> {
@@ -11,7 +11,9 @@ pub async fn tcp_send(
Ok(())
}

pub async fn tcp_receive(connection: &mut (impl AsyncRead + Unpin)) -> std::io::Result<Vec<u8>> {
pub async fn socket_stream_receive(
connection: &mut (impl AsyncRead + Unpin),
) -> std::io::Result<Vec<u8>> {
let reply_len = {
let mut raw = [0; 8];
connection.read_exact(&mut raw).await?;

+ 32
- 0
examples/rust-dataflow/dataflow_socket.yml View File

@@ -0,0 +1,32 @@
communication:
_unstable_local:
UnixDomain
nodes:
- id: rust-node
build: cargo build -p rust-dataflow-example-node
path: ../../target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/10
outputs:
- random
- id: rust-status-node
custom:
build: cargo build -p rust-dataflow-example-status-node
source: ../../target/debug/rust-dataflow-example-status-node
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink
build: cargo build -p rust-dataflow-example-sink
path: ../../target/debug/rust-dataflow-example-sink
inputs:
message: rust-status-node/status
- id: dora-record
build: cargo build -p dora-record
path: ../../target/debug/dora-record
inputs:
message: rust-status-node/status
random: rust-node/random

+ 7
- 1
examples/rust-dataflow/run.rs View File

@@ -10,7 +10,13 @@ async fn main() -> eyre::Result<()> {
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

let dataflow = Path::new("dataflow.yml");
let args: Vec<String> = std::env::args().collect();
let dataflow = if args.len() > 1 {
Path::new(&args[1])
} else {
Path::new("dataflow.yml")
};

build_dataflow(dataflow).await?;

run_dataflow(dataflow).await?;


+ 1
- 0
libraries/core/src/config.rs View File

@@ -348,6 +348,7 @@ pub struct CommunicationConfig {
pub enum LocalCommunicationConfig {
Tcp,
Shmem,
UnixDomain,
}

impl Default for LocalCommunicationConfig {


+ 4
- 0
libraries/core/src/daemon_messages.rs View File

@@ -35,6 +35,10 @@ pub enum DaemonCommunication {
Tcp {
socket_addr: SocketAddr,
},
#[cfg(unix)]
UnixDomain {
socket_file: PathBuf,
},
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]


Loading…
Cancel
Save