From d7cd3708a716c86fb7860d7a99cb1aa65322afb8 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Sat, 15 Apr 2023 13:02:52 +0200 Subject: [PATCH] Add `allocate_data_sample` and `send_output_sample` methods to Rust node API This provides an alternative way to send outputs with less lifetime restrictions than the closure-based interface. --- apis/rust/node/src/node/mod.rs | 107 ++++++++++++++++++++++++++------- 1 file changed, 85 insertions(+), 22 deletions(-) diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 86093ea6..7da86be4 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -84,6 +84,23 @@ impl DoraNode { where F: FnOnce(&mut [u8]), { + let sample = if data_len > 0 { + let mut sample = self.allocate_data_sample(data_len)?; + data(&mut sample); + Some(sample) + } else { + None + }; + + self.send_output_sample(output_id, parameters, sample) + } + + pub fn send_output_sample( + &mut self, + output_id: DataId, + parameters: MetadataParameters, + sample: Option, + ) -> eyre::Result<()> { self.handle_finished_drop_tokens()?; if !self.node_config.outputs.contains(&output_id) { @@ -91,28 +108,9 @@ impl DoraNode { } let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned()); - let (data, shmem) = if data_len >= ZERO_COPY_THRESHOLD { - // create shared memory region - let mut shared_memory = self.allocate_shared_memory(data_len)?; - - // fill in the data - let raw = unsafe { shared_memory.as_slice_mut() }; - data(&mut raw[..data_len]); - - let drop_token = DropToken::generate(); - let data = Data::SharedMemory { - shared_memory_id: shared_memory.get_os_id().to_owned(), - len: data_len, - drop_token, - }; - (Some(data), Some((shared_memory, drop_token))) - } else if data_len == 0 { - data(&mut []); - (None, None) - } else { - let mut buffer = vec![0; data_len]; - data(&mut buffer); - (Some(Data::Vec(buffer)), None) + let (data, shmem) = match sample { + Some(sample) => sample.finalize(), + None => (None, None), }; self.control_channel @@ -149,6 +147,22 @@ impl DoraNode { &self.node_config } + pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result { + let data = if data_len >= ZERO_COPY_THRESHOLD { + // create shared memory region + let shared_memory = self.allocate_shared_memory(data_len)?; + + DataSampleInner::Shmem(shared_memory) + } else { + DataSampleInner::Vec(vec![0; data_len]) + }; + + Ok(DataSample { + inner: data, + len: data_len, + }) + } + fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result { let cache_index = self .cache @@ -254,6 +268,55 @@ impl Drop for DoraNode { } } +pub struct DataSample { + inner: DataSampleInner, + len: usize, +} + +impl DataSample { + fn finalize(self) -> (Option, Option<(ShmemHandle, DropToken)>) { + match self.inner { + DataSampleInner::Shmem(shared_memory) => { + let drop_token = DropToken::generate(); + let data = Data::SharedMemory { + shared_memory_id: shared_memory.get_os_id().to_owned(), + len: self.len, + drop_token, + }; + (Some(data), Some((shared_memory, drop_token))) + } + DataSampleInner::Vec(buffer) => (Some(Data::Vec(buffer)), None), + } + } +} + +impl Deref for DataSample { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + let slice = match &self.inner { + DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() }, + DataSampleInner::Vec(data) => data, + }; + &slice[..self.len] + } +} + +impl DerefMut for DataSample { + fn deref_mut(&mut self) -> &mut Self::Target { + let slice = match &mut self.inner { + DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() }, + DataSampleInner::Vec(data) => data, + }; + &mut slice[..self.len] + } +} + +enum DataSampleInner { + Shmem(ShmemHandle), + Vec(Vec), +} + struct ShmemHandle(Box); impl Deref for ShmemHandle {