Browse Source

Unify daemon listener implementations to avoid code duplication

tags/v0.2.0-candidate
Philipp Oppermann 2 years ago
parent
commit
556e2e2ec2
Failed to extract signature
7 changed files with 520 additions and 689 deletions
  1. +18
    -6
      Cargo.lock
  2. +1
    -0
      binaries/daemon/Cargo.toml
  3. +408
    -0
      binaries/daemon/src/listener/mod.rs
  4. +54
    -277
      binaries/daemon/src/listener/shmem.rs
  5. +30
    -323
      binaries/daemon/src/listener/tcp.rs
  6. +7
    -83
      binaries/daemon/src/spawn.rs
  7. +2
    -0
      examples/rust-dataflow/dataflow.yml

+ 18
- 6
Cargo.lock View File

@@ -219,9 +219,9 @@ checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9"

[[package]]
name = "async-trait"
version = "0.1.53"
version = "0.1.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600"
checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2"
dependencies = [
"proc-macro2",
"quote",
@@ -716,7 +716,7 @@ dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
"lazy_static",
"memoffset",
"memoffset 0.6.5",
"scopeguard",
]

@@ -1008,6 +1008,7 @@ dependencies = [
name = "dora-daemon"
version = "0.1.0"
dependencies = [
"async-trait",
"bincode",
"clap 3.2.20",
"ctrlc",
@@ -2061,6 +2062,15 @@ dependencies = [
"autocfg 1.1.0",
]

[[package]]
name = "memoffset"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
dependencies = [
"autocfg 1.1.0",
]

[[package]]
name = "mime"
version = "0.3.16"
@@ -2301,7 +2311,7 @@ dependencies = [
"cc",
"cfg-if 1.0.0",
"libc",
"memoffset",
"memoffset 0.6.5",
]

[[package]]
@@ -2314,7 +2324,7 @@ dependencies = [
"cc",
"cfg-if 1.0.0",
"libc",
"memoffset",
"memoffset 0.6.5",
]

[[package]]
@@ -2326,6 +2336,8 @@ dependencies = [
"bitflags",
"cfg-if 1.0.0",
"libc",
"memoffset 0.7.1",
"pin-utils",
"static_assertions",
]

@@ -3175,7 +3187,7 @@ checksum = "2a34bde3561f980a51c70495164200569a11662644fe5af017f0b5d7015688cc"
dependencies = [
"cfg-if 0.1.10",
"libc",
"nix 0.23.1",
"nix 0.26.2",
"rand",
"winapi",
]


+ 1
- 0
binaries/daemon/Cargo.toml View File

@@ -25,3 +25,4 @@ clap = { version = "3.1.8", features = ["derive"] }
shared-memory-server = { path = "../../libraries/shared-memory-server" }
ctrlc = "3.2.5"
bincode = "1.3.3"
async-trait = "0.1.64"

+ 408
- 0
binaries/daemon/src/listener/mod.rs View File

@@ -1,2 +1,410 @@
use crate::{shared_mem_handler, DaemonNodeEvent, Event};
use dora_core::{
config::NodeId,
daemon_messages::{
DaemonCommunication, DaemonCommunicationConfig, DaemonReply, DaemonRequest, DataflowId,
DropEvent, NodeEvent,
},
};
use eyre::{eyre, Context};
use shared_memory_server::{ShmemConf, ShmemServer};
use std::{collections::VecDeque, net::Ipv4Addr};
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
};

// TODO unify and avoid duplication;
pub mod shmem;
pub mod tcp;

pub async fn spawn_listener_loop(
dataflow_id: &DataflowId,
node_id: &NodeId,
daemon_tx: &mpsc::Sender<Event>,
shmem_handler_tx: &flume::Sender<shared_mem_handler::NodeEvent>,
config: DaemonCommunicationConfig,
) -> eyre::Result<DaemonCommunication> {
match config {
DaemonCommunicationConfig::Tcp => {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, 0)).await {
Ok(socket) => socket,
Err(err) => {
return Err(
eyre::Report::new(err).wrap_err("failed to create local TCP listener")
)
}
};
let socket_addr = socket
.local_addr()
.wrap_err("failed to get local addr of socket")?;

let event_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
let shmem_handler_tx = shmem_handler_tx.clone();
tokio::spawn(async move {
tcp::listener_loop(socket, daemon_tx, shmem_handler_tx).await;
tracing::debug!("event listener loop finished for `{event_loop_node_id}`");
});

Ok(DaemonCommunication::Tcp { socket_addr })
}
DaemonCommunicationConfig::Shmem => {
let daemon_control_region = ShmemConf::new()
.size(4096)
.create()
.wrap_err("failed to allocate daemon_control_region")?;
let daemon_events_region = ShmemConf::new()
.size(4096)
.create()
.wrap_err("failed to allocate daemon_events_region")?;
let daemon_control_region_id = daemon_control_region.get_os_id().to_owned();
let daemon_events_region_id = daemon_events_region.get_os_id().to_owned();

{
let server = unsafe { ShmemServer::new(daemon_control_region) }
.wrap_err("failed to create control server")?;
let daemon_tx = daemon_tx.clone();
let shmem_handler_tx = shmem_handler_tx.clone();
tokio::spawn(shmem::listener_loop(server, daemon_tx, shmem_handler_tx));
}

{
let server = unsafe { ShmemServer::new(daemon_events_region) }
.wrap_err("failed to create events server")?;
let event_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
let shmem_handler_tx = shmem_handler_tx.clone();
tokio::task::spawn(async move {
shmem::listener_loop(server, daemon_tx, shmem_handler_tx).await;
tracing::debug!("event listener loop finished for `{event_loop_node_id}`");
});
}

Ok(DaemonCommunication::Shmem {
daemon_control_region_id,
daemon_events_region_id,
})
}
}
}

