Browse Source

Add `allocate_data_sample` and `send_output_sample` methods to Rust node API

This provides an alternative way to send outputs with less lifetime restrictions than the closure-based interface.
tags/v0.2.3-rc
Philipp Oppermann 3 years ago
parent
commit
d7cd3708a7
Failed to extract signature
1 changed files with 85 additions and 22 deletions
  1. +85
    -22
      apis/rust/node/src/node/mod.rs

+ 85
- 22
apis/rust/node/src/node/mod.rs View File

@@ -84,6 +84,23 @@ impl DoraNode {
where
F: FnOnce(&mut [u8]),
{
let sample = if data_len > 0 {
let mut sample = self.allocate_data_sample(data_len)?;
data(&mut sample);
Some(sample)
} else {
None
};

self.send_output_sample(output_id, parameters, sample)
}

pub fn send_output_sample(
&mut self,
output_id: DataId,
parameters: MetadataParameters,
sample: Option<DataSample>,
) -> eyre::Result<()> {
self.handle_finished_drop_tokens()?;

if !self.node_config.outputs.contains(&output_id) {
@@ -91,28 +108,9 @@ impl DoraNode {
}
let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned());

let (data, shmem) = if data_len >= ZERO_COPY_THRESHOLD {
// create shared memory region
let mut shared_memory = self.allocate_shared_memory(data_len)?;

// fill in the data
let raw = unsafe { shared_memory.as_slice_mut() };
data(&mut raw[..data_len]);

let drop_token = DropToken::generate();
let data = Data::SharedMemory {
shared_memory_id: shared_memory.get_os_id().to_owned(),
len: data_len,
drop_token,
};
(Some(data), Some((shared_memory, drop_token)))
} else if data_len == 0 {
data(&mut []);
(None, None)
} else {
let mut buffer = vec![0; data_len];
data(&mut buffer);
(Some(Data::Vec(buffer)), None)
let (data, shmem) = match sample {
Some(sample) => sample.finalize(),
None => (None, None),
};

self.control_channel
@@ -149,6 +147,22 @@ impl DoraNode {
&self.node_config
}

pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
let data = if data_len >= ZERO_COPY_THRESHOLD {
// create shared memory region
let shared_memory = self.allocate_shared_memory(data_len)?;

DataSampleInner::Shmem(shared_memory)
} else {
DataSampleInner::Vec(vec![0; data_len])
};

Ok(DataSample {
inner: data,
len: data_len,
})
}

fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> {
let cache_index = self
.cache
@@ -254,6 +268,55 @@ impl Drop for DoraNode {
}
}

pub struct DataSample {
inner: DataSampleInner,
len: usize,
}

impl DataSample {
fn finalize(self) -> (Option<Data>, Option<(ShmemHandle, DropToken)>) {
match self.inner {
DataSampleInner::Shmem(shared_memory) => {
let drop_token = DropToken::generate();
let data = Data::SharedMemory {
shared_memory_id: shared_memory.get_os_id().to_owned(),
len: self.len,
drop_token,
};
(Some(data), Some((shared_memory, drop_token)))
}
DataSampleInner::Vec(buffer) => (Some(Data::Vec(buffer)), None),
}
}
}

impl Deref for DataSample {
type Target = [u8];

fn deref(&self) -> &Self::Target {
let slice = match &self.inner {
DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() },
DataSampleInner::Vec(data) => data,
};
&slice[..self.len]
}
}

impl DerefMut for DataSample {
fn deref_mut(&mut self) -> &mut Self::Target {
let slice = match &mut self.inner {
DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() },
DataSampleInner::Vec(data) => data,
};
&mut slice[..self.len]
}
}

enum DataSampleInner {
Shmem(ShmemHandle),
Vec(Vec<u8>),
}

struct ShmemHandle(Box<Shmem>);

impl Deref for ShmemHandle {


Loading…
Cancel
Save