From 556e2e2ec221b863f833a66a23b49f37385f2561 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 21 Feb 2023 14:48:35 +0100 Subject: [PATCH] Unify daemon listener implementations to avoid code duplication --- Cargo.lock | 24 +- binaries/daemon/Cargo.toml | 1 + binaries/daemon/src/listener/mod.rs | 408 ++++++++++++++++++++++++++ binaries/daemon/src/listener/shmem.rs | 331 ++++----------------- binaries/daemon/src/listener/tcp.rs | 353 ++-------------------- binaries/daemon/src/spawn.rs | 90 +----- examples/rust-dataflow/dataflow.yml | 2 + 7 files changed, 520 insertions(+), 689 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 696e6a19..8f807965 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -219,9 +219,9 @@ checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9" [[package]] name = "async-trait" -version = "0.1.53" +version = "0.1.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" +checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" dependencies = [ "proc-macro2", "quote", @@ -716,7 +716,7 @@ dependencies = [ "cfg-if 1.0.0", "crossbeam-utils", "lazy_static", - "memoffset", + "memoffset 0.6.5", "scopeguard", ] @@ -1008,6 +1008,7 @@ dependencies = [ name = "dora-daemon" version = "0.1.0" dependencies = [ + "async-trait", "bincode", "clap 3.2.20", "ctrlc", @@ -2061,6 +2062,15 @@ dependencies = [ "autocfg 1.1.0", ] +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg 1.1.0", +] + [[package]] name = "mime" version = "0.3.16" @@ -2301,7 +2311,7 @@ dependencies = [ "cc", "cfg-if 1.0.0", "libc", - "memoffset", + "memoffset 0.6.5", ] [[package]] @@ -2314,7 +2324,7 @@ dependencies = [ "cc", "cfg-if 1.0.0", "libc", - "memoffset", + "memoffset 0.6.5", ] [[package]] @@ -2326,6 +2336,8 @@ dependencies = [ "bitflags", "cfg-if 1.0.0", "libc", + "memoffset 0.7.1", + "pin-utils", "static_assertions", ] @@ -3175,7 +3187,7 @@ checksum = "2a34bde3561f980a51c70495164200569a11662644fe5af017f0b5d7015688cc" dependencies = [ "cfg-if 0.1.10", "libc", - "nix 0.23.1", + "nix 0.26.2", "rand", "winapi", ] diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 67470957..0945a616 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -25,3 +25,4 @@ clap = { version = "3.1.8", features = ["derive"] } shared-memory-server = { path = "../../libraries/shared-memory-server" } ctrlc = "3.2.5" bincode = "1.3.3" +async-trait = "0.1.64" diff --git a/binaries/daemon/src/listener/mod.rs b/binaries/daemon/src/listener/mod.rs index 70fbb7d5..c0803853 100644 --- a/binaries/daemon/src/listener/mod.rs +++ b/binaries/daemon/src/listener/mod.rs @@ -1,2 +1,410 @@ +use crate::{shared_mem_handler, DaemonNodeEvent, Event}; +use dora_core::{ + config::NodeId, + daemon_messages::{ + DaemonCommunication, DaemonCommunicationConfig, DaemonReply, DaemonRequest, DataflowId, + DropEvent, NodeEvent, + }, +}; +use eyre::{eyre, Context}; +use shared_memory_server::{ShmemConf, ShmemServer}; +use std::{collections::VecDeque, net::Ipv4Addr}; +use tokio::{ + net::TcpListener, + sync::{mpsc, oneshot}, +}; + +// TODO unify and avoid duplication; pub mod shmem; pub mod tcp; + +pub async fn spawn_listener_loop( + dataflow_id: &DataflowId, + node_id: &NodeId, + daemon_tx: &mpsc::Sender, + shmem_handler_tx: &flume::Sender, + config: DaemonCommunicationConfig, +) -> eyre::Result { + match config { + DaemonCommunicationConfig::Tcp => { + let localhost = Ipv4Addr::new(127, 0, 0, 1); + let socket = match TcpListener::bind((localhost, 0)).await { + Ok(socket) => socket, + Err(err) => { + return Err( + eyre::Report::new(err).wrap_err("failed to create local TCP listener") + ) + } + }; + let socket_addr = socket + .local_addr() + .wrap_err("failed to get local addr of socket")?; + + let event_loop_node_id = format!("{dataflow_id}/{node_id}"); + let daemon_tx = daemon_tx.clone(); + let shmem_handler_tx = shmem_handler_tx.clone(); + tokio::spawn(async move { + tcp::listener_loop(socket, daemon_tx, shmem_handler_tx).await; + tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); + }); + + Ok(DaemonCommunication::Tcp { socket_addr }) + } + DaemonCommunicationConfig::Shmem => { + let daemon_control_region = ShmemConf::new() + .size(4096) + .create() + .wrap_err("failed to allocate daemon_control_region")?; + let daemon_events_region = ShmemConf::new() + .size(4096) + .create() + .wrap_err("failed to allocate daemon_events_region")?; + let daemon_control_region_id = daemon_control_region.get_os_id().to_owned(); + let daemon_events_region_id = daemon_events_region.get_os_id().to_owned(); + + { + let server = unsafe { ShmemServer::new(daemon_control_region) } + .wrap_err("failed to create control server")?; + let daemon_tx = daemon_tx.clone(); + let shmem_handler_tx = shmem_handler_tx.clone(); + tokio::spawn(shmem::listener_loop(server, daemon_tx, shmem_handler_tx)); + } + + { + let server = unsafe { ShmemServer::new(daemon_events_region) } + .wrap_err("failed to create events server")?; + let event_loop_node_id = format!("{dataflow_id}/{node_id}"); + let daemon_tx = daemon_tx.clone(); + let shmem_handler_tx = shmem_handler_tx.clone(); + tokio::task::spawn(async move { + shmem::listener_loop(server, daemon_tx, shmem_handler_tx).await; + tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); + }); + } + + Ok(DaemonCommunication::Shmem { + daemon_control_region_id, + daemon_events_region_id, + }) + } + } +} + +struct Listener { + dataflow_id: DataflowId, + node_id: NodeId, + daemon_tx: mpsc::Sender, + shmem_handler_tx: flume::Sender, + subscribed_events: Option>, + max_queue_len: usize, + queue: VecDeque, + connection: C, +} + +impl Listener +where + C: Connection, +{ + pub(crate) async fn run( + mut connection: C, + daemon_tx: mpsc::Sender, + shmem_handler_tx: flume::Sender, + ) { + // receive the first message + let message = match connection + .receive_message() + .await + .wrap_err("failed to receive register message") + { + Ok(Some(m)) => m, + Ok(None) => { + tracing::info!("channel disconnected before register message"); + return; + } // disconnected + Err(err) => { + tracing::info!("{err:?}"); + return; + } + }; + + match message { + DaemonRequest::Register { + dataflow_id, + node_id, + } => { + let reply = DaemonReply::Result(Ok(())); + match connection + .send_reply(reply) + .await + .wrap_err("failed to send register reply") + { + Ok(()) => { + let mut listener = Listener { + dataflow_id, + node_id, + connection, + daemon_tx, + shmem_handler_tx, + subscribed_events: None, + max_queue_len: 10, // TODO: make this configurable + queue: VecDeque::new(), + }; + match listener.run_inner().await.wrap_err("listener failed") { + Ok(()) => {} + Err(err) => tracing::error!("{err:?}"), + } + } + Err(err) => { + tracing::warn!("{err:?}"); + } + } + } + other => { + tracing::warn!("expected register message, got `{other:?}`"); + let reply = DaemonReply::Result(Err("must send register message first".into())); + if let Err(err) = connection + .send_reply(reply) + .await + .wrap_err("failed to send reply") + { + tracing::warn!("{err:?}"); + } + } + } + } + + async fn run_inner(&mut self) -> eyre::Result<()> { + loop { + // receive the next node message + let message = match self + .connection + .receive_message() + .await + .wrap_err("failed to receive DaemonRequest") + { + Ok(Some(m)) => m, + Ok(None) => { + tracing::info!( + "channel disconnected: {}/{}", + self.dataflow_id, + self.node_id + ); + break; + } // disconnected + Err(err) => { + tracing::warn!("{err:?}"); + continue; + } + }; + + // handle incoming events + self.handle_events().await?; + + self.handle_message(message).await?; + } + Ok(()) + } + + async fn handle_events(&mut self) -> eyre::Result<()> { + if let Some(events) = &mut self.subscribed_events { + while let Ok(event) = events.try_recv() { + self.queue.push_back(event); + } + + // drop oldest input events to maintain max queue length queue + let input_event_count = self + .queue + .iter() + .filter(|e| matches!(e, NodeEvent::Input { .. })) + .count(); + let drop_n = input_event_count.saturating_sub(self.max_queue_len); + self.drop_oldest_inputs(drop_n).await?; + } + Ok(()) + } + + async fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> { + let mut drop_tokens = Vec::new(); + for i in 0..number { + // find index of oldest input event + let index = self + .queue + .iter() + .position(|e| matches!(e, NodeEvent::Input { .. })) + .unwrap_or_else(|| panic!("no input event found in drop iteration {i}")); + + // remove that event + if let Some(event) = self.queue.remove(index) { + if let NodeEvent::Input { + data: Some(data), .. + } = event + { + drop_tokens.push(data.drop_token); + } + } + } + self.report_drop_tokens(drop_tokens).await?; + Ok(()) + } + + #[tracing::instrument(skip(self), fields(%self.dataflow_id, %self.node_id))] + async fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> { + match message { + DaemonRequest::Register { .. } => { + let reply = DaemonReply::Result(Err("unexpected register message".into())); + self.send_reply(reply).await?; + } + DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped).await?, + DaemonRequest::CloseOutputs(outputs) => { + self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs)) + .await? + } + DaemonRequest::PrepareOutputMessage { + output_id, + metadata, + data_len, + } => { + let (reply_sender, reply) = oneshot::channel(); + let event = shared_mem_handler::NodeEvent::PrepareOutputMessage { + dataflow_id: self.dataflow_id, + node_id: self.node_id.clone(), + output_id, + metadata, + data_len, + reply_sender, + }; + self.send_shared_memory_event(event).await?; + let reply = reply + .await + .wrap_err("failed to receive prepare output reply")?; + // tracing::debug!("prepare latency: {:?}", start.elapsed()?); + self.send_reply(reply).await?; + } + DaemonRequest::SendPreparedMessage { id } => { + let (reply_sender, reply) = oneshot::channel(); + let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender }; + self.send_shared_memory_event(event).await?; + self.send_reply( + reply + .await + .wrap_err("failed to receive send output reply")?, + ) + .await?; + } + DaemonRequest::SendEmptyMessage { + output_id, + metadata, + } => { + // let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?; + // tracing::debug!("listener SendEmptyMessage: {elapsed:?}"); + let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut { + dataflow_id: self.dataflow_id, + node_id: self.node_id.clone(), + output_id, + metadata, + data: None, + }); + let result = self + .send_daemon_event(event) + .await + .map_err(|_| "failed to receive send_empty_message reply".to_owned()); + self.send_reply(DaemonReply::Result(result)).await?; + } + DaemonRequest::Subscribe => { + let (tx, rx) = flume::bounded(100); + self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx }) + .await?; + self.subscribed_events = Some(rx); + } + DaemonRequest::NextEvent { drop_tokens } => { + self.report_drop_tokens(drop_tokens).await?; + + // try to take the latest queued event first + let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent); + let reply = match queued_event { + Some(reply) => reply, + None => { + match self.subscribed_events.as_mut() { + // wait for next event + Some(events) => match events.recv_async().await { + Ok(event) => DaemonReply::NodeEvent(event), + Err(flume::RecvError::Disconnected) => DaemonReply::Closed, + }, + None => DaemonReply::Result(Err( + "Ignoring event request because no subscribe \ + message was sent yet" + .into(), + )), + } + } + }; + + self.send_reply(reply).await?; + } + } + Ok(()) + } + + async fn report_drop_tokens( + &mut self, + drop_tokens: Vec, + ) -> eyre::Result<()> { + if !drop_tokens.is_empty() { + let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent { + tokens: drop_tokens, + }); + self.send_shared_memory_event(drop_event).await?; + } + Ok(()) + } + + async fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> { + // send NodeEvent to daemon main loop + let (reply_tx, reply) = oneshot::channel(); + let event = Event::Node { + dataflow_id: self.dataflow_id, + node_id: self.node_id.clone(), + event, + reply_sender: reply_tx, + }; + self.daemon_tx + .send(event) + .await + .map_err(|_| eyre!("failed to send event to daemon"))?; + let reply = reply + .await + .map_err(|_| eyre!("failed to receive reply from daemon"))?; + self.send_reply(reply).await?; + Ok(()) + } + + async fn send_reply(&mut self, reply: DaemonReply) -> eyre::Result<()> { + self.connection + .send_reply(reply) + .await + .wrap_err("failed to send reply to node") + } + + async fn send_shared_memory_event( + &self, + event: shared_mem_handler::NodeEvent, + ) -> eyre::Result<()> { + self.shmem_handler_tx + .send_async(event) + .await + .map_err(|_| eyre!("failed to send event to shared_mem_handler")) + } + + async fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> { + self.daemon_tx + .send(event) + .await + .map_err(|_| eyre!("failed to send event to daemon")) + } +} + +#[async_trait::async_trait] +trait Connection { + async fn receive_message(&mut self) -> eyre::Result>; + async fn send_reply(&mut self, message: DaemonReply) -> eyre::Result<()>; +} diff --git a/binaries/daemon/src/listener/shmem.rs b/binaries/daemon/src/listener/shmem.rs index 9e12a2de..c05f1d86 100644 --- a/binaries/daemon/src/listener/shmem.rs +++ b/binaries/daemon/src/listener/shmem.rs @@ -1,298 +1,75 @@ -use std::collections::VecDeque; - -use crate::{shared_mem_handler, DaemonNodeEvent, Event}; -use dora_core::{ - config::NodeId, - daemon_messages::{DaemonReply, DaemonRequest, DataflowId, DropEvent, NodeEvent}, -}; -use eyre::{eyre, Context}; +use super::Listener; +use crate::{shared_mem_handler, Event}; +use dora_core::daemon_messages::{DaemonReply, DaemonRequest}; +use eyre::eyre; use shared_memory_server::ShmemServer; use tokio::sync::{mpsc, oneshot}; #[tracing::instrument(skip(server, daemon_tx, shmem_handler_tx))] -pub fn listener_loop( +pub async fn listener_loop( mut server: ShmemServer, daemon_tx: mpsc::Sender, shmem_handler_tx: flume::Sender, ) { - // receive the first message - let message = match server - .listen() - .wrap_err("failed to receive register message") - { - Ok(Some(m)) => m, - Ok(None) => { - tracing::info!("channel disconnected before register message"); - return; - } // disconnected - Err(err) => { - tracing::info!("{err:?}"); - return; - } - }; - - match message { - DaemonRequest::Register { - dataflow_id, - node_id, - } => { - let reply = DaemonReply::Result(Ok(())); - match server - .send_reply(&reply) - .wrap_err("failed to send register reply") - { - Ok(()) => { - let mut listener = Listener { - dataflow_id, - node_id, - server, - daemon_tx, - shmem_handler_tx, - subscribed_events: None, - max_queue_len: 10, // TODO: make this configurable - queue: VecDeque::new(), - }; - match listener.run().wrap_err("listener failed") { - Ok(()) => {} - Err(err) => tracing::error!("{err:?}"), + let (tx, rx) = flume::bounded(0); + tokio::task::spawn_blocking(move || { + while let Ok(operation) = rx.recv() { + match operation { + Operation::Receive(sender) => { + if sender.send(server.listen()).is_err() { + break; } } - Err(err) => { - tracing::warn!("{err:?}"); + Operation::Send { + message, + result_sender, + } => { + let result = server.send_reply(&message); + if result_sender.send(result).is_err() { + break; + } } } } - _ => { - let reply = DaemonReply::Result(Err("must send register message first".into())); - if let Err(err) = server.send_reply(&reply).wrap_err("failed to send reply") { - tracing::warn!("{err:?}"); - } - } - } + }); + let connection = ShmemConnection(tx); + Listener::run(connection, daemon_tx, shmem_handler_tx).await } -struct Listener { - dataflow_id: DataflowId, - node_id: NodeId, - server: ShmemServer, - daemon_tx: mpsc::Sender, - shmem_handler_tx: flume::Sender, - subscribed_events: Option>, - max_queue_len: usize, - queue: VecDeque, +enum Operation { + Receive(oneshot::Sender>>), + Send { + message: DaemonReply, + result_sender: oneshot::Sender>, + }, } -impl Listener { - fn run(&mut self) -> eyre::Result<()> { - loop { - // receive the next node message - let message = match self - .server - .listen() - .wrap_err("failed to receive DaemonRequest") - { - Ok(Some(m)) => m, - Ok(None) => { - tracing::info!( - "channel disconnected: {}/{}", - self.dataflow_id, - self.node_id - ); - break; - } // disconnected - Err(err) => { - tracing::warn!("{err:?}"); - continue; - } - }; - - // handle incoming events - self.handle_events()?; - - self.handle_message(message)?; - } - Ok(()) - } - - fn handle_events(&mut self) -> eyre::Result<()> { - if let Some(events) = &mut self.subscribed_events { - while let Ok(event) = events.try_recv() { - self.queue.push_back(event); - } - - // drop oldest input events to maintain max queue length queue - let input_event_count = self - .queue - .iter() - .filter(|e| matches!(e, NodeEvent::Input { .. })) - .count(); - let drop_n = input_event_count.saturating_sub(self.max_queue_len); - self.drop_oldest_inputs(drop_n)?; - } - Ok(()) - } - - fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> { - let mut drop_tokens = Vec::new(); - for i in 0..number { - // find index of oldest input event - let index = self - .queue - .iter() - .position(|e| matches!(e, NodeEvent::Input { .. })) - .expect(&format!("no input event found in drop iteration {i}")); - - // remove that event - if let Some(event) = self.queue.remove(index) { - if let NodeEvent::Input { - data: Some(data), .. - } = event - { - drop_tokens.push(data.drop_token); - } - } - } - self.report_drop_tokens(drop_tokens)?; - Ok(()) - } - - fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> { - match message { - DaemonRequest::Register { .. } => { - let reply = DaemonReply::Result(Err("unexpected register message".into())); - self.send_reply(&reply)?; - } - DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped)?, - DaemonRequest::CloseOutputs(outputs) => { - self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs))? - } - DaemonRequest::PrepareOutputMessage { - output_id, - metadata, - data_len, - } => { - let (reply_sender, reply) = oneshot::channel(); - let event = shared_mem_handler::NodeEvent::PrepareOutputMessage { - dataflow_id: self.dataflow_id, - node_id: self.node_id.clone(), - output_id, - metadata, - data_len, - reply_sender, - }; - self.send_shared_memory_event(event)?; - let reply = reply - .blocking_recv() - .wrap_err("failed to receive prepare output reply")?; - // tracing::debug!("prepare latency: {:?}", start.elapsed()?); - self.send_reply(&reply)?; - } - DaemonRequest::SendPreparedMessage { id } => { - let (reply_sender, reply) = oneshot::channel(); - let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender }; - self.send_shared_memory_event(event)?; - self.send_reply( - &reply - .blocking_recv() - .wrap_err("failed to receive send output reply")?, - )?; - } - DaemonRequest::SendEmptyMessage { - output_id, - metadata, - } => { - // let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?; - // tracing::debug!("listener SendEmptyMessage: {elapsed:?}"); - let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut { - dataflow_id: self.dataflow_id, - node_id: self.node_id.clone(), - output_id, - metadata, - data: None, - }); - let result = self - .send_daemon_event(event) - .map_err(|_| "failed to receive send_empty_message reply".to_owned()); - self.send_reply(&DaemonReply::Result(result))?; - } - DaemonRequest::Subscribe => { - let (tx, rx) = flume::bounded(100); - self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx })?; - self.subscribed_events = Some(rx); - } - DaemonRequest::NextEvent { drop_tokens } => { - self.report_drop_tokens(drop_tokens)?; - - // try to take the latest queued event first - let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent); - let reply = queued_event.unwrap_or_else(|| { - match self.subscribed_events.as_mut() { - // wait for next event - Some(events) => match events.recv() { - Ok(event) => DaemonReply::NodeEvent(event), - Err(flume::RecvError::Disconnected) => DaemonReply::Closed, - }, - None => { - DaemonReply::Result(Err("Ignoring event request because no subscribe \ - message was sent yet" - .into())) - } - } - }); - - self.send_reply(&reply)?; - } - } - Ok(()) - } - - fn report_drop_tokens( - &mut self, - drop_tokens: Vec, - ) -> eyre::Result<()> { - if !drop_tokens.is_empty() { - let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent { - tokens: drop_tokens, - }); - self.send_shared_memory_event(drop_event)?; - } - Ok(()) - } - - fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> { - // send NodeEvent to daemon main loop - let (reply_tx, reply) = oneshot::channel(); - let event = Event::Node { - dataflow_id: self.dataflow_id.clone(), - node_id: self.node_id.clone(), - event, - reply_sender: reply_tx, - }; - self.daemon_tx - .blocking_send(event) - .map_err(|_| eyre!("failed to send event to daemon"))?; - let reply = reply - .blocking_recv() - .map_err(|_| eyre!("failed to receive reply from daemon"))?; - self.send_reply(&reply)?; - Ok(()) - } - - fn send_reply(&mut self, reply: &DaemonReply) -> eyre::Result<()> { - self.server - .send_reply(&reply) - .wrap_err("failed to send reply to node") - } - - fn send_shared_memory_event(&self, event: shared_mem_handler::NodeEvent) -> eyre::Result<()> { - self.shmem_handler_tx - .send(event) - .map_err(|_| eyre!("failed to send event to shared_mem_handler")) +struct ShmemConnection(flume::Sender); + +#[async_trait::async_trait] +impl super::Connection for ShmemConnection { + async fn receive_message(&mut self) -> eyre::Result> { + let (tx, rx) = oneshot::channel(); + self.0 + .send_async(Operation::Receive(tx)) + .await + .map_err(|_| eyre!("failed send receive request to ShmemServer"))?; + rx.await + .map_err(|_| eyre!("failed to receive from ShmemServer")) + .and_then(|r| r) } - fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> { - self.daemon_tx - .blocking_send(event) - .map_err(|_| eyre!("failed to send event to daemon")) + async fn send_reply(&mut self, reply: DaemonReply) -> eyre::Result<()> { + let (tx, rx) = oneshot::channel(); + self.0 + .send_async(Operation::Send { + message: reply, + result_sender: tx, + }) + .await + .map_err(|_| eyre!("failed send send request to ShmemServer"))?; + rx.await + .map_err(|_| eyre!("failed to receive from ShmemServer")) + .and_then(|r| r) } } diff --git a/binaries/daemon/src/listener/tcp.rs b/binaries/daemon/src/listener/tcp.rs index c3ca824f..cc216cca 100644 --- a/binaries/daemon/src/listener/tcp.rs +++ b/binaries/daemon/src/listener/tcp.rs @@ -1,17 +1,14 @@ +use super::Listener; use crate::{ shared_mem_handler, tcp_utils::{tcp_receive, tcp_send}, - DaemonNodeEvent, Event, + Event, }; -use dora_core::{ - config::NodeId, - daemon_messages::{DaemonReply, DaemonRequest, DataflowId, DropEvent, NodeEvent}, -}; -use eyre::{eyre, Context}; -use std::collections::VecDeque; +use dora_core::daemon_messages::{DaemonReply, DaemonRequest}; +use eyre::Context; use tokio::{ net::{TcpListener, TcpStream}, - sync::{mpsc, oneshot}, + sync::mpsc, }; #[tracing::instrument(skip(listener, daemon_tx, shmem_handler_tx))] @@ -41,8 +38,8 @@ pub async fn listener_loop( } #[tracing::instrument(skip(connection, daemon_tx, shmem_handler_tx))] -pub async fn handle_connection_loop( - mut connection: TcpStream, +async fn handle_connection_loop( + connection: TcpStream, daemon_tx: mpsc::Sender, shmem_handler_tx: flume::Sender, ) { @@ -50,327 +47,37 @@ pub async fn handle_connection_loop( tracing::warn!("failed to set nodelay for connection: {err}"); } - // receive the first message - let message = match receive_message(&mut connection) - .await - .wrap_err("failed to receive register message") - { - Ok(Some(m)) => m, - Ok(None) => { - tracing::info!("channel disconnected before register message"); - return; - } // disconnected - Err(err) => { - tracing::info!("{err:?}"); - return; - } - }; - - match message { - DaemonRequest::Register { - dataflow_id, - node_id, - } => { - let reply = DaemonReply::Result(Ok(())); - match send_reply(&mut connection, &reply) - .await - .wrap_err("failed to send register reply") - { - Ok(()) => { - let mut listener = Listener { - dataflow_id, - node_id, - connection, - daemon_tx, - shmem_handler_tx, - subscribed_events: None, - max_queue_len: 10, // TODO: make this configurable - queue: VecDeque::new(), - }; - match listener.run().await.wrap_err("listener failed") { - Ok(()) => {} - Err(err) => tracing::error!("{err:?}"), - } - } - Err(err) => { - tracing::warn!("{err:?}"); - } - } - } - other => { - tracing::warn!("expected register message, got `{other:?}`"); - let reply = DaemonReply::Result(Err("must send register message first".into())); - if let Err(err) = send_reply(&mut connection, &reply) - .await - .wrap_err("failed to send reply") - { - tracing::warn!("{err:?}"); - } - } - } -} - -async fn receive_message(connection: &mut TcpStream) -> eyre::Result> { - let raw = match tcp_receive(connection).await { - Ok(raw) => raw, - Err(err) => match err.kind() { - std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => { - return Ok(None) - } - _other => { - return Err(err) - .context("unexpected I/O error while trying to receive DaemonRequest") - } - }, - }; - bincode::deserialize(&raw) - .wrap_err("failed to deserialize DaemonRequest") - .map(Some) + Listener::run(TcpConnection(connection), daemon_tx, shmem_handler_tx).await } -async fn send_reply(connection: &mut TcpStream, message: &DaemonReply) -> eyre::Result<()> { - let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?; - tcp_send(connection, &serialized) - .await - .wrap_err("failed to send DaemonReply")?; - Ok(()) -} +struct TcpConnection(TcpStream); -struct Listener { - dataflow_id: DataflowId, - node_id: NodeId, - connection: TcpStream, - daemon_tx: mpsc::Sender, - shmem_handler_tx: flume::Sender, - subscribed_events: Option>, - max_queue_len: usize, - queue: VecDeque, -} - -impl Listener { - async fn run(&mut self) -> eyre::Result<()> { - loop { - // receive the next node message - let message = match receive_message(&mut self.connection) - .await - .wrap_err("failed to receive DaemonRequest") - { - Ok(Some(m)) => m, - Ok(None) => { - tracing::info!( - "channel disconnected: {}/{}", - self.dataflow_id, - self.node_id - ); - break; - } // disconnected - Err(err) => { - tracing::warn!("{err:?}"); - continue; +#[async_trait::async_trait] +impl super::Connection for TcpConnection { + async fn receive_message(&mut self) -> eyre::Result> { + let raw = match tcp_receive(&mut self.0).await { + Ok(raw) => raw, + Err(err) => match err.kind() { + std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => { + return Ok(None) } - }; - - // handle incoming events - self.handle_events().await?; - - self.handle_message(message).await?; - } - Ok(()) - } - - async fn handle_events(&mut self) -> eyre::Result<()> { - if let Some(events) = &mut self.subscribed_events { - while let Ok(event) = events.try_recv() { - self.queue.push_back(event); - } - - // drop oldest input events to maintain max queue length queue - let input_event_count = self - .queue - .iter() - .filter(|e| matches!(e, NodeEvent::Input { .. })) - .count(); - let drop_n = input_event_count.saturating_sub(self.max_queue_len); - self.drop_oldest_inputs(drop_n).await?; - } - Ok(()) - } - - async fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> { - let mut drop_tokens = Vec::new(); - for i in 0..number { - // find index of oldest input event - let index = self - .queue - .iter() - .position(|e| matches!(e, NodeEvent::Input { .. })) - .unwrap_or_else(|| panic!("no input event found in drop iteration {i}")); - - // remove that event - if let Some(event) = self.queue.remove(index) { - if let NodeEvent::Input { - data: Some(data), .. - } = event - { - drop_tokens.push(data.drop_token); + _other => { + return Err(err) + .context("unexpected I/O error while trying to receive DaemonRequest") } - } - } - self.report_drop_tokens(drop_tokens).await?; - Ok(()) - } - - #[tracing::instrument(skip(self), fields(%self.dataflow_id, %self.node_id))] - async fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> { - match message { - DaemonRequest::Register { .. } => { - let reply = DaemonReply::Result(Err("unexpected register message".into())); - self.send_reply(&reply).await?; - } - DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped).await?, - DaemonRequest::CloseOutputs(outputs) => { - self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs)) - .await? - } - DaemonRequest::PrepareOutputMessage { - output_id, - metadata, - data_len, - } => { - let (reply_sender, reply) = oneshot::channel(); - let event = shared_mem_handler::NodeEvent::PrepareOutputMessage { - dataflow_id: self.dataflow_id, - node_id: self.node_id.clone(), - output_id, - metadata, - data_len, - reply_sender, - }; - self.send_shared_memory_event(event).await?; - let reply = reply - .await - .wrap_err("failed to receive prepare output reply")?; - // tracing::debug!("prepare latency: {:?}", start.elapsed()?); - self.send_reply(&reply).await?; - } - DaemonRequest::SendPreparedMessage { id } => { - let (reply_sender, reply) = oneshot::channel(); - let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender }; - self.send_shared_memory_event(event).await?; - self.send_reply( - &reply - .await - .wrap_err("failed to receive send output reply")?, - ) - .await?; - } - DaemonRequest::SendEmptyMessage { - output_id, - metadata, - } => { - // let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?; - // tracing::debug!("listener SendEmptyMessage: {elapsed:?}"); - let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut { - dataflow_id: self.dataflow_id, - node_id: self.node_id.clone(), - output_id, - metadata, - data: None, - }); - let result = self - .send_daemon_event(event) - .await - .map_err(|_| "failed to receive send_empty_message reply".to_owned()); - self.send_reply(&DaemonReply::Result(result)).await?; - } - DaemonRequest::Subscribe => { - let (tx, rx) = flume::bounded(100); - self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx }) - .await?; - self.subscribed_events = Some(rx); - } - DaemonRequest::NextEvent { drop_tokens } => { - self.report_drop_tokens(drop_tokens).await?; - - // try to take the latest queued event first - let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent); - let reply = match queued_event { - Some(reply) => reply, - None => { - match self.subscribed_events.as_mut() { - // wait for next event - Some(events) => match events.recv_async().await { - Ok(event) => DaemonReply::NodeEvent(event), - Err(flume::RecvError::Disconnected) => DaemonReply::Closed, - }, - None => DaemonReply::Result(Err( - "Ignoring event request because no subscribe \ - message was sent yet" - .into(), - )), - } - } - }; - - self.send_reply(&reply).await?; - } - } - Ok(()) - } - - async fn report_drop_tokens( - &mut self, - drop_tokens: Vec, - ) -> eyre::Result<()> { - if !drop_tokens.is_empty() { - let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent { - tokens: drop_tokens, - }); - self.send_shared_memory_event(drop_event).await?; - } - Ok(()) - } - - async fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> { - // send NodeEvent to daemon main loop - let (reply_tx, reply) = oneshot::channel(); - let event = Event::Node { - dataflow_id: self.dataflow_id, - node_id: self.node_id.clone(), - event, - reply_sender: reply_tx, + }, }; - self.daemon_tx - .send(event) - .await - .map_err(|_| eyre!("failed to send event to daemon"))?; - let reply = reply - .await - .map_err(|_| eyre!("failed to receive reply from daemon"))?; - self.send_reply(&reply).await?; - Ok(()) - } - - async fn send_reply(&mut self, reply: &DaemonReply) -> eyre::Result<()> { - send_reply(&mut self.connection, reply) - .await - .wrap_err("failed to send reply to node") + bincode::deserialize(&raw) + .wrap_err("failed to deserialize DaemonRequest") + .map(Some) } - async fn send_shared_memory_event( - &self, - event: shared_mem_handler::NodeEvent, - ) -> eyre::Result<()> { - self.shmem_handler_tx - .send_async(event) + async fn send_reply(&mut self, message: DaemonReply) -> eyre::Result<()> { + let serialized = + bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?; + tcp_send(&mut self.0, &serialized) .await - .map_err(|_| eyre!("failed to send event to shared_mem_handler")) - } - - async fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> { - self.daemon_tx - .send(event) - .await - .map_err(|_| eyre!("failed to send event to daemon")) + .wrap_err("failed to send DaemonReply")?; + Ok(()) } } diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 4a8fe635..d2c7170e 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,18 +1,16 @@ use crate::{ - listener, runtime_node_inputs, runtime_node_outputs, shared_mem_handler, DoraEvent, Event, + listener::spawn_listener_loop, runtime_node_inputs, runtime_node_outputs, shared_mem_handler, + DoraEvent, Event, }; use dora_core::{ - config::{NodeId, NodeRunConfig}, - daemon_messages::{ - DaemonCommunication, DaemonCommunicationConfig, DataflowId, NodeConfig, RuntimeConfig, - }, + config::NodeRunConfig, + daemon_messages::{DaemonCommunicationConfig, DataflowId, NodeConfig, RuntimeConfig}, descriptor::{resolve_path, source_is_url, OperatorSource, ResolvedNode}, }; use dora_download::download_file; use eyre::{eyre, WrapErr}; -use shared_memory_server::{ShmemConf, ShmemServer}; -use std::{env::consts::EXE_EXTENSION, net::Ipv4Addr, path::Path, process::Stdio}; -use tokio::{net::TcpListener, sync::mpsc}; +use std::{env::consts::EXE_EXTENSION, path::Path, process::Stdio}; +use tokio::sync::mpsc; pub async fn spawn_node( dataflow_id: DataflowId, @@ -25,7 +23,7 @@ pub async fn spawn_node( let node_id = node.id.clone(); tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); - let daemon_communication = daemon_communication_config( + let daemon_communication = spawn_listener_loop( &dataflow_id, &node_id, &daemon_tx, @@ -150,77 +148,3 @@ pub async fn spawn_node( }); Ok(()) } - -async fn daemon_communication_config( - dataflow_id: &DataflowId, - node_id: &NodeId, - daemon_tx: &mpsc::Sender, - shmem_handler_tx: &flume::Sender, - config: DaemonCommunicationConfig, -) -> eyre::Result { - match config { - DaemonCommunicationConfig::Tcp => { - let localhost = Ipv4Addr::new(127, 0, 0, 1); - let socket = match TcpListener::bind((localhost, 0)).await { - Ok(socket) => socket, - Err(err) => { - return Err( - eyre::Report::new(err).wrap_err("failed to create local TCP listener") - ) - } - }; - let socket_addr = socket - .local_addr() - .wrap_err("failed to get local addr of socket")?; - - let event_loop_node_id = format!("{dataflow_id}/{node_id}"); - let daemon_tx = daemon_tx.clone(); - let shmem_handler_tx = shmem_handler_tx.clone(); - tokio::spawn(async move { - listener::tcp::listener_loop(socket, daemon_tx, shmem_handler_tx).await; - tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); - }); - - Ok(DaemonCommunication::Tcp { socket_addr }) - } - DaemonCommunicationConfig::Shmem => { - let daemon_control_region = ShmemConf::new() - .size(4096) - .create() - .wrap_err("failed to allocate daemon_control_region")?; - let daemon_events_region = ShmemConf::new() - .size(4096) - .create() - .wrap_err("failed to allocate daemon_events_region")?; - let daemon_control_region_id = daemon_control_region.get_os_id().to_owned(); - let daemon_events_region_id = daemon_events_region.get_os_id().to_owned(); - - { - let server = unsafe { ShmemServer::new(daemon_control_region) } - .wrap_err("failed to create control server")?; - let daemon_tx = daemon_tx.clone(); - let shmem_handler_tx = shmem_handler_tx.clone(); - tokio::task::spawn_blocking(move || { - listener::shmem::listener_loop(server, daemon_tx, shmem_handler_tx) - }); - } - - { - let server = unsafe { ShmemServer::new(daemon_events_region) } - .wrap_err("failed to create events server")?; - let event_loop_node_id = format!("{dataflow_id}/{node_id}"); - let daemon_tx = daemon_tx.clone(); - let shmem_handler_tx = shmem_handler_tx.clone(); - tokio::task::spawn_blocking(move || { - listener::shmem::listener_loop(server, daemon_tx, shmem_handler_tx); - tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); - }); - } - - Ok(DaemonCommunication::Shmem { - daemon_control_region_id, - daemon_events_region_id, - }) - } - } -} diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml index 110dc327..d3777d97 100644 --- a/examples/rust-dataflow/dataflow.yml +++ b/examples/rust-dataflow/dataflow.yml @@ -2,6 +2,8 @@ communication: zenoh: prefix: /example-rust-dataflow +daemon_config: Tcp # or Shmem + nodes: - id: rust-node custom: