Browse Source

Don't send replies for `SendMessage` requests when using TCP

Allows queueing of multiple messages on the TCP socket, which considerably improves throughput.
tags/v0.2.0-candidate
Philipp Oppermann 2 years ago
parent
commit
9bb57ee531
Failed to extract signature
5 changed files with 34 additions and 13 deletions
  1. +9
    -10
      apis/rust/node/src/daemon/mod.rs
  2. +6
    -2
      apis/rust/node/src/daemon/tcp.rs
  3. +4
    -1
      binaries/daemon/src/listener/mod.rs
  4. +4
    -0
      binaries/daemon/src/listener/tcp.rs
  5. +11
    -0
      libraries/core/src/daemon_messages.rs

+ 9
- 10
apis/rust/node/src/daemon/mod.rs View File

@@ -151,19 +151,18 @@ impl ControlChannel {
metadata: Metadata<'static>,
data: Vec<u8>,
) -> eyre::Result<()> {
let request = DaemonRequest::SendMessage {
output_id,
metadata,
data,
};
let reply = self
.channel
.request(&DaemonRequest::SendMessage {
output_id,
metadata,
data,
})
.wrap_err("failed to send SendEmptyMessage request to dora-daemon")?;
.request(&request)
.wrap_err("failed to send SendMessage request to dora-daemon")?;
match reply {
dora_core::daemon_messages::DaemonReply::Result(result) => {
result.map_err(|err| eyre!(err))
}
other => bail!("unexpected SendEmptyMessage reply: {other:?}"),
dora_core::daemon_messages::DaemonReply::Empty => Ok(()),
other => bail!("unexpected SendMessage reply: {other:?}"),
}
}
}


+ 6
- 2
apis/rust/node/src/daemon/tcp.rs View File

@@ -7,8 +7,12 @@ use std::{

pub fn request(connection: &mut TcpStream, request: &DaemonRequest) -> eyre::Result<DaemonReply> {
send_message(connection, request)?;
receive_reply(connection)
.and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly")))
if request.expects_tcp_reply() {
receive_reply(connection)
.and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly")))
} else {
Ok(DaemonReply::Empty)
}
}

fn send_message(connection: &mut TcpStream, message: &DaemonRequest) -> eyre::Result<()> {


+ 4
- 1
binaries/daemon/src/listener/mod.rs View File

@@ -316,7 +316,10 @@ where
.send_daemon_event(event)
.await
.map_err(|_| "failed to receive send_empty_message reply".to_owned());
self.send_reply(DaemonReply::Result(result))
if let Err(err) = result {
tracing::warn!("{err:?}");
}
self.send_reply(DaemonReply::Empty)
.await
.wrap_err("failed to send SendEmptyMessage reply")?;
}


+ 4
- 0
binaries/daemon/src/listener/tcp.rs View File

@@ -75,6 +75,10 @@ impl super::Connection for TcpConnection {
}

async fn send_reply(&mut self, message: DaemonReply) -> eyre::Result<()> {
if matches!(message, DaemonReply::Empty) {
// don't send empty replies
return Ok(());
}
let serialized =
bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?;
tcp_send(&mut self.0, &serialized)


+ 11
- 0
libraries/core/src/daemon_messages.rs View File

@@ -59,6 +59,16 @@ pub enum DaemonRequest {
},
}

impl DaemonRequest {
pub fn expects_tcp_reply(&self) -> bool {
#[allow(clippy::match_like_matches_macro)]
match self {
DaemonRequest::SendMessage { .. } => false,
_ => true,
}
}
}

type SharedMemoryId = String;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
@@ -66,6 +76,7 @@ pub enum DaemonReply {
Result(Result<(), String>),
PreparedMessage { shared_memory_id: SharedMemoryId },
NextEvents(Vec<NodeEvent>),
Empty,
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]


Loading…
Cancel
Save