struct Listener<C> {
dataflow_id: DataflowId,
node_id: NodeId,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
subscribed_events: Option<flume::Receiver<NodeEvent>>,
max_queue_len: usize,
queue: VecDeque<NodeEvent>,
connection: C,
}

impl<C> Listener<C>
where
C: Connection,
{
pub(crate) async fn run(
mut connection: C,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
) {
// receive the first message
let message = match connection
.receive_message()
.await
.wrap_err("failed to receive register message")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!("channel disconnected before register message");
return;
} // disconnected
Err(err) => {
tracing::info!("{err:?}");
return;
}
};

match message {
DaemonRequest::Register {
dataflow_id,
node_id,
} => {
let reply = DaemonReply::Result(Ok(()));
match connection
.send_reply(reply)
.await
.wrap_err("failed to send register reply")
{
Ok(()) => {
let mut listener = Listener {
dataflow_id,
node_id,
connection,
daemon_tx,
shmem_handler_tx,
subscribed_events: None,
max_queue_len: 10, // TODO: make this configurable
queue: VecDeque::new(),
};
match listener.run_inner().await.wrap_err("listener failed") {
Ok(()) => {}
Err(err) => tracing::error!("{err:?}"),
}
}
Err(err) => {
tracing::warn!("{err:?}");
}
}
}
other => {
tracing::warn!("expected register message, got `{other:?}`");
let reply = DaemonReply::Result(Err("must send register message first".into()));
if let Err(err) = connection
.send_reply(reply)
.await
.wrap_err("failed to send reply")
{
tracing::warn!("{err:?}");
}
}
}
}

async fn run_inner(&mut self) -> eyre::Result<()> {
loop {
// receive the next node message
let message = match self
.connection
.receive_message()
.await
.wrap_err("failed to receive DaemonRequest")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!(
"channel disconnected: {}/{}",
self.dataflow_id,
self.node_id
);
break;
} // disconnected
Err(err) => {
tracing::warn!("{err:?}");
continue;
}
};

// handle incoming events
self.handle_events().await?;

self.handle_message(message).await?;
}
Ok(())
}

async fn handle_events(&mut self) -> eyre::Result<()> {
if let Some(events) = &mut self.subscribed_events {
while let Ok(event) = events.try_recv() {
self.queue.push_back(event);
}

// drop oldest input events to maintain max queue length queue
let input_event_count = self
.queue
.iter()
.filter(|e| matches!(e, NodeEvent::Input { .. }))
.count();
let drop_n = input_event_count.saturating_sub(self.max_queue_len);
self.drop_oldest_inputs(drop_n).await?;
}
Ok(())
}

async fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> {
let mut drop_tokens = Vec::new();
for i in 0..number {
// find index of oldest input event
let index = self
.queue
.iter()
.position(|e| matches!(e, NodeEvent::Input { .. }))
.unwrap_or_else(|| panic!("no input event found in drop iteration {i}"));

// remove that event
if let Some(event) = self.queue.remove(index) {
if let NodeEvent::Input {
data: Some(data), ..
} = event
{
drop_tokens.push(data.drop_token);
}
}
}
self.report_drop_tokens(drop_tokens).await?;
Ok(())
}

