From 446fa7f0dcfbd1f01ac6d87deb9a042716c82037 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 5 Apr 2023 12:04:00 +0200 Subject: [PATCH] Wait for remaining drop tokens before breaking from event stream thread There might still be some pending drop tokens after the receiving end of the event stream was closed. So we don't want to break from the receiver thread directly. Instead we keep it running until the control channel signals that it expects no more drop tokens by closing the `finished_drop_tokens` channel. This happens either when all required drop tokens were received, or because of a timeout. --- .../node/src/daemon_connection/event_stream.rs | 15 ++++++++++++--- apis/rust/node/src/node.rs | 4 ++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/apis/rust/node/src/daemon_connection/event_stream.rs b/apis/rust/node/src/daemon_connection/event_stream.rs index 62b8e576..0bbd531c 100644 --- a/apis/rust/node/src/daemon_connection/event_stream.rs +++ b/apis/rust/node/src/daemon_connection/event_stream.rs @@ -176,9 +176,16 @@ fn event_stream_loop( ack_channel: drop_tx, }) { Ok(()) => {} - Err(_) => { - // receiving end of channel was closed - break 'outer Ok(()); + Err(send_error) => { + let event = send_error.into_inner(); + tracing::debug!( + "event channel was closed already, could no forward `{event:?}`" + ); + if finished_drop_tokens.is_disconnected() { + // both the event stream and the dora node were dropped + // -> break from the `event_stream_loop` + break 'outer Ok(()); + } } } @@ -220,6 +227,8 @@ fn event_stream_loop( } } } + +#[derive(Debug)] enum EventItem { NodeEvent { event: NodeEvent, diff --git a/apis/rust/node/src/node.rs b/apis/rust/node/src/node.rs index 998c4e62..6e7ca9d7 100644 --- a/apis/rust/node/src/node.rs +++ b/apis/rust/node/src/node.rs @@ -236,6 +236,10 @@ impl Drop for DoraNode { } } + // close `finished_drop_tokens` to signal event stream thread that no + // more drop tokens are expected + self.finished_drop_tokens = flume::bounded(0).1; + tracing::info!("reporting node stop for node `{}`", self.id); if let Err(err) = self.control_channel.report_stop() { tracing::error!("{err:?}")