From bccb1ae27dbaf59d53818eae5241f8be70ad3082 Mon Sep 17 00:00:00 2001 From: chang xu Date: Fri, 19 Jul 2024 11:22:54 -0400 Subject: [PATCH] Add domain unix socket supports (#594) --- .github/workflows/ci.yml | 3 + apis/rust/node/src/daemon_connection/mod.rs | 15 +++ .../node/src/daemon_connection/unix_domain.rs | 84 +++++++++++++++++ apis/rust/node/src/event_stream/mod.rs | 12 +++ apis/rust/node/src/node/control_channel.rs | 5 + apis/rust/node/src/node/drop_stream.rs | 6 ++ binaries/daemon/src/coordinator.rs | 10 +- binaries/daemon/src/inter_daemon.rs | 6 +- binaries/daemon/src/lib.rs | 10 +- binaries/daemon/src/local_listener.rs | 6 +- binaries/daemon/src/node_communication/mod.rs | 34 +++++++ binaries/daemon/src/node_communication/tcp.rs | 6 +- .../src/node_communication/unix_domain.rs | 93 +++++++++++++++++++ binaries/daemon/src/pending.rs | 4 +- .../{tcp_utils.rs => socket_stream_utils.rs} | 6 +- examples/rust-dataflow/dataflow_socket.yml | 32 +++++++ examples/rust-dataflow/run.rs | 8 +- libraries/core/src/config.rs | 1 + libraries/core/src/daemon_messages.rs | 4 + 19 files changed, 321 insertions(+), 24 deletions(-) create mode 100644 apis/rust/node/src/daemon_connection/unix_domain.rs create mode 100644 binaries/daemon/src/node_communication/unix_domain.rs rename binaries/daemon/src/{tcp_utils.rs => socket_stream_utils.rs} (80%) create mode 100644 examples/rust-dataflow/dataflow_socket.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8708ba96..e2f45f19 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -129,6 +129,9 @@ jobs: if: runner.os == 'Linux' timeout-minutes: 30 run: cargo run --example cmake-dataflow + - name: "Unix Domain Socket example" + if: runner.os == 'Linux' + run: cargo run --example rust-dataflow -- dataflow_socket.yml # python examples - uses: actions/setup-python@v2 diff --git a/apis/rust/node/src/daemon_connection/mod.rs b/apis/rust/node/src/daemon_connection/mod.rs index 7d778fb4..d21607f1 100644 --- a/apis/rust/node/src/daemon_connection/mod.rs +++ b/apis/rust/node/src/daemon_connection/mod.rs @@ -5,16 +5,22 @@ use dora_core::{ }; use eyre::{bail, eyre, Context}; use shared_memory_server::{ShmemClient, ShmemConf}; +#[cfg(unix)] +use std::os::unix::net::UnixStream; use std::{ net::{SocketAddr, TcpStream}, time::Duration, }; mod tcp; +#[cfg(unix)] +mod unix_domain; pub enum DaemonChannel { Shmem(ShmemClient, DaemonReply>), Tcp(TcpStream), + #[cfg(unix)] + UnixDomain(UnixStream), } impl DaemonChannel { @@ -38,6 +44,13 @@ impl DaemonChannel { Ok(channel) } + #[cfg(unix)] + #[tracing::instrument(level = "trace")] + pub fn new_unix_socket(path: &std::path::PathBuf) -> eyre::Result { + let stream = UnixStream::connect(path).wrap_err("failed to open Unix socket")?; + Ok(DaemonChannel::UnixDomain(stream)) + } + pub fn register( &mut self, dataflow_id: DataflowId, @@ -69,6 +82,8 @@ impl DaemonChannel { match self { DaemonChannel::Shmem(client) => client.request(request), DaemonChannel::Tcp(stream) => tcp::request(stream, request), + #[cfg(unix)] + DaemonChannel::UnixDomain(stream) => unix_domain::request(stream, request), } } } diff --git a/apis/rust/node/src/daemon_connection/unix_domain.rs b/apis/rust/node/src/daemon_connection/unix_domain.rs new file mode 100644 index 00000000..dfcb17a2 --- /dev/null +++ b/apis/rust/node/src/daemon_connection/unix_domain.rs @@ -0,0 +1,84 @@ +use dora_core::daemon_messages::{DaemonReply, DaemonRequest, Timestamped}; +use eyre::{eyre, Context}; +use std::{ + io::{Read, Write}, + os::unix::net::UnixStream, +}; + +enum Serializer { + Bincode, + SerdeJson, +} +pub fn request( + connection: &mut UnixStream, + request: &Timestamped, +) -> eyre::Result { + send_message(connection, request)?; + if request.inner.expects_tcp_bincode_reply() { + receive_reply(connection, Serializer::Bincode) + .and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly"))) + // Use serde json for message with variable length + } else if request.inner.expects_tcp_json_reply() { + receive_reply(connection, Serializer::SerdeJson) + .and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly"))) + } else { + Ok(DaemonReply::Empty) + } +} + +fn send_message( + connection: &mut UnixStream, + message: &Timestamped, +) -> eyre::Result<()> { + let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonRequest")?; + stream_send(connection, &serialized).wrap_err("failed to send DaemonRequest")?; + Ok(()) +} + +fn receive_reply( + connection: &mut UnixStream, + serializer: Serializer, +) -> eyre::Result> { + let raw = match stream_receive(connection) { + Ok(raw) => raw, + Err(err) => match err.kind() { + std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => { + return Ok(None) + } + other => { + return Err(err).with_context(|| { + format!( + "unexpected I/O error (kind {other:?}) while trying to receive DaemonReply" + ) + }) + } + }, + }; + match serializer { + Serializer::Bincode => bincode::deserialize(&raw) + .wrap_err("failed to deserialize DaemonReply") + .map(Some), + Serializer::SerdeJson => serde_json::from_slice(&raw) + .wrap_err("failed to deserialize DaemonReply") + .map(Some), + } +} + +fn stream_send(connection: &mut (impl Write + Unpin), message: &[u8]) -> std::io::Result<()> { + let len_raw = (message.len() as u64).to_le_bytes(); + connection.write_all(&len_raw)?; + connection.write_all(message)?; + connection.flush()?; + Ok(()) +} + +fn stream_receive(connection: &mut (impl Read + Unpin)) -> std::io::Result> { + let reply_len = { + let mut raw = [0; 8]; + connection.read_exact(&mut raw)?; + u64::from_le_bytes(raw) as usize + }; + let mut reply = vec![0; reply_len]; + connection.read_exact(&mut reply)?; + Ok(reply) +} diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 9575a8d7..2a11503b 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -50,6 +50,12 @@ impl EventStream { )?, DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr) .wrap_err_with(|| format!("failed to connect event stream for node `{node_id}`"))?, + #[cfg(unix)] + DaemonCommunication::UnixDomain { socket_file } => { + DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| { + format!("failed to connect event stream for node `{node_id}`") + })? + } }; let close_channel = match daemon_communication { @@ -63,6 +69,12 @@ impl EventStream { .wrap_err_with(|| { format!("failed to connect event close channel for node `{node_id}`") })?, + #[cfg(unix)] + DaemonCommunication::UnixDomain { socket_file } => { + DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| { + format!("failed to connect event close channel for node `{node_id}`") + })? + } }; Self::init_on_channel(dataflow_id, node_id, channel, close_channel, clock) diff --git a/apis/rust/node/src/node/control_channel.rs b/apis/rust/node/src/node/control_channel.rs index 69329578..28826e95 100644 --- a/apis/rust/node/src/node/control_channel.rs +++ b/apis/rust/node/src/node/control_channel.rs @@ -29,6 +29,11 @@ impl ControlChannel { .wrap_err("failed to create shmem control channel")?, DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr) .wrap_err("failed to connect control channel")?, + #[cfg(unix)] + DaemonCommunication::UnixDomain { socket_file } => { + DaemonChannel::new_unix_socket(socket_file) + .wrap_err("failed to connect control channel")? + } }; Self::init_on_channel(dataflow_id, node_id, channel, clock) diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index efe6c796..80947ac9 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -36,6 +36,12 @@ impl DropStream { } DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr) .wrap_err_with(|| format!("failed to connect drop stream for node `{node_id}`"))?, + #[cfg(unix)] + DaemonCommunication::UnixDomain { socket_file } => { + DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| { + format!("failed to connect drop stream for node `{node_id}`") + })? + } }; Self::init_on_channel(dataflow_id, node_id, channel, hlc) diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index d2f86b3c..895bf49b 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -1,5 +1,5 @@ use crate::{ - tcp_utils::{tcp_receive, tcp_send}, + socket_stream_utils::{socket_stream_receive, socket_stream_send}, DaemonCoordinatorEvent, }; use dora_core::{ @@ -41,10 +41,10 @@ pub async fn register( }, timestamp: clock.new_timestamp(), })?; - tcp_send(&mut stream, ®ister) + socket_stream_send(&mut stream, ®ister) .await .wrap_err("failed to send register request to dora-coordinator")?; - let reply_raw = tcp_receive(&mut stream) + let reply_raw = socket_stream_receive(&mut stream) .await .wrap_err("failed to register reply from dora-coordinator")?; let result: Timestamped = serde_json::from_slice(&reply_raw) @@ -59,7 +59,7 @@ pub async fn register( let (tx, rx) = mpsc::channel(1); tokio::spawn(async move { loop { - let event = match tcp_receive(&mut stream).await { + let event = match socket_stream_receive(&mut stream).await { Ok(raw) => match serde_json::from_slice(&raw) { Ok(event) => event, Err(err) => { @@ -109,7 +109,7 @@ pub async fn register( continue; } }; - if let Err(err) = tcp_send(&mut stream, &serialized).await { + if let Err(err) = socket_stream_send(&mut stream, &serialized).await { tracing::warn!("failed to send reply to coordinator: {err}"); continue; }; diff --git a/binaries/daemon/src/inter_daemon.rs b/binaries/daemon/src/inter_daemon.rs index 7eb4b948..21cce12a 100644 --- a/binaries/daemon/src/inter_daemon.rs +++ b/binaries/daemon/src/inter_daemon.rs @@ -1,4 +1,4 @@ -use crate::tcp_utils::{tcp_receive, tcp_send}; +use crate::socket_stream_utils::{socket_stream_receive, socket_stream_send}; use dora_core::daemon_messages::{InterDaemonEvent, Timestamped}; use eyre::{Context, ContextCompat}; use std::{collections::BTreeMap, io::ErrorKind, net::SocketAddr}; @@ -52,7 +52,7 @@ pub async fn send_inter_daemon_event( .connect() .await .wrap_err_with(|| format!("failed to connect to machine `{target_machine}`"))?; - tcp_send(connection, &message) + socket_stream_send(connection, &message) .await .wrap_err_with(|| format!("failed to send event to machine `{target_machine}`"))?; } @@ -131,7 +131,7 @@ async fn handle_connection_loop( async fn receive_message( connection: &mut TcpStream, ) -> eyre::Result>> { - let raw = match tcp_receive(connection).await { + let raw = match socket_stream_receive(connection).await { Ok(raw) => raw, Err(err) => match err.kind() { ErrorKind::UnexpectedEof diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 54d7284e..85cff217 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -30,6 +30,7 @@ use inter_daemon::InterDaemonConnection; use local_listener::DynamicNodeEventWrapper; use pending::PendingNodes; use shared_memory_server::ShmemConf; +use socket_stream_utils::socket_stream_send; use std::sync::Arc; use std::time::Instant; use std::{ @@ -39,7 +40,6 @@ use std::{ time::Duration, }; use sysinfo::Pid; -use tcp_utils::tcp_send; use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; @@ -56,8 +56,8 @@ mod local_listener; mod log; mod node_communication; mod pending; +mod socket_stream_utils; mod spawn; -mod tcp_utils; #[cfg(feature = "telemetry")] use dora_tracing::telemetry::serialize_context; @@ -314,7 +314,7 @@ impl Daemon { }, timestamp: self.clock.new_timestamp(), })?; - tcp_send(connection, &msg) + socket_stream_send(connection, &msg) .await .wrap_err("failed to send watchdog message to dora-coordinator")?; @@ -345,7 +345,7 @@ impl Daemon { }, timestamp: self.clock.new_timestamp(), })?; - tcp_send(connection, &msg) + socket_stream_send(connection, &msg) .await .wrap_err("failed to send watchdog message to dora-coordinator")?; @@ -1103,7 +1103,7 @@ impl Daemon { }, timestamp: self.clock.new_timestamp(), })?; - tcp_send(connection, &msg) + socket_stream_send(connection, &msg) .await .wrap_err("failed to report dataflow finish to dora-coordinator")?; } diff --git a/binaries/daemon/src/local_listener.rs b/binaries/daemon/src/local_listener.rs index dbffe39e..d9ff858a 100644 --- a/binaries/daemon/src/local_listener.rs +++ b/binaries/daemon/src/local_listener.rs @@ -1,4 +1,4 @@ -use crate::tcp_utils::{tcp_receive, tcp_send}; +use crate::socket_stream_utils::{socket_stream_receive, socket_stream_send}; use dora_core::daemon_messages::{DaemonReply, DaemonRequest, DynamicNodeEvent, Timestamped}; use eyre::Context; use std::{io::ErrorKind, net::SocketAddr}; @@ -99,7 +99,7 @@ async fn handle_connection_loop( continue; } }; - if let Err(err) = tcp_send(&mut connection, &serialized).await { + if let Err(err) = socket_stream_send(&mut connection, &serialized).await { tracing::warn!("failed to send reply: {err}"); continue; }; @@ -120,7 +120,7 @@ async fn handle_connection_loop( async fn receive_message( connection: &mut TcpStream, ) -> eyre::Result>> { - let raw = match tcp_receive(connection).await { + let raw = match socket_stream_receive(connection).await { Ok(raw) => raw, Err(err) => match err.kind() { ErrorKind::UnexpectedEof diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 738fa65e..c05078a3 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -17,6 +17,8 @@ use std::{ sync::Arc, task::Poll, }; +#[cfg(unix)] +use tokio::net::UnixListener; use tokio::{ net::TcpListener, sync::{ @@ -28,6 +30,8 @@ use tokio::{ // TODO unify and avoid duplication; pub mod shmem; pub mod tcp; +#[cfg(unix)] +pub mod unix_domain; pub async fn spawn_listener_loop( dataflow_id: &DataflowId, @@ -138,6 +142,36 @@ pub async fn spawn_listener_loop( daemon_events_close_region_id, }) } + #[cfg(unix)] + LocalCommunicationConfig::UnixDomain => { + use std::path::Path; + let tmpfile_dir = Path::new("/tmp"); + let tmpfile_dir = tmpfile_dir.join(dataflow_id.to_string()); + if !tmpfile_dir.exists() { + std::fs::create_dir_all(&tmpfile_dir).context("could not create tmp dir")?; + } + let socket_file = tmpfile_dir.join(format!("{}.sock", node_id)); + let socket = match UnixListener::bind(&socket_file) { + Ok(socket) => socket, + Err(err) => { + return Err(eyre::Report::new(err) + .wrap_err("failed to create local Unix domain socket")) + } + }; + + let event_loop_node_id = format!("{dataflow_id}/{node_id}"); + let daemon_tx = daemon_tx.clone(); + tokio::spawn(async move { + unix_domain::listener_loop(socket, daemon_tx, queue_sizes, clock).await; + tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); + }); + + Ok(DaemonCommunication::UnixDomain { socket_file }) + } + #[cfg(not(unix))] + LocalCommunicationConfig::UnixDomain => { + eyre::bail!("Communication via UNIX domain sockets is only supported on UNIX systems") + } } } diff --git a/binaries/daemon/src/node_communication/tcp.rs b/binaries/daemon/src/node_communication/tcp.rs index c4d6122c..2d787d8d 100644 --- a/binaries/daemon/src/node_communication/tcp.rs +++ b/binaries/daemon/src/node_communication/tcp.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, io::ErrorKind, sync::Arc}; use super::{Connection, Listener}; use crate::{ - tcp_utils::{tcp_receive, tcp_send}, + socket_stream_utils::{socket_stream_receive, socket_stream_send}, Event, }; use dora_core::{ @@ -63,7 +63,7 @@ struct TcpConnection(TcpStream); #[async_trait::async_trait] impl Connection for TcpConnection { async fn receive_message(&mut self) -> eyre::Result>> { - let raw = match tcp_receive(&mut self.0).await { + let raw = match socket_stream_receive(&mut self.0).await { Ok(raw) => raw, Err(err) => match err.kind() { ErrorKind::UnexpectedEof @@ -87,7 +87,7 @@ impl Connection for TcpConnection { } let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?; - tcp_send(&mut self.0, &serialized) + socket_stream_send(&mut self.0, &serialized) .await .wrap_err("failed to send DaemonReply")?; Ok(()) diff --git a/binaries/daemon/src/node_communication/unix_domain.rs b/binaries/daemon/src/node_communication/unix_domain.rs new file mode 100644 index 00000000..1268821e --- /dev/null +++ b/binaries/daemon/src/node_communication/unix_domain.rs @@ -0,0 +1,93 @@ +use std::{collections::BTreeMap, io::ErrorKind, sync::Arc}; + +use dora_core::{ + config::DataId, + daemon_messages::{DaemonReply, DaemonRequest, Timestamped}, + message::uhlc::HLC, +}; +use eyre::Context; +use tokio::{ + net::{UnixListener, UnixStream}, + sync::mpsc, +}; + +use crate::{ + socket_stream_utils::{socket_stream_receive, socket_stream_send}, + Event, +}; + +use super::{Connection, Listener}; + +#[tracing::instrument(skip(listener, daemon_tx, clock), level = "trace")] +pub async fn listener_loop( + listener: UnixListener, + daemon_tx: mpsc::Sender>, + queue_sizes: BTreeMap, + clock: Arc, +) { + loop { + match listener + .accept() + .await + .wrap_err("failed to accept new connection") + { + Err(err) => { + tracing::info!("{err}"); + } + Ok((connection, _)) => { + tokio::spawn(handle_connection_loop( + connection, + daemon_tx.clone(), + queue_sizes.clone(), + clock.clone(), + )); + } + } + } +} + +#[tracing::instrument(skip(connection, daemon_tx, clock), level = "trace")] +async fn handle_connection_loop( + connection: UnixStream, + daemon_tx: mpsc::Sender>, + queue_sizes: BTreeMap, + clock: Arc, +) { + Listener::run(UnixConnection(connection), daemon_tx, queue_sizes, clock).await +} + +struct UnixConnection(UnixStream); + +#[async_trait::async_trait] +impl Connection for UnixConnection { + async fn receive_message(&mut self) -> eyre::Result>> { + let raw = match socket_stream_receive(&mut self.0).await { + Ok(raw) => raw, + Err(err) => match err.kind() { + ErrorKind::UnexpectedEof + | ErrorKind::ConnectionAborted + | ErrorKind::ConnectionReset => return Ok(None), + _other => { + return Err(err) + .context("unexpected I/O error while trying to receive DaemonRequest") + } + }, + }; + bincode::deserialize(&raw) + .wrap_err("failed to deserialize DaemonRequest") + .map(Some) + } + + async fn send_reply(&mut self, message: DaemonReply) -> eyre::Result<()> { + if matches!(message, DaemonReply::Empty) { + // don't send empty replies + return Ok(()); + } + let serialized = + bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?; + socket_stream_send(&mut self.0, &serialized) + .await + .wrap_err("failed to send DaemonReply")?; + Ok(()) + } +} diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 0e42dca3..e4037fdc 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -9,7 +9,7 @@ use dora_core::{ use eyre::{bail, Context}; use tokio::{net::TcpStream, sync::oneshot}; -use crate::{tcp_utils::tcp_send, CascadingErrorCauses}; +use crate::{socket_stream_utils::socket_stream_send, CascadingErrorCauses}; pub struct PendingNodes { dataflow_id: DataflowId, @@ -210,7 +210,7 @@ impl PendingNodes { }, timestamp, })?; - tcp_send(connection, &msg) + socket_stream_send(connection, &msg) .await .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; Ok(()) diff --git a/binaries/daemon/src/tcp_utils.rs b/binaries/daemon/src/socket_stream_utils.rs similarity index 80% rename from binaries/daemon/src/tcp_utils.rs rename to binaries/daemon/src/socket_stream_utils.rs index db327c58..b2e9e903 100644 --- a/binaries/daemon/src/tcp_utils.rs +++ b/binaries/daemon/src/socket_stream_utils.rs @@ -1,6 +1,6 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -pub async fn tcp_send( +pub async fn socket_stream_send( connection: &mut (impl AsyncWrite + Unpin), message: &[u8], ) -> std::io::Result<()> { @@ -11,7 +11,9 @@ pub async fn tcp_send( Ok(()) } -pub async fn tcp_receive(connection: &mut (impl AsyncRead + Unpin)) -> std::io::Result> { +pub async fn socket_stream_receive( + connection: &mut (impl AsyncRead + Unpin), +) -> std::io::Result> { let reply_len = { let mut raw = [0; 8]; connection.read_exact(&mut raw).await?; diff --git a/examples/rust-dataflow/dataflow_socket.yml b/examples/rust-dataflow/dataflow_socket.yml new file mode 100644 index 00000000..ce998a8c --- /dev/null +++ b/examples/rust-dataflow/dataflow_socket.yml @@ -0,0 +1,32 @@ +communication: + _unstable_local: + UnixDomain + +nodes: + - id: rust-node + build: cargo build -p rust-dataflow-example-node + path: ../../target/debug/rust-dataflow-example-node + inputs: + tick: dora/timer/millis/10 + outputs: + - random + - id: rust-status-node + custom: + build: cargo build -p rust-dataflow-example-status-node + source: ../../target/debug/rust-dataflow-example-status-node + inputs: + tick: dora/timer/millis/100 + random: rust-node/random + outputs: + - status + - id: rust-sink + build: cargo build -p rust-dataflow-example-sink + path: ../../target/debug/rust-dataflow-example-sink + inputs: + message: rust-status-node/status + - id: dora-record + build: cargo build -p dora-record + path: ../../target/debug/dora-record + inputs: + message: rust-status-node/status + random: rust-node/random diff --git a/examples/rust-dataflow/run.rs b/examples/rust-dataflow/run.rs index f5e035a5..213b65a0 100644 --- a/examples/rust-dataflow/run.rs +++ b/examples/rust-dataflow/run.rs @@ -10,7 +10,13 @@ async fn main() -> eyre::Result<()> { std::env::set_current_dir(root.join(file!()).parent().unwrap()) .wrap_err("failed to set working dir")?; - let dataflow = Path::new("dataflow.yml"); + let args: Vec = std::env::args().collect(); + let dataflow = if args.len() > 1 { + Path::new(&args[1]) + } else { + Path::new("dataflow.yml") + }; + build_dataflow(dataflow).await?; run_dataflow(dataflow).await?; diff --git a/libraries/core/src/config.rs b/libraries/core/src/config.rs index 6b8f2ac1..0fcf11f2 100644 --- a/libraries/core/src/config.rs +++ b/libraries/core/src/config.rs @@ -348,6 +348,7 @@ pub struct CommunicationConfig { pub enum LocalCommunicationConfig { Tcp, Shmem, + UnixDomain, } impl Default for LocalCommunicationConfig { diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index ce6c459a..0bda5b42 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -35,6 +35,10 @@ pub enum DaemonCommunication { Tcp { socket_addr: SocketAddr, }, + #[cfg(unix)] + UnixDomain { + socket_file: PathBuf, + }, } #[derive(Debug, serde::Serialize, serde::Deserialize)]