Browse Source

Wait for remaining drop tokens before breaking from event stream thread

There might still be some pending drop tokens after the receiving end of the event stream was closed. So we don't want to break from the receiver thread directly. Instead we keep it running until the control channel signals that it expects no more drop tokens by closing the `finished_drop_tokens` channel. This happens either when all required drop tokens were received, or because of a timeout.
tags/v0.2.3-rc
Philipp Oppermann 3 years ago
parent
commit
446fa7f0dc
Failed to extract signature
2 changed files with 16 additions and 3 deletions
  1. +12
    -3
      apis/rust/node/src/daemon_connection/event_stream.rs
  2. +4
    -0
      apis/rust/node/src/node.rs

+ 12
- 3
apis/rust/node/src/daemon_connection/event_stream.rs View File

@@ -176,9 +176,16 @@ fn event_stream_loop(
ack_channel: drop_tx,
}) {
Ok(()) => {}
Err(_) => {
// receiving end of channel was closed
break 'outer Ok(());
Err(send_error) => {
let event = send_error.into_inner();
tracing::debug!(
"event channel was closed already, could no forward `{event:?}`"
);
if finished_drop_tokens.is_disconnected() {
// both the event stream and the dora node were dropped
// -> break from the `event_stream_loop`
break 'outer Ok(());
}
}
}

@@ -220,6 +227,8 @@ fn event_stream_loop(
}
}
}

#[derive(Debug)]
enum EventItem {
NodeEvent {
event: NodeEvent,


+ 4
- 0
apis/rust/node/src/node.rs View File

@@ -236,6 +236,10 @@ impl Drop for DoraNode {
}
}

// close `finished_drop_tokens` to signal event stream thread that no
// more drop tokens are expected
self.finished_drop_tokens = flume::bounded(0).1;

tracing::info!("reporting node stop for node `{}`", self.id);
if let Err(err) = self.control_channel.report_stop() {
tracing::error!("{err:?}")


Loading…
Cancel
Save