#[tracing::instrument(skip(self), fields(%self.dataflow_id, %self.node_id))]
async fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> {
match message {
DaemonRequest::Register { .. } => {
let reply = DaemonReply::Result(Err("unexpected register message".into()));
self.send_reply(reply).await?;
}
DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped).await?,
DaemonRequest::CloseOutputs(outputs) => {
self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs))
.await?
}
DaemonRequest::PrepareOutputMessage {
output_id,
metadata,
data_len,
} => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::PrepareOutputMessage {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data_len,
reply_sender,
};
self.send_shared_memory_event(event).await?;
let reply = reply
.await
.wrap_err("failed to receive prepare output reply")?;
// tracing::debug!("prepare latency: {:?}", start.elapsed()?);
self.send_reply(reply).await?;
}
DaemonRequest::SendPreparedMessage { id } => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender };
self.send_shared_memory_event(event).await?;
self.send_reply(
reply
.await
.wrap_err("failed to receive send output reply")?,
)
.await?;
}
DaemonRequest::SendEmptyMessage {
output_id,
metadata,
} => {
// let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?;
// tracing::debug!("listener SendEmptyMessage: {elapsed:?}");
let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data: None,
});
let result = self
.send_daemon_event(event)
.await
.map_err(|_| "failed to receive send_empty_message reply".to_owned());
self.send_reply(DaemonReply::Result(result)).await?;
}
DaemonRequest::Subscribe => {
let (tx, rx) = flume::bounded(100);
self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx })
.await?;
self.subscribed_events = Some(rx);
}
DaemonRequest::NextEvent { drop_tokens } => {
self.report_drop_tokens(drop_tokens).await?;

// try to take the latest queued event first
let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent);
let reply = match queued_event {
Some(reply) => reply,
None => {
match self.subscribed_events.as_mut() {
// wait for next event
Some(events) => match events.recv_async().await {
Ok(event) => DaemonReply::NodeEvent(event),
Err(flume::RecvError::Disconnected) => DaemonReply::Closed,
},
None => DaemonReply::Result(Err(
"Ignoring event request because no subscribe \
message was sent yet"
.into(),
)),
}
}
};

self.send_reply(reply).await?;
}
}
Ok(())
}

async fn report_drop_tokens(
&mut self,
drop_tokens: Vec<dora_core::daemon_messages::DropToken>,
) -> eyre::Result<()> {
if !drop_tokens.is_empty() {
let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent {
tokens: drop_tokens,
});
self.send_shared_memory_event(drop_event).await?;
}
Ok(())
}

async fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> {
// send NodeEvent to daemon main loop
let (reply_tx, reply) = oneshot::channel();
let event = Event::Node {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
event,
reply_sender: reply_tx,
};
self.daemon_tx
.send(event)
.await
.map_err(|_| eyre!("failed to send event to daemon"))?;
let reply = reply
.await
.map_err(|_| eyre!("failed to receive reply from daemon"))?;
self.send_reply(reply).await?;
Ok(())
}

async fn send_reply(&mut self, reply: DaemonReply) -> eyre::Result<()> {
self.connection
.send_reply(reply)
.await
.wrap_err("failed to send reply to node")
}

async fn send_shared_memory_event(
&self,
event: shared_mem_handler::NodeEvent,
) -> eyre::Result<()> {
self.shmem_handler_tx
.send_async(event)
.await
.map_err(|_| eyre!("failed to send event to shared_mem_handler"))
}

async fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> {
self.daemon_tx
.send(event)
.await
.map_err(|_| eyre!("failed to send event to daemon"))
}
}

#[async_trait::async_trait]
trait Connection {
async fn receive_message(&mut self) -> eyre::Result<Option<DaemonRequest>>;
async fn send_reply(&mut self, message: DaemonReply) -> eyre::Result<()>;
}

+ 54
- 277
binaries/daemon/src/listener/shmem.rs View File

@@ -1,298 +1,75 @@
use std::collections::VecDeque;

use crate::{shared_mem_handler, DaemonNodeEvent, Event};
use dora_core::{
config::NodeId,
daemon_messages::{DaemonReply, DaemonRequest, DataflowId, DropEvent, NodeEvent},
};
use eyre::{eyre, Context};
use super::Listener;
use crate::{shared_mem_handler, Event};
use dora_core::daemon_messages::{DaemonReply, DaemonRequest};
use eyre::eyre;
use shared_memory_server::ShmemServer;
use tokio::sync::{mpsc, oneshot};

