From 132957ad34b98c1a4bcf10693d1aa3d6dcc9d1dd Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 28 Apr 2023 13:57:24 +0200 Subject: [PATCH 01/12] Fix: Don't ignore subscribe results --- apis/rust/node/src/event_stream/mod.rs | 10 +++++++++- apis/rust/node/src/node/drop_stream.rs | 12 ++++++++++-- libraries/core/src/daemon_messages.rs | 1 + 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 99f72bfe..c47a5674 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -59,11 +59,19 @@ impl EventStream { mut close_channel: DaemonChannel, ) -> eyre::Result { channel.register(dataflow_id, node_id.clone())?; - channel + let reply = channel .request(&DaemonRequest::Subscribe) .map_err(|e| eyre!(e)) .wrap_err("failed to create subscription with dora-daemon")?; + match reply { + daemon_messages::DaemonReply::Result(Ok(())) => {} + daemon_messages::DaemonReply::Result(Err(err)) => { + eyre::bail!("subscribe failed: {err}") + } + other => eyre::bail!("unexpected subscribe reply: {other:?}"), + } + close_channel.register(dataflow_id, node_id.clone())?; let (tx, rx) = flume::bounded(0); diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index c594a252..405b9d3e 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -4,7 +4,7 @@ use crate::daemon_connection::DaemonChannel; use dora_core::{ config::NodeId, daemon_messages::{ - DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent, + self, DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent, }, }; use eyre::{eyre, Context}; @@ -45,11 +45,19 @@ impl DropStream { ) -> eyre::Result { channel.register(dataflow_id, node_id.clone())?; - channel + let reply = channel .request(&DaemonRequest::SubscribeDrop) .map_err(|e| eyre!(e)) .wrap_err("failed to create subscription with dora-daemon")?; + match reply { + daemon_messages::DaemonReply::Result(Ok(())) => {} + daemon_messages::DaemonReply::Result(Err(err)) => { + eyre::bail!("drop subscribe failed: {err}") + } + other => eyre::bail!("unexpected drop subscribe reply: {other:?}"), + } + let (tx, rx) = flume::bounded(0); let node_id_cloned = node_id.clone(); diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 8035571c..d5f3575f 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -127,6 +127,7 @@ impl fmt::Debug for Data { type SharedMemoryId = String; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[must_use] pub enum DaemonReply { Result(Result<(), String>), PreparedMessage { shared_memory_id: SharedMemoryId }, From e20699de3fb5e6961ade322b93050e23c357bab6 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 28 Apr 2023 14:06:38 +0200 Subject: [PATCH 02/12] Handle node errors during initialization phase We synchronize the start of nodes (even across machines) to avoid missed messages. This led to deadlock issues when nodes exited before they initialized the connection to the dora daemon. The reason was that the other nodes were still waiting for the stopped node to become ready. This commit fixes this issue by properly handling node exits that occur before sending the subscribe message. If nodes exit, they are removed from the pending list and added to a new 'exited before init' list. Once all nodes have subscribed or exited, we answer the pending subscribe requests. If nodes exited before subscribing we send an error reply to the other nodes because a synchronized start is no longer possible. To make this logic work across different machines too, we add a `success` field to the `AllNodesReady` message that the daemon sends to the coordinator. The coordinator forwards this flag to other daemons so that they can act accordingly. --- binaries/coordinator/src/lib.rs | 10 +- binaries/coordinator/src/listener.rs | 10 +- binaries/daemon/src/lib.rs | 184 +++++++++++++-------- binaries/daemon/src/pending.rs | 104 ++++++++++++ libraries/core/src/coordinator_messages.rs | 1 + libraries/core/src/daemon_messages.rs | 1 + 6 files changed, 235 insertions(+), 75 deletions(-) create mode 100644 binaries/daemon/src/pending.rs diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 4a51dce5..73430d93 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -180,15 +180,20 @@ async fn start_inner( } }, Event::Dataflow { uuid, event } => match event { - DataflowEvent::ReadyOnMachine { machine_id } => { + DataflowEvent::ReadyOnMachine { + machine_id, + success, + } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { let dataflow = entry.get_mut(); dataflow.pending_machines.remove(&machine_id); + dataflow.init_success &= success; if dataflow.pending_machines.is_empty() { let message = serde_json::to_vec(&DaemonCoordinatorEvent::AllNodesReady { dataflow_id: uuid, + success: dataflow.init_success, }) .wrap_err("failed to serialize AllNodesReady message")?; @@ -498,6 +503,7 @@ struct RunningDataflow { machines: BTreeSet, /// IDs of machines that are waiting until all nodes are started. pending_machines: BTreeSet, + init_success: bool, } impl PartialEq for RunningDataflow { @@ -602,6 +608,7 @@ async fn start_dataflow( } else { BTreeSet::new() }, + init_success: true, machines, }) } @@ -665,6 +672,7 @@ pub enum DataflowEvent { }, ReadyOnMachine { machine_id: String, + success: bool, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index da4ee624..8ed3b57f 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -60,10 +60,16 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende break; } coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { - coordinator_messages::DaemonEvent::AllNodesReady { dataflow_id } => { + coordinator_messages::DaemonEvent::AllNodesReady { + dataflow_id, + success, + } => { let event = Event::Dataflow { uuid: dataflow_id, - event: DataflowEvent::ReadyOnMachine { machine_id }, + event: DataflowEvent::ReadyOnMachine { + machine_id, + success, + }, }; if events_tx.send(event).await.is_err() { break; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index e43a676c..b1807df7 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -17,8 +17,8 @@ use eyre::{bail, eyre, Context, ContextCompat}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; use inter_daemon::InterDaemonConnection; +use pending::PendingNodes; use shared_memory_server::ShmemConf; -use std::collections::HashSet; use std::{ borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, @@ -37,6 +37,7 @@ use uuid::Uuid; mod coordinator; mod inter_daemon; mod node_communication; +mod pending; mod spawn; mod tcp_utils; @@ -45,6 +46,8 @@ use dora_tracing::telemetry::serialize_context; #[cfg(feature = "telemetry")] use tracing_opentelemetry::OpenTelemetrySpanExt; +use crate::pending::DataflowStatus; + pub struct Daemon { running: HashMap, @@ -289,11 +292,20 @@ impl Daemon { DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}"))); (Some(reply), RunStatus::Continue) } - DaemonCoordinatorEvent::AllNodesReady { dataflow_id } => { + DaemonCoordinatorEvent::AllNodesReady { + dataflow_id, + success, + } => { match self.running.get_mut(&dataflow_id) { Some(dataflow) => { - tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); - dataflow.start(&self.events_tx).await?; + dataflow + .pending_nodes + .handle_external_all_nodes_ready(success) + .await; + if success { + tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); + dataflow.start(&self.events_tx).await?; + } } None => { tracing::warn!( @@ -463,7 +475,7 @@ impl Daemon { .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?; dataflow.running_nodes.insert(node_id); } else { - dataflow.external_nodes.insert(node.id.clone(), node); + dataflow.pending_nodes.set_external_nodes(true); } } @@ -481,39 +493,44 @@ impl Daemon { event_sender, reply_sender, } => { - let result = self - .subscribe(dataflow_id, node_id.clone(), event_sender) - .await; - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { - format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`") - })?; - tracing::debug!("node `{node_id}` is ready"); + let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { + format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") + }); - dataflow - .subscribe_replies - .insert(node_id.clone(), (reply_sender, result)); - dataflow.pending_nodes.remove(&node_id); - if dataflow.pending_nodes.is_empty() { - if dataflow.external_nodes.is_empty() { - tracing::info!("all nodes are ready, starting dataflow `{dataflow_id}`"); - dataflow.start(&self.events_tx).await?; - } else { - tracing::info!( - "all local nodes are ready, waiting for remote nodes \ - for dataflow `{dataflow_id}`" - ); + match dataflow { + Err(err) => { + let _ = reply_sender.send(DaemonReply::Result(Err(err))); + } + Ok(dataflow) => { + tracing::debug!("node `{node_id}` is ready"); + Self::subscribe(dataflow, node_id.clone(), event_sender).await; - // dataflow is split across multiple daemons -> synchronize with dora-coordinator - let Some(connection) = &mut self.coordinator_connection else { - bail!("no coordinator connection to send AllNodesReady"); - }; - let msg = serde_json::to_vec(&CoordinatorRequest::Event { - machine_id: self.machine_id.clone(), - event: DaemonEvent::AllNodesReady { dataflow_id }, - })?; - tcp_send(connection, &msg) - .await - .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; + let status = dataflow + .pending_nodes + .handle_node_subscription(node_id.clone(), reply_sender) + .await?; + match status { + DataflowStatus::AllNodesReady => { + tracing::info!( + "all nodes are ready, starting dataflow `{dataflow_id}`" + ); + dataflow.start(&self.events_tx).await?; + } + DataflowStatus::LocalNodesPending => {} + DataflowStatus::LocalNodesReady { success } => { + tracing::info!( + "all local nodes are ready, waiting for remote nodes" + ); + + Self::report_nodes_ready( + &mut self.coordinator_connection, + self.machine_id.clone(), + dataflow_id, + success, + ) + .await?; + } + } } } } @@ -551,8 +568,17 @@ impl Daemon { let _ = reply_sender.send(DaemonReply::Result(reply)); } DaemonNodeEvent::OutputsDone { reply_sender } => { - let _ = reply_sender.send(DaemonReply::Result(Ok(()))); - self.handle_outputs_done(dataflow_id, &node_id).await?; + let result = match self.running.get_mut(&dataflow_id) { + Some(dataflow) => { + Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, &node_id) + .await + }, + None => Err(eyre!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")), + }; + + let _ = reply_sender.send(DaemonReply::Result( + result.map_err(|err| format!("{err:?}")), + )); } DaemonNodeEvent::SendOut { output_id, @@ -602,6 +628,28 @@ impl Daemon { Ok(()) } + async fn report_nodes_ready( + coordinator_connection: &mut Option, + machine_id: String, + dataflow_id: Uuid, + success: bool, + ) -> Result<(), eyre::ErrReport> { + let Some(connection) = coordinator_connection else { + bail!("no coordinator connection to send AllNodesReady"); + }; + let msg = serde_json::to_vec(&CoordinatorRequest::Event { + machine_id: machine_id.clone(), + event: DaemonEvent::AllNodesReady { + dataflow_id, + success, + }, + })?; + tcp_send(connection, &msg) + .await + .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; + Ok(()) + } + async fn send_reload( &mut self, dataflow_id: Uuid, @@ -670,15 +718,10 @@ impl Daemon { } async fn subscribe( - &mut self, - dataflow_id: Uuid, + dataflow: &mut RunningDataflow, node_id: NodeId, event_sender: UnboundedSender, - ) -> Result<(), String> { - let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { - format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") - })?; - + ) { // some inputs might have been closed already -> report those events let closed_inputs = dataflow .mappings @@ -709,22 +752,17 @@ impl Daemon { } dataflow.subscribe_channels.insert(node_id, event_sender); - - Ok(()) } - #[tracing::instrument(skip(self), level = "trace")] + #[tracing::instrument(skip(dataflow, inter_daemon_connections), fields(uuid = %dataflow.id), level = "trace")] async fn handle_outputs_done( - &mut self, - dataflow_id: Uuid, + dataflow: &mut RunningDataflow, + inter_daemon_connections: &mut BTreeMap, node_id: &NodeId, ) -> eyre::Result<()> { - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { - format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") - })?; send_input_closed_events( dataflow, - &mut self.inter_daemon_connections, + inter_daemon_connections, |OutputId(source_id, _)| source_id == node_id, ) .await?; @@ -732,13 +770,28 @@ impl Daemon { Ok(()) } - #[tracing::instrument(skip(self), level = "trace")] async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> { - self.handle_outputs_done(dataflow_id, node_id).await?; - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") })?; + + tracing::warn!("node `{node_id}` exited before initializing dora connection"); + match dataflow.pending_nodes.handle_node_stop(node_id).await { + DataflowStatus::AllNodesReady => {} + DataflowStatus::LocalNodesPending => {} + DataflowStatus::LocalNodesReady { success } => { + Self::report_nodes_ready( + &mut self.coordinator_connection, + self.machine_id.clone(), + dataflow_id, + success, + ) + .await?; + } + } + + Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, node_id).await?; + dataflow.running_nodes.remove(node_id); if dataflow.running_nodes.is_empty() { tracing::info!( @@ -1050,11 +1103,7 @@ fn close_input(dataflow: &mut RunningDataflow, receiver_id: &NodeId, input_id: & pub struct RunningDataflow { id: Uuid, /// Local nodes that are not started yet - pending_nodes: HashSet, - /// Used to synchronize node starts. - /// - /// Subscribe requests block the node until all other nodes are ready too. - subscribe_replies: HashMap, Result<(), String>)>, + pending_nodes: PendingNodes, subscribe_channels: HashMap>, drop_channels: HashMap>, @@ -1063,7 +1112,6 @@ pub struct RunningDataflow { open_inputs: BTreeMap>, running_nodes: BTreeSet, - external_nodes: BTreeMap, open_external_mappings: HashMap>>, pending_drop_tokens: HashMap, @@ -1082,15 +1130,13 @@ impl RunningDataflow { fn new(id: Uuid) -> RunningDataflow { Self { id, - pending_nodes: HashSet::new(), - subscribe_replies: HashMap::new(), + pending_nodes: PendingNodes::default(), subscribe_channels: HashMap::new(), drop_channels: HashMap::new(), mappings: HashMap::new(), timers: BTreeMap::new(), open_inputs: BTreeMap::new(), running_nodes: BTreeSet::new(), - external_nodes: BTreeMap::new(), open_external_mappings: HashMap::new(), pending_drop_tokens: HashMap::new(), _timer_handles: Vec::new(), @@ -1100,12 +1146,6 @@ impl RunningDataflow { } async fn start(&mut self, events_tx: &mpsc::Sender) -> eyre::Result<()> { - // answer all subscribe requests - let subscribe_replies = std::mem::take(&mut self.subscribe_replies); - for (reply_sender, subscribe_result) in subscribe_replies.into_values() { - let _ = reply_sender.send(DaemonReply::Result(subscribe_result)); - } - for interval in self.timers.keys().copied() { let events_tx = events_tx.clone(); let dataflow_id = self.id; diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs new file mode 100644 index 00000000..8f52c07b --- /dev/null +++ b/binaries/daemon/src/pending.rs @@ -0,0 +1,104 @@ +use std::collections::{HashMap, HashSet}; + +use dora_core::{config::NodeId, daemon_messages::DaemonReply}; +use tokio::sync::oneshot; + +#[derive(Default)] +pub struct PendingNodes { + /// The local nodes that are still waiting to start. + local_nodes: HashSet, + /// Whether there are external nodes for this dataflow. + external_nodes: bool, + + /// Used to synchronize node starts. + /// + /// Subscribe requests block the node until all other nodes are ready too. + waiting_subscribers: HashMap>, + /// List of nodes that finished before connecting to the dora daemon. + /// + /// If this list is non-empty, we should not start the dataflow at all. Instead, + /// we report an error to the other nodes. + exited_before_subscribe: HashSet, +} + +impl PendingNodes { + pub fn insert(&mut self, node_id: NodeId) { + self.local_nodes.insert(node_id); + } + + pub fn set_external_nodes(&mut self, value: bool) { + self.external_nodes = value; + } + + pub async fn handle_node_subscription( + &mut self, + node_id: NodeId, + reply_sender: oneshot::Sender, + ) -> eyre::Result { + self.waiting_subscribers + .insert(node_id.clone(), reply_sender); + + self.local_nodes.remove(&node_id); + if self.local_nodes.is_empty() { + if self.external_nodes { + Ok(DataflowStatus::LocalNodesReady { success: true }) + } else { + self.answer_subscribe_requests(None).await; + Ok(DataflowStatus::AllNodesReady) + } + } else { + Ok(DataflowStatus::LocalNodesPending) + } + } + + pub async fn handle_node_stop(&mut self, node_id: &NodeId) -> DataflowStatus { + self.exited_before_subscribe.insert(node_id.clone()); + + self.local_nodes.remove(node_id); + if self.local_nodes.is_empty() { + if self.external_nodes { + DataflowStatus::LocalNodesReady { success: false } + } else { + self.answer_subscribe_requests(None).await; + DataflowStatus::AllNodesReady + } + } else { + // continue waiting for other nodes + DataflowStatus::LocalNodesPending + } + } + + pub async fn handle_external_all_nodes_ready(&mut self, success: bool) { + let external_error = if success { + None + } else { + Some("some nodes failed to initalize on remote machines".to_string()) + }; + self.answer_subscribe_requests(external_error).await + } + + async fn answer_subscribe_requests(&mut self, external_error: Option) { + let result = if self.exited_before_subscribe.is_empty() { + match external_error { + Some(err) => Err(err), + None => Ok(()), + } + } else { + Err(format!( + "Nodes failed before subscribing: {:?}", + self.exited_before_subscribe + )) + }; + // answer all subscribe requests + let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); + for reply_sender in subscribe_replies.into_values() { + let _ = reply_sender.send(DaemonReply::Result(result.clone())); + } + } +} + +pub enum DataflowStatus { + AllNodesReady, + LocalNodesPending, + LocalNodesReady { success: bool }, +} diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 21ae38a9..b1a6e550 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -19,6 +19,7 @@ pub enum CoordinatorRequest { pub enum DaemonEvent { AllNodesReady { dataflow_id: DataflowId, + success: bool, }, AllNodesFinished { dataflow_id: DataflowId, diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index d5f3575f..9a31a20b 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -201,6 +201,7 @@ pub enum DaemonCoordinatorEvent { Spawn(SpawnDataflowNodes), AllNodesReady { dataflow_id: DataflowId, + success: bool, }, StopDataflow { dataflow_id: DataflowId, From 793e9a7c0f72aec70f525ec57ebd87d6fc9a7c4c Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 28 Apr 2023 15:17:58 +0200 Subject: [PATCH 03/12] Make watchdog messages less strict --- binaries/coordinator/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 73430d93..083a5cbf 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -99,7 +99,7 @@ async fn start_inner( .wrap_err("failed to create control events")?; let daemon_watchdog_interval = - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1))) + tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3))) .map(|_| Event::DaemonWatchdogInterval); // events that should be aborted on `dora destroy` @@ -399,7 +399,7 @@ async fn start_inner( let mut disconnected = BTreeSet::new(); for (machine_id, connection) in &mut daemon_connections { let result: eyre::Result<()> = - tokio::time::timeout(Duration::from_millis(100), send_watchdog_message(&mut connection.stream)) + tokio::time::timeout(Duration::from_millis(500), send_watchdog_message(&mut connection.stream)) .await .wrap_err("timeout") .and_then(|r| r).wrap_err_with(|| From be92d9d43b366a9a34e86556bdc9c3cb8ab5af88 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 3 May 2023 14:58:09 +0200 Subject: [PATCH 04/12] Handle coordinator synchronization message internally in `pending` submodule --- binaries/daemon/src/lib.rs | 70 +++++--------------- binaries/daemon/src/pending.rs | 113 ++++++++++++++++++++++++--------- 2 files changed, 97 insertions(+), 86 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index b1807df7..e4204ecc 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -301,7 +301,7 @@ impl Daemon { dataflow .pending_nodes .handle_external_all_nodes_ready(success) - .await; + .await?; if success { tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); dataflow.start(&self.events_tx).await?; @@ -415,7 +415,7 @@ impl Daemon { nodes: Vec, daemon_communication_config: LocalCommunicationConfig, ) -> eyre::Result<()> { - let dataflow = RunningDataflow::new(dataflow_id); + let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); let dataflow = match self.running.entry(dataflow_id) { std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow), std::collections::hash_map::Entry::Occupied(_) => { @@ -507,7 +507,11 @@ impl Daemon { let status = dataflow .pending_nodes - .handle_node_subscription(node_id.clone(), reply_sender) + .handle_node_subscription( + node_id.clone(), + reply_sender, + &mut self.coordinator_connection, + ) .await?; match status { DataflowStatus::AllNodesReady => { @@ -516,20 +520,7 @@ impl Daemon { ); dataflow.start(&self.events_tx).await?; } - DataflowStatus::LocalNodesPending => {} - DataflowStatus::LocalNodesReady { success } => { - tracing::info!( - "all local nodes are ready, waiting for remote nodes" - ); - - Self::report_nodes_ready( - &mut self.coordinator_connection, - self.machine_id.clone(), - dataflow_id, - success, - ) - .await?; - } + DataflowStatus::Pending => {} } } } @@ -628,28 +619,6 @@ impl Daemon { Ok(()) } - async fn report_nodes_ready( - coordinator_connection: &mut Option, - machine_id: String, - dataflow_id: Uuid, - success: bool, - ) -> Result<(), eyre::ErrReport> { - let Some(connection) = coordinator_connection else { - bail!("no coordinator connection to send AllNodesReady"); - }; - let msg = serde_json::to_vec(&CoordinatorRequest::Event { - machine_id: machine_id.clone(), - event: DaemonEvent::AllNodesReady { - dataflow_id, - success, - }, - })?; - tcp_send(connection, &msg) - .await - .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; - Ok(()) - } - async fn send_reload( &mut self, dataflow_id: Uuid, @@ -776,19 +745,10 @@ impl Daemon { })?; tracing::warn!("node `{node_id}` exited before initializing dora connection"); - match dataflow.pending_nodes.handle_node_stop(node_id).await { - DataflowStatus::AllNodesReady => {} - DataflowStatus::LocalNodesPending => {} - DataflowStatus::LocalNodesReady { success } => { - Self::report_nodes_ready( - &mut self.coordinator_connection, - self.machine_id.clone(), - dataflow_id, - success, - ) - .await?; - } - } + dataflow + .pending_nodes + .handle_node_stop(node_id, &mut self.coordinator_connection) + .await?; Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, node_id).await?; @@ -1127,10 +1087,10 @@ pub struct RunningDataflow { } impl RunningDataflow { - fn new(id: Uuid) -> RunningDataflow { + fn new(dataflow_id: Uuid, machine_id: String) -> RunningDataflow { Self { - id, - pending_nodes: PendingNodes::default(), + id: dataflow_id, + pending_nodes: PendingNodes::new(dataflow_id, machine_id), subscribe_channels: HashMap::new(), drop_channels: HashMap::new(), mappings: HashMap::new(), diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 8f52c07b..2043ecc6 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -1,10 +1,19 @@ use std::collections::{HashMap, HashSet}; -use dora_core::{config::NodeId, daemon_messages::DaemonReply}; -use tokio::sync::oneshot; +use dora_core::{ + config::NodeId, + coordinator_messages::{CoordinatorRequest, DaemonEvent}, + daemon_messages::{DaemonReply, DataflowId}, +}; +use eyre::{bail, Context}; +use tokio::{net::TcpStream, sync::oneshot}; + +use crate::tcp_utils::tcp_send; -#[derive(Default)] pub struct PendingNodes { + dataflow_id: DataflowId, + machine_id: String, + /// The local nodes that are still waiting to start. local_nodes: HashSet, /// Whether there are external nodes for this dataflow. @@ -22,6 +31,17 @@ pub struct PendingNodes { } impl PendingNodes { + pub fn new(dataflow_id: DataflowId, machine_id: String) -> Self { + Self { + dataflow_id, + machine_id, + local_nodes: HashSet::new(), + external_nodes: false, + waiting_subscribers: HashMap::new(), + exited_before_subscribe: HashSet::new(), + } + } + pub fn insert(&mut self, node_id: NodeId) { self.local_nodes.insert(node_id); } @@ -34,47 +54,55 @@ impl PendingNodes { &mut self, node_id: NodeId, reply_sender: oneshot::Sender, + coordinator_connection: &mut Option, ) -> eyre::Result { self.waiting_subscribers .insert(node_id.clone(), reply_sender); - self.local_nodes.remove(&node_id); - if self.local_nodes.is_empty() { - if self.external_nodes { - Ok(DataflowStatus::LocalNodesReady { success: true }) - } else { - self.answer_subscribe_requests(None).await; - Ok(DataflowStatus::AllNodesReady) - } - } else { - Ok(DataflowStatus::LocalNodesPending) - } + + self.update_dataflow_status(coordinator_connection).await } - pub async fn handle_node_stop(&mut self, node_id: &NodeId) -> DataflowStatus { + pub async fn handle_node_stop( + &mut self, + node_id: &NodeId, + coordinator_connection: &mut Option, + ) -> eyre::Result { self.exited_before_subscribe.insert(node_id.clone()); - self.local_nodes.remove(node_id); - if self.local_nodes.is_empty() { - if self.external_nodes { - DataflowStatus::LocalNodesReady { success: false } - } else { - self.answer_subscribe_requests(None).await; - DataflowStatus::AllNodesReady - } - } else { - // continue waiting for other nodes - DataflowStatus::LocalNodesPending - } + + self.update_dataflow_status(coordinator_connection).await } - pub async fn handle_external_all_nodes_ready(&mut self, success: bool) { + pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> { + if !self.local_nodes.is_empty() { + bail!("received external `all_nodes_ready` event before local nodes were ready"); + } let external_error = if success { None } else { Some("some nodes failed to initalize on remote machines".to_string()) }; - self.answer_subscribe_requests(external_error).await + self.answer_subscribe_requests(external_error).await; + + Ok(()) + } + + async fn update_dataflow_status( + &mut self, + coordinator_connection: &mut Option, + ) -> eyre::Result { + if self.local_nodes.is_empty() { + if self.external_nodes { + self.report_nodes_ready(coordinator_connection).await?; + Ok(DataflowStatus::Pending) + } else { + self.answer_subscribe_requests(None).await; + Ok(DataflowStatus::AllNodesReady) + } + } else { + Ok(DataflowStatus::Pending) + } } async fn answer_subscribe_requests(&mut self, external_error: Option) { @@ -95,10 +123,33 @@ impl PendingNodes { let _ = reply_sender.send(DaemonReply::Result(result.clone())); } } + + async fn report_nodes_ready( + &self, + coordinator_connection: &mut Option, + ) -> eyre::Result<()> { + let Some(connection) = coordinator_connection else { + bail!("no coordinator connection to send AllNodesReady"); + }; + + let success = self.exited_before_subscribe.is_empty(); + tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes"); + + let msg = serde_json::to_vec(&CoordinatorRequest::Event { + machine_id: self.machine_id.clone(), + event: DaemonEvent::AllNodesReady { + dataflow_id: self.dataflow_id, + success, + }, + })?; + tcp_send(connection, &msg) + .await + .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; + Ok(()) + } } pub enum DataflowStatus { AllNodesReady, - LocalNodesPending, - LocalNodesReady { success: bool }, + Pending, } From 043fe3cbbc86113bc15e5bdbf9b575e81fc09c8b Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 3 May 2023 14:58:45 +0200 Subject: [PATCH 05/12] Fix: Don't use blocking send operation in async coordinator function --- binaries/coordinator/src/control.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index 51a8219c..c8987a9d 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -44,7 +44,7 @@ async fn listen( let incoming = match result { Ok(incoming) => incoming, Err(err) => { - let _ = tx.blocking_send(err.into()); + let _ = tx.send(err.into()).await; return; } }; From ac0c521837b33e96adf1665520386c8160a8ab19 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 3 May 2023 14:59:29 +0200 Subject: [PATCH 06/12] Fix: Only report init state to coordinator once --- binaries/daemon/src/pending.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 2043ecc6..39d623c0 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -28,6 +28,9 @@ pub struct PendingNodes { /// If this list is non-empty, we should not start the dataflow at all. Instead, /// we report an error to the other nodes. exited_before_subscribe: HashSet, + + /// Whether the local init result was already reported to the coordinator. + reported_init_to_coordinator: bool, } impl PendingNodes { @@ -39,6 +42,7 @@ impl PendingNodes { external_nodes: false, waiting_subscribers: HashMap::new(), exited_before_subscribe: HashSet::new(), + reported_init_to_coordinator: false, } } @@ -94,7 +98,10 @@ impl PendingNodes { ) -> eyre::Result { if self.local_nodes.is_empty() { if self.external_nodes { - self.report_nodes_ready(coordinator_connection).await?; + if !self.reported_init_to_coordinator { + self.report_nodes_ready(coordinator_connection).await?; + self.reported_init_to_coordinator = true; + } Ok(DataflowStatus::Pending) } else { self.answer_subscribe_requests(None).await; From 026ddd915f4bbf1615748a058e9a780308dab8f0 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 3 May 2023 15:00:38 +0200 Subject: [PATCH 07/12] Fix: Only update dataflow status on exit events of pending nodes We don't want to consider exit events of nodes that sent a subscribe event already. --- binaries/daemon/src/lib.rs | 1 - binaries/daemon/src/pending.rs | 12 +++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index e4204ecc..53f1d754 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -744,7 +744,6 @@ impl Daemon { format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") })?; - tracing::warn!("node `{node_id}` exited before initializing dora connection"); dataflow .pending_nodes .handle_node_stop(node_id, &mut self.coordinator_connection) diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 39d623c0..b040211f 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -71,11 +71,13 @@ impl PendingNodes { &mut self, node_id: &NodeId, coordinator_connection: &mut Option, - ) -> eyre::Result { - self.exited_before_subscribe.insert(node_id.clone()); - self.local_nodes.remove(node_id); - - self.update_dataflow_status(coordinator_connection).await + ) -> eyre::Result<()> { + if self.local_nodes.remove(node_id) { + tracing::warn!("node `{node_id}` exited before initializing dora connection"); + self.exited_before_subscribe.insert(node_id.clone()); + self.update_dataflow_status(coordinator_connection).await?; + } + Ok(()) } pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> { From 7d646509c2360528460e9a81ad8cbd0babd6563b Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 3 May 2023 17:36:44 +0200 Subject: [PATCH 08/12] Replace watchdog by asynchronous heartbeat messages The previous watchdog requests required waiting for a reply, which could slow down the system under load and also lead to false errors on slower systems (e.g. the CI). Because of this, this commit replaces the watchdog requests with asynchronous heartbeat messages, which don't require replies. Instead, we record the last heartbeat timestamp whenever we receive a heartbeat message. We then periodically check the time since the last received heartbeat message and consider the connection closed if too much time has passed. --- binaries/coordinator/src/lib.rs | 44 ++++++++++++---------- binaries/coordinator/src/listener.rs | 19 ++++------ binaries/daemon/src/lib.rs | 36 +++++++++--------- libraries/core/src/coordinator_messages.rs | 5 +-- libraries/core/src/daemon_messages.rs | 3 +- 5 files changed, 52 insertions(+), 55 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index c9ad5ed5..622a0d63 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -21,7 +21,7 @@ use std::{ collections::{BTreeSet, HashMap}, net::SocketAddr, path::PathBuf, - time::Duration, + time::{Duration, SystemTime}, }; use tokio::{ net::{TcpListener, TcpStream}, @@ -138,9 +138,9 @@ async fn start_inner( .await .wrap_err("failed to create control events")?; - let daemon_watchdog_interval = - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1))) - .map(|_| Event::DaemonWatchdogInterval); + let daemon_heartbeat_interval = + tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3))) + .map(|_| Event::DaemonHeartbeatInterval); // events that should be aborted on `dora destroy` let (abortable_events, abort_handle) = futures::stream::abortable( @@ -148,7 +148,7 @@ async fn start_inner( control_events, new_daemon_connections, external_events, - daemon_watchdog_interval, + daemon_heartbeat_interval, ) .merge(), ); @@ -203,6 +203,7 @@ async fn start_inner( DaemonConnection { stream: connection, listen_socket, + last_heartbeat: SystemTime::now(), }, ); if let Some(_previous) = previous { @@ -442,9 +443,15 @@ async fn start_inner( } ControlEvent::Error(err) => tracing::error!("{err:?}"), }, - Event::DaemonWatchdogInterval => { + Event::DaemonHeartbeatInterval => { let mut disconnected = BTreeSet::new(); for (machine_id, connection) in &mut daemon_connections { + if connection.last_heartbeat.elapsed().unwrap_or_default() + > Duration::from_secs(15) + { + disconnected.insert(machine_id.clone()); + continue; + } let result: eyre::Result<()> = tokio::time::timeout(Duration::from_millis(100), send_watchdog_message(&mut connection.stream)) .await @@ -474,6 +481,11 @@ async fn start_inner( ) .await?; } + Event::DaemonHeartbeat { machine_id } => { + if let Some(connection) = daemon_connections.get_mut(&machine_id) { + connection.last_heartbeat = SystemTime::now(); + } + } } } @@ -485,6 +497,7 @@ async fn start_inner( struct DaemonConnection { stream: TcpStream, listen_socket: SocketAddr, + last_heartbeat: SystemTime, } fn set_up_ctrlc_handler() -> Result, eyre::ErrReport> { @@ -525,21 +538,11 @@ async fn handle_destroy( } async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> { - let message = serde_json::to_vec(&DaemonCoordinatorEvent::Watchdog).unwrap(); + let message = serde_json::to_vec(&DaemonCoordinatorEvent::Heartbeat).unwrap(); tcp_send(connection, &message) .await - .wrap_err("failed to send watchdog message to daemon")?; - let reply_raw = tcp_receive(connection) - .await - .wrap_err("failed to receive stop reply from daemon")?; - - match serde_json::from_slice(&reply_raw) - .wrap_err("failed to deserialize stop reply from daemon")? - { - DaemonCoordinatorReply::WatchdogAck => Ok(()), - other => bail!("unexpected reply after sending `watchdog`: {other:?}"), - } + .wrap_err("failed to send watchdog message to daemon") } #[allow(dead_code)] // Keeping the communication layer for later use. @@ -770,10 +773,11 @@ async fn destroy_daemons( pub enum Event { NewDaemonConnection(TcpStream), DaemonConnectError(eyre::Report), + DaemonHeartbeat { machine_id: String }, Dataflow { uuid: Uuid, event: DataflowEvent }, Control(ControlEvent), Daemon(DaemonEvent), - DaemonWatchdogInterval, + DaemonHeartbeatInterval, CtrlC, } @@ -782,7 +786,7 @@ impl Event { #[allow(clippy::match_like_matches_macro)] pub fn log(&self) -> bool { match self { - Event::DaemonWatchdogInterval => false, + Event::DaemonHeartbeatInterval => false, _ => true, } } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index da4ee624..f8561865 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,10 +1,7 @@ -use crate::{ - tcp_utils::{tcp_receive, tcp_send}, - DaemonEvent, DataflowEvent, Event, -}; +use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use dora_core::coordinator_messages; use eyre::{eyre, Context}; -use std::{io::ErrorKind, net::Ipv4Addr, time::Duration}; +use std::{io::ErrorKind, net::Ipv4Addr}; use tokio::{ net::{TcpListener, TcpStream}, sync::mpsc, @@ -84,13 +81,11 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende break; } } - coordinator_messages::DaemonEvent::Watchdog => { - let reply = serde_json::to_vec(&coordinator_messages::WatchdogAck).unwrap(); - _ = tokio::time::timeout( - Duration::from_millis(10), - tcp_send(&mut connection, &reply), - ) - .await; + coordinator_messages::DaemonEvent::Heartbeat => { + let event = Event::DaemonHeartbeat { machine_id }; + if events_tx.send(event).await.is_err() { + break; + } } }, }; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 1192086d..d71c6dc4 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -20,6 +20,7 @@ use inter_daemon::InterDaemonConnection; use shared_memory_server::ShmemConf; use std::collections::HashSet; use std::env::temp_dir; +use std::time::SystemTime; use std::{ borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, @@ -28,7 +29,7 @@ use std::{ path::{Path, PathBuf}, time::Duration, }; -use tcp_utils::{tcp_receive, tcp_send}; +use tcp_utils::tcp_send; use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; @@ -57,6 +58,7 @@ pub struct Daemon { events_tx: mpsc::Sender, coordinator_connection: Option, + last_coordinator_heartbeat: SystemTime, inter_daemon_connections: BTreeMap, machine_id: String, @@ -183,6 +185,7 @@ impl Daemon { running: HashMap::new(), events_tx: dora_events_tx, coordinator_connection, + last_coordinator_heartbeat: SystemTime::now(), inter_daemon_connections: BTreeMap::new(), machine_id, exit_when_done, @@ -193,7 +196,7 @@ impl Daemon { let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( Duration::from_secs(5), )) - .map(|_| Event::WatchdogInterval); + .map(|_| Event::HeartbeatInterval); let events = (external_events, dora_events, watchdog_interval).merge(); daemon.run_inner(events).await } @@ -227,22 +230,24 @@ impl Daemon { RunStatus::Continue => {} RunStatus::Exit => break, }, - Event::WatchdogInterval => { + Event::HeartbeatInterval => { if let Some(connection) = &mut self.coordinator_connection { let msg = serde_json::to_vec(&CoordinatorRequest::Event { machine_id: self.machine_id.clone(), - event: DaemonEvent::Watchdog, + event: DaemonEvent::Heartbeat, })?; tcp_send(connection, &msg) .await .wrap_err("failed to send watchdog message to dora-coordinator")?; - let reply_raw = tcp_receive(connection) - .await - .wrap_err("lost connection to coordinator")?; - let _: dora_core::coordinator_messages::WatchdogAck = - serde_json::from_slice(&reply_raw) - .wrap_err("received unexpected watchdog reply from coordinator")?; + if self + .last_coordinator_heartbeat + .elapsed() + .unwrap_or_default() + > Duration::from_secs(20) + { + bail!("lost connection to coordinator") + } } } Event::CtrlC => { @@ -382,12 +387,9 @@ impl Daemon { .map_err(|_| error!("could not send destroy reply from daemon to coordinator")); RunStatus::Exit } - DaemonCoordinatorEvent::Watchdog => { - let _ = reply_tx - .send(Some(DaemonCoordinatorReply::WatchdogAck)) - .map_err(|_| { - error!("could not send WatchdogAck reply from daemon to coordinator") - }); + DaemonCoordinatorEvent::Heartbeat => { + self.last_coordinator_heartbeat = SystemTime::now(); + let _ = reply_tx.send(None); RunStatus::Continue } }; @@ -1283,7 +1285,7 @@ pub enum Event { Coordinator(CoordinatorEvent), Daemon(InterDaemonEvent), Dora(DoraEvent), - WatchdogInterval, + HeartbeatInterval, CtrlC, } diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 21ae38a9..7505b098 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -24,7 +24,7 @@ pub enum DaemonEvent { dataflow_id: DataflowId, result: Result<(), String>, }, - Watchdog, + Heartbeat, } #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -41,6 +41,3 @@ impl RegisterResult { } } } - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct WatchdogAck; diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index e51c1586..66704e7a 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -214,7 +214,7 @@ pub enum DaemonCoordinatorEvent { node_id: NodeId, }, Destroy, - Watchdog, + Heartbeat, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -238,7 +238,6 @@ pub enum DaemonCoordinatorReply { ReloadResult(Result<(), String>), StopResult(Result<(), String>), DestroyResult(Result<(), String>), - WatchdogAck, Logs(Result, String>), } From 5edfc394a640d1be730bce030c4b85e7fa11f3f9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 4 May 2023 09:17:05 +0200 Subject: [PATCH 09/12] Use monotonic `Instant` instead of `SystemTime` --- binaries/coordinator/src/lib.rs | 12 +++++------- binaries/daemon/src/lib.rs | 15 +++++---------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 622a0d63..881eed14 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -21,7 +21,7 @@ use std::{ collections::{BTreeSet, HashMap}, net::SocketAddr, path::PathBuf, - time::{Duration, SystemTime}, + time::{Duration, Instant}, }; use tokio::{ net::{TcpListener, TcpStream}, @@ -203,7 +203,7 @@ async fn start_inner( DaemonConnection { stream: connection, listen_socket, - last_heartbeat: SystemTime::now(), + last_heartbeat: Instant::now(), }, ); if let Some(_previous) = previous { @@ -446,9 +446,7 @@ async fn start_inner( Event::DaemonHeartbeatInterval => { let mut disconnected = BTreeSet::new(); for (machine_id, connection) in &mut daemon_connections { - if connection.last_heartbeat.elapsed().unwrap_or_default() - > Duration::from_secs(15) - { + if connection.last_heartbeat.elapsed() > Duration::from_secs(15) { disconnected.insert(machine_id.clone()); continue; } @@ -483,7 +481,7 @@ async fn start_inner( } Event::DaemonHeartbeat { machine_id } => { if let Some(connection) = daemon_connections.get_mut(&machine_id) { - connection.last_heartbeat = SystemTime::now(); + connection.last_heartbeat = Instant::now(); } } } @@ -497,7 +495,7 @@ async fn start_inner( struct DaemonConnection { stream: TcpStream, listen_socket: SocketAddr, - last_heartbeat: SystemTime, + last_heartbeat: Instant, } fn set_up_ctrlc_handler() -> Result, eyre::ErrReport> { diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index d71c6dc4..bd4b0786 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -20,7 +20,7 @@ use inter_daemon::InterDaemonConnection; use shared_memory_server::ShmemConf; use std::collections::HashSet; use std::env::temp_dir; -use std::time::SystemTime; +use std::time::Instant; use std::{ borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, @@ -58,7 +58,7 @@ pub struct Daemon { events_tx: mpsc::Sender, coordinator_connection: Option, - last_coordinator_heartbeat: SystemTime, + last_coordinator_heartbeat: Instant, inter_daemon_connections: BTreeMap, machine_id: String, @@ -185,7 +185,7 @@ impl Daemon { running: HashMap::new(), events_tx: dora_events_tx, coordinator_connection, - last_coordinator_heartbeat: SystemTime::now(), + last_coordinator_heartbeat: Instant::now(), inter_daemon_connections: BTreeMap::new(), machine_id, exit_when_done, @@ -240,12 +240,7 @@ impl Daemon { .await .wrap_err("failed to send watchdog message to dora-coordinator")?; - if self - .last_coordinator_heartbeat - .elapsed() - .unwrap_or_default() - > Duration::from_secs(20) - { + if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) { bail!("lost connection to coordinator") } } @@ -388,7 +383,7 @@ impl Daemon { RunStatus::Exit } DaemonCoordinatorEvent::Heartbeat => { - self.last_coordinator_heartbeat = SystemTime::now(); + self.last_coordinator_heartbeat = Instant::now(); let _ = reply_tx.send(None); RunStatus::Continue } From 4e72a8b2162bca6c471f17f4474e5dcdd6434368 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 4 May 2023 09:23:21 +0200 Subject: [PATCH 10/12] CI: Increase timeout for 'build CLI and binaries' step Apparently the step can take longer on macOS if no cache is found. --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 260aa5b4..372ccc55 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -82,7 +82,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: "Build cli and binaries" - timeout-minutes: 30 + timeout-minutes: 45 run: | cargo install --path binaries/coordinator cargo install --path binaries/daemon From 893ca100ece8c2c82786fd0eeda703edd5c36aba Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 4 May 2023 09:29:02 +0200 Subject: [PATCH 11/12] Improve subscribe error message when nodes failed before init --- binaries/daemon/src/pending.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index b040211f..85cbe56f 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -122,8 +122,11 @@ impl PendingNodes { } } else { Err(format!( - "Nodes failed before subscribing: {:?}", - self.exited_before_subscribe + "Some nodes exited before subscribing to dora: {:?}\n\n\ + This is typically happens when an initialization error occurs + in the node or operator. To check the output of the failed + nodes, run `dora logs {} `.", + self.exited_before_subscribe, self.dataflow_id )) }; // answer all subscribe requests From 7499d43c065e950e484b48af66661534cd1f472a Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 4 May 2023 13:56:58 +0200 Subject: [PATCH 12/12] Fix: Handle node spawn error like stop-before-subscribe --- binaries/daemon/src/lib.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 8f62be32..d04799f4 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -518,7 +518,7 @@ impl Daemon { dataflow.pending_nodes.insert(node.id.clone()); let node_id = node.id.clone(); - spawn::spawn_node( + match spawn::spawn_node( dataflow_id, &working_dir, node, @@ -526,8 +526,19 @@ impl Daemon { daemon_communication_config, ) .await - .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?; - dataflow.running_nodes.insert(node_id); + .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) + { + Ok(()) => { + dataflow.running_nodes.insert(node_id); + } + Err(err) => { + tracing::error!("{err:?}"); + dataflow + .pending_nodes + .handle_node_stop(&node_id, &mut self.coordinator_connection) + .await?; + } + } } else { dataflow.pending_nodes.set_external_nodes(true); }