From feca83e3093d1edd218d8c1925e82e953519d858 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 8 Mar 2023 10:19:47 +0100 Subject: [PATCH] Refactor: Split Rust node API into smaller submodules --- Cargo.lock | 1 + apis/rust/node/Cargo.toml | 1 + .../daemon_connection/communication/mod.rs | 60 +++ .../{ => communication}/tcp.rs | 0 .../src/daemon_connection/control_channel.rs | 117 +++++ .../src/daemon_connection/event_stream.rs | 174 +++++++ apis/rust/node/src/daemon_connection/mod.rs | 423 +----------------- apis/rust/node/src/event.rs | 73 +++ apis/rust/node/src/lib.rs | 4 +- apis/rust/node/src/node.rs | 2 +- 10 files changed, 445 insertions(+), 410 deletions(-) create mode 100644 apis/rust/node/src/daemon_connection/communication/mod.rs rename apis/rust/node/src/daemon_connection/{ => communication}/tcp.rs (100%) create mode 100644 apis/rust/node/src/daemon_connection/control_channel.rs create mode 100644 apis/rust/node/src/daemon_connection/event_stream.rs create mode 100644 apis/rust/node/src/event.rs diff --git a/Cargo.lock b/Cargo.lock index 3bb2d198..f9f101eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1070,6 +1070,7 @@ dependencies = [ "serde_json", "serde_yaml 0.8.23", "shared-memory-server", + "shared_memory", "thiserror", "tokio", "tracing", diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index 9d94c2a5..6dfe1520 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -23,6 +23,7 @@ flume = "0.10.14" uuid = { version = "1.1.2", features = ["v4"] } capnp = "0.14.11" bincode = "1.3.3" +shared_memory = "0.12.0" [dev-dependencies] tokio = { version = "1.24.2", features = ["rt"] } diff --git a/apis/rust/node/src/daemon_connection/communication/mod.rs b/apis/rust/node/src/daemon_connection/communication/mod.rs new file mode 100644 index 00000000..f154abdf --- /dev/null +++ b/apis/rust/node/src/daemon_connection/communication/mod.rs @@ -0,0 +1,60 @@ +use dora_core::{ + config::NodeId, + daemon_messages::{DaemonReply, DaemonRequest, DataflowId}, +}; +use eyre::{bail, eyre, Context}; +use shared_memory_server::{ShmemClient, ShmemConf}; +use std::{net::TcpStream, time::Duration}; + +mod tcp; + +pub enum DaemonChannel { + Shmem(ShmemClient), + Tcp(TcpStream), +} + +impl DaemonChannel { + #[tracing::instrument] + pub fn new_tcp(stream: TcpStream) -> eyre::Result { + stream.set_nodelay(true).context("failed to set nodelay")?; + Ok(DaemonChannel::Tcp(stream)) + } + + #[tracing::instrument] + pub unsafe fn new_shmem(daemon_control_region_id: &str) -> eyre::Result { + let daemon_events_region = ShmemConf::new() + .os_id(daemon_control_region_id) + .open() + .wrap_err("failed to connect to dora-daemon")?; + let channel = DaemonChannel::Shmem( + unsafe { ShmemClient::new(daemon_events_region, Some(Duration::from_secs(5))) } + .wrap_err("failed to create ShmemChannel")?, + ); + Ok(channel) + } + + pub fn register(&mut self, dataflow_id: DataflowId, node_id: NodeId) -> eyre::Result<()> { + let msg = DaemonRequest::Register { + dataflow_id, + node_id, + }; + let reply = self + .request(&msg) + .wrap_err("failed to send register request to dora-daemon")?; + + match reply { + dora_core::daemon_messages::DaemonReply::Result(result) => result + .map_err(|e| eyre!(e)) + .wrap_err("failed to register node with dora-daemon")?, + other => bail!("unexpected register reply: {other:?}"), + } + Ok(()) + } + + pub fn request(&mut self, request: &DaemonRequest) -> eyre::Result { + match self { + DaemonChannel::Shmem(client) => client.request(request), + DaemonChannel::Tcp(stream) => tcp::request(stream, request), + } + } +} diff --git a/apis/rust/node/src/daemon_connection/tcp.rs b/apis/rust/node/src/daemon_connection/communication/tcp.rs similarity index 100% rename from apis/rust/node/src/daemon_connection/tcp.rs rename to apis/rust/node/src/daemon_connection/communication/tcp.rs diff --git a/apis/rust/node/src/daemon_connection/control_channel.rs b/apis/rust/node/src/daemon_connection/control_channel.rs new file mode 100644 index 00000000..79194a9c --- /dev/null +++ b/apis/rust/node/src/daemon_connection/control_channel.rs @@ -0,0 +1,117 @@ +use super::{communication::DaemonChannel, EventStreamThreadHandle, MessageSample}; +use dora_core::{ + config::{DataId, NodeId}, + daemon_messages::{DaemonRequest, DataflowId}, + message::Metadata, +}; +use eyre::{bail, eyre, Context}; +use std::sync::Arc; + +pub(crate) struct ControlChannel { + channel: DaemonChannel, + _event_stream_thread_handle: Option>, +} + +impl ControlChannel { + #[tracing::instrument(skip(channel, event_stream_thread_handle))] + pub(crate) fn init( + dataflow_id: DataflowId, + node_id: &NodeId, + mut channel: DaemonChannel, + event_stream_thread_handle: Arc, + ) -> eyre::Result { + channel.register(dataflow_id, node_id.clone())?; + + Ok(Self { + channel, + _event_stream_thread_handle: Some(event_stream_thread_handle), + }) + } + + pub fn report_stop(&mut self) -> eyre::Result<()> { + let reply = self + .channel + .request(&DaemonRequest::Stopped) + .wrap_err("failed to report stopped to dora-daemon")?; + match reply { + dora_core::daemon_messages::DaemonReply::Result(result) => result + .map_err(|e| eyre!(e)) + .wrap_err("failed to report stop event to dora-daemon")?, + other => bail!("unexpected stopped reply: {other:?}"), + } + Ok(()) + } + + pub fn report_closed_outputs(&mut self, outputs: Vec) -> eyre::Result<()> { + let reply = self + .channel + .request(&DaemonRequest::CloseOutputs(outputs)) + .wrap_err("failed to report closed outputs to dora-daemon")?; + match reply { + dora_core::daemon_messages::DaemonReply::Result(result) => result + .map_err(|e| eyre!(e)) + .wrap_err("failed to receive closed outputs reply from dora-daemon")?, + other => bail!("unexpected closed outputs reply: {other:?}"), + } + Ok(()) + } + + pub fn prepare_message( + &mut self, + output_id: DataId, + metadata: Metadata<'static>, + data_len: usize, + ) -> eyre::Result { + let reply = self + .channel + .request(&DaemonRequest::PrepareOutputMessage { + output_id, + metadata, + data_len, + }) + .wrap_err("failed to send PrepareOutputMessage request to dora-daemon")?; + match reply { + dora_core::daemon_messages::DaemonReply::PreparedMessage { + shared_memory_id: id, + } => Ok(MessageSample { id }), + dora_core::daemon_messages::DaemonReply::Result(Err(err)) => { + Err(eyre!(err).wrap_err("failed to report stop event to dora-daemon")) + } + other => bail!("unexpected PrepareOutputMessage reply: {other:?}"), + } + } + + pub fn send_prepared_message(&mut self, sample: MessageSample) -> eyre::Result<()> { + let reply = self + .channel + .request(&DaemonRequest::SendPreparedMessage { id: sample.id }) + .wrap_err("failed to send SendOutMessage request to dora-daemon")?; + match reply { + dora_core::daemon_messages::DaemonReply::Result(result) => { + result.map_err(|err| eyre!(err)) + } + other => bail!("unexpected SendOutMessage reply: {other:?}"), + } + } + + pub fn send_message( + &mut self, + output_id: DataId, + metadata: Metadata<'static>, + data: Vec, + ) -> eyre::Result<()> { + let request = DaemonRequest::SendMessage { + output_id, + metadata, + data, + }; + let reply = self + .channel + .request(&request) + .wrap_err("failed to send SendMessage request to dora-daemon")?; + match reply { + dora_core::daemon_messages::DaemonReply::Empty => Ok(()), + other => bail!("unexpected SendMessage reply: {other:?}"), + } + } +} diff --git a/apis/rust/node/src/daemon_connection/event_stream.rs b/apis/rust/node/src/daemon_connection/event_stream.rs new file mode 100644 index 00000000..7214a1e6 --- /dev/null +++ b/apis/rust/node/src/daemon_connection/event_stream.rs @@ -0,0 +1,174 @@ +use dora_core::{ + config::NodeId, + daemon_messages::{DaemonReply, DaemonRequest, DataflowId, NodeEvent}, +}; +use eyre::{eyre, Context}; +use std::{sync::Arc, time::Duration}; + +use crate::{event::Data, Event, MappedInputData}; + +use super::{DaemonChannel, EventStreamThreadHandle}; + +pub struct EventStream { + receiver: flume::Receiver, + _thread_handle: Arc, +} + +impl EventStream { + pub(crate) fn init( + dataflow_id: DataflowId, + node_id: &NodeId, + mut channel: DaemonChannel, + ) -> eyre::Result<(Self, Arc)> { + channel.register(dataflow_id, node_id.clone())?; + + channel + .request(&DaemonRequest::Subscribe) + .map_err(|e| eyre!(e)) + .wrap_err("failed to create subscription with dora-daemon")?; + + let (tx, rx) = flume::bounded(0); + let mut drop_tokens = Vec::new(); + let node_id = node_id.clone(); + let join_handle = std::thread::spawn(move || { + let result = 'outer: loop { + let daemon_request = DaemonRequest::NextEvent { + drop_tokens: std::mem::take(&mut drop_tokens), + }; + let events = match channel.request(&daemon_request) { + Ok(DaemonReply::NextEvents(events)) if events.is_empty() => { + tracing::debug!("Event stream closed for node ID `{node_id}`"); + break Ok(()); + } + Ok(DaemonReply::NextEvents(events)) => events, + Ok(other) => { + let err = eyre!("unexpected control reply: {other:?}"); + tracing::warn!("{err:?}"); + continue; + } + Err(err) => { + let err = eyre!(err).wrap_err("failed to receive incoming event"); + tracing::warn!("{err:?}"); + continue; + } + }; + for event in events { + let drop_token = match &event { + NodeEvent::Input { + data: Some(data), .. + } => data.drop_token(), + NodeEvent::Stop + | NodeEvent::InputClosed { .. } + | NodeEvent::Input { data: None, .. } => None, + }; + + let (drop_tx, drop_rx) = std::sync::mpsc::channel(); + match tx.send(EventItem::NodeEvent { + event, + ack_channel: drop_tx, + }) { + Ok(()) => {} + Err(_) => { + // receiving end of channel was closed + break 'outer Ok(()); + } + } + + let timeout = Duration::from_secs(30); + match drop_rx.recv_timeout(timeout) { + Ok(()) => { + break 'outer Err(eyre!( + "Node API should not send anything on ACK channel" + )) + } + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + tracing::warn!("timeout: event was not dropped after {timeout:?}"); + } + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {} // expected result + } + + if let Some(token) = drop_token { + drop_tokens.push(token); + } + } + }; + if let Err(err) = result { + if let Err(flume::SendError(item)) = tx.send(EventItem::FatalError(err)) { + let err = match item { + EventItem::FatalError(err) => err, + _ => unreachable!(), + }; + tracing::error!("failed to report fatal EventStream error: {err:?}"); + } + } + }); + + let thread_handle = EventStreamThreadHandle::new(join_handle); + + Ok(( + EventStream { + receiver: rx, + _thread_handle: thread_handle.clone(), + }, + thread_handle, + )) + } + + pub fn recv(&mut self) -> Option { + let event = self.receiver.recv(); + self.recv_common(event) + } + + pub async fn recv_async(&mut self) -> Option { + let event = self.receiver.recv_async().await; + self.recv_common(event) + } + + fn recv_common(&mut self, event: Result) -> Option { + let event = match event { + Ok(event) => event, + Err(flume::RecvError::Disconnected) => { + tracing::info!("event channel disconnected"); + return None; + } + }; + let event = match event { + EventItem::NodeEvent { event, ack_channel } => match event { + NodeEvent::Stop => Event::Stop, + NodeEvent::InputClosed { id } => Event::InputClosed { id }, + NodeEvent::Input { id, metadata, data } => { + let data = data + .map(|data| match data { + dora_core::daemon_messages::InputData::Vec(d) => Ok(Data::Vec(d)), + dora_core::daemon_messages::InputData::SharedMemory(d) => unsafe { + MappedInputData::map(&d.shared_memory_id, d.len).map(|data| { + Data::SharedMemory { + data, + _drop: ack_channel, + } + }) + }, + }) + .transpose(); + match data { + Ok(data) => Event::Input { id, metadata, data }, + Err(err) => Event::Error(format!("{err:?}")), + } + } + }, + EventItem::FatalError(err) => { + Event::Error(format!("fatal event stream error: {err:?}")) + } + }; + + Some(event) + } +} + +enum EventItem { + NodeEvent { + event: NodeEvent, + ack_channel: std::sync::mpsc::Sender<()>, + }, + FatalError(eyre::Report), +} diff --git a/apis/rust/node/src/daemon_connection/mod.rs b/apis/rust/node/src/daemon_connection/mod.rs index dcc27f96..bfccb6e8 100644 --- a/apis/rust/node/src/daemon_connection/mod.rs +++ b/apis/rust/node/src/daemon_connection/mod.rs @@ -1,14 +1,18 @@ +use communication::DaemonChannel; use dora_core::{ - config::{DataId, NodeId}, - daemon_messages::{DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeEvent}, - message::Metadata, + config::NodeId, + daemon_messages::{DaemonCommunication, DataflowId}, }; -use eyre::{bail, eyre, Context}; +use eyre::Context; use flume::RecvTimeoutError; -use shared_memory_server::{Shmem, ShmemClient, ShmemConf}; -use std::{marker::PhantomData, net::TcpStream, sync::Arc, time::Duration}; +use std::{net::TcpStream, sync::Arc, time::Duration}; -mod tcp; +pub(crate) use control_channel::ControlChannel; +pub use event_stream::EventStream; + +mod communication; +mod control_channel; +mod event_stream; pub(crate) struct DaemonConnection { pub control_channel: ControlChannel, @@ -43,14 +47,12 @@ impl DaemonConnection { } }; - let mut control_channel = ControlChannel::init(dataflow_id, node_id, control) - .wrap_err("failed to init control stream")?; - let (event_stream, event_stream_thread_handle) = EventStream::init(dataflow_id, node_id, events) .wrap_err("failed to init event stream")?; - - control_channel.event_stream_thread_handle = Some(event_stream_thread_handle); + let control_channel = + ControlChannel::init(dataflow_id, node_id, control, event_stream_thread_handle) + .wrap_err("failed to init control stream")?; Ok(Self { control_channel, @@ -59,406 +61,11 @@ impl DaemonConnection { } } -pub(crate) struct ControlChannel { - channel: DaemonChannel, - event_stream_thread_handle: Option>, -} - -impl ControlChannel { - #[tracing::instrument(skip(channel))] - fn init( - dataflow_id: DataflowId, - node_id: &NodeId, - mut channel: DaemonChannel, - ) -> eyre::Result { - register(dataflow_id, node_id.clone(), &mut channel)?; - - Ok(Self { - channel, - event_stream_thread_handle: None, - }) - } - - pub fn report_stop(&mut self) -> eyre::Result<()> { - let reply = self - .channel - .request(&DaemonRequest::Stopped) - .wrap_err("failed to report stopped to dora-daemon")?; - match reply { - dora_core::daemon_messages::DaemonReply::Result(result) => result - .map_err(|e| eyre!(e)) - .wrap_err("failed to report stop event to dora-daemon")?, - other => bail!("unexpected stopped reply: {other:?}"), - } - Ok(()) - } - - pub fn report_closed_outputs(&mut self, outputs: Vec) -> eyre::Result<()> { - let reply = self - .channel - .request(&DaemonRequest::CloseOutputs(outputs)) - .wrap_err("failed to report closed outputs to dora-daemon")?; - match reply { - dora_core::daemon_messages::DaemonReply::Result(result) => result - .map_err(|e| eyre!(e)) - .wrap_err("failed to receive closed outputs reply from dora-daemon")?, - other => bail!("unexpected closed outputs reply: {other:?}"), - } - Ok(()) - } - - pub fn prepare_message( - &mut self, - output_id: DataId, - metadata: Metadata<'static>, - data_len: usize, - ) -> eyre::Result { - let reply = self - .channel - .request(&DaemonRequest::PrepareOutputMessage { - output_id, - metadata, - data_len, - }) - .wrap_err("failed to send PrepareOutputMessage request to dora-daemon")?; - match reply { - dora_core::daemon_messages::DaemonReply::PreparedMessage { - shared_memory_id: id, - } => Ok(MessageSample { id }), - dora_core::daemon_messages::DaemonReply::Result(Err(err)) => { - Err(eyre!(err).wrap_err("failed to report stop event to dora-daemon")) - } - other => bail!("unexpected PrepareOutputMessage reply: {other:?}"), - } - } - - pub fn send_prepared_message(&mut self, sample: MessageSample) -> eyre::Result<()> { - let reply = self - .channel - .request(&DaemonRequest::SendPreparedMessage { id: sample.id }) - .wrap_err("failed to send SendOutMessage request to dora-daemon")?; - match reply { - dora_core::daemon_messages::DaemonReply::Result(result) => { - result.map_err(|err| eyre!(err)) - } - other => bail!("unexpected SendOutMessage reply: {other:?}"), - } - } - - pub fn send_message( - &mut self, - output_id: DataId, - metadata: Metadata<'static>, - data: Vec, - ) -> eyre::Result<()> { - let request = DaemonRequest::SendMessage { - output_id, - metadata, - data, - }; - let reply = self - .channel - .request(&request) - .wrap_err("failed to send SendMessage request to dora-daemon")?; - match reply { - dora_core::daemon_messages::DaemonReply::Empty => Ok(()), - other => bail!("unexpected SendMessage reply: {other:?}"), - } - } -} - -enum DaemonChannel { - Shmem(ShmemClient), - Tcp(TcpStream), -} - -impl DaemonChannel { - #[tracing::instrument] - fn new_tcp(stream: TcpStream) -> eyre::Result { - stream.set_nodelay(true).context("failed to set nodelay")?; - Ok(DaemonChannel::Tcp(stream)) - } - - #[tracing::instrument] - unsafe fn new_shmem(daemon_control_region_id: &str) -> eyre::Result { - let daemon_events_region = ShmemConf::new() - .os_id(daemon_control_region_id) - .open() - .wrap_err("failed to connect to dora-daemon")?; - let channel = DaemonChannel::Shmem( - unsafe { ShmemClient::new(daemon_events_region, Some(Duration::from_secs(5))) } - .wrap_err("failed to create ShmemChannel")?, - ); - Ok(channel) - } - - fn request(&mut self, request: &DaemonRequest) -> eyre::Result { - match self { - DaemonChannel::Shmem(client) => client.request(request), - DaemonChannel::Tcp(stream) => tcp::request(stream, request), - } - } -} - -fn register( - dataflow_id: DataflowId, - node_id: NodeId, - channel: &mut DaemonChannel, -) -> eyre::Result<()> { - let msg = DaemonRequest::Register { - dataflow_id, - node_id, - }; - let reply = channel - .request(&msg) - .wrap_err("failed to send register request to dora-daemon")?; - - match reply { - dora_core::daemon_messages::DaemonReply::Result(result) => result - .map_err(|e| eyre!(e)) - .wrap_err("failed to register node with dora-daemon")?, - other => bail!("unexpected register reply: {other:?}"), - } - Ok(()) -} - -enum EventItem { - NodeEvent { - event: NodeEvent, - ack_channel: std::sync::mpsc::Sender<()>, - }, - FatalError(eyre::Report), -} - -pub struct EventStream { - receiver: flume::Receiver, - _thread_handle: Arc, -} - -impl EventStream { - fn init( - dataflow_id: DataflowId, - node_id: &NodeId, - mut channel: DaemonChannel, - ) -> eyre::Result<(Self, Arc)> { - register(dataflow_id, node_id.clone(), &mut channel)?; - - channel - .request(&DaemonRequest::Subscribe) - .map_err(|e| eyre!(e)) - .wrap_err("failed to create subscription with dora-daemon")?; - - let (tx, rx) = flume::bounded(0); - let mut drop_tokens = Vec::new(); - let node_id = node_id.clone(); - let join_handle = std::thread::spawn(move || { - let result = 'outer: loop { - let daemon_request = DaemonRequest::NextEvent { - drop_tokens: std::mem::take(&mut drop_tokens), - }; - let events = match channel.request(&daemon_request) { - Ok(DaemonReply::NextEvents(events)) if events.is_empty() => { - tracing::debug!("Event stream closed for node ID `{node_id}`"); - break Ok(()); - } - Ok(DaemonReply::NextEvents(events)) => events, - Ok(other) => { - let err = eyre!("unexpected control reply: {other:?}"); - tracing::warn!("{err:?}"); - continue; - } - Err(err) => { - let err = eyre!(err).wrap_err("failed to receive incoming event"); - tracing::warn!("{err:?}"); - continue; - } - }; - for event in events { - let drop_token = match &event { - NodeEvent::Input { - data: Some(data), .. - } => data.drop_token(), - NodeEvent::Stop - | NodeEvent::InputClosed { .. } - | NodeEvent::Input { data: None, .. } => None, - }; - - let (drop_tx, drop_rx) = std::sync::mpsc::channel(); - match tx.send(EventItem::NodeEvent { - event, - ack_channel: drop_tx, - }) { - Ok(()) => {} - Err(_) => { - // receiving end of channel was closed - break 'outer Ok(()); - } - } - - let timeout = Duration::from_secs(30); - match drop_rx.recv_timeout(timeout) { - Ok(()) => { - break 'outer Err(eyre!( - "Node API should not send anything on ACK channel" - )) - } - Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { - tracing::warn!("timeout: event was not dropped after {timeout:?}"); - } - Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {} // expected result - } - - if let Some(token) = drop_token { - drop_tokens.push(token); - } - } - }; - if let Err(err) = result { - if let Err(flume::SendError(item)) = tx.send(EventItem::FatalError(err)) { - let err = match item { - EventItem::FatalError(err) => err, - _ => unreachable!(), - }; - tracing::error!("failed to report fatal EventStream error: {err:?}"); - } - } - }); - - let thread_handle = EventStreamThreadHandle::new(join_handle); - - Ok(( - EventStream { - receiver: rx, - _thread_handle: thread_handle.clone(), - }, - thread_handle, - )) - } - - pub fn recv(&mut self) -> Option { - let event = self.receiver.recv(); - self.recv_common(event) - } - - pub async fn recv_async(&mut self) -> Option { - let event = self.receiver.recv_async().await; - self.recv_common(event) - } - - fn recv_common(&mut self, event: Result) -> Option { - let event = match event { - Ok(event) => event, - Err(flume::RecvError::Disconnected) => { - tracing::info!("event channel disconnected"); - return None; - } - }; - let event = match event { - EventItem::NodeEvent { event, ack_channel } => match event { - NodeEvent::Stop => Event::Stop, - NodeEvent::InputClosed { id } => Event::InputClosed { id }, - NodeEvent::Input { id, metadata, data } => { - let data = data - .map(|data| match data { - dora_core::daemon_messages::InputData::Vec(d) => Ok(Data::Vec(d)), - dora_core::daemon_messages::InputData::SharedMemory(d) => unsafe { - MappedInputData::map(&d.shared_memory_id, d.len).map(|data| { - Data::SharedMemory { - data, - _drop: ack_channel, - } - }) - }, - }) - .transpose(); - match data { - Ok(data) => Event::Input { id, metadata, data }, - Err(err) => Event::Error(format!("{err:?}")), - } - } - }, - EventItem::FatalError(err) => { - Event::Error(format!("fatal event stream error: {err:?}")) - } - }; - - Some(event) - } -} - pub struct MessageSample { pub id: String, } -#[derive(Debug)] -#[non_exhaustive] -pub enum Event<'a> { - Stop, - Input { - id: DataId, - metadata: Metadata<'static>, - data: Option>, - }, - InputClosed { - id: DataId, - }, - Error(String), -} - -pub enum Data<'a> { - Vec(Vec), - SharedMemory { - data: MappedInputData<'a>, - _drop: std::sync::mpsc::Sender<()>, - }, -} - -impl std::ops::Deref for Data<'_> { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - match self { - Data::SharedMemory { data, .. } => data, - Data::Vec(data) => data, - } - } -} - -impl std::fmt::Debug for Data<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Data").finish_non_exhaustive() - } -} - -pub struct MappedInputData<'a> { - memory: Shmem, - len: usize, - _data: PhantomData<&'a [u8]>, -} - -impl MappedInputData<'_> { - unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result { - let memory = ShmemConf::new() - .os_id(shared_memory_id) - .open() - .wrap_err("failed to map shared memory input")?; - Ok(MappedInputData { - memory, - len, - _data: PhantomData, - }) - } -} - -impl std::ops::Deref for MappedInputData<'_> { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - unsafe { &self.memory.as_slice()[..self.len] } - } -} - -struct EventStreamThreadHandle(flume::Receiver>); +pub(crate) struct EventStreamThreadHandle(flume::Receiver>); impl EventStreamThreadHandle { fn new(join_handle: std::thread::JoinHandle<()>) -> Arc { let (tx, rx) = flume::bounded(1); diff --git a/apis/rust/node/src/event.rs b/apis/rust/node/src/event.rs new file mode 100644 index 00000000..96f5460e --- /dev/null +++ b/apis/rust/node/src/event.rs @@ -0,0 +1,73 @@ +use std::marker::PhantomData; + +use dora_core::{config::DataId, message::Metadata}; +use eyre::Context; +use shared_memory::{Shmem, ShmemConf}; + +#[derive(Debug)] +#[non_exhaustive] +pub enum Event<'a> { + Stop, + Input { + id: DataId, + metadata: Metadata<'static>, + data: Option>, + }, + InputClosed { + id: DataId, + }, + Error(String), +} + +pub enum Data<'a> { + Vec(Vec), + SharedMemory { + data: MappedInputData<'a>, + _drop: std::sync::mpsc::Sender<()>, + }, +} + +impl std::ops::Deref for Data<'_> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + match self { + Data::SharedMemory { data, .. } => data, + Data::Vec(data) => data, + } + } +} + +impl std::fmt::Debug for Data<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Data").finish_non_exhaustive() + } +} + +pub struct MappedInputData<'a> { + memory: Shmem, + len: usize, + _data: PhantomData<&'a [u8]>, +} + +impl MappedInputData<'_> { + pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result { + let memory = ShmemConf::new() + .os_id(shared_memory_id) + .open() + .wrap_err("failed to map shared memory input")?; + Ok(MappedInputData { + memory, + len, + _data: PhantomData, + }) + } +} + +impl std::ops::Deref for MappedInputData<'_> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + unsafe { &self.memory.as_slice()[..self.len] } + } +} diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index f2ccb567..53a1749d 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,8 +1,10 @@ -pub use daemon_connection::{Event, EventStream}; +pub use daemon_connection::EventStream; pub use dora_core; pub use dora_core::message::{uhlc, Metadata, MetadataParameters}; +pub use event::{Event, MappedInputData}; pub use flume::Receiver; pub use node::DoraNode; mod daemon_connection; +mod event; mod node; diff --git a/apis/rust/node/src/node.rs b/apis/rust/node/src/node.rs index 1f7988f5..ba3030db 100644 --- a/apis/rust/node/src/node.rs +++ b/apis/rust/node/src/node.rs @@ -7,7 +7,7 @@ use eyre::WrapErr; use shared_memory_server::ShmemConf; use crate::{ - daemon::{ControlChannel, DaemonConnection}, + daemon_connection::{ControlChannel, DaemonConnection}, EventStream, };