diff --git a/apis/rust/node/src/daemon/mod.rs b/apis/rust/node/src/daemon/mod.rs index 799848f7..39acf0fa 100644 --- a/apis/rust/node/src/daemon/mod.rs +++ b/apis/rust/node/src/daemon/mod.rs @@ -12,7 +12,6 @@ mod tcp; pub(crate) struct DaemonConnection { pub control_channel: ControlChannel, pub event_stream: EventStream, - pub(crate) event_stream_thread: JoinHandle<()>, } impl DaemonConnection { @@ -46,13 +45,12 @@ impl DaemonConnection { let control_channel = ControlChannel::init(dataflow_id, node_id, control) .wrap_err("failed to init control stream")?; - let (event_stream, event_stream_thread) = EventStream::init(dataflow_id, node_id, events) + let event_stream = EventStream::init(dataflow_id, node_id, events) .wrap_err("failed to init event stream")?; Ok(Self { control_channel, event_stream, - event_stream_thread, }) } } @@ -226,7 +224,7 @@ impl EventStream { dataflow_id: DataflowId, node_id: &NodeId, mut channel: DaemonChannel, - ) -> eyre::Result<(Self, JoinHandle<()>)> { + ) -> eyre::Result { register(dataflow_id, node_id.clone(), &mut channel)?; channel @@ -288,7 +286,7 @@ impl EventStream { } }); - Ok((EventStream { receiver: rx }, thread)) + Ok(EventStream { receiver: rx }) } pub fn recv(&mut self) -> Option { diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 32a57095..4304215c 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,5 +1,3 @@ -use std::thread::JoinHandle; - use daemon::{ControlChannel, DaemonConnection}; pub use daemon::{Event, EventStream}; pub use dora_core; @@ -19,7 +17,6 @@ pub struct DoraNode { node_config: NodeRunConfig, control_channel: ControlChannel, hlc: uhlc::HLC, - event_stream_thread: Option>, } impl DoraNode { @@ -46,7 +43,6 @@ impl DoraNode { let DaemonConnection { control_channel, event_stream, - event_stream_thread, } = DaemonConnection::init(dataflow_id, &node_id, &daemon_communication) .wrap_err("failed to connect to dora-daemon")?; @@ -55,7 +51,6 @@ impl DoraNode { node_config: run_config, control_channel, hlc: uhlc::HLC::default(), - event_stream_thread: Some(event_stream_thread), }; Ok((node, event_stream)) } @@ -128,15 +123,9 @@ impl DoraNode { impl Drop for DoraNode { #[tracing::instrument(skip(self), fields(self.id = %self.id))] fn drop(&mut self) { - match self.control_channel.report_stop() { - Ok(()) => { - if let Some(thread) = self.event_stream_thread.take() { - if let Err(panic) = thread.join() { - std::panic::resume_unwind(panic); - } - } - } - Err(err) => tracing::error!("{err:?}"), + tracing::info!("reporting node stop for node `{}`", self.id); + if let Err(err) = self.control_channel.report_stop() { + tracing::error!("{err:?}") } } }