diff --git a/apis/rust/node/src/daemon.rs b/apis/rust/node/src/daemon.rs index ce92abe1..68bc0377 100644 --- a/apis/rust/node/src/daemon.rs +++ b/apis/rust/node/src/daemon.rs @@ -7,7 +7,7 @@ use std::{ use dora_core::{ config::{DataId, NodeId}, - daemon_messages::{ControlRequest, DataflowId, NodeEvent}, + daemon_messages::{ControlRequest, DataflowId, DropEvent, NodeEvent}, }; use dora_message::Metadata; use eyre::{bail, eyre, Context}; @@ -21,14 +21,14 @@ pub struct DaemonConnection { impl DaemonConnection { pub fn init(dataflow_id: DataflowId, node_id: &NodeId, daemon_port: u16) -> eyre::Result { let daemon_addr = (Ipv4Addr::new(127, 0, 0, 1), daemon_port).into(); - let control_stream = init_control_stream(daemon_addr, dataflow_id, &node_id) + let control_channel = ControlChannel::init(daemon_addr, dataflow_id, node_id) .wrap_err("failed to init control stream")?; - let event_stream = init_event_stream(daemon_addr, dataflow_id, &node_id) + let event_stream = EventStream::init(daemon_addr, dataflow_id, node_id) .wrap_err("failed to init event stream")?; Ok(Self { - control_channel: ControlChannel(control_stream), + control_channel, event_stream, }) } @@ -37,6 +37,35 @@ impl DaemonConnection { pub struct ControlChannel(TcpStream); impl ControlChannel { + fn init( + daemon_addr: SocketAddr, + dataflow_id: DataflowId, + node_id: &NodeId, + ) -> eyre::Result { + let mut control_stream = + TcpStream::connect(daemon_addr).wrap_err("failed to connect to dora-daemon")?; + control_stream + .set_nodelay(true) + .wrap_err("failed to set TCP_NODELAY")?; + tcp_send( + &mut control_stream, + &ControlRequest::Register { + dataflow_id, + node_id: node_id.clone(), + }, + ) + .wrap_err("failed to send register request to dora-daemon")?; + match tcp_receive(&mut control_stream) + .wrap_err("failed to receive register reply from dora-daemon")? + { + dora_core::daemon_messages::ControlReply::Result(result) => result + .map_err(|e| eyre!(e)) + .wrap_err("failed to register node with dora-daemon")?, + other => bail!("unexpected register reply: {other:?}"), + } + Ok(Self(control_stream)) + } + pub fn report_stop(&mut self) -> eyre::Result<()> { tcp_send(&mut self.0, &ControlRequest::Stopped) .wrap_err("failed to send subscribe request to dora-daemon")?; @@ -101,8 +130,84 @@ pub struct EventStream { } impl EventStream { + fn init( + daemon_addr: SocketAddr, + dataflow_id: DataflowId, + node_id: &NodeId, + ) -> eyre::Result { + let mut event_stream = + TcpStream::connect(daemon_addr).wrap_err("failed to connect to dora-daemon")?; + event_stream + .set_nodelay(true) + .wrap_err("failed to set TCP_NODELAY")?; + tcp_send( + &mut event_stream, + &ControlRequest::Subscribe { + dataflow_id, + node_id: node_id.clone(), + }, + ) + .wrap_err("failed to send subscribe request to dora-daemon")?; + match tcp_receive(&mut event_stream) + .wrap_err("failed to receive subscribe reply from dora-daemon")? + { + dora_core::daemon_messages::ControlReply::Result(result) => result + .map_err(|e| eyre!(e)) + .wrap_err("failed to create subscription with dora-daemon")?, + other => bail!("unexpected subscribe reply: {other:?}"), + } + + let (tx, rx) = flume::bounded(1); + std::thread::spawn(move || loop { + let event: NodeEvent = match tcp_receive(&mut event_stream) { + Ok(event) => event, + Err(err) if err.kind() == ErrorKind::UnexpectedEof => break, + Err(err) => { + let err = eyre!(err).wrap_err("failed to receive incoming event"); + tracing::warn!("{err:?}"); + continue; + } + }; + let drop_token = match &event { + NodeEvent::Input { + data: Some(data), .. + } => Some(data.drop_token.clone()), + NodeEvent::Stop + | NodeEvent::InputClosed { .. } + | NodeEvent::Input { data: None, .. } => None, + }; + + let (drop_tx, drop_rx) = std::sync::mpsc::channel(); + match tx.send((event, drop_tx)) { + Ok(()) => {} + Err(_) => { + // receiving end of channel was closed + break; + } + } + + match drop_rx.recv_timeout(Duration::from_secs(30)) { + Ok(()) => panic!("Node API should not send anything on ACK channel"), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + tracing::warn!("timeout while waiting for input ACK"); + } + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {} // expected result + } + + if let Some(token) = drop_token { + let message = DropEvent { token }; + if let Err(err) = tcp_send(&mut event_stream, &message) { + tracing::warn!("failed to send drop token: {err}"); + break; + } + } + }); + + Ok(EventStream { receiver: rx }) + } + pub fn recv(&mut self) -> Option { - let (node_event, ack) = match self.receiver.recv() { + let (node_event, drop_sender) = match self.receiver.recv() { Ok(d) => d, Err(flume::RecvError::Disconnected) => return None, }; @@ -117,7 +222,10 @@ impl EventStream { Ok(mapped) => Event::Input { id, metadata, - data: mapped.map(|data| Data { data, _ack: ack }), + data: mapped.map(|data| Data { + data, + _drop: drop_sender, + }), }, Err(err) => Event::Error(format!("{err:?}")), } @@ -132,66 +240,6 @@ pub struct MessageSample { pub id: String, } -fn init_event_stream( - daemon_addr: SocketAddr, - dataflow_id: DataflowId, - node_id: &NodeId, -) -> eyre::Result { - let mut event_stream = - TcpStream::connect(daemon_addr).wrap_err("failed to connect to dora-daemon")?; - event_stream - .set_nodelay(true) - .wrap_err("failed to set TCP_NODELAY")?; - tcp_send( - &mut event_stream, - &ControlRequest::Subscribe { - dataflow_id, - node_id: node_id.clone(), - }, - ) - .wrap_err("failed to send subscribe request to dora-daemon")?; - match tcp_receive(&mut event_stream) - .wrap_err("failed to receive subscribe reply from dora-daemon")? - { - dora_core::daemon_messages::ControlReply::Result(result) => result - .map_err(|e| eyre!(e)) - .wrap_err("failed to create subscription with dora-daemon")?, - other => bail!("unexpected subscribe reply: {other:?}"), - } - - let (tx, rx) = flume::bounded(1); - std::thread::spawn(move || loop { - let event: NodeEvent = match tcp_receive(&mut event_stream) { - Ok(event) => event, - Err(err) if err.kind() == ErrorKind::UnexpectedEof => break, - Err(err) => { - let err = eyre!(err).wrap_err("failed to receive incoming event"); - tracing::warn!("{err:?}"); - continue; - } - }; - - let (ack_tx, ack_rx) = std::sync::mpsc::channel(); - match tx.send((event, ack_tx)) { - Ok(()) => {} - Err(_) => { - // receiving end of channel was closed - break; - } - } - - match ack_rx.recv_timeout(Duration::from_secs(30)) { - Ok(()) => panic!("Node API should not send anything on ACK channel"), - Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { - tracing::warn!("timeout while waiting for input ACK"); - } - Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {} // expected result - } - }); - - Ok(EventStream { receiver: rx }) -} - #[derive(Debug)] #[non_exhaustive] pub enum Event<'a> { @@ -209,7 +257,7 @@ pub enum Event<'a> { pub struct Data<'a> { data: MappedInputData<'a>, - _ack: std::sync::mpsc::Sender<()>, + _drop: std::sync::mpsc::Sender<()>, } impl std::ops::Deref for Data<'_> { @@ -252,35 +300,6 @@ impl std::ops::Deref for MappedInputData<'_> { } } -fn init_control_stream( - daemon_addr: SocketAddr, - dataflow_id: DataflowId, - node_id: &NodeId, -) -> eyre::Result { - let mut control_stream = - TcpStream::connect(daemon_addr).wrap_err("failed to connect to dora-daemon")?; - control_stream - .set_nodelay(true) - .wrap_err("failed to set TCP_NODELAY")?; - tcp_send( - &mut control_stream, - &ControlRequest::Register { - dataflow_id, - node_id: node_id.clone(), - }, - ) - .wrap_err("failed to send register request to dora-daemon")?; - match tcp_receive(&mut control_stream) - .wrap_err("failed to receive register reply from dora-daemon")? - { - dora_core::daemon_messages::ControlReply::Result(result) => result - .map_err(|e| eyre!(e)) - .wrap_err("failed to register node with dora-daemon")?, - other => bail!("unexpected register reply: {other:?}"), - } - Ok(control_stream) -} - fn tcp_send(connection: &mut TcpStream, request: &T) -> std::io::Result<()> { let serialized = serde_json::to_vec(request)?; diff --git a/binaries/daemon/src/listener.rs b/binaries/daemon/src/listener.rs index f3b0d3dc..ee75a4c5 100644 --- a/binaries/daemon/src/listener.rs +++ b/binaries/daemon/src/listener.rs @@ -2,7 +2,7 @@ use crate::{ tcp_utils::{tcp_receive, tcp_send}, DaemonNodeEvent, Event, }; -use dora_core::daemon_messages; +use dora_core::daemon_messages::{self, DropEvent}; use eyre::{eyre, Context}; use std::{io::ErrorKind, net::Ipv4Addr}; use tokio::{ @@ -136,16 +136,38 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende // enter subscribe loop after receiving a subscribe message if let Some(events) = enter_subscribe_loop { - subscribe_loop(connection, events).await; + subscribe_loop(connection, events, events_tx).await; break; // the subscribe loop only exits when the connection was closed } } } async fn subscribe_loop( - mut connection: TcpStream, + connection: TcpStream, events: flume::Receiver, + events_tx: mpsc::Sender, ) { + let (mut rx, mut tx) = connection.into_split(); + + tokio::spawn(async move { + loop { + let Ok(raw) = tcp_receive(&mut rx).await else { + break; + }; + + let event: DropEvent = match serde_json::from_slice(&raw) { + Ok(e) => e, + Err(err) => { + tracing::error!("Failed to parse incoming message: {err}"); + continue; + } + }; + if events_tx.send(Event::Drop(event)).await.is_err() { + break; + } + } + }); + while let Some(event) = events.stream().next().await { let message = match serde_json::to_vec(&event) { Ok(m) => m, @@ -155,7 +177,7 @@ async fn subscribe_loop( continue; } }; - match tcp_send(&mut connection, &message).await { + match tcp_send(&mut tx, &message).await { Ok(()) => {} Err(err) if err.kind() == ErrorKind::UnexpectedEof diff --git a/binaries/daemon/src/main.rs b/binaries/daemon/src/main.rs index 8e54867e..faedde5b 100644 --- a/binaries/daemon/src/main.rs +++ b/binaries/daemon/src/main.rs @@ -1,6 +1,9 @@ use dora_core::{ config::{DataId, InputMapping, NodeId}, - daemon_messages::{self, ControlReply, DaemonCoordinatorEvent, DataflowId, SpawnDataflowNodes}, + daemon_messages::{ + self, ControlReply, DaemonCoordinatorEvent, DataflowId, DropEvent, DropToken, + SpawnDataflowNodes, + }, topics::DORA_COORDINATOR_PORT_DEFAULT, }; use dora_message::uhlc::HLC; @@ -11,6 +14,7 @@ use shared_memory::{Shmem, ShmemConf}; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, net::{Ipv4Addr, SocketAddr}, + rc::Rc, time::Duration, }; use tokio::{ @@ -47,7 +51,7 @@ async fn run() -> eyre::Result<()> { struct Daemon { port: u16, uninit_shared_memory: HashMap, Shmem)>, - sent_out_shared_memory: HashMap, + sent_out_shared_memory: HashMap>, running: HashMap, @@ -117,6 +121,18 @@ impl Daemon { .await? } Event::Dora(event) => self.handle_dora_event(event).await?, + Event::Drop(DropEvent { token }) => { + match self.sent_out_shared_memory.remove(&token) { + Some(rc) => { + if let Ok(_shmem) = Rc::try_unwrap(rc) { + tracing::trace!( + "freeing shared memory after receiving last drop token" + ) + } + } + None => tracing::warn!("received unknown drop token {token:?}"), + } + } } } @@ -243,6 +259,8 @@ impl Daemon { .remove(&id) .ok_or_else(|| eyre!("invalid shared memory id"))?; + let memory = Rc::new(memory); + let dataflow = self .running .get_mut(&dataflow_id) @@ -259,17 +277,24 @@ impl Daemon { let mut closed = Vec::new(); for (receiver_id, input_id) in local_receivers { if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) { + let drop_token = DropToken::generate(); if channel .send_async(daemon_messages::NodeEvent::Input { id: input_id.clone(), metadata: metadata.clone(), - data: Some(unsafe { daemon_messages::InputData::new(id.clone()) }), + data: Some(daemon_messages::InputData { + shared_memory_id: id.clone(), + drop_token: drop_token.clone(), + }), }) .await .is_err() { closed.push(receiver_id); } + // keep shared memory ptr in order to free it once all subscribers are done + self.sent_out_shared_memory + .insert(drop_token, memory.clone()); } } for id in closed { @@ -279,9 +304,6 @@ impl Daemon { // TODO send `data` via network to all remove receivers let data = std::ptr::slice_from_raw_parts(memory.as_ptr(), memory.len()); - // keep shared memory ptr in order to free it once all subscribers are done - self.sent_out_shared_memory.insert(id, memory); - let _ = reply_sender.send(ControlReply::Result(Ok(()))); } DaemonNodeEvent::Stopped => { @@ -417,6 +439,7 @@ pub enum Event { }, Coordinator(DaemonCoordinatorEvent), Dora(DoraEvent), + Drop(DropEvent), } #[derive(Debug)] diff --git a/binaries/daemon/src/tcp_utils.rs b/binaries/daemon/src/tcp_utils.rs index 31f5e3b5..b6c31e30 100644 --- a/binaries/daemon/src/tcp_utils.rs +++ b/binaries/daemon/src/tcp_utils.rs @@ -1,16 +1,16 @@ -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::TcpStream, -}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -pub async fn tcp_send(connection: &mut TcpStream, message: &[u8]) -> std::io::Result<()> { +pub async fn tcp_send( + connection: &mut (impl AsyncWrite + Unpin), + message: &[u8], +) -> std::io::Result<()> { let len_raw = (message.len() as u64).to_le_bytes(); connection.write_all(&len_raw).await?; connection.write_all(message).await?; Ok(()) } -pub async fn tcp_receive(connection: &mut TcpStream) -> std::io::Result> { +pub async fn tcp_receive(connection: &mut (impl AsyncRead + Unpin)) -> std::io::Result> { let reply_len = { let mut raw = [0; 8]; connection.read_exact(&mut raw).await?; diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 7129ea9b..54e0b3fd 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -58,16 +58,27 @@ pub enum NodeEvent { } #[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct InputData { - pub shared_memory_id: SharedMemoryId, +pub struct DropEvent { + pub token: DropToken, } -impl InputData { - pub unsafe fn new(shared_memory_id: SharedMemoryId) -> Self { - Self { shared_memory_id } +#[derive( + Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +pub struct DropToken(Uuid); + +impl DropToken { + pub fn generate() -> Self { + Self(Uuid::new_v4()) } } +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct InputData { + pub shared_memory_id: SharedMemoryId, + pub drop_token: DropToken, +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum DaemonCoordinatorEvent { Spawn(SpawnDataflowNodes),