diff --git a/apis/rust/node/src/daemon/mod.rs b/apis/rust/node/src/daemon/mod.rs index 4e43098b..cf2afbbc 100644 --- a/apis/rust/node/src/daemon/mod.rs +++ b/apis/rust/node/src/daemon/mod.rs @@ -151,19 +151,18 @@ impl ControlChannel { metadata: Metadata<'static>, data: Vec, ) -> eyre::Result<()> { + let request = DaemonRequest::SendMessage { + output_id, + metadata, + data, + }; let reply = self .channel - .request(&DaemonRequest::SendMessage { - output_id, - metadata, - data, - }) - .wrap_err("failed to send SendEmptyMessage request to dora-daemon")?; + .request(&request) + .wrap_err("failed to send SendMessage request to dora-daemon")?; match reply { - dora_core::daemon_messages::DaemonReply::Result(result) => { - result.map_err(|err| eyre!(err)) - } - other => bail!("unexpected SendEmptyMessage reply: {other:?}"), + dora_core::daemon_messages::DaemonReply::Empty => Ok(()), + other => bail!("unexpected SendMessage reply: {other:?}"), } } } diff --git a/apis/rust/node/src/daemon/tcp.rs b/apis/rust/node/src/daemon/tcp.rs index 17ed4351..14c1f0b8 100644 --- a/apis/rust/node/src/daemon/tcp.rs +++ b/apis/rust/node/src/daemon/tcp.rs @@ -7,8 +7,12 @@ use std::{ pub fn request(connection: &mut TcpStream, request: &DaemonRequest) -> eyre::Result { send_message(connection, request)?; - receive_reply(connection) - .and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly"))) + if request.expects_tcp_reply() { + receive_reply(connection) + .and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly"))) + } else { + Ok(DaemonReply::Empty) + } } fn send_message(connection: &mut TcpStream, message: &DaemonRequest) -> eyre::Result<()> { diff --git a/binaries/daemon/src/listener/mod.rs b/binaries/daemon/src/listener/mod.rs index 07e47989..f4c9ae6e 100644 --- a/binaries/daemon/src/listener/mod.rs +++ b/binaries/daemon/src/listener/mod.rs @@ -316,7 +316,10 @@ where .send_daemon_event(event) .await .map_err(|_| "failed to receive send_empty_message reply".to_owned()); - self.send_reply(DaemonReply::Result(result)) + if let Err(err) = result { + tracing::warn!("{err:?}"); + } + self.send_reply(DaemonReply::Empty) .await .wrap_err("failed to send SendEmptyMessage reply")?; } diff --git a/binaries/daemon/src/listener/tcp.rs b/binaries/daemon/src/listener/tcp.rs index 88c8aee7..d8de7ed5 100644 --- a/binaries/daemon/src/listener/tcp.rs +++ b/binaries/daemon/src/listener/tcp.rs @@ -75,6 +75,10 @@ impl super::Connection for TcpConnection { } async fn send_reply(&mut self, message: DaemonReply) -> eyre::Result<()> { + if matches!(message, DaemonReply::Empty) { + // don't send empty replies + return Ok(()); + } let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?; tcp_send(&mut self.0, &serialized) diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 1d5747a1..1e78a49a 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -59,6 +59,16 @@ pub enum DaemonRequest { }, } +impl DaemonRequest { + pub fn expects_tcp_reply(&self) -> bool { + #[allow(clippy::match_like_matches_macro)] + match self { + DaemonRequest::SendMessage { .. } => false, + _ => true, + } + } +} + type SharedMemoryId = String; #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -66,6 +76,7 @@ pub enum DaemonReply { Result(Result<(), String>), PreparedMessage { shared_memory_id: SharedMemoryId }, NextEvents(Vec), + Empty, } #[derive(Debug, serde::Serialize, serde::Deserialize)]