#[tracing::instrument(skip(server, daemon_tx, shmem_handler_tx))]
pub fn listener_loop(
pub async fn listener_loop(
mut server: ShmemServer<DaemonRequest, DaemonReply>,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
) {
// receive the first message
let message = match server
.listen()
.wrap_err("failed to receive register message")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!("channel disconnected before register message");
return;
} // disconnected
Err(err) => {
tracing::info!("{err:?}");
return;
}
};

match message {
DaemonRequest::Register {
dataflow_id,
node_id,
} => {
let reply = DaemonReply::Result(Ok(()));
match server
.send_reply(&reply)
.wrap_err("failed to send register reply")
{
Ok(()) => {
let mut listener = Listener {
dataflow_id,
node_id,
server,
daemon_tx,
shmem_handler_tx,
subscribed_events: None,
max_queue_len: 10, // TODO: make this configurable
queue: VecDeque::new(),
};
match listener.run().wrap_err("listener failed") {
Ok(()) => {}
Err(err) => tracing::error!("{err:?}"),
let (tx, rx) = flume::bounded(0);
tokio::task::spawn_blocking(move || {
while let Ok(operation) = rx.recv() {
match operation {
Operation::Receive(sender) => {
if sender.send(server.listen()).is_err() {
break;
}
}
Err(err) => {
tracing::warn!("{err:?}");
Operation::Send {
message,
result_sender,
} => {
let result = server.send_reply(&message);
if result_sender.send(result).is_err() {
break;
}
}
}
}
_ => {
let reply = DaemonReply::Result(Err("must send register message first".into()));
if let Err(err) = server.send_reply(&reply).wrap_err("failed to send reply") {
tracing::warn!("{err:?}");
}
}
}
});
let connection = ShmemConnection(tx);
Listener::run(connection, daemon_tx, shmem_handler_tx).await
}

struct Listener {
dataflow_id: DataflowId,
node_id: NodeId,
server: ShmemServer<DaemonRequest, DaemonReply>,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
subscribed_events: Option<flume::Receiver<NodeEvent>>,
max_queue_len: usize,
queue: VecDeque<NodeEvent>,
enum Operation {
Receive(oneshot::Sender<eyre::Result<Option<DaemonRequest>>>),
Send {
message: DaemonReply,
result_sender: oneshot::Sender<eyre::Result<()>>,
},
}

impl Listener {
fn run(&mut self) -> eyre::Result<()> {
loop {
// receive the next node message
let message = match self
.server
.listen()
.wrap_err("failed to receive DaemonRequest")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!(
"channel disconnected: {}/{}",
self.dataflow_id,
self.node_id
);
break;
} // disconnected
Err(err) => {
tracing::warn!("{err:?}");
continue;
}
};

// handle incoming events
self.handle_events()?;

self.handle_message(message)?;
}
Ok(())
}

fn handle_events(&mut self) -> eyre::Result<()> {
if let Some(events) = &mut self.subscribed_events {
while let Ok(event) = events.try_recv() {
self.queue.push_back(event);
}

// drop oldest input events to maintain max queue length queue
let input_event_count = self
.queue
.iter()
.filter(|e| matches!(e, NodeEvent::Input { .. }))
.count();
let drop_n = input_event_count.saturating_sub(self.max_queue_len);
self.drop_oldest_inputs(drop_n)?;
}
Ok(())
}

fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> {
let mut drop_tokens = Vec::new();
for i in 0..number {
// find index of oldest input event
let index = self
.queue
.iter()
.position(|e| matches!(e, NodeEvent::Input { .. }))
.expect(&format!("no input event found in drop iteration {i}"));

// remove that event
if let Some(event) = self.queue.remove(index) {
if let NodeEvent::Input {
data: Some(data), ..
} = event
{
drop_tokens.push(data.drop_token);
}
}
}
self.report_drop_tokens(drop_tokens)?;
Ok(())
}

fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> {
match message {
DaemonRequest::Register { .. } => {
let reply = DaemonReply::Result(Err("unexpected register message".into()));
self.send_reply(&reply)?;
}
DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped)?,
DaemonRequest::CloseOutputs(outputs) => {
self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs))?
}
DaemonRequest::PrepareOutputMessage {
output_id,
metadata,
data_len,
} => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::PrepareOutputMessage {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data_len,
reply_sender,
};
self.send_shared_memory_event(event)?;
let reply = reply
.blocking_recv()
.wrap_err("failed to receive prepare output reply")?;
// tracing::debug!("prepare latency: {:?}", start.elapsed()?);
self.send_reply(&reply)?;
}
DaemonRequest::SendPreparedMessage { id } => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender };
self.send_shared_memory_event(event)?;
self.send_reply(
&reply
.blocking_recv()
.wrap_err("failed to receive send output reply")?,
)?;
}
DaemonRequest::SendEmptyMessage {
output_id,
metadata,
} => {
// let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?;
// tracing::debug!("listener SendEmptyMessage: {elapsed:?}");
let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data: None,
});
let result = self
.send_daemon_event(event)
.map_err(|_| "failed to receive send_empty_message reply".to_owned());
self.send_reply(&DaemonReply::Result(result))?;
}
DaemonRequest::Subscribe => {
let (tx, rx) = flume::bounded(100);
self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx })?;
self.subscribed_events = Some(rx);
}
DaemonRequest::NextEvent { drop_tokens } => {
self.report_drop_tokens(drop_tokens)?;

// try to take the latest queued event first
let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent);
let reply = queued_event.unwrap_or_else(|| {
match self.subscribed_events.as_mut() {
// wait for next event
Some(events) => match events.recv() {
Ok(event) => DaemonReply::NodeEvent(event),
Err(flume::RecvError::Disconnected) => DaemonReply::Closed,
},
None => {
DaemonReply::Result(Err("Ignoring event request because no subscribe \
message was sent yet"
.into()))
}
}
});

