diff --git a/apis/rust/node/src/daemon_connection/event_stream.rs b/apis/rust/node/src/daemon_connection/event_stream.rs index 62b8e576..0bbd531c 100644 --- a/apis/rust/node/src/daemon_connection/event_stream.rs +++ b/apis/rust/node/src/daemon_connection/event_stream.rs @@ -176,9 +176,16 @@ fn event_stream_loop( ack_channel: drop_tx, }) { Ok(()) => {} - Err(_) => { - // receiving end of channel was closed - break 'outer Ok(()); + Err(send_error) => { + let event = send_error.into_inner(); + tracing::debug!( + "event channel was closed already, could no forward `{event:?}`" + ); + if finished_drop_tokens.is_disconnected() { + // both the event stream and the dora node were dropped + // -> break from the `event_stream_loop` + break 'outer Ok(()); + } } } @@ -220,6 +227,8 @@ fn event_stream_loop( } } } + +#[derive(Debug)] enum EventItem { NodeEvent { event: NodeEvent, diff --git a/apis/rust/node/src/node.rs b/apis/rust/node/src/node.rs index 998c4e62..6e7ca9d7 100644 --- a/apis/rust/node/src/node.rs +++ b/apis/rust/node/src/node.rs @@ -236,6 +236,10 @@ impl Drop for DoraNode { } } + // close `finished_drop_tokens` to signal event stream thread that no + // more drop tokens are expected + self.finished_drop_tokens = flume::bounded(0).1; + tracing::info!("reporting node stop for node `{}`", self.id); if let Err(err) = self.control_channel.report_stop() { tracing::error!("{err:?}")