Browse Source

Fix: Don't wait on event stream thread on drop, as it might be dropped later

It is not guaranteed that the `EventStream` is dropped before the `DoraNode`. If it is dropped later on the same thread, this `join` leads to a deadlock.
tags/v0.2.0-candidate
Philipp Oppermann 2 years ago
parent
commit
2ba397c644
Failed to extract signature
2 changed files with 6 additions and 19 deletions
  1. +3
    -5
      apis/rust/node/src/daemon/mod.rs
  2. +3
    -14
      apis/rust/node/src/lib.rs

+ 3
- 5
apis/rust/node/src/daemon/mod.rs View File

@@ -12,7 +12,6 @@ mod tcp;
pub(crate) struct DaemonConnection {
pub control_channel: ControlChannel,
pub event_stream: EventStream,
pub(crate) event_stream_thread: JoinHandle<()>,
}

impl DaemonConnection {
@@ -46,13 +45,12 @@ impl DaemonConnection {
let control_channel = ControlChannel::init(dataflow_id, node_id, control)
.wrap_err("failed to init control stream")?;

let (event_stream, event_stream_thread) = EventStream::init(dataflow_id, node_id, events)
let event_stream = EventStream::init(dataflow_id, node_id, events)
.wrap_err("failed to init event stream")?;

Ok(Self {
control_channel,
event_stream,
event_stream_thread,
})
}
}
@@ -226,7 +224,7 @@ impl EventStream {
dataflow_id: DataflowId,
node_id: &NodeId,
mut channel: DaemonChannel,
) -> eyre::Result<(Self, JoinHandle<()>)> {
) -> eyre::Result<Self> {
register(dataflow_id, node_id.clone(), &mut channel)?;

channel
@@ -288,7 +286,7 @@ impl EventStream {
}
});

Ok((EventStream { receiver: rx }, thread))
Ok(EventStream { receiver: rx })
}

pub fn recv(&mut self) -> Option<Event> {


+ 3
- 14
apis/rust/node/src/lib.rs View File

@@ -1,5 +1,3 @@
use std::thread::JoinHandle;

use daemon::{ControlChannel, DaemonConnection};
pub use daemon::{Event, EventStream};
pub use dora_core;
@@ -19,7 +17,6 @@ pub struct DoraNode {
node_config: NodeRunConfig,
control_channel: ControlChannel,
hlc: uhlc::HLC,
event_stream_thread: Option<JoinHandle<()>>,
}

impl DoraNode {
@@ -46,7 +43,6 @@ impl DoraNode {
let DaemonConnection {
control_channel,
event_stream,
event_stream_thread,
} = DaemonConnection::init(dataflow_id, &node_id, &daemon_communication)
.wrap_err("failed to connect to dora-daemon")?;

@@ -55,7 +51,6 @@ impl DoraNode {
node_config: run_config,
control_channel,
hlc: uhlc::HLC::default(),
event_stream_thread: Some(event_stream_thread),
};
Ok((node, event_stream))
}
@@ -128,15 +123,9 @@ impl DoraNode {
impl Drop for DoraNode {
#[tracing::instrument(skip(self), fields(self.id = %self.id))]
fn drop(&mut self) {
match self.control_channel.report_stop() {
Ok(()) => {
if let Some(thread) = self.event_stream_thread.take() {
if let Err(panic) = thread.join() {
std::panic::resume_unwind(panic);
}
}
}
Err(err) => tracing::error!("{err:?}"),
tracing::info!("reporting node stop for node `{}`", self.id);
if let Err(err) = self.control_channel.report_stop() {
tracing::error!("{err:?}")
}
}
}


Loading…
Cancel
Save