self.send_reply(&reply)?;
}
}
Ok(())
}

fn report_drop_tokens(
&mut self,
drop_tokens: Vec<dora_core::daemon_messages::DropToken>,
) -> eyre::Result<()> {
if !drop_tokens.is_empty() {
let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent {
tokens: drop_tokens,
});
self.send_shared_memory_event(drop_event)?;
}
Ok(())
}

fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> {
// send NodeEvent to daemon main loop
let (reply_tx, reply) = oneshot::channel();
let event = Event::Node {
dataflow_id: self.dataflow_id.clone(),
node_id: self.node_id.clone(),
event,
reply_sender: reply_tx,
};
self.daemon_tx
.blocking_send(event)
.map_err(|_| eyre!("failed to send event to daemon"))?;
let reply = reply
.blocking_recv()
.map_err(|_| eyre!("failed to receive reply from daemon"))?;
self.send_reply(&reply)?;
Ok(())
}

fn send_reply(&mut self, reply: &DaemonReply) -> eyre::Result<()> {
self.server
.send_reply(&reply)
.wrap_err("failed to send reply to node")
}

fn send_shared_memory_event(&self, event: shared_mem_handler::NodeEvent) -> eyre::Result<()> {
self.shmem_handler_tx
.send(event)
.map_err(|_| eyre!("failed to send event to shared_mem_handler"))
struct ShmemConnection(flume::Sender<Operation>);

#[async_trait::async_trait]
impl super::Connection for ShmemConnection {
async fn receive_message(&mut self) -> eyre::Result<Option<DaemonRequest>> {
let (tx, rx) = oneshot::channel();
self.0
.send_async(Operation::Receive(tx))
.await
.map_err(|_| eyre!("failed send receive request to ShmemServer"))?;
rx.await
.map_err(|_| eyre!("failed to receive from ShmemServer"))
.and_then(|r| r)
}

fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> {
self.daemon_tx
.blocking_send(event)
.map_err(|_| eyre!("failed to send event to daemon"))
async fn send_reply(&mut self, reply: DaemonReply) -> eyre::Result<()> {
let (tx, rx) = oneshot::channel();
self.0
.send_async(Operation::Send {
message: reply,
result_sender: tx,
})
.await
.map_err(|_| eyre!("failed send send request to ShmemServer"))?;
rx.await
.map_err(|_| eyre!("failed to receive from ShmemServer"))
.and_then(|r| r)
}
}

+ 30
- 323
binaries/daemon/src/listener/tcp.rs View File

@@ -1,17 +1,14 @@
use super::Listener;
use crate::{
shared_mem_handler,
tcp_utils::{tcp_receive, tcp_send},
DaemonNodeEvent, Event,
Event,
};
use dora_core::{
config::NodeId,
daemon_messages::{DaemonReply, DaemonRequest, DataflowId, DropEvent, NodeEvent},
};
use eyre::{eyre, Context};
use std::collections::VecDeque;
use dora_core::daemon_messages::{DaemonReply, DaemonRequest};
use eyre::Context;
use tokio::{
net::{TcpListener, TcpStream},
sync::{mpsc, oneshot},
sync::mpsc,
};

#[tracing::instrument(skip(listener, daemon_tx, shmem_handler_tx))]
@@ -41,8 +38,8 @@ pub async fn listener_loop(
}

#[tracing::instrument(skip(connection, daemon_tx, shmem_handler_tx))]
pub async fn handle_connection_loop(
mut connection: TcpStream,
async fn handle_connection_loop(
connection: TcpStream,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
) {
@@ -50,327 +47,37 @@ pub async fn handle_connection_loop(
tracing::warn!("failed to set nodelay for connection: {err}");
}

// receive the first message
let message = match receive_message(&mut connection)
.await
.wrap_err("failed to receive register message")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!("channel disconnected before register message");
return;
} // disconnected
Err(err) => {
tracing::info!("{err:?}");
return;
}
};

match message {
DaemonRequest::Register {
dataflow_id,
node_id,
} => {
let reply = DaemonReply::Result(Ok(()));
match send_reply(&mut connection, &reply)
.await
.wrap_err("failed to send register reply")
{
Ok(()) => {
let mut listener = Listener {
dataflow_id,
node_id,
connection,
daemon_tx,
shmem_handler_tx,
subscribed_events: None,
max_queue_len: 10, // TODO: make this configurable
queue: VecDeque::new(),
};
match listener.run().await.wrap_err("listener failed") {
Ok(()) => {}
Err(err) => tracing::error!("{err:?}"),
}
}
Err(err) => {
tracing::warn!("{err:?}");
}
}
}
other => {
tracing::warn!("expected register message, got `{other:?}`");
let reply = DaemonReply::Result(Err("must send register message first".into()));
if let Err(err) = send_reply(&mut connection, &reply)
.await
.wrap_err("failed to send reply")
{
tracing::warn!("{err:?}");
}
}
}
}

async fn receive_message(connection: &mut TcpStream) -> eyre::Result<Option<DaemonRequest>> {
let raw = match tcp_receive(connection).await {
Ok(raw) => raw,
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => {
return Ok(None)
}
_other => {
return Err(err)
.context("unexpected I/O error while trying to receive DaemonRequest")
}
},
};
bincode::deserialize(&raw)
.wrap_err("failed to deserialize DaemonRequest")
.map(Some)
Listener::run(TcpConnection(connection), daemon_tx, shmem_handler_tx).await
}

