From 4d13a29dead3bf8933838bc1115e5058ee659efb Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 1 Mar 2023 14:52:13 +0100 Subject: [PATCH] Improve error messages and fix clippy warnings in daemon --- binaries/daemon/src/lib.rs | 2 +- binaries/daemon/src/listener/mod.rs | 32 +++++++++++++++++------------ 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 244b0122..9a6f66bf 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -127,7 +127,7 @@ impl Daemon { } }); - let (dataflow_errors, _) = future::try_join(run_result, spawn_result).await?; + let (dataflow_errors, ()) = future::try_join(run_result, spawn_result).await?; if dataflow_errors.is_empty() { Ok(()) diff --git a/binaries/daemon/src/listener/mod.rs b/binaries/daemon/src/listener/mod.rs index 82a1f1d9..3bec44ef 100644 --- a/binaries/daemon/src/listener/mod.rs +++ b/binaries/daemon/src/listener/mod.rs @@ -237,13 +237,11 @@ where .unwrap_or_else(|| panic!("no input event found in drop iteration {i}")); // remove that event - if let Some(event) = self.queue.remove(index) { - if let NodeEvent::Input { - data: Some(data), .. - } = event - { - drop_tokens.push(data.drop_token); - } + if let Some(NodeEvent::Input { + data: Some(data), .. + }) = self.queue.remove(index) + { + drop_tokens.push(data.drop_token); } } self.report_drop_tokens(drop_tokens).await?; @@ -255,7 +253,9 @@ where match message { DaemonRequest::Register { .. } => { let reply = DaemonReply::Result(Err("unexpected register message".into())); - self.send_reply(reply).await?; + self.send_reply(reply) + .await + .wrap_err("failed to send register reply")?; } DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped).await?, DaemonRequest::CloseOutputs(outputs) => { @@ -281,7 +281,9 @@ where .await .wrap_err("failed to receive prepare output reply")?; // tracing::debug!("prepare latency: {:?}", start.elapsed()?); - self.send_reply(reply).await?; + self.send_reply(reply) + .await + .wrap_err("failed to send PrepareOutputMessage reply")?; } DaemonRequest::SendPreparedMessage { id } => { let (reply_sender, reply) = oneshot::channel(); @@ -290,7 +292,7 @@ where self.send_reply( reply .await - .wrap_err("failed to receive send output reply")?, + .wrap_err("failed to receive SendPreparedMessage reply")?, ) .await?; } @@ -311,7 +313,9 @@ where .send_daemon_event(event) .await .map_err(|_| "failed to receive send_empty_message reply".to_owned()); - self.send_reply(DaemonReply::Result(result)).await?; + self.send_reply(DaemonReply::Result(result)) + .await + .wrap_err("failed to send SendEmptyMessage reply")?; } DaemonRequest::Subscribe => { let (tx, rx) = flume::bounded(100); @@ -342,7 +346,9 @@ where } }; - self.send_reply(reply).await?; + self.send_reply(reply) + .await + .wrap_err("failed to send NextEvent reply")?; } } Ok(()) @@ -385,7 +391,7 @@ where self.connection .send_reply(reply) .await - .wrap_err("failed to send reply to node") + .wrap_err_with(|| format!("failed to send reply to node `{}`", self.node_id)) } async fn send_shared_memory_event(