Browse Source

Send small messages directly without shared memory

We currently need an additional request to prepare zero-copy shared memory messages. For small messages, it might be faster to avoid this extra round-trip and send the message directly over TCP. This commit implements support for this. Messages smaller than a threshold (currently set to 4096 bytes) are sent via TCP, while larger messages still use shared memory.

This step also enables future optimizations such as queueing output messages in order to improve the throughput.
tags/v0.2.0-candidate
Philipp Oppermann 2 years ago
parent
commit
00421e2bcd
Failed to extract signature
6 changed files with 138 additions and 55 deletions
  1. +29
    -18
      apis/rust/node/src/daemon/mod.rs
  2. +6
    -3
      apis/rust/node/src/lib.rs
  3. +77
    -27
      binaries/daemon/src/lib.rs
  4. +6
    -3
      binaries/daemon/src/listener/mod.rs
  5. +1
    -1
      binaries/daemon/src/shared_mem_handler.rs
  6. +19
    -3
      libraries/core/src/daemon_messages.rs

+ 29
- 18
apis/rust/node/src/daemon/mod.rs View File

@@ -145,16 +145,18 @@ impl ControlChannel {
}
}

pub fn send_empty_message(
pub fn send_message(
&mut self,
output_id: DataId,
metadata: Metadata<'static>,
data: Vec<u8>,
) -> eyre::Result<()> {
let reply = self
.channel
.request(&DaemonRequest::SendEmptyMessage {
.request(&DaemonRequest::SendMessage {
output_id,
metadata,
data,
})
.wrap_err("failed to send SendEmptyMessage request to dora-daemon")?;
match reply {
@@ -275,7 +277,7 @@ impl EventStream {
let drop_token = match &event {
NodeEvent::Input {
data: Some(data), ..
} => Some(data.drop_token.clone()),
} => data.drop_token(),
NodeEvent::Stop
| NodeEvent::InputClosed { .. }
| NodeEvent::Input { data: None, .. } => None,
@@ -350,18 +352,21 @@ impl EventStream {
NodeEvent::Stop => Event::Stop,
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let mapped = data
.map(|d| unsafe { MappedInputData::map(&d.shared_memory_id, d.len) })
let data = data
.map(|data| match data {
dora_core::daemon_messages::InputData::Vec(d) => Ok(Data::Vec(d)),
dora_core::daemon_messages::InputData::SharedMemory(d) => unsafe {
MappedInputData::map(&d.shared_memory_id, d.len).map(|data| {
Data::SharedMemory {
data,
_drop: ack_channel,
}
})
},
})
.transpose();
match mapped {
Ok(mapped) => Event::Input {
id,
metadata,
data: mapped.map(|data| Data {
data,
_drop: ack_channel,
}),
},
match data {
Ok(data) => Event::Input { id, metadata, data },
Err(err) => Event::Error(format!("{err:?}")),
}
}
@@ -394,16 +399,22 @@ pub enum Event<'a> {
Error(String),
}

pub struct Data<'a> {
data: MappedInputData<'a>,
_drop: std::sync::mpsc::Sender<()>,
pub enum Data<'a> {
Vec(Vec<u8>),
SharedMemory {
data: MappedInputData<'a>,
_drop: std::sync::mpsc::Sender<()>,
},
}

impl std::ops::Deref for Data<'_> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
&self.data
match self {
Data::SharedMemory { data, .. } => data,
Data::Vec(data) => data,
}
}
}



+ 6
- 3
apis/rust/node/src/lib.rs View File

@@ -12,6 +12,8 @@ use shared_memory_server::ShmemConf;

mod daemon;

const ZERO_COPY_THRESHOLD: usize = 4096;

pub struct DoraNode {
id: NodeId,
node_config: NodeRunConfig,
@@ -70,7 +72,7 @@ impl DoraNode {
}
let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned());

if data_len > 0 {
if data_len >= ZERO_COPY_THRESHOLD {
let sample = self
.control_channel
.prepare_message(output_id.clone(), metadata, data_len)
@@ -88,9 +90,10 @@ impl DoraNode {
.send_prepared_message(sample)
.wrap_err_with(|| format!("failed to send data for output {output_id}"))?;
} else {
data(&mut []);
let mut buffer = vec![0; data_len];
data(&mut buffer);
self.control_channel
.send_empty_message(output_id.clone(), metadata)
.send_message(output_id.clone(), metadata, buffer)
.wrap_err_with(|| format!("failed to send output {output_id}"))?;
}



+ 77
- 27
binaries/daemon/src/lib.rs View File

@@ -687,20 +687,35 @@ impl Daemon {
let mut drop_tokens = Vec::new();
for (receiver_id, input_id) in local_receivers {
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let drop_token = DropToken::generate();
let send_result = channel.send_async(daemon_messages::NodeEvent::Input {
let mut drop_token = None;
let item = daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: data.as_ref().map(|data| daemon_messages::InputData {
shared_memory_id: data.get_os_id().to_owned(),
len: data.len(),
drop_token: drop_token.clone(),
}),
});
data: match &data {
Data::None => None,
Data::SharedMemory(data) => {
let token = DropToken::generate();
drop_token = Some(token);
Some(daemon_messages::InputData::SharedMemory(
daemon_messages::SharedMemoryInput {
shared_memory_id: data.get_os_id().to_owned(),
len: data.len(),
drop_token: token,
},
))
}
Data::Vec(data) => {
Some(daemon_messages::InputData::Vec(data.clone()))
}
},
};
let send_result = channel.send_async(item);

match timeout(Duration::from_millis(10), send_result).await {
Ok(Ok(())) => {
drop_tokens.push(drop_token);
if let Some(token) = drop_token {
drop_tokens.push(token);
}
}
Ok(Err(_)) => {
closed.push(receiver_id);
@@ -716,25 +731,32 @@ impl Daemon {
for id in closed {
dataflow.subscribe_channels.remove(id);
}
let data_bytes = data.as_ref().map(|d| unsafe { d.as_slice() }.to_owned());

// report drop tokens to shared memory handler
if let Some(data) = data {
if let Err(err) = self
.shared_memory_handler
.send_async(shared_mem_handler::DaemonEvent::SentOut {
data: *data,
drop_tokens,
})
.await
.wrap_err("shared mem handler crashed after send out")
{
tracing::error!("{err:?}");

let data_bytes = match data {
Data::SharedMemory(data) => {
let bytes = unsafe { data.as_slice() }.to_owned();

// report drop tokens to shared memory handler
let send_result = self
.shared_memory_handler
.send_async(shared_mem_handler::DaemonEvent::SentOut {
data: *data,
drop_tokens,
})
.await;
if let Err(err) =
send_result.wrap_err("shared mem handler crashed after send out")
{
tracing::error!("{err:?}");
}

bytes
}
}
Data::Vec(data) => data,
Data::None => Vec::new(),
};

// TODO send `data` via network to all remove receivers
if let Some(data) = data_bytes {}
}
ShmemHandlerEvent::HandlerError(err) => {
bail!(err.wrap_err("shared memory handler failed"))
@@ -882,7 +904,7 @@ pub enum ShmemHandlerEvent {
node_id: NodeId,
output_id: DataId,
metadata: dora_core::message::Metadata<'static>,
data: Option<Box<SharedMemSample>>,
data: Data,
},
HandlerError(eyre::ErrReport),
}
@@ -902,7 +924,14 @@ impl fmt::Debug for ShmemHandlerEvent {
.field("node_id", node_id)
.field("output_id", output_id)
.field("metadata", metadata)
.field("data", &data.as_ref().map(|_| "Some(..)").unwrap_or("None"))
.field(
"data",
match &data {
Data::None => &"None",
Data::SharedMemory(_) => &"SharedMemory(..)",
Data::Vec(_) => &"Vec(..)",
},
)
.finish(),
ShmemHandlerEvent::HandlerError(err) => {
f.debug_tuple("HandlerError").field(err).finish()
@@ -911,6 +940,27 @@ impl fmt::Debug for ShmemHandlerEvent {
}
}

pub enum Data {
None,
SharedMemory(Box<SharedMemSample>),
Vec(Vec<u8>),
}

impl From<Option<Box<SharedMemSample>>> for Data {
fn from(data: Option<Box<SharedMemSample>>) -> Self {
match data {
Some(data) => Self::SharedMemory(data),
None => Self::None,
}
}
}

impl From<Vec<u8>> for Data {
fn from(data: Vec<u8>) -> Self {
Self::Vec(data)
}
}

#[derive(Debug)]
pub enum DoraEvent {
Timer {


+ 6
- 3
binaries/daemon/src/listener/mod.rs View File

@@ -241,7 +241,9 @@ where
data: Some(data), ..
}) = self.queue.remove(index)
{
drop_tokens.push(data.drop_token);
if let Some(drop_token) = data.drop_token() {
drop_tokens.push(drop_token);
}
}
}
self.report_drop_tokens(drop_tokens).await?;
@@ -296,9 +298,10 @@ where
)
.await?;
}
DaemonRequest::SendEmptyMessage {
DaemonRequest::SendMessage {
output_id,
metadata,
data,
} => {
// let elapsed = metadata.timestamp().get_time().to_system_time().elapsed()?;
// tracing::debug!("listener SendEmptyMessage: {elapsed:?}");
@@ -307,7 +310,7 @@ where
node_id: self.node_id.clone(),
output_id,
metadata,
data: None,
data: data.into(),
});
let result = self
.send_daemon_event(event)


+ 1
- 1
binaries/daemon/src/shared_mem_handler.rs View File

@@ -181,7 +181,7 @@ impl SharedMemHandler {
node_id,
output_id,
metadata,
data,
data: data.into(),
})
.await;
let _ = reply_sender.send(DaemonReply::Result(


+ 19
- 3
libraries/core/src/daemon_messages.rs View File

@@ -47,9 +47,10 @@ pub enum DaemonRequest {
SendPreparedMessage {
id: SharedMemoryId,
},
SendEmptyMessage {
SendMessage {
output_id: DataId,
metadata: Metadata<'static>,
data: Vec<u8>,
},
CloseOutputs(Vec<DataId>),
Stopped,
@@ -87,7 +88,7 @@ pub struct DropEvent {
}

#[derive(
Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize,
)]
pub struct DropToken(Uuid);

@@ -98,7 +99,22 @@ impl DropToken {
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct InputData {
pub enum InputData {
SharedMemory(SharedMemoryInput),
Vec(Vec<u8>),
}

impl InputData {
pub fn drop_token(&self) -> Option<DropToken> {
match self {
InputData::SharedMemory(data) => Some(data.drop_token),
InputData::Vec(_) => None,
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SharedMemoryInput {
pub shared_memory_id: SharedMemoryId,
pub len: usize,
pub drop_token: DropToken,


Loading…
Cancel
Save