async fn send_reply(connection: &mut TcpStream, message: &DaemonReply) -> eyre::Result<()> {
let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?;
tcp_send(connection, &serialized)
.await
.wrap_err("failed to send DaemonReply")?;
Ok(())
}
struct TcpConnection(TcpStream);

struct Listener {
dataflow_id: DataflowId,
node_id: NodeId,
connection: TcpStream,
daemon_tx: mpsc::Sender<Event>,
shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>,
subscribed_events: Option<flume::Receiver<NodeEvent>>,
max_queue_len: usize,
queue: VecDeque<NodeEvent>,
}

impl Listener {
async fn run(&mut self) -> eyre::Result<()> {
loop {
// receive the next node message
let message = match receive_message(&mut self.connection)
.await
.wrap_err("failed to receive DaemonRequest")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!(
"channel disconnected: {}/{}",
self.dataflow_id,
self.node_id
);
break;
} // disconnected
Err(err) => {
tracing::warn!("{err:?}");
continue;
#[async_trait::async_trait]
impl super::Connection for TcpConnection {
async fn receive_message(&mut self) -> eyre::Result<Option<DaemonRequest>> {
let raw = match tcp_receive(&mut self.0).await {
Ok(raw) => raw,
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => {
return Ok(None)
}
};

// handle incoming events
self.handle_events().await?;

self.handle_message(message).await?;
}
Ok(())
}

async fn handle_events(&mut self) -> eyre::Result<()> {
if let Some(events) = &mut self.subscribed_events {
while let Ok(event) = events.try_recv() {
self.queue.push_back(event);
}

// drop oldest input events to maintain max queue length queue
let input_event_count = self
.queue
.iter()
.filter(|e| matches!(e, NodeEvent::Input { .. }))
.count();
let drop_n = input_event_count.saturating_sub(self.max_queue_len);
self.drop_oldest_inputs(drop_n).await?;
}
Ok(())
}

async fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> {
let mut drop_tokens = Vec::new();
for i in 0..number {
// find index of oldest input event
let index = self
.queue
.iter()
.position(|e| matches!(e, NodeEvent::Input { .. }))
.unwrap_or_else(|| panic!("no input event found in drop iteration {i}"));

// remove that event
if let Some(event) = self.queue.remove(index) {
if let NodeEvent::Input {
data: Some(data), ..
} = event
{
drop_tokens.push(data.drop_token);
_other => {
return Err(err)
.context("unexpected I/O error while trying to receive DaemonRequest")
}
}
}
self.report_drop_tokens(drop_tokens).await?;
Ok(())
}

#[tracing::instrument(skip(self), fields(%self.dataflow_id, %self.node_id))]
async fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> {
match message {
DaemonRequest::Register { .. } => {
let reply = DaemonReply::Result(Err("unexpected register message".into()));
self.send_reply(&reply).await?;
}
DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped).await?,
DaemonRequest::CloseOutputs(outputs) => {
self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs))
.await?
}
DaemonRequest::PrepareOutputMessage {
output_id,
metadata,
data_len,
} => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::PrepareOutputMessage {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data_len,
reply_sender,
};
self.send_shared_memory_event(event).await?;
let reply = reply
.await
.wrap_err("failed to receive prepare output reply")?;
// tracing::debug!("prepare latency: {:?}", start.elapsed()?);
self.send_reply(&reply).await?;
}
DaemonRequest::SendPreparedMessage { id } => {
let (reply_sender, reply) = oneshot::channel();
let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender };
self.send_shared_memory_event(event).await?;
self.send_reply(
&reply
.await
.wrap_err("failed to receive send output reply")?,
)
.await?;
}
DaemonRequest::SendEmptyMessage {
output_id,
metadata,
} => {
// let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?;
// tracing::debug!("listener SendEmptyMessage: {elapsed:?}");
let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
output_id,
metadata,
data: None,
});
let result = self
.send_daemon_event(event)
.await
.map_err(|_| "failed to receive send_empty_message reply".to_owned());
self.send_reply(&DaemonReply::Result(result)).await?;
}
DaemonRequest::Subscribe => {
let (tx, rx) = flume::bounded(100);
self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx })
.await?;
self.subscribed_events = Some(rx);
}
DaemonRequest::NextEvent { drop_tokens } => {
self.report_drop_tokens(drop_tokens).await?;

// try to take the latest queued event first
let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent);
let reply = match queued_event {
Some(reply) => reply,
None => {
match self.subscribed_events.as_mut() {
// wait for next event
Some(events) => match events.recv_async().await {
Ok(event) => DaemonReply::NodeEvent(event),
Err(flume::RecvError::Disconnected) => DaemonReply::Closed,
},
None => DaemonReply::Result(Err(
"Ignoring event request because no subscribe \
message was sent yet"
.into(),
)),
}
}
};

