This commit migrates the allocation of shared memory samples from the daemon to the individual nodes. This way, the nodes can prepare the memory themselves when sending outputs. Thus, we avoid the extra roundtrip to the daemon that we used before (the prepare request and reply). The other advantage is that we can now queue shared memory outputs on the TCP socket without waiting for replies from the daemon (like we already do for `Vec`-backed messages).tags/v0.2.0
| @@ -1,7 +1,7 @@ | |||
| use super::{communication::DaemonChannel, EventStreamThreadHandle, MessageSample}; | |||
| use super::{communication::DaemonChannel, EventStreamThreadHandle}; | |||
| use dora_core::{ | |||
| config::{DataId, NodeId}, | |||
| daemon_messages::{DaemonRequest, DataflowId}, | |||
| daemon_messages::{DaemonRequest, Data, DataflowId}, | |||
| message::Metadata, | |||
| }; | |||
| use eyre::{bail, eyre, Context}; | |||
| @@ -56,49 +56,11 @@ impl ControlChannel { | |||
| Ok(()) | |||
| } | |||
| pub fn prepare_message( | |||
| &mut self, | |||
| output_id: DataId, | |||
| metadata: Metadata<'static>, | |||
| data_len: usize, | |||
| ) -> eyre::Result<MessageSample> { | |||
| let reply = self | |||
| .channel | |||
| .request(&DaemonRequest::PrepareOutputMessage { | |||
| output_id, | |||
| metadata, | |||
| data_len, | |||
| }) | |||
| .wrap_err("failed to send PrepareOutputMessage request to dora-daemon")?; | |||
| match reply { | |||
| dora_core::daemon_messages::DaemonReply::PreparedMessage { | |||
| shared_memory_id: id, | |||
| } => Ok(MessageSample { id }), | |||
| dora_core::daemon_messages::DaemonReply::Result(Err(err)) => { | |||
| Err(eyre!(err).wrap_err("failed to report stop event to dora-daemon")) | |||
| } | |||
| other => bail!("unexpected PrepareOutputMessage reply: {other:?}"), | |||
| } | |||
| } | |||
| pub fn send_prepared_message(&mut self, sample: MessageSample) -> eyre::Result<()> { | |||
| let reply = self | |||
| .channel | |||
| .request(&DaemonRequest::SendPreparedMessage { id: sample.id }) | |||
| .wrap_err("failed to send SendOutMessage request to dora-daemon")?; | |||
| match reply { | |||
| dora_core::daemon_messages::DaemonReply::Result(result) => { | |||
| result.map_err(|err| eyre!(err)) | |||
| } | |||
| other => bail!("unexpected SendOutMessage reply: {other:?}"), | |||
| } | |||
| } | |||
| pub fn send_message( | |||
| &mut self, | |||
| output_id: DataId, | |||
| metadata: Metadata<'static>, | |||
| data: Vec<u8>, | |||
| data: Option<Data>, | |||
| ) -> eyre::Result<()> { | |||
| let request = DaemonRequest::SendMessage { | |||
| output_id, | |||
| @@ -1,6 +1,6 @@ | |||
| use dora_core::{ | |||
| config::NodeId, | |||
| daemon_messages::{DaemonReply, DaemonRequest, DataflowId, NodeEvent}, | |||
| daemon_messages::{self, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeEvent}, | |||
| }; | |||
| use eyre::{eyre, Context}; | |||
| use std::{sync::Arc, time::Duration}; | |||
| @@ -19,7 +19,11 @@ impl EventStream { | |||
| dataflow_id: DataflowId, | |||
| node_id: &NodeId, | |||
| mut channel: DaemonChannel, | |||
| ) -> eyre::Result<(Self, Arc<EventStreamThreadHandle>)> { | |||
| ) -> eyre::Result<( | |||
| Self, | |||
| Arc<EventStreamThreadHandle>, | |||
| flume::Receiver<DropToken>, | |||
| )> { | |||
| channel.register(dataflow_id, node_id.clone())?; | |||
| channel | |||
| @@ -30,7 +34,10 @@ impl EventStream { | |||
| let (tx, rx) = flume::bounded(0); | |||
| let mut drop_tokens = Vec::new(); | |||
| let node_id = node_id.clone(); | |||
| let (finished_drop_tokens, finished_drop_tokens_rx) = flume::unbounded(); | |||
| let join_handle = std::thread::spawn(move || { | |||
| let mut tx = Some(tx); | |||
| let result = 'outer: loop { | |||
| let daemon_request = DaemonRequest::NextEvent { | |||
| drop_tokens: std::mem::take(&mut drop_tokens), | |||
| @@ -57,52 +64,80 @@ impl EventStream { | |||
| NodeEvent::Input { | |||
| data: Some(data), .. | |||
| } => data.drop_token(), | |||
| NodeEvent::Stop | |||
| | NodeEvent::InputClosed { .. } | |||
| | NodeEvent::Input { data: None, .. } => None, | |||
| NodeEvent::AllInputsClosed => { | |||
| // close the event stream | |||
| tx = None; | |||
| // skip this internal event | |||
| continue; | |||
| } | |||
| NodeEvent::OutputDropped { drop_token } => { | |||
| if let Err(flume::SendError(token)) = | |||
| finished_drop_tokens.send(*drop_token) | |||
| { | |||
| tracing::error!( | |||
| "failed to report drop_token `{token:?}` to dora node" | |||
| ); | |||
| } | |||
| // skip this internal event | |||
| continue; | |||
| } | |||
| _ => None, | |||
| }; | |||
| let (drop_tx, drop_rx) = std::sync::mpsc::channel(); | |||
| match tx.send(EventItem::NodeEvent { | |||
| event, | |||
| ack_channel: drop_tx, | |||
| }) { | |||
| Ok(()) => {} | |||
| Err(_) => { | |||
| // receiving end of channel was closed | |||
| break 'outer Ok(()); | |||
| if let Some(tx) = tx.as_ref() { | |||
| let (drop_tx, drop_rx) = std::sync::mpsc::channel(); | |||
| match tx.send(EventItem::NodeEvent { | |||
| event, | |||
| ack_channel: drop_tx, | |||
| }) { | |||
| Ok(()) => {} | |||
| Err(_) => { | |||
| // receiving end of channel was closed | |||
| break 'outer Ok(()); | |||
| } | |||
| } | |||
| } | |||
| let timeout = Duration::from_secs(30); | |||
| match drop_rx.recv_timeout(timeout) { | |||
| Ok(()) => { | |||
| break 'outer Err(eyre!( | |||
| "Node API should not send anything on ACK channel" | |||
| )) | |||
| let timeout = Duration::from_secs(30); | |||
| match drop_rx.recv_timeout(timeout) { | |||
| Ok(()) => { | |||
| break 'outer Err(eyre!( | |||
| "Node API should not send anything on ACK channel" | |||
| )) | |||
| } | |||
| Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { | |||
| tracing::warn!("timeout: event was not dropped after {timeout:?}"); | |||
| if let Some(drop_token) = drop_token { | |||
| tracing::warn!("leaking drop token {drop_token:?}"); | |||
| } | |||
| } | |||
| Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { | |||
| // the event was dropped -> add the drop token to the list | |||
| if let Some(token) = drop_token { | |||
| drop_tokens.push(token); | |||
| } | |||
| } | |||
| } | |||
| Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { | |||
| tracing::warn!("timeout: event was not dropped after {timeout:?}"); | |||
| } | |||
| Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {} // expected result | |||
| } | |||
| if let Some(token) = drop_token { | |||
| drop_tokens.push(token); | |||
| } else { | |||
| tracing::warn!( | |||
| "dropping event because event `tx` was already closed: `{event:?}`" | |||
| ); | |||
| } | |||
| } | |||
| }; | |||
| if let Err(err) = result { | |||
| if let Err(flume::SendError(item)) = tx.send(EventItem::FatalError(err)) { | |||
| let err = match item { | |||
| EventItem::FatalError(err) => err, | |||
| _ => unreachable!(), | |||
| }; | |||
| tracing::error!("failed to report fatal EventStream error: {err:?}"); | |||
| if let Some(tx) = tx.as_ref() { | |||
| if let Err(flume::SendError(item)) = tx.send(EventItem::FatalError(err)) { | |||
| let err = match item { | |||
| EventItem::FatalError(err) => err, | |||
| _ => unreachable!(), | |||
| }; | |||
| tracing::error!("failed to report fatal EventStream error: {err:?}"); | |||
| } | |||
| } else { | |||
| tracing::error!("received error event after `tx` was closed: {err:?}"); | |||
| } | |||
| } | |||
| }); | |||
| let thread_handle = EventStreamThreadHandle::new(join_handle); | |||
| Ok(( | |||
| @@ -111,6 +146,7 @@ impl EventStream { | |||
| _thread_handle: thread_handle.clone(), | |||
| }, | |||
| thread_handle, | |||
| finished_drop_tokens_rx, | |||
| )) | |||
| } | |||
| @@ -137,24 +173,41 @@ impl EventStream { | |||
| NodeEvent::Stop => Event::Stop, | |||
| NodeEvent::InputClosed { id } => Event::InputClosed { id }, | |||
| NodeEvent::Input { id, metadata, data } => { | |||
| let data = data | |||
| .map(|data| match data { | |||
| dora_core::daemon_messages::InputData::Vec(d) => Ok(Data::Vec(d)), | |||
| dora_core::daemon_messages::InputData::SharedMemory(d) => unsafe { | |||
| MappedInputData::map(&d.shared_memory_id, d.len).map(|data| { | |||
| Data::SharedMemory { | |||
| data, | |||
| _drop: ack_channel, | |||
| } | |||
| let data = match data { | |||
| None => Ok(None), | |||
| Some(daemon_messages::Data::Vec(v)) => Ok(Some(Data::Vec(v))), | |||
| Some(daemon_messages::Data::SharedMemory { | |||
| shared_memory_id, | |||
| len, | |||
| drop_token: _, // handled above | |||
| }) => unsafe { | |||
| MappedInputData::map(&shared_memory_id, len).map(|data| { | |||
| Some(Data::SharedMemory { | |||
| data, | |||
| _drop: ack_channel, | |||
| }) | |||
| }, | |||
| }) | |||
| .transpose(); | |||
| }) | |||
| }, | |||
| }; | |||
| match data { | |||
| Ok(data) => Event::Input { id, metadata, data }, | |||
| Err(err) => Event::Error(format!("{err:?}")), | |||
| } | |||
| } | |||
| NodeEvent::AllInputsClosed => { | |||
| let err = eyre!( | |||
| "received `AllInputsClosed` event, which should be handled by background task" | |||
| ); | |||
| tracing::error!("{err:?}"); | |||
| Event::Error(err.wrap_err("internal error").to_string()) | |||
| } | |||
| NodeEvent::OutputDropped { .. } => { | |||
| let err = eyre!( | |||
| "received OutputDrop event, which should be handled by background task" | |||
| ); | |||
| tracing::error!("{err:?}"); | |||
| Event::Error(err.wrap_err("internal error").to_string()) | |||
| } | |||
| }, | |||
| EventItem::FatalError(err) => { | |||
| Event::Error(format!("fatal event stream error: {err:?}")) | |||
| @@ -164,7 +217,6 @@ impl EventStream { | |||
| Some(event) | |||
| } | |||
| } | |||
| enum EventItem { | |||
| NodeEvent { | |||
| event: NodeEvent, | |||
| @@ -1,7 +1,7 @@ | |||
| use communication::DaemonChannel; | |||
| use dora_core::{ | |||
| config::NodeId, | |||
| daemon_messages::{DaemonCommunication, DataflowId}, | |||
| daemon_messages::{DaemonCommunication, DataflowId, DropToken}, | |||
| }; | |||
| use eyre::Context; | |||
| use flume::RecvTimeoutError; | |||
| @@ -17,6 +17,7 @@ mod event_stream; | |||
| pub(crate) struct DaemonConnection { | |||
| pub control_channel: ControlChannel, | |||
| pub event_stream: EventStream, | |||
| pub finished_drop_tokens: flume::Receiver<DropToken>, | |||
| } | |||
| impl DaemonConnection { | |||
| @@ -47,7 +48,7 @@ impl DaemonConnection { | |||
| } | |||
| }; | |||
| let (event_stream, event_stream_thread_handle) = | |||
| let (event_stream, event_stream_thread_handle, finished_drop_tokens) = | |||
| EventStream::init(dataflow_id, node_id, events) | |||
| .wrap_err("failed to init event stream")?; | |||
| let control_channel = | |||
| @@ -57,14 +58,11 @@ impl DaemonConnection { | |||
| Ok(Self { | |||
| control_channel, | |||
| event_stream, | |||
| finished_drop_tokens, | |||
| }) | |||
| } | |||
| } | |||
| pub struct MessageSample { | |||
| pub id: String, | |||
| } | |||
| pub(crate) struct EventStreamThreadHandle(flume::Receiver<std::thread::Result<()>>); | |||
| impl EventStreamThreadHandle { | |||
| fn new(join_handle: std::thread::JoinHandle<()>) -> Arc<Self> { | |||
| @@ -1,10 +1,16 @@ | |||
| use std::{ | |||
| collections::{HashMap, VecDeque}, | |||
| ops::{Deref, DerefMut}, | |||
| time::Duration, | |||
| }; | |||
| use dora_core::{ | |||
| config::{DataId, NodeId, NodeRunConfig}, | |||
| daemon_messages::NodeConfig, | |||
| daemon_messages::{Data, DropToken, NodeConfig}, | |||
| message::{uhlc, Metadata, MetadataParameters}, | |||
| }; | |||
| use eyre::WrapErr; | |||
| use shared_memory_server::ShmemConf; | |||
| use eyre::{bail, WrapErr}; | |||
| use shared_memory::{Shmem, ShmemConf}; | |||
| use crate::{ | |||
| daemon_connection::{ControlChannel, DaemonConnection}, | |||
| @@ -21,6 +27,10 @@ pub struct DoraNode { | |||
| node_config: NodeRunConfig, | |||
| control_channel: ControlChannel, | |||
| hlc: uhlc::HLC, | |||
| sent_out_shared_memory: HashMap<DropToken, ShmemHandle>, | |||
| finished_drop_tokens: flume::Receiver<DropToken>, | |||
| cache: VecDeque<ShmemHandle>, | |||
| } | |||
| impl DoraNode { | |||
| @@ -47,6 +57,7 @@ impl DoraNode { | |||
| let DaemonConnection { | |||
| control_channel, | |||
| event_stream, | |||
| finished_drop_tokens, | |||
| } = DaemonConnection::init(dataflow_id, &node_id, &daemon_communication) | |||
| .wrap_err("failed to connect to dora-daemon")?; | |||
| @@ -55,6 +66,9 @@ impl DoraNode { | |||
| node_config: run_config, | |||
| control_channel, | |||
| hlc: uhlc::HLC::default(), | |||
| sent_out_shared_memory: HashMap::new(), | |||
| finished_drop_tokens, | |||
| cache: VecDeque::new(), | |||
| }; | |||
| Ok((node, event_stream)) | |||
| } | |||
| @@ -69,34 +83,44 @@ impl DoraNode { | |||
| where | |||
| F: FnOnce(&mut [u8]), | |||
| { | |||
| self.handle_finished_drop_tokens()?; | |||
| if !self.node_config.outputs.contains(&output_id) { | |||
| eyre::bail!("unknown output"); | |||
| } | |||
| let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned()); | |||
| if data_len >= ZERO_COPY_THRESHOLD { | |||
| let sample = self | |||
| .control_channel | |||
| .prepare_message(output_id.clone(), metadata, data_len) | |||
| .wrap_err("failed to prepare sample for output message")?; | |||
| // map shared memory and fill in data | |||
| let mut shared_memory = ShmemConf::new() | |||
| .os_id(&sample.id) | |||
| .open() | |||
| .wrap_err("failed to open shared memory sample")?; | |||
| let (data, shmem) = if data_len >= ZERO_COPY_THRESHOLD { | |||
| // create shared memory region | |||
| let mut shared_memory = self.allocate_shared_memory(data_len)?; | |||
| // fill in the data | |||
| let raw = unsafe { shared_memory.as_slice_mut() }; | |||
| data(&mut raw[..data_len]); | |||
| self.control_channel | |||
| .send_prepared_message(sample) | |||
| .wrap_err_with(|| format!("failed to send data for output {output_id}"))?; | |||
| let drop_token = DropToken::generate(); | |||
| let data = Data::SharedMemory { | |||
| shared_memory_id: shared_memory.get_os_id().to_owned(), | |||
| len: data_len, | |||
| drop_token, | |||
| }; | |||
| (Some(data), Some((shared_memory, drop_token))) | |||
| } else if data_len == 0 { | |||
| data(&mut []); | |||
| (None, None) | |||
| } else { | |||
| let mut buffer = vec![0; data_len]; | |||
| data(&mut buffer); | |||
| self.control_channel | |||
| .send_message(output_id.clone(), metadata, buffer) | |||
| .wrap_err_with(|| format!("failed to send output {output_id}"))?; | |||
| (Some(Data::Vec(buffer)), None) | |||
| }; | |||
| self.control_channel | |||
| .send_message(output_id.clone(), metadata, data) | |||
| .wrap_err_with(|| format!("failed to send output {output_id}"))?; | |||
| if let Some((shared_memory, drop_token)) = shmem { | |||
| self.sent_out_shared_memory | |||
| .insert(drop_token, shared_memory); | |||
| } | |||
| Ok(()) | |||
| @@ -123,14 +147,109 @@ impl DoraNode { | |||
| pub fn node_config(&self) -> &NodeRunConfig { | |||
| &self.node_config | |||
| } | |||
| fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> { | |||
| let cache_index = self | |||
| .cache | |||
| .iter() | |||
| .enumerate() | |||
| .rev() | |||
| .filter(|(_, s)| s.len() >= data_len) | |||
| .min_by_key(|(_, s)| s.len()) | |||
| .map(|(i, _)| i); | |||
| let memory = match cache_index { | |||
| Some(i) => { | |||
| // we know that this index exists, so we can safely unwrap here | |||
| self.cache.remove(i).unwrap() | |||
| } | |||
| None => ShmemHandle(Box::new( | |||
| ShmemConf::new() | |||
| .size(data_len) | |||
| .create() | |||
| .wrap_err("failed to allocate shared memory")?, | |||
| )), | |||
| }; | |||
| assert!(memory.len() >= data_len); | |||
| Ok(memory) | |||
| } | |||
| fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> { | |||
| loop { | |||
| match self.finished_drop_tokens.try_recv() { | |||
| Ok(token) => match self.sent_out_shared_memory.remove(&token) { | |||
| Some(region) => self.add_to_cache(region), | |||
| None => tracing::warn!("received unknown finished drop token `{token:?}`"), | |||
| }, | |||
| Err(flume::TryRecvError::Empty) => break, | |||
| Err(flume::TryRecvError::Disconnected) => { | |||
| bail!("event stream was closed before sending all expected drop tokens") | |||
| } | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| fn add_to_cache(&mut self, memory: ShmemHandle) { | |||
| const MAX_CACHE_SIZE: usize = 20; | |||
| self.cache.push_back(memory); | |||
| while self.cache.len() > MAX_CACHE_SIZE { | |||
| self.cache.pop_front(); | |||
| } | |||
| } | |||
| } | |||
| impl Drop for DoraNode { | |||
| #[tracing::instrument(skip(self), fields(self.id = %self.id))] | |||
| fn drop(&mut self) { | |||
| if !self.sent_out_shared_memory.is_empty() { | |||
| tracing::info!( | |||
| "waiting for `{}` remaining drop tokens", | |||
| self.sent_out_shared_memory.len() | |||
| ); | |||
| } | |||
| while !self.sent_out_shared_memory.is_empty() { | |||
| match self | |||
| .finished_drop_tokens | |||
| .recv_timeout(Duration::from_secs(10)) | |||
| { | |||
| Ok(token) => { | |||
| self.sent_out_shared_memory.remove(&token); | |||
| } | |||
| Err(flume::RecvTimeoutError::Disconnected) => break, | |||
| Err(flume::RecvTimeoutError::Timeout) => { | |||
| tracing::warn!( | |||
| "timeout while waiting for drop tokens; \ | |||
| leaking {} shared memory regions", | |||
| self.sent_out_shared_memory.len() | |||
| ); | |||
| } | |||
| } | |||
| } | |||
| tracing::info!("reporting node stop for node `{}`", self.id); | |||
| if let Err(err) = self.control_channel.report_stop() { | |||
| tracing::error!("{err:?}") | |||
| } | |||
| } | |||
| } | |||
| struct ShmemHandle(Box<Shmem>); | |||
| impl Deref for ShmemHandle { | |||
| type Target = Shmem; | |||
| fn deref(&self) -> &Self::Target { | |||
| &self.0 | |||
| } | |||
| } | |||
| impl DerefMut for ShmemHandle { | |||
| fn deref_mut(&mut self) -> &mut Self::Target { | |||
| &mut self.0 | |||
| } | |||
| } | |||
| unsafe impl Send for ShmemHandle {} | |||
| unsafe impl Sync for ShmemHandle {} | |||
| @@ -1,4 +1,5 @@ | |||
| use coordinator::CoordinatorEvent; | |||
| use dora_core::daemon_messages::Data; | |||
| use dora_core::message::uhlc::HLC; | |||
| use dora_core::{ | |||
| config::{DataId, InputMapping, NodeId}, | |||
| @@ -12,11 +13,11 @@ use dora_core::{ | |||
| use eyre::{bail, eyre, Context, ContextCompat}; | |||
| use futures::{future, stream, FutureExt, TryFutureExt}; | |||
| use futures_concurrency::stream::Merge; | |||
| use shared_mem_handler::SharedMemSample; | |||
| use shared_memory_server::ShmemConf; | |||
| use std::{ | |||
| borrow::Cow, | |||
| collections::{BTreeMap, BTreeSet, HashMap}, | |||
| fmt, io, | |||
| io, | |||
| net::SocketAddr, | |||
| path::{Path, PathBuf}, | |||
| time::{Duration, Instant}, | |||
| @@ -32,7 +33,6 @@ use uuid::Uuid; | |||
| mod coordinator; | |||
| mod listener; | |||
| mod shared_mem_handler; | |||
| mod spawn; | |||
| mod tcp_utils; | |||
| @@ -41,9 +41,6 @@ pub struct Daemon { | |||
| events_tx: mpsc::Sender<Event>, | |||
| shared_memory_handler: flume::Sender<shared_mem_handler::DaemonEvent>, | |||
| shared_memory_handler_node: flume::Sender<shared_mem_handler::NodeEvent>, | |||
| coordinator_addr: Option<SocketAddr>, | |||
| machine_id: String, | |||
| @@ -165,39 +162,22 @@ impl Daemon { | |||
| }) | |||
| .wrap_err("failed to set ctrl-c handler")?; | |||
| let (shared_memory_handler, shared_memory_daemon_rx) = flume::unbounded(); | |||
| let (shared_memory_handler_node, shared_memory_node_rx) = flume::bounded(10); | |||
| let daemon = Self { | |||
| running: HashMap::new(), | |||
| events_tx: dora_events_tx, | |||
| shared_memory_handler, | |||
| shared_memory_handler_node, | |||
| coordinator_addr, | |||
| machine_id, | |||
| exit_when_done, | |||
| dora_runtime_path, | |||
| dataflow_errors: Vec::new(), | |||
| }; | |||
| let (shmem_events_tx, shmem_events_rx) = flume::bounded(5); | |||
| tokio::spawn(async { | |||
| let mut handler = shared_mem_handler::SharedMemHandler::new(shmem_events_tx); | |||
| handler | |||
| .run(shared_memory_node_rx, shared_memory_daemon_rx) | |||
| .await; | |||
| }); | |||
| let dora_events = ReceiverStream::new(dora_events_rx); | |||
| let shmem_events = shmem_events_rx.into_stream().map(Event::ShmemHandler); | |||
| let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( | |||
| Duration::from_secs(5), | |||
| )) | |||
| .map(|_| Event::WatchdogInterval); | |||
| let events = ( | |||
| external_events, | |||
| dora_events, | |||
| shmem_events, | |||
| watchdog_interval, | |||
| ) | |||
| .merge(); | |||
| let events = (external_events, dora_events, watchdog_interval).merge(); | |||
| daemon.run_inner(events).await | |||
| } | |||
| @@ -223,16 +203,11 @@ impl Daemon { | |||
| dataflow_id: dataflow, | |||
| node_id, | |||
| event, | |||
| reply_sender, | |||
| } => { | |||
| self.handle_node_event(event, dataflow, node_id, reply_sender) | |||
| .await? | |||
| } | |||
| } => self.handle_node_event(event, dataflow, node_id).await?, | |||
| Event::Dora(event) => match self.handle_dora_event(event).await? { | |||
| RunStatus::Continue => {} | |||
| RunStatus::Exit => break, | |||
| }, | |||
| Event::ShmemHandler(event) => self.handle_shmem_handler_event(event).await?, | |||
| Event::WatchdogInterval => { | |||
| if let Some(addr) = self.coordinator_addr { | |||
| let mut connection = coordinator::send_event( | |||
| @@ -339,7 +314,7 @@ impl Daemon { | |||
| InputMapping::User(mapping) => { | |||
| dataflow | |||
| .mappings | |||
| .entry((mapping.source, mapping.output)) | |||
| .entry(OutputId(mapping.source, mapping.output)) | |||
| .or_default() | |||
| .insert((node.id.clone(), input_id)); | |||
| } | |||
| @@ -359,7 +334,6 @@ impl Daemon { | |||
| &working_dir, | |||
| node, | |||
| self.events_tx.clone(), | |||
| self.shared_memory_handler_node.clone(), | |||
| daemon_communication_config, | |||
| self.dora_runtime_path.as_deref(), | |||
| ) | |||
| @@ -399,21 +373,26 @@ impl Daemon { | |||
| event: DaemonNodeEvent, | |||
| dataflow_id: DataflowId, | |||
| node_id: NodeId, | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| ) -> eyre::Result<()> { | |||
| match event { | |||
| DaemonNodeEvent::Subscribe { event_sender } => { | |||
| DaemonNodeEvent::Subscribe { | |||
| event_sender, | |||
| reply_sender, | |||
| } => { | |||
| let result = self.subscribe(dataflow_id, node_id, event_sender).await; | |||
| let _ = reply_sender.send(DaemonReply::Result(result)); | |||
| } | |||
| DaemonNodeEvent::CloseOutputs(outputs) => { | |||
| DaemonNodeEvent::CloseOutputs { | |||
| outputs, | |||
| reply_sender, | |||
| } => { | |||
| // notify downstream nodes | |||
| let inner = async { | |||
| let dataflow = self | |||
| .running | |||
| .get_mut(&dataflow_id) | |||
| .wrap_err_with(|| format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"))?; | |||
| send_input_closed_events(dataflow, |(source_id, output_id)| { | |||
| .running | |||
| .get_mut(&dataflow_id) | |||
| .wrap_err_with(|| format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"))?; | |||
| send_input_closed_events(dataflow, |OutputId(source_id, output_id)| { | |||
| source_id == &node_id && outputs.contains(output_id) | |||
| }) | |||
| .await; | |||
| @@ -424,17 +403,124 @@ impl Daemon { | |||
| let _ = reply_sender.send(DaemonReply::Result(reply)); | |||
| // TODO: notify remote nodes | |||
| } | |||
| DaemonNodeEvent::Stopped => { | |||
| DaemonNodeEvent::Stopped { reply_sender } => { | |||
| tracing::info!("Stopped: {dataflow_id}/{node_id}"); | |||
| let _ = reply_sender.send(DaemonReply::Result(Ok(()))); | |||
| self.handle_node_stop(dataflow_id, &node_id).await?; | |||
| } | |||
| DaemonNodeEvent::SendOut { | |||
| output_id, | |||
| metadata, | |||
| data, | |||
| } => { | |||
| self.send_out(dataflow_id, node_id, output_id, metadata, data) | |||
| .await? | |||
| } | |||
| DaemonNodeEvent::ReportDrop { tokens } => { | |||
| let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { | |||
| format!( | |||
| "failed to get handle drop tokens: \ | |||
| no running dataflow with ID `{dataflow_id}`" | |||
| ) | |||
| })?; | |||
| for token in tokens { | |||
| match dataflow.pending_drop_tokens.get_mut(&token) { | |||
| Some(info) => { | |||
| if info.pending_nodes.remove(&node_id) { | |||
| dataflow.check_drop_token(token).await?; | |||
| } else { | |||
| tracing::warn!( | |||
| "node `{node_id}` is not pending for drop token `{token:?}`" | |||
| ); | |||
| } | |||
| } | |||
| None => tracing::warn!("unknown drop token `{token:?}`"), | |||
| } | |||
| } | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| async fn send_out( | |||
| &mut self, | |||
| dataflow_id: Uuid, | |||
| node_id: NodeId, | |||
| output_id: DataId, | |||
| metadata: dora_core::message::Metadata<'static>, | |||
| data: Option<Data>, | |||
| ) -> Result<(), eyre::ErrReport> { | |||
| let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { | |||
| format!("send out failed: no running dataflow with ID `{dataflow_id}`") | |||
| })?; | |||
| let empty_set = BTreeSet::new(); | |||
| let output_id = OutputId(node_id, output_id); | |||
| let local_receivers = { dataflow.mappings.get(&output_id).unwrap_or(&empty_set) }; | |||
| let OutputId(node_id, _) = output_id; | |||
| let mut closed = Vec::new(); | |||
| for (receiver_id, input_id) in local_receivers { | |||
| if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) { | |||
| let item = daemon_messages::NodeEvent::Input { | |||
| id: input_id.clone(), | |||
| metadata: metadata.clone(), | |||
| data: data.clone(), | |||
| }; | |||
| let send_result = channel.send_async(item); | |||
| match timeout(Duration::from_millis(10), send_result).await { | |||
| Ok(Ok(())) => { | |||
| if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) { | |||
| dataflow | |||
| .pending_drop_tokens | |||
| .entry(token) | |||
| .or_insert_with(|| DropTokenInformation { | |||
| owner: node_id.clone(), | |||
| pending_nodes: Default::default(), | |||
| }) | |||
| .pending_nodes | |||
| .insert(receiver_id.clone()); | |||
| } | |||
| } | |||
| Ok(Err(_)) => { | |||
| closed.push(receiver_id); | |||
| } | |||
| Err(_) => { | |||
| tracing::warn!( | |||
| "dropping input event `{receiver_id}/{input_id}` (send timeout)" | |||
| ); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| for id in closed { | |||
| dataflow.subscribe_channels.remove(id); | |||
| } | |||
| let (data_bytes, drop_token) = match data { | |||
| None => (None, None), | |||
| Some(Data::SharedMemory { | |||
| shared_memory_id, | |||
| len, | |||
| drop_token, | |||
| }) => { | |||
| let memory = ShmemConf::new() | |||
| .os_id(shared_memory_id) | |||
| .open() | |||
| .wrap_err("failed to map shared memory output")?; | |||
| let data = Some(unsafe { memory.as_slice() }[..len].to_owned()); | |||
| (data, Some(drop_token)) | |||
| } | |||
| Some(Data::Vec(v)) => (Some(v), None), | |||
| }; | |||
| if let Some(token) = drop_token { | |||
| dataflow.check_drop_token(token).await?; | |||
| } | |||
| let data_bytes = data_bytes; | |||
| Ok(()) | |||
| } | |||
| async fn subscribe( | |||
| &mut self, | |||
| dataflow_id: Uuid, | |||
| @@ -466,6 +552,11 @@ impl Daemon { | |||
| }) | |||
| .await; | |||
| } | |||
| if dataflow.open_inputs(&node_id).is_empty() { | |||
| let _ = event_sender | |||
| .send_async(daemon_messages::NodeEvent::AllInputsClosed) | |||
| .await; | |||
| } | |||
| // if a stop event was already sent for the dataflow, send it to | |||
| // the newly connected node too | |||
| @@ -475,11 +566,7 @@ impl Daemon { | |||
| .await; | |||
| } | |||
| if dataflow.stop_sent || dataflow.open_inputs(&node_id).is_empty() { | |||
| tracing::debug!("Received subscribe message for closed event stream"); | |||
| } else { | |||
| dataflow.subscribe_channels.insert(node_id, event_sender); | |||
| } | |||
| dataflow.subscribe_channels.insert(node_id, event_sender); | |||
| Ok(()) | |||
| } | |||
| @@ -493,7 +580,7 @@ impl Daemon { | |||
| let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { | |||
| format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") | |||
| })?; | |||
| send_input_closed_events(dataflow, |(source_id, _)| source_id == node_id).await; | |||
| send_input_closed_events(dataflow, |OutputId(source_id, _)| source_id == node_id).await; | |||
| dataflow.running_nodes.remove(node_id); | |||
| if dataflow.running_nodes.is_empty() { | |||
| tracing::info!( | |||
| @@ -651,120 +738,6 @@ impl Daemon { | |||
| } | |||
| Ok(RunStatus::Continue) | |||
| } | |||
| async fn handle_shmem_handler_event(&mut self, event: ShmemHandlerEvent) -> eyre::Result<()> { | |||
| match event { | |||
| ShmemHandlerEvent::SendOut { | |||
| dataflow_id, | |||
| node_id, | |||
| output_id, | |||
| metadata, | |||
| data, | |||
| } => { | |||
| let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { | |||
| format!("send out failed: no running dataflow with ID `{dataflow_id}`") | |||
| })?; | |||
| tracing::trace!( | |||
| "Time between prepare and send out: {:?}", | |||
| metadata | |||
| .timestamp() | |||
| .get_time() | |||
| .to_system_time() | |||
| .elapsed() | |||
| .unwrap() | |||
| ); | |||
| // figure out receivers from dataflow graph | |||
| let empty_set = BTreeSet::new(); | |||
| let local_receivers = dataflow | |||
| .mappings | |||
| .get(&(node_id, output_id)) | |||
| .unwrap_or(&empty_set); | |||
| // send shared memory ID to all local receivers | |||
| let mut closed = Vec::new(); | |||
| let mut drop_tokens = Vec::new(); | |||
| for (receiver_id, input_id) in local_receivers { | |||
| if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) { | |||
| let mut drop_token = None; | |||
| let item = daemon_messages::NodeEvent::Input { | |||
| id: input_id.clone(), | |||
| metadata: metadata.clone(), | |||
| data: match &data { | |||
| Data::None => None, | |||
| Data::SharedMemory(data) => { | |||
| let token = DropToken::generate(); | |||
| drop_token = Some(token); | |||
| Some(daemon_messages::InputData::SharedMemory( | |||
| daemon_messages::SharedMemoryInput { | |||
| shared_memory_id: data.get_os_id().to_owned(), | |||
| len: data.len(), | |||
| drop_token: token, | |||
| }, | |||
| )) | |||
| } | |||
| Data::Vec(data) => { | |||
| Some(daemon_messages::InputData::Vec(data.clone())) | |||
| } | |||
| }, | |||
| }; | |||
| let send_result = channel.send_async(item); | |||
| match timeout(Duration::from_millis(10), send_result).await { | |||
| Ok(Ok(())) => { | |||
| if let Some(token) = drop_token { | |||
| drop_tokens.push(token); | |||
| } | |||
| } | |||
| Ok(Err(_)) => { | |||
| closed.push(receiver_id); | |||
| } | |||
| Err(_) => { | |||
| tracing::warn!( | |||
| "dropping input event `{receiver_id}/{input_id}` (send timeout)" | |||
| ); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| for id in closed { | |||
| dataflow.subscribe_channels.remove(id); | |||
| } | |||
| let data_bytes = match data { | |||
| Data::SharedMemory(data) => { | |||
| let bytes = unsafe { data.as_slice() }.to_owned(); | |||
| // report drop tokens to shared memory handler | |||
| let send_result = self | |||
| .shared_memory_handler | |||
| .send_async(shared_mem_handler::DaemonEvent::SentOut { | |||
| data: *data, | |||
| drop_tokens, | |||
| }) | |||
| .await; | |||
| if let Err(err) = | |||
| send_result.wrap_err("shared mem handler crashed after send out") | |||
| { | |||
| tracing::error!("{err:?}"); | |||
| } | |||
| bytes | |||
| } | |||
| Data::Vec(data) => data, | |||
| Data::None => Vec::new(), | |||
| }; | |||
| // TODO send `data` via network to all remove receivers | |||
| } | |||
| ShmemHandlerEvent::HandlerError(err) => { | |||
| bail!(err.wrap_err("shared memory handler failed")) | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| } | |||
| fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, InputMapping> { | |||
| @@ -803,7 +776,7 @@ fn runtime_node_outputs(n: &dora_core::descriptor::RuntimeNode) -> BTreeSet<Data | |||
| async fn send_input_closed_events<F>(dataflow: &mut RunningDataflow, mut filter: F) | |||
| where | |||
| F: FnMut(&(NodeId, DataId)) -> bool, | |||
| F: FnMut(&OutputId) -> bool, | |||
| { | |||
| let downstream_nodes: BTreeSet<_> = dataflow | |||
| .mappings | |||
| @@ -812,19 +785,20 @@ where | |||
| .flat_map(|(_, v)| v) | |||
| .collect(); | |||
| for (receiver_id, input_id) in downstream_nodes { | |||
| if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) { | |||
| open_inputs.remove(input_id); | |||
| } | |||
| if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) { | |||
| let _ = channel | |||
| .send_async(daemon_messages::NodeEvent::InputClosed { | |||
| id: input_id.clone(), | |||
| }) | |||
| .await; | |||
| }; | |||
| if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) { | |||
| open_inputs.remove(input_id); | |||
| if open_inputs.is_empty() { | |||
| // close the subscriber channel | |||
| dataflow.subscribe_channels.remove(receiver_id); | |||
| if dataflow.open_inputs(receiver_id).is_empty() { | |||
| let _ = channel | |||
| .send_async(daemon_messages::NodeEvent::AllInputsClosed) | |||
| .await; | |||
| } | |||
| } | |||
| } | |||
| @@ -837,6 +811,9 @@ pub struct RunningDataflow { | |||
| timers: BTreeMap<Duration, BTreeSet<InputId>>, | |||
| open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>, | |||
| running_nodes: BTreeSet<NodeId>, | |||
| pending_drop_tokens: HashMap<DropToken, DropTokenInformation>, | |||
| /// Keep handles to all timer tasks of this dataflow to cancel them on drop. | |||
| _timer_handles: Vec<futures::future::RemoteHandle<()>>, | |||
| stop_sent: bool, | |||
| @@ -858,22 +835,59 @@ impl RunningDataflow { | |||
| fn open_inputs(&self, node_id: &NodeId) -> &BTreeSet<DataId> { | |||
| self.open_inputs.get(node_id).unwrap_or(&self.empty_set) | |||
| } | |||
| async fn check_drop_token(&mut self, token: DropToken) -> eyre::Result<()> { | |||
| match self.pending_drop_tokens.entry(token) { | |||
| std::collections::hash_map::Entry::Occupied(entry) => { | |||
| if entry.get().pending_nodes.is_empty() { | |||
| let (drop_token, info) = entry.remove_entry(); | |||
| let result = match self.subscribe_channels.get_mut(&info.owner) { | |||
| Some(channel) => channel | |||
| .send_async(daemon_messages::NodeEvent::OutputDropped { drop_token }) | |||
| .await | |||
| .wrap_err("send failed"), | |||
| None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)), | |||
| }; | |||
| if let Err(err) = result.wrap_err_with(|| { | |||
| format!( | |||
| "failed to report drop token `{drop_token:?}` to owner `{}`", | |||
| &info.owner | |||
| ) | |||
| }) { | |||
| tracing::warn!("{err:?}"); | |||
| } | |||
| } | |||
| } | |||
| std::collections::hash_map::Entry::Vacant(_) => { | |||
| tracing::warn!("check_drop_token called with already closed token") | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| } | |||
| type OutputId = (NodeId, DataId); | |||
| #[derive(Debug, Clone, PartialEq, Eq, Hash)] | |||
| struct OutputId(NodeId, DataId); | |||
| type InputId = (NodeId, DataId); | |||
| struct DropTokenInformation { | |||
| /// The node that created the associated drop token. | |||
| owner: NodeId, | |||
| /// Contains the set of pending nodes that still have access to the input | |||
| /// associated with a drop token. | |||
| pending_nodes: BTreeSet<NodeId>, | |||
| } | |||
| #[derive(Debug)] | |||
| pub enum Event { | |||
| Node { | |||
| dataflow_id: DataflowId, | |||
| node_id: NodeId, | |||
| event: DaemonNodeEvent, | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| }, | |||
| Coordinator(CoordinatorEvent), | |||
| Dora(DoraEvent), | |||
| ShmemHandler(ShmemHandlerEvent), | |||
| WatchdogInterval, | |||
| CtrlC, | |||
| } | |||
| @@ -883,82 +897,28 @@ impl From<DoraEvent> for Event { | |||
| Event::Dora(event) | |||
| } | |||
| } | |||
| impl From<ShmemHandlerEvent> for Event { | |||
| fn from(event: ShmemHandlerEvent) -> Self { | |||
| Event::ShmemHandler(event) | |||
| } | |||
| } | |||
| #[derive(Debug)] | |||
| pub enum DaemonNodeEvent { | |||
| Stopped, | |||
| Stopped { | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| }, | |||
| Subscribe { | |||
| event_sender: flume::Sender<daemon_messages::NodeEvent>, | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| }, | |||
| CloseOutputs { | |||
| outputs: Vec<dora_core::config::DataId>, | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| }, | |||
| CloseOutputs(Vec<dora_core::config::DataId>), | |||
| } | |||
| pub enum ShmemHandlerEvent { | |||
| SendOut { | |||
| dataflow_id: DataflowId, | |||
| node_id: NodeId, | |||
| output_id: DataId, | |||
| metadata: dora_core::message::Metadata<'static>, | |||
| data: Data, | |||
| data: Option<Data>, | |||
| }, | |||
| ReportDrop { | |||
| tokens: Vec<DropToken>, | |||
| }, | |||
| HandlerError(eyre::ErrReport), | |||
| } | |||
| impl fmt::Debug for ShmemHandlerEvent { | |||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |||
| match self { | |||
| Self::SendOut { | |||
| dataflow_id, | |||
| node_id, | |||
| output_id, | |||
| metadata, | |||
| data, | |||
| } => f | |||
| .debug_struct("SendOut") | |||
| .field("dataflow_id", dataflow_id) | |||
| .field("node_id", node_id) | |||
| .field("output_id", output_id) | |||
| .field("metadata", metadata) | |||
| .field( | |||
| "data", | |||
| match &data { | |||
| Data::None => &"None", | |||
| Data::SharedMemory(_) => &"SharedMemory(..)", | |||
| Data::Vec(_) => &"Vec(..)", | |||
| }, | |||
| ) | |||
| .finish(), | |||
| ShmemHandlerEvent::HandlerError(err) => { | |||
| f.debug_tuple("HandlerError").field(err).finish() | |||
| } | |||
| } | |||
| } | |||
| } | |||
| pub enum Data { | |||
| None, | |||
| SharedMemory(Box<SharedMemSample>), | |||
| Vec(Vec<u8>), | |||
| } | |||
| impl From<Option<Box<SharedMemSample>>> for Data { | |||
| fn from(data: Option<Box<SharedMemSample>>) -> Self { | |||
| match data { | |||
| Some(data) => Self::SharedMemory(data), | |||
| None => Self::None, | |||
| } | |||
| } | |||
| } | |||
| impl From<Vec<u8>> for Data { | |||
| fn from(data: Vec<u8>) -> Self { | |||
| Self::Vec(data) | |||
| } | |||
| } | |||
| #[derive(Debug)] | |||
| @@ -1008,8 +968,6 @@ impl From<Result<std::process::ExitStatus, io::Error>> for NodeExitStatus { | |||
| } | |||
| } | |||
| type MessageId = String; | |||
| #[must_use] | |||
| enum RunStatus { | |||
| Continue, | |||
| @@ -1,9 +1,9 @@ | |||
| use crate::{shared_mem_handler, DaemonNodeEvent, Event}; | |||
| use crate::{DaemonNodeEvent, Event}; | |||
| use dora_core::{ | |||
| config::NodeId, | |||
| daemon_messages::{ | |||
| DaemonCommunication, DaemonCommunicationConfig, DaemonReply, DaemonRequest, DataflowId, | |||
| DropEvent, NodeEvent, | |||
| NodeEvent, | |||
| }, | |||
| }; | |||
| use eyre::{eyre, Context}; | |||
| @@ -22,7 +22,6 @@ pub async fn spawn_listener_loop( | |||
| dataflow_id: &DataflowId, | |||
| node_id: &NodeId, | |||
| daemon_tx: &mpsc::Sender<Event>, | |||
| shmem_handler_tx: &flume::Sender<shared_mem_handler::NodeEvent>, | |||
| config: DaemonCommunicationConfig, | |||
| ) -> eyre::Result<DaemonCommunication> { | |||
| match config { | |||
| @@ -42,9 +41,8 @@ pub async fn spawn_listener_loop( | |||
| let event_loop_node_id = format!("{dataflow_id}/{node_id}"); | |||
| let daemon_tx = daemon_tx.clone(); | |||
| let shmem_handler_tx = shmem_handler_tx.clone(); | |||
| tokio::spawn(async move { | |||
| tcp::listener_loop(socket, daemon_tx, shmem_handler_tx).await; | |||
| tcp::listener_loop(socket, daemon_tx).await; | |||
| tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); | |||
| }); | |||
| @@ -66,8 +64,7 @@ pub async fn spawn_listener_loop( | |||
| let server = unsafe { ShmemServer::new(daemon_control_region) } | |||
| .wrap_err("failed to create control server")?; | |||
| let daemon_tx = daemon_tx.clone(); | |||
| let shmem_handler_tx = shmem_handler_tx.clone(); | |||
| tokio::spawn(shmem::listener_loop(server, daemon_tx, shmem_handler_tx)); | |||
| tokio::spawn(shmem::listener_loop(server, daemon_tx)); | |||
| } | |||
| { | |||
| @@ -75,9 +72,8 @@ pub async fn spawn_listener_loop( | |||
| .wrap_err("failed to create events server")?; | |||
| let event_loop_node_id = format!("{dataflow_id}/{node_id}"); | |||
| let daemon_tx = daemon_tx.clone(); | |||
| let shmem_handler_tx = shmem_handler_tx.clone(); | |||
| tokio::task::spawn(async move { | |||
| shmem::listener_loop(server, daemon_tx, shmem_handler_tx).await; | |||
| shmem::listener_loop(server, daemon_tx).await; | |||
| tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); | |||
| }); | |||
| } | |||
| @@ -94,7 +90,6 @@ struct Listener<C> { | |||
| dataflow_id: DataflowId, | |||
| node_id: NodeId, | |||
| daemon_tx: mpsc::Sender<Event>, | |||
| shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>, | |||
| subscribed_events: Option<flume::Receiver<NodeEvent>>, | |||
| queue: Vec<NodeEvent>, | |||
| max_queue_len: usize, | |||
| @@ -105,11 +100,7 @@ impl<C> Listener<C> | |||
| where | |||
| C: Connection, | |||
| { | |||
| pub(crate) async fn run( | |||
| mut connection: C, | |||
| daemon_tx: mpsc::Sender<Event>, | |||
| shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>, | |||
| ) { | |||
| pub(crate) async fn run(mut connection: C, daemon_tx: mpsc::Sender<Event>) { | |||
| // receive the first message | |||
| let message = match connection | |||
| .receive_message() | |||
| @@ -144,7 +135,6 @@ where | |||
| node_id, | |||
| connection, | |||
| daemon_tx, | |||
| shmem_handler_tx, | |||
| subscribed_events: None, | |||
| max_queue_len: 10, // TODO: make this configurable | |||
| queue: Vec::new(), | |||
| @@ -237,11 +227,8 @@ where | |||
| .unwrap_or_else(|| panic!("no input event found in drop iteration {i}")); | |||
| // remove that event | |||
| if let NodeEvent::Input { | |||
| data: Some(data), .. | |||
| } = self.queue.remove(index) | |||
| { | |||
| if let Some(drop_token) = data.drop_token() { | |||
| if let NodeEvent::Input { data, .. } = self.queue.remove(index) { | |||
| if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) { | |||
| drop_tokens.push(drop_token); | |||
| } | |||
| } | |||
| @@ -259,74 +246,45 @@ where | |||
| .await | |||
| .wrap_err("failed to send register reply")?; | |||
| } | |||
| DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped).await?, | |||
| DaemonRequest::CloseOutputs(outputs) => { | |||
| self.process_daemon_event(DaemonNodeEvent::CloseOutputs(outputs)) | |||
| .await? | |||
| } | |||
| DaemonRequest::PrepareOutputMessage { | |||
| output_id, | |||
| metadata, | |||
| data_len, | |||
| } => { | |||
| DaemonRequest::Stopped => { | |||
| let (reply_sender, reply) = oneshot::channel(); | |||
| let event = shared_mem_handler::NodeEvent::PrepareOutputMessage { | |||
| dataflow_id: self.dataflow_id, | |||
| node_id: self.node_id.clone(), | |||
| output_id, | |||
| metadata, | |||
| data_len, | |||
| reply_sender, | |||
| }; | |||
| self.send_shared_memory_event(event).await?; | |||
| let reply = reply | |||
| .await | |||
| .wrap_err("failed to receive prepare output reply")?; | |||
| // tracing::debug!("prepare latency: {:?}", start.elapsed()?); | |||
| self.send_reply(reply) | |||
| .await | |||
| .wrap_err("failed to send PrepareOutputMessage reply")?; | |||
| self.process_daemon_event(DaemonNodeEvent::Stopped { reply_sender }, Some(reply)) | |||
| .await? | |||
| } | |||
| DaemonRequest::SendPreparedMessage { id } => { | |||
| DaemonRequest::CloseOutputs(outputs) => { | |||
| let (reply_sender, reply) = oneshot::channel(); | |||
| let event = shared_mem_handler::NodeEvent::SendPreparedMessage { id, reply_sender }; | |||
| self.send_shared_memory_event(event).await?; | |||
| self.send_reply( | |||
| reply | |||
| .await | |||
| .wrap_err("failed to receive SendPreparedMessage reply")?, | |||
| self.process_daemon_event( | |||
| DaemonNodeEvent::CloseOutputs { | |||
| outputs, | |||
| reply_sender, | |||
| }, | |||
| Some(reply), | |||
| ) | |||
| .await?; | |||
| .await? | |||
| } | |||
| DaemonRequest::SendMessage { | |||
| output_id, | |||
| metadata, | |||
| data, | |||
| } => { | |||
| // let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?; | |||
| // tracing::debug!("listener SendEmptyMessage: {elapsed:?}"); | |||
| let event = crate::Event::ShmemHandler(crate::ShmemHandlerEvent::SendOut { | |||
| dataflow_id: self.dataflow_id, | |||
| node_id: self.node_id.clone(), | |||
| let event = crate::DaemonNodeEvent::SendOut { | |||
| output_id, | |||
| metadata, | |||
| data: data.into(), | |||
| }); | |||
| let result = self | |||
| .send_daemon_event(event) | |||
| .await | |||
| .map_err(|_| "failed to receive send_empty_message reply".to_owned()); | |||
| if let Err(err) = result { | |||
| tracing::warn!("{err:?}"); | |||
| } | |||
| self.send_reply(DaemonReply::Empty) | |||
| .await | |||
| .wrap_err("failed to send SendEmptyMessage reply")?; | |||
| data, | |||
| }; | |||
| self.process_daemon_event(event, None).await?; | |||
| } | |||
| DaemonRequest::Subscribe => { | |||
| let (tx, rx) = flume::bounded(100); | |||
| self.process_daemon_event(DaemonNodeEvent::Subscribe { event_sender: tx }) | |||
| .await?; | |||
| let (reply_sender, reply) = oneshot::channel(); | |||
| self.process_daemon_event( | |||
| DaemonNodeEvent::Subscribe { | |||
| event_sender: tx, | |||
| reply_sender, | |||
| }, | |||
| Some(reply), | |||
| ) | |||
| .await?; | |||
| self.subscribed_events = Some(rx); | |||
| } | |||
| DaemonRequest::NextEvent { drop_tokens } => { | |||
| @@ -364,30 +322,36 @@ where | |||
| drop_tokens: Vec<dora_core::daemon_messages::DropToken>, | |||
| ) -> eyre::Result<()> { | |||
| if !drop_tokens.is_empty() { | |||
| let drop_event = shared_mem_handler::NodeEvent::Drop(DropEvent { | |||
| let drop_event = DaemonNodeEvent::ReportDrop { | |||
| tokens: drop_tokens, | |||
| }); | |||
| self.send_shared_memory_event(drop_event).await?; | |||
| }; | |||
| self.process_daemon_event(drop_event, None).await?; | |||
| } | |||
| Ok(()) | |||
| } | |||
| async fn process_daemon_event(&mut self, event: DaemonNodeEvent) -> eyre::Result<()> { | |||
| async fn process_daemon_event( | |||
| &mut self, | |||
| event: DaemonNodeEvent, | |||
| reply: Option<oneshot::Receiver<DaemonReply>>, | |||
| ) -> eyre::Result<()> { | |||
| // send NodeEvent to daemon main loop | |||
| let (reply_tx, reply) = oneshot::channel(); | |||
| let event = Event::Node { | |||
| dataflow_id: self.dataflow_id, | |||
| node_id: self.node_id.clone(), | |||
| event, | |||
| reply_sender: reply_tx, | |||
| }; | |||
| self.daemon_tx | |||
| .send(event) | |||
| .await | |||
| .map_err(|_| eyre!("failed to send event to daemon"))?; | |||
| let reply = reply | |||
| .await | |||
| .map_err(|_| eyre!("failed to receive reply from daemon"))?; | |||
| let reply = if let Some(reply) = reply { | |||
| reply | |||
| .await | |||
| .map_err(|_| eyre!("failed to receive reply from daemon"))? | |||
| } else { | |||
| DaemonReply::Empty | |||
| }; | |||
| self.send_reply(reply).await?; | |||
| Ok(()) | |||
| } | |||
| @@ -398,23 +362,6 @@ where | |||
| .await | |||
| .wrap_err_with(|| format!("failed to send reply to node `{}`", self.node_id)) | |||
| } | |||
| async fn send_shared_memory_event( | |||
| &self, | |||
| event: shared_mem_handler::NodeEvent, | |||
| ) -> eyre::Result<()> { | |||
| self.shmem_handler_tx | |||
| .send_async(event) | |||
| .await | |||
| .map_err(|_| eyre!("failed to send event to shared_mem_handler")) | |||
| } | |||
| async fn send_daemon_event(&self, event: crate::Event) -> eyre::Result<()> { | |||
| self.daemon_tx | |||
| .send(event) | |||
| .await | |||
| .map_err(|_| eyre!("failed to send event to daemon")) | |||
| } | |||
| } | |||
| #[async_trait::async_trait] | |||
| @@ -1,15 +1,14 @@ | |||
| use super::Listener; | |||
| use crate::{shared_mem_handler, Event}; | |||
| use crate::Event; | |||
| use dora_core::daemon_messages::{DaemonReply, DaemonRequest}; | |||
| use eyre::eyre; | |||
| use shared_memory_server::ShmemServer; | |||
| use tokio::sync::{mpsc, oneshot}; | |||
| #[tracing::instrument(skip(server, daemon_tx, shmem_handler_tx))] | |||
| #[tracing::instrument(skip(server, daemon_tx))] | |||
| pub async fn listener_loop( | |||
| mut server: ShmemServer<DaemonRequest, DaemonReply>, | |||
| daemon_tx: mpsc::Sender<Event>, | |||
| shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>, | |||
| ) { | |||
| let (tx, rx) = flume::bounded(0); | |||
| tokio::task::spawn_blocking(move || { | |||
| @@ -33,7 +32,7 @@ pub async fn listener_loop( | |||
| } | |||
| }); | |||
| let connection = ShmemConnection(tx); | |||
| Listener::run(connection, daemon_tx, shmem_handler_tx).await | |||
| Listener::run(connection, daemon_tx).await | |||
| } | |||
| enum Operation { | |||
| @@ -2,7 +2,6 @@ use std::io::ErrorKind; | |||
| use super::Listener; | |||
| use crate::{ | |||
| shared_mem_handler, | |||
| tcp_utils::{tcp_receive, tcp_send}, | |||
| Event, | |||
| }; | |||
| @@ -13,12 +12,8 @@ use tokio::{ | |||
| sync::mpsc, | |||
| }; | |||
| #[tracing::instrument(skip(listener, daemon_tx, shmem_handler_tx))] | |||
| pub async fn listener_loop( | |||
| listener: TcpListener, | |||
| daemon_tx: mpsc::Sender<Event>, | |||
| shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>, | |||
| ) { | |||
| #[tracing::instrument(skip(listener, daemon_tx))] | |||
| pub async fn listener_loop(listener: TcpListener, daemon_tx: mpsc::Sender<Event>) { | |||
| loop { | |||
| match listener | |||
| .accept() | |||
| @@ -29,27 +24,19 @@ pub async fn listener_loop( | |||
| tracing::info!("{err}"); | |||
| } | |||
| Ok((connection, _)) => { | |||
| tokio::spawn(handle_connection_loop( | |||
| connection, | |||
| daemon_tx.clone(), | |||
| shmem_handler_tx.clone(), | |||
| )); | |||
| tokio::spawn(handle_connection_loop(connection, daemon_tx.clone())); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| #[tracing::instrument(skip(connection, daemon_tx, shmem_handler_tx))] | |||
| async fn handle_connection_loop( | |||
| connection: TcpStream, | |||
| daemon_tx: mpsc::Sender<Event>, | |||
| shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>, | |||
| ) { | |||
| #[tracing::instrument(skip(connection, daemon_tx))] | |||
| async fn handle_connection_loop(connection: TcpStream, daemon_tx: mpsc::Sender<Event>) { | |||
| if let Err(err) = connection.set_nodelay(true) { | |||
| tracing::warn!("failed to set nodelay for connection: {err}"); | |||
| } | |||
| Listener::run(TcpConnection(connection), daemon_tx, shmem_handler_tx).await | |||
| Listener::run(TcpConnection(connection), daemon_tx).await | |||
| } | |||
| struct TcpConnection(TcpStream); | |||
| @@ -1,309 +0,0 @@ | |||
| use core::fmt; | |||
| use std::{ | |||
| collections::{HashMap, HashSet, VecDeque}, | |||
| sync::Arc, | |||
| }; | |||
| use dora_core::{ | |||
| config::{DataId, NodeId}, | |||
| daemon_messages::{DaemonReply, DataflowId, DropEvent, DropToken}, | |||
| }; | |||
| use eyre::{eyre, Context}; | |||
| use flume::{Receiver, Sender}; | |||
| use futures::StreamExt; | |||
| use futures_concurrency::stream::Merge; | |||
| use shared_memory_server::{Shmem, ShmemConf}; | |||
| use tokio::sync::oneshot; | |||
| use uuid::Uuid; | |||
| use crate::MessageId; | |||
| pub struct SharedMemHandler { | |||
| events_tx: Sender<crate::ShmemHandlerEvent>, | |||
| prepared_messages: HashMap<String, PreparedMessage>, | |||
| sent_out_shared_memory: HashMap<DropToken, Arc<ShmemHandle>>, | |||
| dropped: HashSet<DropToken>, | |||
| cache: VecDeque<ShmemHandle>, | |||
| } | |||
| impl SharedMemHandler { | |||
| pub fn new(events_tx: Sender<crate::ShmemHandlerEvent>) -> Self { | |||
| Self { | |||
| events_tx, | |||
| prepared_messages: HashMap::new(), | |||
| sent_out_shared_memory: HashMap::new(), | |||
| dropped: HashSet::new(), | |||
| cache: VecDeque::new(), | |||
| } | |||
| } | |||
| pub async fn run( | |||
| &mut self, | |||
| node_events: Receiver<NodeEvent>, | |||
| daemon_events: Receiver<DaemonEvent>, | |||
| ) { | |||
| if let Err(err) = self.run_inner(node_events, daemon_events).await { | |||
| if let Err(send_err) = self | |||
| .events_tx | |||
| .send_async(crate::ShmemHandlerEvent::HandlerError(err)) | |||
| .await | |||
| { | |||
| tracing::error!("{send_err:?}"); | |||
| } | |||
| } | |||
| } | |||
| pub async fn run_inner( | |||
| &mut self, | |||
| node_events: Receiver<NodeEvent>, | |||
| daemon_events: Receiver<DaemonEvent>, | |||
| ) -> eyre::Result<()> { | |||
| let mut events = ( | |||
| node_events.stream().map(Event::Node), | |||
| daemon_events.stream().map(Event::Daemon), | |||
| ) | |||
| .merge(); | |||
| while let Some(event) = events.next().await { | |||
| match event { | |||
| Event::Node(event) => self.handle_node_event(event).await?, | |||
| Event::Daemon(event) => self.handle_daemon_event(event).await?, | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| async fn handle_node_event(&mut self, event: NodeEvent) -> eyre::Result<()> { | |||
| match event { | |||
| NodeEvent::Drop(DropEvent { tokens }) => { | |||
| for token in tokens { | |||
| match self.sent_out_shared_memory.remove(&token) { | |||
| Some(arc) => { | |||
| if let Ok(shmem) = Arc::try_unwrap(arc) { | |||
| self.add_to_cache(shmem); | |||
| } | |||
| } | |||
| None => { | |||
| self.dropped.insert(token); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| NodeEvent::PrepareOutputMessage { | |||
| dataflow_id, | |||
| node_id, | |||
| output_id, | |||
| metadata, | |||
| data_len, | |||
| reply_sender, | |||
| } => { | |||
| tracing::trace!( | |||
| "Time between construct and prepare: {:?}", | |||
| metadata | |||
| .timestamp() | |||
| .get_time() | |||
| .to_system_time() | |||
| .elapsed() | |||
| .unwrap() | |||
| ); | |||
| let memory = if data_len > 0 { | |||
| let cache_index = self | |||
| .cache | |||
| .iter() | |||
| .enumerate() | |||
| .rev() | |||
| .filter(|(_, s)| s.size() >= data_len) | |||
| .min_by_key(|(_, s)| s.size()) | |||
| .map(|(i, _)| i); | |||
| let memory = match cache_index { | |||
| Some(i) => { | |||
| // we know that this index exists, so we can safely unwrap here | |||
| self.cache.remove(i).unwrap() | |||
| } | |||
| None => ShmemHandle(Box::new( | |||
| ShmemConf::new() | |||
| .size(data_len) | |||
| .create() | |||
| .wrap_err("failed to allocate shared memory")?, | |||
| )), | |||
| }; | |||
| assert!(memory.size() >= data_len); | |||
| Some(memory) | |||
| } else { | |||
| None | |||
| }; | |||
| let id = memory | |||
| .as_ref() | |||
| .map(|m| m.0.get_os_id().to_owned()) | |||
| .unwrap_or_else(|| Uuid::new_v4().to_string()); | |||
| let message = PreparedMessage { | |||
| dataflow_id, | |||
| node_id, | |||
| output_id, | |||
| metadata, | |||
| data: memory.map(|m| (m, data_len)), | |||
| }; | |||
| self.prepared_messages.insert(id.clone(), message); | |||
| let reply = DaemonReply::PreparedMessage { | |||
| shared_memory_id: id.clone(), | |||
| }; | |||
| if reply_sender.send(reply).is_err() { | |||
| // free shared memory slice again | |||
| self.prepared_messages.remove(&id); | |||
| } | |||
| } | |||
| NodeEvent::SendPreparedMessage { id, reply_sender } => { | |||
| let message = self | |||
| .prepared_messages | |||
| .remove(&id) | |||
| .ok_or_else(|| eyre!("invalid shared memory id"))?; | |||
| let PreparedMessage { | |||
| dataflow_id, | |||
| node_id, | |||
| output_id, | |||
| metadata, | |||
| data, | |||
| } = message; | |||
| let data = data.map(|(m, len)| { | |||
| SharedMemSample { | |||
| shared_memory: m, | |||
| len, | |||
| } | |||
| .into() | |||
| }); | |||
| let send_result = self | |||
| .events_tx | |||
| .send_async(crate::ShmemHandlerEvent::SendOut { | |||
| dataflow_id, | |||
| node_id, | |||
| output_id, | |||
| metadata, | |||
| data: data.into(), | |||
| }) | |||
| .await; | |||
| let _ = reply_sender.send(DaemonReply::Result( | |||
| send_result.map_err(|_| "daemon is no longer running".into()), | |||
| )); | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| async fn handle_daemon_event(&mut self, event: DaemonEvent) -> eyre::Result<()> { | |||
| match event { | |||
| DaemonEvent::SentOut { data, drop_tokens } => { | |||
| // keep shared memory alive until we received all drop tokens | |||
| let memory = Arc::new(data.shared_memory); | |||
| for drop_token in drop_tokens { | |||
| if self.dropped.remove(&drop_token) { | |||
| // this token was already dropped -> ignore | |||
| } else { | |||
| self.sent_out_shared_memory | |||
| .insert(drop_token, memory.clone()); | |||
| } | |||
| } | |||
| if let Ok(memory) = Arc::try_unwrap(memory) { | |||
| self.add_to_cache(memory); | |||
| } | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| fn add_to_cache(&mut self, memory: ShmemHandle) { | |||
| const MAX_CACHE_SIZE: usize = 20; | |||
| self.cache.push_back(memory); | |||
| while self.cache.len() > MAX_CACHE_SIZE { | |||
| self.cache.pop_front(); | |||
| } | |||
| } | |||
| } | |||
| pub struct SharedMemSample { | |||
| shared_memory: ShmemHandle, | |||
| len: usize, | |||
| } | |||
| impl SharedMemSample { | |||
| pub fn as_raw_slice(&self) -> *const [u8] { | |||
| std::ptr::slice_from_raw_parts(self.shared_memory.0.as_ptr(), self.len) | |||
| } | |||
| pub unsafe fn as_slice(&self) -> &[u8] { | |||
| unsafe { &*self.as_raw_slice() } | |||
| } | |||
| pub fn get_os_id(&self) -> &str { | |||
| self.shared_memory.0.get_os_id() | |||
| } | |||
| pub fn len(&self) -> usize { | |||
| self.len | |||
| } | |||
| } | |||
| #[derive(Debug)] | |||
| enum Event { | |||
| Node(NodeEvent), | |||
| Daemon(DaemonEvent), | |||
| } | |||
| #[derive(Debug)] | |||
| pub enum NodeEvent { | |||
| PrepareOutputMessage { | |||
| dataflow_id: DataflowId, | |||
| node_id: NodeId, | |||
| output_id: DataId, | |||
| metadata: dora_core::message::Metadata<'static>, | |||
| data_len: usize, | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| }, | |||
| SendPreparedMessage { | |||
| id: MessageId, | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| }, | |||
| Drop(DropEvent), | |||
| } | |||
| pub enum DaemonEvent { | |||
| SentOut { | |||
| data: SharedMemSample, | |||
| drop_tokens: Vec<DropToken>, | |||
| }, | |||
| } | |||
| impl fmt::Debug for DaemonEvent { | |||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |||
| match self { | |||
| Self::SentOut { | |||
| data: _, | |||
| drop_tokens, | |||
| } => f | |||
| .debug_struct("SentOut") | |||
| .field("data", &"[..]") | |||
| .field("drop_tokens", drop_tokens) | |||
| .finish(), | |||
| } | |||
| } | |||
| } | |||
| struct PreparedMessage { | |||
| dataflow_id: DataflowId, | |||
| node_id: NodeId, | |||
| output_id: DataId, | |||
| metadata: dora_core::message::Metadata<'static>, | |||
| data: Option<(ShmemHandle, usize)>, | |||
| } | |||
| struct ShmemHandle(Box<Shmem>); | |||
| impl ShmemHandle { | |||
| fn size(&self) -> usize { | |||
| self.0.len() | |||
| } | |||
| } | |||
| unsafe impl Send for ShmemHandle {} | |||
| unsafe impl Sync for ShmemHandle {} | |||
| @@ -1,6 +1,6 @@ | |||
| use crate::{ | |||
| listener::spawn_listener_loop, runtime_node_inputs, runtime_node_outputs, shared_mem_handler, | |||
| DoraEvent, Event, NodeExitStatus, | |||
| listener::spawn_listener_loop, runtime_node_inputs, runtime_node_outputs, DoraEvent, Event, | |||
| NodeExitStatus, | |||
| }; | |||
| use dora_core::{ | |||
| config::NodeRunConfig, | |||
| @@ -19,21 +19,14 @@ pub async fn spawn_node( | |||
| working_dir: &Path, | |||
| node: ResolvedNode, | |||
| daemon_tx: mpsc::Sender<Event>, | |||
| shmem_handler_tx: flume::Sender<shared_mem_handler::NodeEvent>, | |||
| config: DaemonCommunicationConfig, | |||
| dora_runtime_path: Option<&Path>, | |||
| ) -> eyre::Result<()> { | |||
| let node_id = node.id.clone(); | |||
| tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); | |||
| let daemon_communication = spawn_listener_loop( | |||
| &dataflow_id, | |||
| &node_id, | |||
| &daemon_tx, | |||
| &shmem_handler_tx, | |||
| config, | |||
| ) | |||
| .await?; | |||
| let daemon_communication = | |||
| spawn_listener_loop(&dataflow_id, &node_id, &daemon_tx, config).await?; | |||
| let mut child = match node.kind { | |||
| dora_core::descriptor::CoreNodeKind::Custom(n) => { | |||
| @@ -1,4 +1,4 @@ | |||
| use std::{net::SocketAddr, path::PathBuf}; | |||
| use std::{fmt, net::SocketAddr, path::PathBuf}; | |||
| use crate::{ | |||
| config::{DataId, NodeId, NodeRunConfig}, | |||
| @@ -39,18 +39,10 @@ pub enum DaemonRequest { | |||
| node_id: NodeId, | |||
| }, | |||
| Subscribe, | |||
| PrepareOutputMessage { | |||
| output_id: DataId, | |||
| metadata: Metadata<'static>, | |||
| data_len: usize, | |||
| }, | |||
| SendPreparedMessage { | |||
| id: SharedMemoryId, | |||
| }, | |||
| SendMessage { | |||
| output_id: DataId, | |||
| metadata: Metadata<'static>, | |||
| data: Vec<u8>, | |||
| data: Option<Data>, | |||
| }, | |||
| CloseOutputs(Vec<DataId>), | |||
| Stopped, | |||
| @@ -69,6 +61,46 @@ impl DaemonRequest { | |||
| } | |||
| } | |||
| #[derive(serde::Serialize, serde::Deserialize, Clone)] | |||
| pub enum Data { | |||
| Vec(Vec<u8>), | |||
| SharedMemory { | |||
| shared_memory_id: String, | |||
| len: usize, | |||
| drop_token: DropToken, | |||
| }, | |||
| } | |||
| impl Data { | |||
| pub fn drop_token(&self) -> Option<DropToken> { | |||
| match self { | |||
| Data::Vec(_) => None, | |||
| Data::SharedMemory { drop_token, .. } => Some(*drop_token), | |||
| } | |||
| } | |||
| } | |||
| impl fmt::Debug for Data { | |||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |||
| match self { | |||
| Self::Vec(v) => f | |||
| .debug_struct("Vec") | |||
| .field("len", &v.len()) | |||
| .finish_non_exhaustive(), | |||
| Self::SharedMemory { | |||
| shared_memory_id, | |||
| len, | |||
| drop_token, | |||
| } => f | |||
| .debug_struct("SharedMemory") | |||
| .field("shared_memory_id", shared_memory_id) | |||
| .field("len", len) | |||
| .field("drop_token", drop_token) | |||
| .finish(), | |||
| } | |||
| } | |||
| } | |||
| type SharedMemoryId = String; | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||
| @@ -85,11 +117,15 @@ pub enum NodeEvent { | |||
| Input { | |||
| id: DataId, | |||
| metadata: Metadata<'static>, | |||
| data: Option<InputData>, | |||
| data: Option<Data>, | |||
| }, | |||
| InputClosed { | |||
| id: DataId, | |||
| }, | |||
| AllInputsClosed, | |||
| OutputDropped { | |||
| drop_token: DropToken, | |||
| }, | |||
| } | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||