|
|
@@ -84,6 +84,23 @@ impl DoraNode { |
|
|
where |
|
|
where |
|
|
F: FnOnce(&mut [u8]), |
|
|
F: FnOnce(&mut [u8]), |
|
|
{ |
|
|
{ |
|
|
|
|
|
let sample = if data_len > 0 { |
|
|
|
|
|
let mut sample = self.allocate_data_sample(data_len)?; |
|
|
|
|
|
data(&mut sample); |
|
|
|
|
|
Some(sample) |
|
|
|
|
|
} else { |
|
|
|
|
|
None |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
self.send_output_sample(output_id, parameters, sample) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub fn send_output_sample( |
|
|
|
|
|
&mut self, |
|
|
|
|
|
output_id: DataId, |
|
|
|
|
|
parameters: MetadataParameters, |
|
|
|
|
|
sample: Option<DataSample>, |
|
|
|
|
|
) -> eyre::Result<()> { |
|
|
self.handle_finished_drop_tokens()?; |
|
|
self.handle_finished_drop_tokens()?; |
|
|
|
|
|
|
|
|
if !self.node_config.outputs.contains(&output_id) { |
|
|
if !self.node_config.outputs.contains(&output_id) { |
|
|
@@ -91,28 +108,9 @@ impl DoraNode { |
|
|
} |
|
|
} |
|
|
let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned()); |
|
|
let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned()); |
|
|
|
|
|
|
|
|
let (data, shmem) = if data_len >= ZERO_COPY_THRESHOLD { |
|
|
|
|
|
// create shared memory region |
|
|
|
|
|
let mut shared_memory = self.allocate_shared_memory(data_len)?; |
|
|
|
|
|
|
|
|
|
|
|
// fill in the data |
|
|
|
|
|
let raw = unsafe { shared_memory.as_slice_mut() }; |
|
|
|
|
|
data(&mut raw[..data_len]); |
|
|
|
|
|
|
|
|
|
|
|
let drop_token = DropToken::generate(); |
|
|
|
|
|
let data = Data::SharedMemory { |
|
|
|
|
|
shared_memory_id: shared_memory.get_os_id().to_owned(), |
|
|
|
|
|
len: data_len, |
|
|
|
|
|
drop_token, |
|
|
|
|
|
}; |
|
|
|
|
|
(Some(data), Some((shared_memory, drop_token))) |
|
|
|
|
|
} else if data_len == 0 { |
|
|
|
|
|
data(&mut []); |
|
|
|
|
|
(None, None) |
|
|
|
|
|
} else { |
|
|
|
|
|
let mut buffer = vec![0; data_len]; |
|
|
|
|
|
data(&mut buffer); |
|
|
|
|
|
(Some(Data::Vec(buffer)), None) |
|
|
|
|
|
|
|
|
let (data, shmem) = match sample { |
|
|
|
|
|
Some(sample) => sample.finalize(), |
|
|
|
|
|
None => (None, None), |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
self.control_channel |
|
|
self.control_channel |
|
|
@@ -149,6 +147,22 @@ impl DoraNode { |
|
|
&self.node_config |
|
|
&self.node_config |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> { |
|
|
|
|
|
let data = if data_len >= ZERO_COPY_THRESHOLD { |
|
|
|
|
|
// create shared memory region |
|
|
|
|
|
let shared_memory = self.allocate_shared_memory(data_len)?; |
|
|
|
|
|
|
|
|
|
|
|
DataSampleInner::Shmem(shared_memory) |
|
|
|
|
|
} else { |
|
|
|
|
|
DataSampleInner::Vec(vec![0; data_len]) |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
Ok(DataSample { |
|
|
|
|
|
inner: data, |
|
|
|
|
|
len: data_len, |
|
|
|
|
|
}) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> { |
|
|
fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> { |
|
|
let cache_index = self |
|
|
let cache_index = self |
|
|
.cache |
|
|
.cache |
|
|
@@ -254,6 +268,55 @@ impl Drop for DoraNode { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub struct DataSample { |
|
|
|
|
|
inner: DataSampleInner, |
|
|
|
|
|
len: usize, |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl DataSample { |
|
|
|
|
|
fn finalize(self) -> (Option<Data>, Option<(ShmemHandle, DropToken)>) { |
|
|
|
|
|
match self.inner { |
|
|
|
|
|
DataSampleInner::Shmem(shared_memory) => { |
|
|
|
|
|
let drop_token = DropToken::generate(); |
|
|
|
|
|
let data = Data::SharedMemory { |
|
|
|
|
|
shared_memory_id: shared_memory.get_os_id().to_owned(), |
|
|
|
|
|
len: self.len, |
|
|
|
|
|
drop_token, |
|
|
|
|
|
}; |
|
|
|
|
|
(Some(data), Some((shared_memory, drop_token))) |
|
|
|
|
|
} |
|
|
|
|
|
DataSampleInner::Vec(buffer) => (Some(Data::Vec(buffer)), None), |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl Deref for DataSample { |
|
|
|
|
|
type Target = [u8]; |
|
|
|
|
|
|
|
|
|
|
|
fn deref(&self) -> &Self::Target { |
|
|
|
|
|
let slice = match &self.inner { |
|
|
|
|
|
DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() }, |
|
|
|
|
|
DataSampleInner::Vec(data) => data, |
|
|
|
|
|
}; |
|
|
|
|
|
&slice[..self.len] |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl DerefMut for DataSample { |
|
|
|
|
|
fn deref_mut(&mut self) -> &mut Self::Target { |
|
|
|
|
|
let slice = match &mut self.inner { |
|
|
|
|
|
DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() }, |
|
|
|
|
|
DataSampleInner::Vec(data) => data, |
|
|
|
|
|
}; |
|
|
|
|
|
&mut slice[..self.len] |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
enum DataSampleInner { |
|
|
|
|
|
Shmem(ShmemHandle), |
|
|
|
|
|
Vec(Vec<u8>), |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
struct ShmemHandle(Box<Shmem>); |
|
|
struct ShmemHandle(Box<Shmem>); |
|
|
|
|
|
|
|
|
impl Deref for ShmemHandle { |
|
|
impl Deref for ShmemHandle { |
|
|
|