self.send_reply(&reply).await?;
}
}
Ok(())
}

async fn report_drop_tokens(
&mut self,
drop_tokens: Vec<dora_core::daemon_messages::DropToken>,
) -> eyre::Result<()> {
if !drop_tokens.is_empty() {
let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent {
tokens: drop_tokens,
});
self.send_shared_memory_event(drop_event).await?;
}
Ok(())
}

async fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> {
// send NodeEvent to daemon main loop
let (reply_tx, reply) = oneshot::channel();
let event = Event::Node {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
event,
reply_sender: reply_tx,
},
};
self.daemon_tx
.send(event)
.await
.map_err(|_| eyre!("failed to send event to daemon"))?;
let reply = reply
.await
.map_err(|_| eyre!("failed to receive reply from daemon"))?;
self.send_reply(&reply).await?;
Ok(())
}

async fn send_reply(&mut self, reply: &DaemonReply) -> eyre::Result<()> {
send_reply(&mut self.connection, reply)
.await
.wrap_err("failed to send reply to node")
bincode::deserialize(&raw)
.wrap_err("failed to deserialize DaemonRequest")
.map(Some)
}

async fn send_shared_memory_event(
&self,
event: shared_mem_handler::NodeEvent,
) -> eyre::Result<()> {
self.shmem_handler_tx
.send_async(event)
async fn send_reply(&mut self, message: DaemonReply) -> eyre::Result<()> {
let serialized =
bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?;
tcp_send(&mut self.0, &serialized)
.await
.map_err(|_| eyre!("failed to send event to shared_mem_handler"))
}

async fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> {
self.daemon_tx
.send(event)
.await
.map_err(|_| eyre!("failed to send event to daemon"))
.wrap_err("failed to send DaemonReply")?;
Ok(())
}
}

