diff --git a/Cargo.lock b/Cargo.lock index 85e40216..58af7d2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1008,6 +1008,7 @@ dependencies = [ name = "dora-daemon" version = "0.1.0" dependencies = [ + "bincode", "clap 3.2.20", "ctrlc", "dora-core", diff --git a/apis/rust/node/src/daemon.rs b/apis/rust/node/src/daemon.rs index d428aa94..2feae668 100644 --- a/apis/rust/node/src/daemon.rs +++ b/apis/rust/node/src/daemon.rs @@ -1,6 +1,6 @@ use dora_core::{ config::{DataId, NodeId}, - daemon_messages::{DaemonReply, DaemonRequest, DataflowId, NodeEvent}, + daemon_messages::{DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeEvent}, }; use dora_message::Metadata; use eyre::{bail, eyre, Context}; @@ -17,21 +17,29 @@ impl DaemonConnection { pub(crate) fn init( dataflow_id: DataflowId, node_id: &NodeId, - daemon_control_region_id: &str, - daemon_events_region_id: &str, + daemon_communication: &DaemonCommunication, ) -> eyre::Result { - let control_channel = ControlChannel::init(dataflow_id, node_id, daemon_control_region_id) - .wrap_err("failed to init control stream")?; - - let (event_stream, event_stream_thread) = - EventStream::init(dataflow_id, node_id, daemon_events_region_id) - .wrap_err("failed to init event stream")?; - - Ok(Self { - control_channel, - event_stream, - event_stream_thread, - }) + match daemon_communication { + DaemonCommunication::Shmem { + daemon_control_region_id, + daemon_events_region_id, + } => { + let control_channel = + ControlChannel::init(dataflow_id, node_id, daemon_control_region_id) + .wrap_err("failed to init control stream")?; + + let (event_stream, event_stream_thread) = + EventStream::init(dataflow_id, node_id, daemon_events_region_id) + .wrap_err("failed to init event stream")?; + + Ok(Self { + control_channel, + event_stream, + event_stream_thread, + }) + } + DaemonCommunication::Tcp { socket_addr } => todo!(), + } } } diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index d21ed053..8bb22514 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -39,21 +39,15 @@ impl DoraNode { dataflow_id, node_id, run_config, - daemon_control_region_id, - daemon_events_region_id, + daemon_communication, } = node_config; let DaemonConnection { control_channel, event_stream, event_stream_thread, - } = DaemonConnection::init( - dataflow_id, - &node_id, - &daemon_control_region_id, - &daemon_events_region_id, - ) - .wrap_err("failed to connect to dora-daemon")?; + } = DaemonConnection::init(dataflow_id, &node_id, &daemon_communication) + .wrap_err("failed to connect to dora-daemon")?; let node = Self { id: node_id, diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 51e280d7..6d164ea8 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -6,7 +6,7 @@ use dora_core::{ topics::ControlRequest, }; use eyre::{bail, eyre, Context}; -use std::{env::consts::EXE_EXTENSION, io::Write, path::Path, str::FromStr}; +use std::{env::consts::EXE_EXTENSION, io::Write, path::Path}; use termcolor::{Color, ColorChoice, ColorSpec, WriteColor}; pub fn check_environment() -> eyre::Result<()> { diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index f3da0ecf..b0e331dc 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -65,6 +65,7 @@ pub async fn spawn_dataflow( dataflow_id: uuid, working_dir, nodes, + daemon_communication: descriptor.daemon_config, }; let message = serde_json::to_vec(&DaemonCoordinatorEvent::Spawn(spawn_command))?; diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 50928b6c..67470957 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -24,3 +24,4 @@ futures = "0.3.25" clap = { version = "3.1.8", features = ["derive"] } shared-memory-server = { path = "../../libraries/shared-memory-server" } ctrlc = "3.2.5" +bincode = "1.3.3" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index b46b8fe2..c2824980 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -3,8 +3,8 @@ use dora_core::{ config::{DataId, InputMapping, NodeId}, coordinator_messages::DaemonEvent, daemon_messages::{ - self, DaemonCoordinatorEvent, DaemonCoordinatorReply, DaemonReply, DataflowId, DropToken, - SpawnDataflowNodes, + self, DaemonCommunicationConfig, DaemonCoordinatorEvent, DaemonCoordinatorReply, + DaemonReply, DataflowId, DropToken, SpawnDataflowNodes, }, descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, }; @@ -68,12 +68,14 @@ impl Daemon { .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))? .to_owned(); - let nodes = read_descriptor(dataflow_path).await?.resolve_aliases(); + let descriptor = read_descriptor(dataflow_path).await?; + let nodes = descriptor.resolve_aliases(); let spawn_command = SpawnDataflowNodes { dataflow_id: Uuid::new_v4(), working_dir, nodes, + daemon_communication: descriptor.daemon_config, }; let exit_when_done = spawn_command @@ -234,8 +236,11 @@ impl Daemon { dataflow_id, working_dir, nodes, + daemon_communication, }) => { - let result = self.spawn_dataflow(dataflow_id, working_dir, nodes).await; + let result = self + .spawn_dataflow(dataflow_id, working_dir, nodes, daemon_communication) + .await; if let Err(err) = &result { tracing::error!("{err:?}"); } @@ -276,6 +281,7 @@ impl Daemon { dataflow_id: uuid::Uuid, working_dir: PathBuf, nodes: Vec, + daemon_communication_config: DaemonCommunicationConfig, ) -> eyre::Result<()> { let dataflow = match self.running.entry(dataflow_id) { std::collections::hash_map::Entry::Vacant(entry) => entry.insert(Default::default()), @@ -318,6 +324,7 @@ impl Daemon { node, self.events_tx.clone(), self.shared_memory_handler_node.clone(), + daemon_communication_config, ) .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?; diff --git a/binaries/daemon/src/listener/mod.rs b/binaries/daemon/src/listener/mod.rs index 9e12a2de..70fbb7d5 100644 --- a/binaries/daemon/src/listener/mod.rs +++ b/binaries/daemon/src/listener/mod.rs @@ -1,298 +1,2 @@ -use std::collections::VecDeque; - -use crate::{shared_mem_handler, DaemonNodeEvent, Event}; -use dora_core::{ - config::NodeId, - daemon_messages::{DaemonReply, DaemonRequest, DataflowId, DropEvent, NodeEvent}, -}; -use eyre::{eyre, Context}; -use shared_memory_server::ShmemServer; -use tokio::sync::{mpsc, oneshot}; - -#[tracing::instrument(skip(server, daemon_tx, shmem_handler_tx))] -pub fn listener_loop( - mut server: ShmemServer, - daemon_tx: mpsc::Sender, - shmem_handler_tx: flume::Sender, -) { - // receive the first message - let message = match server - .listen() - .wrap_err("failed to receive register message") - { - Ok(Some(m)) => m, - Ok(None) => { - tracing::info!("channel disconnected before register message"); - return; - } // disconnected - Err(err) => { - tracing::info!("{err:?}"); - return; - } - }; - - match message { - DaemonRequest::Register { - dataflow_id, - node_id, - } => { - let reply = DaemonReply::Result(Ok(())); - match server - .send_reply(&reply) - .wrap_err("failed to send register reply") - { - Ok(()) => { - let mut listener = Listener { - dataflow_id, - node_id, - server, - daemon_tx, - shmem_handler_tx, - subscribed_events: None, - max_queue_len: 10, // TODO: make this configurable - queue: VecDeque::new(), - }; - match listener.run().wrap_err("listener failed") { - Ok(()) => {} - Err(err) => tracing::error!("{err:?}"), - } - } - Err(err) => { - tracing::warn!("{err:?}"); - } - } - } - _ => { - let reply = DaemonReply::Result(Err("must send register message first".into())); - if let Err(err) = server.send_reply(&reply).wrap_err("failed to send reply") { - tracing::warn!("{err:?}"); - } - } - } -} - -struct Listener { - dataflow_id: DataflowId, - node_id: NodeId, - server: ShmemServer, - daemon_tx: mpsc::Sender, - shmem_handler_tx: flume::Sender, - subscribed_events: Option>, - max_queue_len: usize, - queue: VecDeque, -} - -impl Listener { - fn run(&mut self) -> eyre::Result<()> { - loop { - // receive the next node message - let message = match self - .server - .listen() - .wrap_err("failed to receive DaemonRequest") - { - Ok(Some(m)) => m, - Ok(None) => { - tracing::info!( - "channel disconnected: {}/{}", - self.dataflow_id, - self.node_id - ); - break; - } // disconnected - Err(err) => { - tracing::warn!("{err:?}"); - continue; - } - }; - - // handle incoming events - self.handle_events()?; - - self.handle_message(message)?; - } - Ok(()) - } - - fn handle_events(&mut self) -> eyre::Result<()> { - if let Some(events) = &mut self.subscribed_events { - while let Ok(event) = events.try_recv() { - self.queue.push_back(event); - } - - // drop oldest input events to maintain max queue length queue - let input_event_count = self - .queue - .iter() - .filter(|e| matches!(e, NodeEvent::Input { .. })) - .count(); - let drop_n = input_event_count.saturating_sub(self.max_queue_len); - self.drop_oldest_inputs(drop_n)?; - } - Ok(()) - } - - fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> { - let mut drop_tokens = Vec::new(); - for i in 0..number { - // find index of oldest input event - let index = self - .queue - .iter() - .position(|e| matches!(e, NodeEvent::Input { .. })) - .expect(&format!("no input event found in drop iteration {i}")); - - // remove that event - if let Some(event) = self.queue.remove(index) { - if let NodeEvent::Input { - data: Some(data), .. - } = event - { - drop_tokens.push(data.drop_token); - } - } - } - self.report_drop_tokens(drop_tokens)?; - Ok(()) - } - - fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> { - match message { - DaemonRequest::Register { .. } => { - let reply = DaemonReply::Result(Err("unexpected register message".into())); - self.send_reply(&reply)?; - } - DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped)?, - DaemonRequest::CloseOutputs(outputs) => { - self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs))? - } - DaemonRequest::PrepareOutputMessage { - output_id, - metadata, - data_len, - } => { - let (reply_sender, reply) = oneshot::channel(); - let event = shared_mem_handler::NodeEvent::PrepareOutputMessage { - dataflow_id: self.dataflow_id, - node_id: self.node_id.clone(), - output_id, - metadata, - data_len, - reply_sender, - }; - self.send_shared_memory_event(event)?; - let reply = reply - .blocking_recv() - .wrap_err("failed to receive prepare output reply")?; - // tracing::debug!("prepare latency: {:?}", start.elapsed()?); - self.send_reply(&reply)?; - } - DaemonRequest::SendPreparedMessage { id } => { - let (reply_sender, reply) = oneshot::channel(); - let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender }; - self.send_shared_memory_event(event)?; - self.send_reply( - &reply - .blocking_recv() - .wrap_err("failed to receive send output reply")?, - )?; - } - DaemonRequest::SendEmptyMessage { - output_id, - metadata, - } => { - // let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?; - // tracing::debug!("listener SendEmptyMessage: {elapsed:?}"); - let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut { - dataflow_id: self.dataflow_id, - node_id: self.node_id.clone(), - output_id, - metadata, - data: None, - }); - let result = self - .send_daemon_event(event) - .map_err(|_| "failed to receive send_empty_message reply".to_owned()); - self.send_reply(&DaemonReply::Result(result))?; - } - DaemonRequest::Subscribe => { - let (tx, rx) = flume::bounded(100); - self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx })?; - self.subscribed_events = Some(rx); - } - DaemonRequest::NextEvent { drop_tokens } => { - self.report_drop_tokens(drop_tokens)?; - - // try to take the latest queued event first - let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent); - let reply = queued_event.unwrap_or_else(|| { - match self.subscribed_events.as_mut() { - // wait for next event - Some(events) => match events.recv() { - Ok(event) => DaemonReply::NodeEvent(event), - Err(flume::RecvError::Disconnected) => DaemonReply::Closed, - }, - None => { - DaemonReply::Result(Err("Ignoring event request because no subscribe \ - message was sent yet" - .into())) - } - } - }); - - self.send_reply(&reply)?; - } - } - Ok(()) - } - - fn report_drop_tokens( - &mut self, - drop_tokens: Vec, - ) -> eyre::Result<()> { - if !drop_tokens.is_empty() { - let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent { - tokens: drop_tokens, - }); - self.send_shared_memory_event(drop_event)?; - } - Ok(()) - } - - fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> { - // send NodeEvent to daemon main loop - let (reply_tx, reply) = oneshot::channel(); - let event = Event::Node { - dataflow_id: self.dataflow_id.clone(), - node_id: self.node_id.clone(), - event, - reply_sender: reply_tx, - }; - self.daemon_tx - .blocking_send(event) - .map_err(|_| eyre!("failed to send event to daemon"))?; - let reply = reply - .blocking_recv() - .map_err(|_| eyre!("failed to receive reply from daemon"))?; - self.send_reply(&reply)?; - Ok(()) - } - - fn send_reply(&mut self, reply: &DaemonReply) -> eyre::Result<()> { - self.server - .send_reply(&reply) - .wrap_err("failed to send reply to node") - } - - fn send_shared_memory_event(&self, event: shared_mem_handler::NodeEvent) -> eyre::Result<()> { - self.shmem_handler_tx - .send(event) - .map_err(|_| eyre!("failed to send event to shared_mem_handler")) - } - - fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> { - self.daemon_tx - .blocking_send(event) - .map_err(|_| eyre!("failed to send event to daemon")) - } -} +pub mod shmem; +pub mod tcp; diff --git a/binaries/daemon/src/listener/shmem.rs b/binaries/daemon/src/listener/shmem.rs new file mode 100644 index 00000000..9e12a2de --- /dev/null +++ b/binaries/daemon/src/listener/shmem.rs @@ -0,0 +1,298 @@ +use std::collections::VecDeque; + +use crate::{shared_mem_handler, DaemonNodeEvent, Event}; +use dora_core::{ + config::NodeId, + daemon_messages::{DaemonReply, DaemonRequest, DataflowId, DropEvent, NodeEvent}, +}; +use eyre::{eyre, Context}; +use shared_memory_server::ShmemServer; +use tokio::sync::{mpsc, oneshot}; + +#[tracing::instrument(skip(server, daemon_tx, shmem_handler_tx))] +pub fn listener_loop( + mut server: ShmemServer, + daemon_tx: mpsc::Sender, + shmem_handler_tx: flume::Sender, +) { + // receive the first message + let message = match server + .listen() + .wrap_err("failed to receive register message") + { + Ok(Some(m)) => m, + Ok(None) => { + tracing::info!("channel disconnected before register message"); + return; + } // disconnected + Err(err) => { + tracing::info!("{err:?}"); + return; + } + }; + + match message { + DaemonRequest::Register { + dataflow_id, + node_id, + } => { + let reply = DaemonReply::Result(Ok(())); + match server + .send_reply(&reply) + .wrap_err("failed to send register reply") + { + Ok(()) => { + let mut listener = Listener { + dataflow_id, + node_id, + server, + daemon_tx, + shmem_handler_tx, + subscribed_events: None, + max_queue_len: 10, // TODO: make this configurable + queue: VecDeque::new(), + }; + match listener.run().wrap_err("listener failed") { + Ok(()) => {} + Err(err) => tracing::error!("{err:?}"), + } + } + Err(err) => { + tracing::warn!("{err:?}"); + } + } + } + _ => { + let reply = DaemonReply::Result(Err("must send register message first".into())); + if let Err(err) = server.send_reply(&reply).wrap_err("failed to send reply") { + tracing::warn!("{err:?}"); + } + } + } +} + +struct Listener { + dataflow_id: DataflowId, + node_id: NodeId, + server: ShmemServer, + daemon_tx: mpsc::Sender, + shmem_handler_tx: flume::Sender, + subscribed_events: Option>, + max_queue_len: usize, + queue: VecDeque, +} + +impl Listener { + fn run(&mut self) -> eyre::Result<()> { + loop { + // receive the next node message + let message = match self + .server + .listen() + .wrap_err("failed to receive DaemonRequest") + { + Ok(Some(m)) => m, + Ok(None) => { + tracing::info!( + "channel disconnected: {}/{}", + self.dataflow_id, + self.node_id + ); + break; + } // disconnected + Err(err) => { + tracing::warn!("{err:?}"); + continue; + } + }; + + // handle incoming events + self.handle_events()?; + + self.handle_message(message)?; + } + Ok(()) + } + + fn handle_events(&mut self) -> eyre::Result<()> { + if let Some(events) = &mut self.subscribed_events { + while let Ok(event) = events.try_recv() { + self.queue.push_back(event); + } + + // drop oldest input events to maintain max queue length queue + let input_event_count = self + .queue + .iter() + .filter(|e| matches!(e, NodeEvent::Input { .. })) + .count(); + let drop_n = input_event_count.saturating_sub(self.max_queue_len); + self.drop_oldest_inputs(drop_n)?; + } + Ok(()) + } + + fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> { + let mut drop_tokens = Vec::new(); + for i in 0..number { + // find index of oldest input event + let index = self + .queue + .iter() + .position(|e| matches!(e, NodeEvent::Input { .. })) + .expect(&format!("no input event found in drop iteration {i}")); + + // remove that event + if let Some(event) = self.queue.remove(index) { + if let NodeEvent::Input { + data: Some(data), .. + } = event + { + drop_tokens.push(data.drop_token); + } + } + } + self.report_drop_tokens(drop_tokens)?; + Ok(()) + } + + fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> { + match message { + DaemonRequest::Register { .. } => { + let reply = DaemonReply::Result(Err("unexpected register message".into())); + self.send_reply(&reply)?; + } + DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped)?, + DaemonRequest::CloseOutputs(outputs) => { + self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs))? + } + DaemonRequest::PrepareOutputMessage { + output_id, + metadata, + data_len, + } => { + let (reply_sender, reply) = oneshot::channel(); + let event = shared_mem_handler::NodeEvent::PrepareOutputMessage { + dataflow_id: self.dataflow_id, + node_id: self.node_id.clone(), + output_id, + metadata, + data_len, + reply_sender, + }; + self.send_shared_memory_event(event)?; + let reply = reply + .blocking_recv() + .wrap_err("failed to receive prepare output reply")?; + // tracing::debug!("prepare latency: {:?}", start.elapsed()?); + self.send_reply(&reply)?; + } + DaemonRequest::SendPreparedMessage { id } => { + let (reply_sender, reply) = oneshot::channel(); + let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender }; + self.send_shared_memory_event(event)?; + self.send_reply( + &reply + .blocking_recv() + .wrap_err("failed to receive send output reply")?, + )?; + } + DaemonRequest::SendEmptyMessage { + output_id, + metadata, + } => { + // let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?; + // tracing::debug!("listener SendEmptyMessage: {elapsed:?}"); + let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut { + dataflow_id: self.dataflow_id, + node_id: self.node_id.clone(), + output_id, + metadata, + data: None, + }); + let result = self + .send_daemon_event(event) + .map_err(|_| "failed to receive send_empty_message reply".to_owned()); + self.send_reply(&DaemonReply::Result(result))?; + } + DaemonRequest::Subscribe => { + let (tx, rx) = flume::bounded(100); + self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx })?; + self.subscribed_events = Some(rx); + } + DaemonRequest::NextEvent { drop_tokens } => { + self.report_drop_tokens(drop_tokens)?; + + // try to take the latest queued event first + let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent); + let reply = queued_event.unwrap_or_else(|| { + match self.subscribed_events.as_mut() { + // wait for next event + Some(events) => match events.recv() { + Ok(event) => DaemonReply::NodeEvent(event), + Err(flume::RecvError::Disconnected) => DaemonReply::Closed, + }, + None => { + DaemonReply::Result(Err("Ignoring event request because no subscribe \ + message was sent yet" + .into())) + } + } + }); + + self.send_reply(&reply)?; + } + } + Ok(()) + } + + fn report_drop_tokens( + &mut self, + drop_tokens: Vec, + ) -> eyre::Result<()> { + if !drop_tokens.is_empty() { + let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent { + tokens: drop_tokens, + }); + self.send_shared_memory_event(drop_event)?; + } + Ok(()) + } + + fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> { + // send NodeEvent to daemon main loop + let (reply_tx, reply) = oneshot::channel(); + let event = Event::Node { + dataflow_id: self.dataflow_id.clone(), + node_id: self.node_id.clone(), + event, + reply_sender: reply_tx, + }; + self.daemon_tx + .blocking_send(event) + .map_err(|_| eyre!("failed to send event to daemon"))?; + let reply = reply + .blocking_recv() + .map_err(|_| eyre!("failed to receive reply from daemon"))?; + self.send_reply(&reply)?; + Ok(()) + } + + fn send_reply(&mut self, reply: &DaemonReply) -> eyre::Result<()> { + self.server + .send_reply(&reply) + .wrap_err("failed to send reply to node") + } + + fn send_shared_memory_event(&self, event: shared_mem_handler::NodeEvent) -> eyre::Result<()> { + self.shmem_handler_tx + .send(event) + .map_err(|_| eyre!("failed to send event to shared_mem_handler")) + } + + fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> { + self.daemon_tx + .blocking_send(event) + .map_err(|_| eyre!("failed to send event to daemon")) + } +} diff --git a/binaries/daemon/src/listener/tcp.rs b/binaries/daemon/src/listener/tcp.rs new file mode 100644 index 00000000..c34b3aa9 --- /dev/null +++ b/binaries/daemon/src/listener/tcp.rs @@ -0,0 +1,368 @@ +use std::collections::VecDeque; + +use crate::{ + shared_mem_handler, + tcp_utils::{tcp_receive, tcp_send}, + DaemonNodeEvent, Event, +}; +use dora_core::{ + config::NodeId, + daemon_messages::{DaemonReply, DaemonRequest, DataflowId, DropEvent, NodeEvent}, +}; +use eyre::{eyre, Context}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::{mpsc, oneshot}, +}; + +#[tracing::instrument(skip(listener, daemon_tx, shmem_handler_tx))] +pub async fn listener_loop( + listener: TcpListener, + daemon_tx: mpsc::Sender, + shmem_handler_tx: flume::Sender, +) { + loop { + match listener + .accept() + .await + .wrap_err("failed to accept new connection") + { + Err(err) => { + tracing::info!("{err}"); + } + Ok((connection, _)) => { + tokio::spawn(handle_connection_loop( + connection, + daemon_tx.clone(), + shmem_handler_tx.clone(), + )); + } + } + } +} + +#[tracing::instrument(skip(connection, daemon_tx, shmem_handler_tx))] +pub async fn handle_connection_loop( + mut connection: TcpStream, + daemon_tx: mpsc::Sender, + shmem_handler_tx: flume::Sender, +) { + if let Err(err) = connection.set_nodelay(true) { + tracing::warn!("failed to set nodelay for connection: {err}"); + } + + // receive the first message + let message = match receive_message(&mut connection) + .await + .wrap_err("failed to receive register message") + { + Ok(Some(m)) => m, + Ok(None) => { + tracing::info!("channel disconnected before register message"); + return; + } // disconnected + Err(err) => { + tracing::info!("{err:?}"); + return; + } + }; + + match message { + DaemonRequest::Register { + dataflow_id, + node_id, + } => { + let reply = DaemonReply::Result(Ok(())); + match send_reply(&mut connection, &reply) + .await + .wrap_err("failed to send register reply") + { + Ok(()) => { + let mut listener = Listener { + dataflow_id, + node_id, + connection, + daemon_tx, + shmem_handler_tx, + subscribed_events: None, + max_queue_len: 10, // TODO: make this configurable + queue: VecDeque::new(), + }; + match listener.run().await.wrap_err("listener failed") { + Ok(()) => {} + Err(err) => tracing::error!("{err:?}"), + } + } + Err(err) => { + tracing::warn!("{err:?}"); + } + } + } + _ => { + let reply = DaemonReply::Result(Err("must send register message first".into())); + if let Err(err) = send_reply(&mut connection, &reply) + .await + .wrap_err("failed to send reply") + { + tracing::warn!("{err:?}"); + } + } + } +} + +async fn receive_message(connection: &mut TcpStream) -> eyre::Result> { + let raw = match tcp_receive(connection).await { + Ok(raw) => raw, + Err(err) => match err.kind() { + std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => { + return Ok(None) + } + other => { + return Err(err) + .context("unexpected I/O error while trying to receive DaemonRequest") + } + }, + }; + bincode::deserialize(&raw) + .wrap_err("failed to deserialize DaemonRequest") + .map(Some) +} + +async fn send_reply(connection: &mut TcpStream, message: &DaemonReply) -> eyre::Result<()> { + let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?; + tcp_send(connection, &serialized) + .await + .wrap_err("failed to send DaemonReply")?; + Ok(()) +} + +struct Listener { + dataflow_id: DataflowId, + node_id: NodeId, + connection: TcpStream, + daemon_tx: mpsc::Sender, + shmem_handler_tx: flume::Sender, + subscribed_events: Option>, + max_queue_len: usize, + queue: VecDeque, +} + +impl Listener { + async fn run(&mut self) -> eyre::Result<()> { + loop { + // receive the next node message + let message = match receive_message(&mut self.connection) + .await + .wrap_err("failed to receive DaemonRequest") + { + Ok(Some(m)) => m, + Ok(None) => { + tracing::info!( + "channel disconnected: {}/{}", + self.dataflow_id, + self.node_id + ); + break; + } // disconnected + Err(err) => { + tracing::warn!("{err:?}"); + continue; + } + }; + + // handle incoming events + self.handle_events()?; + + self.handle_message(message).await?; + } + Ok(()) + } + + fn handle_events(&mut self) -> eyre::Result<()> { + if let Some(events) = &mut self.subscribed_events { + while let Ok(event) = events.try_recv() { + self.queue.push_back(event); + } + + // drop oldest input events to maintain max queue length queue + let input_event_count = self + .queue + .iter() + .filter(|e| matches!(e, NodeEvent::Input { .. })) + .count(); + let drop_n = input_event_count.saturating_sub(self.max_queue_len); + self.drop_oldest_inputs(drop_n)?; + } + Ok(()) + } + + fn drop_oldest_inputs(&mut self, number: usize) -> Result<(), eyre::ErrReport> { + let mut drop_tokens = Vec::new(); + for i in 0..number { + // find index of oldest input event + let index = self + .queue + .iter() + .position(|e| matches!(e, NodeEvent::Input { .. })) + .expect(&format!("no input event found in drop iteration {i}")); + + // remove that event + if let Some(event) = self.queue.remove(index) { + if let NodeEvent::Input { + data: Some(data), .. + } = event + { + drop_tokens.push(data.drop_token); + } + } + } + self.report_drop_tokens(drop_tokens)?; + Ok(()) + } + + async fn handle_message(&mut self, message: DaemonRequest) -> eyre::Result<()> { + match message { + DaemonRequest::Register { .. } => { + let reply = DaemonReply::Result(Err("unexpected register message".into())); + self.send_reply(&reply).await?; + } + DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped).await?, + DaemonRequest::CloseOutputs(outputs) => { + self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs)) + .await? + } + DaemonRequest::PrepareOutputMessage { + output_id, + metadata, + data_len, + } => { + let (reply_sender, reply) = oneshot::channel(); + let event = shared_mem_handler::NodeEvent::PrepareOutputMessage { + dataflow_id: self.dataflow_id, + node_id: self.node_id.clone(), + output_id, + metadata, + data_len, + reply_sender, + }; + self.send_shared_memory_event(event)?; + let reply = reply + .await + .wrap_err("failed to receive prepare output reply")?; + // tracing::debug!("prepare latency: {:?}", start.elapsed()?); + self.send_reply(&reply).await?; + } + DaemonRequest::SendPreparedMessage { id } => { + let (reply_sender, reply) = oneshot::channel(); + let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender }; + self.send_shared_memory_event(event)?; + self.send_reply( + &reply + .await + .wrap_err("failed to receive send output reply")?, + ) + .await?; + } + DaemonRequest::SendEmptyMessage { + output_id, + metadata, + } => { + // let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?; + // tracing::debug!("listener SendEmptyMessage: {elapsed:?}"); + let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut { + dataflow_id: self.dataflow_id, + node_id: self.node_id.clone(), + output_id, + metadata, + data: None, + }); + let result = self + .send_daemon_event(event) + .await + .map_err(|_| "failed to receive send_empty_message reply".to_owned()); + self.send_reply(&DaemonReply::Result(result)).await?; + } + DaemonRequest::Subscribe => { + let (tx, rx) = flume::bounded(100); + self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx }) + .await?; + self.subscribed_events = Some(rx); + } + DaemonRequest::NextEvent { drop_tokens } => { + self.report_drop_tokens(drop_tokens)?; + + // try to take the latest queued event first + let queued_event = self.queue.pop_front().map(DaemonReply::NodeEvent); + let reply = queued_event.unwrap_or_else(|| { + match self.subscribed_events.as_mut() { + // wait for next event + Some(events) => match events.recv() { + Ok(event) => DaemonReply::NodeEvent(event), + Err(flume::RecvError::Disconnected) => DaemonReply::Closed, + }, + None => { + DaemonReply::Result(Err("Ignoring event request because no subscribe \ + message was sent yet" + .into())) + } + } + }); + + self.send_reply(&reply).await?; + } + } + Ok(()) + } + + fn report_drop_tokens( + &mut self, + drop_tokens: Vec, + ) -> eyre::Result<()> { + if !drop_tokens.is_empty() { + let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent { + tokens: drop_tokens, + }); + self.send_shared_memory_event(drop_event)?; + } + Ok(()) + } + + async fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> { + // send NodeEvent to daemon main loop + let (reply_tx, reply) = oneshot::channel(); + let event = Event::Node { + dataflow_id: self.dataflow_id.clone(), + node_id: self.node_id.clone(), + event, + reply_sender: reply_tx, + }; + self.daemon_tx + .send(event) + .await + .map_err(|_| eyre!("failed to send event to daemon"))?; + let reply = reply + .await + .map_err(|_| eyre!("failed to receive reply from daemon"))?; + self.send_reply(&reply).await?; + Ok(()) + } + + async fn send_reply(&mut self, reply: &DaemonReply) -> eyre::Result<()> { + send_reply(&mut self.connection, reply) + .await + .wrap_err("failed to send reply to node") + } + + fn send_shared_memory_event(&self, event: shared_mem_handler::NodeEvent) -> eyre::Result<()> { + self.shmem_handler_tx + .send(event) + .map_err(|_| eyre!("failed to send event to shared_mem_handler")) + } + + async fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> { + self.daemon_tx + .send(event) + .await + .map_err(|_| eyre!("failed to send event to daemon")) + } +} diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 6616b30c..de016221 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,17 +1,18 @@ use crate::{ - listener::listener_loop, runtime_node_inputs, runtime_node_outputs, shared_mem_handler, - DoraEvent, Event, + listener, runtime_node_inputs, runtime_node_outputs, shared_mem_handler, DoraEvent, Event, }; use dora_core::{ - config::NodeRunConfig, - daemon_messages::{DataflowId, NodeConfig, RuntimeConfig}, + config::{NodeId, NodeRunConfig}, + daemon_messages::{ + DaemonCommunication, DaemonCommunicationConfig, DataflowId, NodeConfig, RuntimeConfig, + }, descriptor::{resolve_path, source_is_url, OperatorSource, ResolvedNode}, }; use dora_download::download_file; use eyre::{eyre, WrapErr}; use shared_memory_server::{ShmemConf, ShmemServer}; -use std::{env::consts::EXE_EXTENSION, path::Path, process::Stdio}; -use tokio::sync::mpsc; +use std::{env::consts::EXE_EXTENSION, net::Ipv4Addr, path::Path, process::Stdio}; +use tokio::{net::TcpListener, sync::mpsc}; pub async fn spawn_node( dataflow_id: DataflowId, @@ -19,38 +20,19 @@ pub async fn spawn_node( node: ResolvedNode, daemon_tx: mpsc::Sender, shmem_handler_tx: flume::Sender, + config: DaemonCommunicationConfig, ) -> eyre::Result<()> { let node_id = node.id.clone(); tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); - let daemon_control_region = ShmemConf::new() - .size(4096) - .create() - .wrap_err("failed to allocate daemon_control_region")?; - let daemon_events_region = ShmemConf::new() - .size(4096) - .create() - .wrap_err("failed to allocate daemon_events_region")?; - let daemon_control_region_id = daemon_control_region.get_os_id().to_owned(); - let daemon_events_region_id = daemon_events_region.get_os_id().to_owned(); - { - let server = unsafe { ShmemServer::new(daemon_control_region) } - .wrap_err("failed to create control server")?; - let daemon_tx = daemon_tx.clone(); - let shmem_handler_tx = shmem_handler_tx.clone(); - tokio::task::spawn_blocking(move || listener_loop(server, daemon_tx, shmem_handler_tx)); - } - { - let server = unsafe { ShmemServer::new(daemon_events_region) } - .wrap_err("failed to create events server")?; - let event_loop_node_id = format!("{dataflow_id}/{node_id}"); - let daemon_tx = daemon_tx.clone(); - let shmem_handler_tx = shmem_handler_tx.clone(); - tokio::task::spawn_blocking(move || { - listener_loop(server, daemon_tx, shmem_handler_tx); - tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); - }); - } + let daemon_communication = daemon_communication_config( + &dataflow_id, + &node_id, + &daemon_tx, + &shmem_handler_tx, + config, + ) + .await?; let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { @@ -64,7 +46,7 @@ pub async fn spawn_node( .wrap_err("failed to download custom node")?; target_path.clone() } else { - resolve_path(&n.source, &working_dir) + resolve_path(&n.source, working_dir) .wrap_err_with(|| format!("failed to resolve node source `{}`", n.source))? }; @@ -76,8 +58,7 @@ pub async fn spawn_node( dataflow_id, node_id: node_id.clone(), run_config: n.run_config.clone(), - daemon_control_region_id, - daemon_events_region_id, + daemon_communication, }; if let Some(args) = &n.args { command.args(args.split_ascii_whitespace()); @@ -133,8 +114,7 @@ pub async fn spawn_node( inputs: runtime_node_inputs(&n), outputs: runtime_node_outputs(&n), }, - daemon_control_region_id, - daemon_events_region_id, + daemon_communication, }, operators: n.operators, }; @@ -170,3 +150,69 @@ pub async fn spawn_node( }); Ok(()) } + +async fn daemon_communication_config( + dataflow_id: &DataflowId, + node_id: &NodeId, + daemon_tx: &mpsc::Sender, + shmem_handler_tx: &flume::Sender, + config: DaemonCommunicationConfig, +) -> eyre::Result { + match config { + DaemonCommunicationConfig::Tcp => { + let localhost = Ipv4Addr::new(127, 0, 0, 1); + let socket = match TcpListener::bind((localhost, 0)).await { + Ok(socket) => socket, + Err(err) => { + return Err( + eyre::Report::new(err).wrap_err("failed to create local TCP listener") + ) + } + }; + let socket_addr = socket + .local_addr() + .wrap_err("failed to get local addr of socket")?; + + Ok(DaemonCommunication::Tcp { socket_addr }) + } + DaemonCommunicationConfig::Shmem => { + let daemon_control_region = ShmemConf::new() + .size(4096) + .create() + .wrap_err("failed to allocate daemon_control_region")?; + let daemon_events_region = ShmemConf::new() + .size(4096) + .create() + .wrap_err("failed to allocate daemon_events_region")?; + let daemon_control_region_id = daemon_control_region.get_os_id().to_owned(); + let daemon_events_region_id = daemon_events_region.get_os_id().to_owned(); + + { + let server = unsafe { ShmemServer::new(daemon_control_region) } + .wrap_err("failed to create control server")?; + let daemon_tx = daemon_tx.clone(); + let shmem_handler_tx = shmem_handler_tx.clone(); + tokio::task::spawn_blocking(move || { + listener::shmem::listener_loop(server, daemon_tx, shmem_handler_tx) + }); + } + + { + let server = unsafe { ShmemServer::new(daemon_events_region) } + .wrap_err("failed to create events server")?; + let event_loop_node_id = format!("{dataflow_id}/{node_id}"); + let daemon_tx = daemon_tx.clone(); + let shmem_handler_tx = shmem_handler_tx.clone(); + tokio::task::spawn_blocking(move || { + listener::shmem::listener_loop(server, daemon_tx, shmem_handler_tx); + tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); + }); + } + + Ok(DaemonCommunication::Shmem { + daemon_control_region_id, + daemon_events_region_id, + }) + } + } +} diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 1013f0ce..c1bb4714 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -1,8 +1,8 @@ -use std::{collections::BTreeMap, path::PathBuf}; +use std::{net::SocketAddr, path::PathBuf}; use crate::{ config::{DataId, NodeId, NodeRunConfig}, - descriptor::{self, OperatorDefinition, ResolvedNode}, + descriptor::{OperatorDefinition, ResolvedNode}, }; use dora_message::Metadata; use uuid::Uuid; @@ -12,8 +12,18 @@ pub struct NodeConfig { pub dataflow_id: DataflowId, pub node_id: NodeId, pub run_config: NodeRunConfig, - pub daemon_control_region_id: SharedMemoryId, - pub daemon_events_region_id: SharedMemoryId, + pub daemon_communication: DaemonCommunication, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum DaemonCommunication { + Shmem { + daemon_control_region_id: SharedMemoryId, + daemon_events_region_id: SharedMemoryId, + }, + Tcp { + socket_addr: SocketAddr, + }, } #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -117,4 +127,17 @@ pub struct SpawnDataflowNodes { pub dataflow_id: DataflowId, pub working_dir: PathBuf, pub nodes: Vec, + pub daemon_communication: DaemonCommunicationConfig, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum DaemonCommunicationConfig { + Tcp, + Shmem, +} + +impl Default for DaemonCommunicationConfig { + fn default() -> Self { + Self::Shmem // TODO change to TCP + } } diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 238ea75b..41cbde15 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,4 +1,7 @@ -use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; +use crate::{ + config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}, + daemon_messages::DaemonCommunicationConfig, +}; use eyre::{bail, Result}; use serde::{Deserialize, Serialize}; use std::{ @@ -18,6 +21,8 @@ pub struct Descriptor { #[serde(with = "serde_yaml::with::singleton_map")] pub communication: CommunicationConfig, pub nodes: Vec, + #[serde(default)] + pub daemon_config: DaemonCommunicationConfig, } pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op";