+ 7
- 83
binaries/daemon/src/spawn.rs View File

@@ -1,18 +1,16 @@
use crate::{
listener, runtime_node_inputs, runtime_node_outputs, shared_mem_handler, DoraEvent, Event,
listener::spawn_listener_loop, runtime_node_inputs, runtime_node_outputs, shared_mem_handler,
DoraEvent, Event,
};
use dora_core::{
config::{NodeId, NodeRunConfig},
daemon_messages::{
DaemonCommunication, DaemonCommunicationConfig, DataflowId, NodeConfig, RuntimeConfig,
},
config::NodeRunConfig,
daemon_messages::{DaemonCommunicationConfig, DataflowId, NodeConfig, RuntimeConfig},
descriptor::{resolve_path, source_is_url, OperatorSource, ResolvedNode},
};
use dora_download::download_file;
use eyre::{eyre, WrapErr};
use shared_memory_server::{ShmemConf, ShmemServer};
use std::{env::consts::EXE_EXTENSION, net::Ipv4Addr, path::Path, process::Stdio};
use tokio::{net::TcpListener, sync::mpsc};
use std::{env::consts::EXE_EXTENSION, path::Path, process::Stdio};
use tokio::sync::mpsc;

pub async fn spawn_node(
dataflow_id: DataflowId,
@@ -25,7 +23,7 @@ pub async fn spawn_node(
let node_id = node.id.clone();
tracing::debug!("Spawning node `{dataflow_id}/{node_id}`");

let daemon_communication = daemon_communication_config(
let daemon_communication = spawn_listener_loop(
&dataflow_id,
&node_id,
&daemon_tx,
@@ -150,77 +148,3 @@ pub async fn spawn_node(
});
Ok(())
}

async fn daemon_communication_config(
dataflow_id: &DataflowId,
node_id: &NodeId,
daemon_tx: &mpsc::Sender<Event>,
shmem_handler_tx: &flume::Sender<shared_mem_handler::NodeEvent>,
config: DaemonCommunicationConfig,
) -> eyre::Result<DaemonCommunication> {
match config {
DaemonCommunicationConfig::Tcp => {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, 0)).await {
Ok(socket) => socket,
Err(err) => {
return Err(
eyre::Report::new(err).wrap_err("failed to create local TCP listener")
)
}
};
let socket_addr = socket
.local_addr()
.wrap_err("failed to get local addr of socket")?;

let event_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
let shmem_handler_tx = shmem_handler_tx.clone();
tokio::spawn(async move {
listener::tcp::listener_loop(socket, daemon_tx, shmem_handler_tx).await;
tracing::debug!("event listener loop finished for `{event_loop_node_id}`");
});

Ok(DaemonCommunication::Tcp { socket_addr })
}
DaemonCommunicationConfig::Shmem => {
let daemon_control_region = ShmemConf::new()
.size(4096)
.create()
.wrap_err("failed to allocate daemon_control_region")?;
let daemon_events_region = ShmemConf::new()
.size(4096)
.create()
.wrap_err("failed to allocate daemon_events_region")?;
let daemon_control_region_id = daemon_control_region.get_os_id().to_owned();
let daemon_events_region_id = daemon_events_region.get_os_id().to_owned();

{
let server = unsafe { ShmemServer::new(daemon_control_region) }
.wrap_err("failed to create control server")?;
let daemon_tx = daemon_tx.clone();
let shmem_handler_tx = shmem_handler_tx.clone();
tokio::task::spawn_blocking(move || {
listener::shmem::listener_loop(server, daemon_tx, shmem_handler_tx)
});
}

{
let server = unsafe { ShmemServer::new(daemon_events_region) }
.wrap_err("failed to create events server")?;
let event_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
let shmem_handler_tx = shmem_handler_tx.clone();
tokio::task::spawn_blocking(move || {
listener::shmem::listener_loop(server, daemon_tx, shmem_handler_tx);
tracing::debug!("event listener loop finished for `{event_loop_node_id}`");
});
}

Ok(DaemonCommunication::Shmem {
daemon_control_region_id,
daemon_events_region_id,
})
}
}
}

+ 2
- 0
examples/rust-dataflow/dataflow.yml View File

@@ -2,6 +2,8 @@ communication:
zenoh:
prefix: /example-rust-dataflow

daemon_config: Tcp # or Shmem

nodes:
- id: rust-node
custom:


Loading…
Cancel
Save