From 132957ad34b98c1a4bcf10693d1aa3d6dcc9d1dd Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 28 Apr 2023 13:57:24 +0200 Subject: [PATCH 01/11] Fix: Don't ignore subscribe results --- apis/rust/node/src/event_stream/mod.rs | 10 +++++++++- apis/rust/node/src/node/drop_stream.rs | 12 ++++++++++-- libraries/core/src/daemon_messages.rs | 1 + 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 99f72bfe..c47a5674 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -59,11 +59,19 @@ impl EventStream { mut close_channel: DaemonChannel, ) -> eyre::Result { channel.register(dataflow_id, node_id.clone())?; - channel + let reply = channel .request(&DaemonRequest::Subscribe) .map_err(|e| eyre!(e)) .wrap_err("failed to create subscription with dora-daemon")?; + match reply { + daemon_messages::DaemonReply::Result(Ok(())) => {} + daemon_messages::DaemonReply::Result(Err(err)) => { + eyre::bail!("subscribe failed: {err}") + } + other => eyre::bail!("unexpected subscribe reply: {other:?}"), + } + close_channel.register(dataflow_id, node_id.clone())?; let (tx, rx) = flume::bounded(0); diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index c594a252..405b9d3e 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -4,7 +4,7 @@ use crate::daemon_connection::DaemonChannel; use dora_core::{ config::NodeId, daemon_messages::{ - DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent, + self, DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent, }, }; use eyre::{eyre, Context}; @@ -45,11 +45,19 @@ impl DropStream { ) -> eyre::Result { channel.register(dataflow_id, node_id.clone())?; - channel + let reply = channel .request(&DaemonRequest::SubscribeDrop) .map_err(|e| eyre!(e)) .wrap_err("failed to create subscription with dora-daemon")?; + match reply { + daemon_messages::DaemonReply::Result(Ok(())) => {} + daemon_messages::DaemonReply::Result(Err(err)) => { + eyre::bail!("drop subscribe failed: {err}") + } + other => eyre::bail!("unexpected drop subscribe reply: {other:?}"), + } + let (tx, rx) = flume::bounded(0); let node_id_cloned = node_id.clone(); diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 8035571c..d5f3575f 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -127,6 +127,7 @@ impl fmt::Debug for Data { type SharedMemoryId = String; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[must_use] pub enum DaemonReply { Result(Result<(), String>), PreparedMessage { shared_memory_id: SharedMemoryId }, From e20699de3fb5e6961ade322b93050e23c357bab6 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 28 Apr 2023 14:06:38 +0200 Subject: [PATCH 02/11] Handle node errors during initialization phase We synchronize the start of nodes (even across machines) to avoid missed messages. This led to deadlock issues when nodes exited before they initialized the connection to the dora daemon. The reason was that the other nodes were still waiting for the stopped node to become ready. This commit fixes this issue by properly handling node exits that occur before sending the subscribe message. If nodes exit, they are removed from the pending list and added to a new 'exited before init' list. Once all nodes have subscribed or exited, we answer the pending subscribe requests. If nodes exited before subscribing we send an error reply to the other nodes because a synchronized start is no longer possible. To make this logic work across different machines too, we add a `success` field to the `AllNodesReady` message that the daemon sends to the coordinator. The coordinator forwards this flag to other daemons so that they can act accordingly. --- binaries/coordinator/src/lib.rs | 10 +- binaries/coordinator/src/listener.rs | 10 +- binaries/daemon/src/lib.rs | 184 +++++++++++++-------- binaries/daemon/src/pending.rs | 104 ++++++++++++ libraries/core/src/coordinator_messages.rs | 1 + libraries/core/src/daemon_messages.rs | 1 + 6 files changed, 235 insertions(+), 75 deletions(-) create mode 100644 binaries/daemon/src/pending.rs diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 4a51dce5..73430d93 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -180,15 +180,20 @@ async fn start_inner( } }, Event::Dataflow { uuid, event } => match event { - DataflowEvent::ReadyOnMachine { machine_id } => { + DataflowEvent::ReadyOnMachine { + machine_id, + success, + } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { let dataflow = entry.get_mut(); dataflow.pending_machines.remove(&machine_id); + dataflow.init_success &= success; if dataflow.pending_machines.is_empty() { let message = serde_json::to_vec(&DaemonCoordinatorEvent::AllNodesReady { dataflow_id: uuid, + success: dataflow.init_success, }) .wrap_err("failed to serialize AllNodesReady message")?; @@ -498,6 +503,7 @@ struct RunningDataflow { machines: BTreeSet, /// IDs of machines that are waiting until all nodes are started. pending_machines: BTreeSet, + init_success: bool, } impl PartialEq for RunningDataflow { @@ -602,6 +608,7 @@ async fn start_dataflow( } else { BTreeSet::new() }, + init_success: true, machines, }) } @@ -665,6 +672,7 @@ pub enum DataflowEvent { }, ReadyOnMachine { machine_id: String, + success: bool, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index da4ee624..8ed3b57f 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -60,10 +60,16 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende break; } coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { - coordinator_messages::DaemonEvent::AllNodesReady { dataflow_id } => { + coordinator_messages::DaemonEvent::AllNodesReady { + dataflow_id, + success, + } => { let event = Event::Dataflow { uuid: dataflow_id, - event: DataflowEvent::ReadyOnMachine { machine_id }, + event: DataflowEvent::ReadyOnMachine { + machine_id, + success, + }, }; if events_tx.send(event).await.is_err() { break; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index e43a676c..b1807df7 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -17,8 +17,8 @@ use eyre::{bail, eyre, Context, ContextCompat}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; use inter_daemon::InterDaemonConnection; +use pending::PendingNodes; use shared_memory_server::ShmemConf; -use std::collections::HashSet; use std::{ borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, @@ -37,6 +37,7 @@ use uuid::Uuid; mod coordinator; mod inter_daemon; mod node_communication; +mod pending; mod spawn; mod tcp_utils; @@ -45,6 +46,8 @@ use dora_tracing::telemetry::serialize_context; #[cfg(feature = "telemetry")] use tracing_opentelemetry::OpenTelemetrySpanExt; +use crate::pending::DataflowStatus; + pub struct Daemon { running: HashMap, @@ -289,11 +292,20 @@ impl Daemon { DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}"))); (Some(reply), RunStatus::Continue) } - DaemonCoordinatorEvent::AllNodesReady { dataflow_id } => { + DaemonCoordinatorEvent::AllNodesReady { + dataflow_id, + success, + } => { match self.running.get_mut(&dataflow_id) { Some(dataflow) => { - tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); - dataflow.start(&self.events_tx).await?; + dataflow + .pending_nodes + .handle_external_all_nodes_ready(success) + .await; + if success { + tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); + dataflow.start(&self.events_tx).await?; + } } None => { tracing::warn!( @@ -463,7 +475,7 @@ impl Daemon { .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?; dataflow.running_nodes.insert(node_id); } else { - dataflow.external_nodes.insert(node.id.clone(), node); + dataflow.pending_nodes.set_external_nodes(true); } } @@ -481,39 +493,44 @@ impl Daemon { event_sender, reply_sender, } => { - let result = self - .subscribe(dataflow_id, node_id.clone(), event_sender) - .await; - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { - format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`") - })?; - tracing::debug!("node `{node_id}` is ready"); + let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { + format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") + }); - dataflow - .subscribe_replies - .insert(node_id.clone(), (reply_sender, result)); - dataflow.pending_nodes.remove(&node_id); - if dataflow.pending_nodes.is_empty() { - if dataflow.external_nodes.is_empty() { - tracing::info!("all nodes are ready, starting dataflow `{dataflow_id}`"); - dataflow.start(&self.events_tx).await?; - } else { - tracing::info!( - "all local nodes are ready, waiting for remote nodes \ - for dataflow `{dataflow_id}`" - ); + match dataflow { + Err(err) => { + let _ = reply_sender.send(DaemonReply::Result(Err(err))); + } + Ok(dataflow) => { + tracing::debug!("node `{node_id}` is ready"); + Self::subscribe(dataflow, node_id.clone(), event_sender).await; - // dataflow is split across multiple daemons -> synchronize with dora-coordinator - let Some(connection) = &mut self.coordinator_connection else { - bail!("no coordinator connection to send AllNodesReady"); - }; - let msg = serde_json::to_vec(&CoordinatorRequest::Event { - machine_id: self.machine_id.clone(), - event: DaemonEvent::AllNodesReady { dataflow_id }, - })?; - tcp_send(connection, &msg) - .await - .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; + let status = dataflow + .pending_nodes + .handle_node_subscription(node_id.clone(), reply_sender) + .await?; + match status { + DataflowStatus::AllNodesReady => { + tracing::info!( + "all nodes are ready, starting dataflow `{dataflow_id}`" + ); + dataflow.start(&self.events_tx).await?; + } + DataflowStatus::LocalNodesPending => {} + DataflowStatus::LocalNodesReady { success } => { + tracing::info!( + "all local nodes are ready, waiting for remote nodes" + ); + + Self::report_nodes_ready( + &mut self.coordinator_connection, + self.machine_id.clone(), + dataflow_id, + success, + ) + .await?; + } + } } } } @@ -551,8 +568,17 @@ impl Daemon { let _ = reply_sender.send(DaemonReply::Result(reply)); } DaemonNodeEvent::OutputsDone { reply_sender } => { - let _ = reply_sender.send(DaemonReply::Result(Ok(()))); - self.handle_outputs_done(dataflow_id, &node_id).await?; + let result = match self.running.get_mut(&dataflow_id) { + Some(dataflow) => { + Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, &node_id) + .await + }, + None => Err(eyre!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")), + }; + + let _ = reply_sender.send(DaemonReply::Result( + result.map_err(|err| format!("{err:?}")), + )); } DaemonNodeEvent::SendOut { output_id, @@ -602,6 +628,28 @@ impl Daemon { Ok(()) } + async fn report_nodes_ready( + coordinator_connection: &mut Option, + machine_id: String, + dataflow_id: Uuid, + success: bool, + ) -> Result<(), eyre::ErrReport> { + let Some(connection) = coordinator_connection else { + bail!("no coordinator connection to send AllNodesReady"); + }; + let msg = serde_json::to_vec(&CoordinatorRequest::Event { + machine_id: machine_id.clone(), + event: DaemonEvent::AllNodesReady { + dataflow_id, + success, + }, + })?; + tcp_send(connection, &msg) + .await + .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; + Ok(()) + } + async fn send_reload( &mut self, dataflow_id: Uuid, @@ -670,15 +718,10 @@ impl Daemon { } async fn subscribe( - &mut self, - dataflow_id: Uuid, + dataflow: &mut RunningDataflow, node_id: NodeId, event_sender: UnboundedSender, - ) -> Result<(), String> { - let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { - format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") - })?; - + ) { // some inputs might have been closed already -> report those events let closed_inputs = dataflow .mappings @@ -709,22 +752,17 @@ impl Daemon { } dataflow.subscribe_channels.insert(node_id, event_sender); - - Ok(()) } - #[tracing::instrument(skip(self), level = "trace")] + #[tracing::instrument(skip(dataflow, inter_daemon_connections), fields(uuid = %dataflow.id), level = "trace")] async fn handle_outputs_done( - &mut self, - dataflow_id: Uuid, + dataflow: &mut RunningDataflow, + inter_daemon_connections: &mut BTreeMap, node_id: &NodeId, ) -> eyre::Result<()> { - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { - format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") - })?; send_input_closed_events( dataflow, - &mut self.inter_daemon_connections, + inter_daemon_connections, |OutputId(source_id, _)| source_id == node_id, ) .await?; @@ -732,13 +770,28 @@ impl Daemon { Ok(()) } - #[tracing::instrument(skip(self), level = "trace")] async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> { - self.handle_outputs_done(dataflow_id, node_id).await?; - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") })?; + + tracing::warn!("node `{node_id}` exited before initializing dora connection"); + match dataflow.pending_nodes.handle_node_stop(node_id).await { + DataflowStatus::AllNodesReady => {} + DataflowStatus::LocalNodesPending => {} + DataflowStatus::LocalNodesReady { success } => { + Self::report_nodes_ready( + &mut self.coordinator_connection, + self.machine_id.clone(), + dataflow_id, + success, + ) + .await?; + } + } + + Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, node_id).await?; + dataflow.running_nodes.remove(node_id); if dataflow.running_nodes.is_empty() { tracing::info!( @@ -1050,11 +1103,7 @@ fn close_input(dataflow: &mut RunningDataflow, receiver_id: &NodeId, input_id: & pub struct RunningDataflow { id: Uuid, /// Local nodes that are not started yet - pending_nodes: HashSet, - /// Used to synchronize node starts. - /// - /// Subscribe requests block the node until all other nodes are ready too. - subscribe_replies: HashMap, Result<(), String>)>, + pending_nodes: PendingNodes, subscribe_channels: HashMap>, drop_channels: HashMap>, @@ -1063,7 +1112,6 @@ pub struct RunningDataflow { open_inputs: BTreeMap>, running_nodes: BTreeSet, - external_nodes: BTreeMap, open_external_mappings: HashMap>>, pending_drop_tokens: HashMap, @@ -1082,15 +1130,13 @@ impl RunningDataflow { fn new(id: Uuid) -> RunningDataflow { Self { id, - pending_nodes: HashSet::new(), - subscribe_replies: HashMap::new(), + pending_nodes: PendingNodes::default(), subscribe_channels: HashMap::new(), drop_channels: HashMap::new(), mappings: HashMap::new(), timers: BTreeMap::new(), open_inputs: BTreeMap::new(), running_nodes: BTreeSet::new(), - external_nodes: BTreeMap::new(), open_external_mappings: HashMap::new(), pending_drop_tokens: HashMap::new(), _timer_handles: Vec::new(), @@ -1100,12 +1146,6 @@ impl RunningDataflow { } async fn start(&mut self, events_tx: &mpsc::Sender) -> eyre::Result<()> { - // answer all subscribe requests - let subscribe_replies = std::mem::take(&mut self.subscribe_replies); - for (reply_sender, subscribe_result) in subscribe_replies.into_values() { - let _ = reply_sender.send(DaemonReply::Result(subscribe_result)); - } - for interval in self.timers.keys().copied() { let events_tx = events_tx.clone(); let dataflow_id = self.id; diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs new file mode 100644 index 00000000..8f52c07b --- /dev/null +++ b/binaries/daemon/src/pending.rs @@ -0,0 +1,104 @@ +use std::collections::{HashMap, HashSet}; + +use dora_core::{config::NodeId, daemon_messages::DaemonReply}; +use tokio::sync::oneshot; + +#[derive(Default)] +pub struct PendingNodes { + /// The local nodes that are still waiting to start. + local_nodes: HashSet, + /// Whether there are external nodes for this dataflow. + external_nodes: bool, + + /// Used to synchronize node starts. + /// + /// Subscribe requests block the node until all other nodes are ready too. + waiting_subscribers: HashMap>, + /// List of nodes that finished before connecting to the dora daemon. + /// + /// If this list is non-empty, we should not start the dataflow at all. Instead, + /// we report an error to the other nodes. + exited_before_subscribe: HashSet, +} + +impl PendingNodes { + pub fn insert(&mut self, node_id: NodeId) { + self.local_nodes.insert(node_id); + } + + pub fn set_external_nodes(&mut self, value: bool) { + self.external_nodes = value; + } + + pub async fn handle_node_subscription( + &mut self, + node_id: NodeId, + reply_sender: oneshot::Sender, + ) -> eyre::Result { + self.waiting_subscribers + .insert(node_id.clone(), reply_sender); + + self.local_nodes.remove(&node_id); + if self.local_nodes.is_empty() { + if self.external_nodes { + Ok(DataflowStatus::LocalNodesReady { success: true }) + } else { + self.answer_subscribe_requests(None).await; + Ok(DataflowStatus::AllNodesReady) + } + } else { + Ok(DataflowStatus::LocalNodesPending) + } + } + + pub async fn handle_node_stop(&mut self, node_id: &NodeId) -> DataflowStatus { + self.exited_before_subscribe.insert(node_id.clone()); + + self.local_nodes.remove(node_id); + if self.local_nodes.is_empty() { + if self.external_nodes { + DataflowStatus::LocalNodesReady { success: false } + } else { + self.answer_subscribe_requests(None).await; + DataflowStatus::AllNodesReady + } + } else { + // continue waiting for other nodes + DataflowStatus::LocalNodesPending + } + } + + pub async fn handle_external_all_nodes_ready(&mut self, success: bool) { + let external_error = if success { + None + } else { + Some("some nodes failed to initalize on remote machines".to_string()) + }; + self.answer_subscribe_requests(external_error).await + } + + async fn answer_subscribe_requests(&mut self, external_error: Option) { + let result = if self.exited_before_subscribe.is_empty() { + match external_error { + Some(err) => Err(err), + None => Ok(()), + } + } else { + Err(format!( + "Nodes failed before subscribing: {:?}", + self.exited_before_subscribe + )) + }; + // answer all subscribe requests + let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); + for reply_sender in subscribe_replies.into_values() { + let _ = reply_sender.send(DaemonReply::Result(result.clone())); + } + } +} + +pub enum DataflowStatus { + AllNodesReady, + LocalNodesPending, + LocalNodesReady { success: bool }, +} diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 21ae38a9..b1a6e550 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -19,6 +19,7 @@ pub enum CoordinatorRequest { pub enum DaemonEvent { AllNodesReady { dataflow_id: DataflowId, + success: bool, }, AllNodesFinished { dataflow_id: DataflowId, diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index d5f3575f..9a31a20b 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -201,6 +201,7 @@ pub enum DaemonCoordinatorEvent { Spawn(SpawnDataflowNodes), AllNodesReady { dataflow_id: DataflowId, + success: bool, }, StopDataflow { dataflow_id: DataflowId, From 9217d848ee65141b125fa2f86146ff5770f100dd Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 28 Apr 2023 14:38:17 +0200 Subject: [PATCH 03/11] Fix: Don't try to create two global tracing subscribers when using bundled runtime --- binaries/daemon/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/binaries/daemon/src/main.rs b/binaries/daemon/src/main.rs index c58db3cc..be189d0b 100644 --- a/binaries/daemon/src/main.rs +++ b/binaries/daemon/src/main.rs @@ -36,9 +36,6 @@ async fn main() -> eyre::Result<()> { } async fn run() -> eyre::Result<()> { - #[cfg(feature = "tracing")] - set_up_tracing("dora-daemon").wrap_err("failed to set up tracing subscriber")?; - let Args { run_dataflow, machine_id, @@ -50,6 +47,9 @@ async fn run() -> eyre::Result<()> { return tokio::task::block_in_place(dora_daemon::run_dora_runtime); } + #[cfg(feature = "tracing")] + set_up_tracing("dora-daemon").wrap_err("failed to set up tracing subscriber")?; + let ctrl_c_events = { let (ctrl_c_tx, ctrl_c_rx) = mpsc::channel(1); let mut ctrlc_sent = false; From 793e9a7c0f72aec70f525ec57ebd87d6fc9a7c4c Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 28 Apr 2023 15:17:58 +0200 Subject: [PATCH 04/11] Make watchdog messages less strict --- binaries/coordinator/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 73430d93..083a5cbf 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -99,7 +99,7 @@ async fn start_inner( .wrap_err("failed to create control events")?; let daemon_watchdog_interval = - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1))) + tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3))) .map(|_| Event::DaemonWatchdogInterval); // events that should be aborted on `dora destroy` @@ -399,7 +399,7 @@ async fn start_inner( let mut disconnected = BTreeSet::new(); for (machine_id, connection) in &mut daemon_connections { let result: eyre::Result<()> = - tokio::time::timeout(Duration::from_millis(100), send_watchdog_message(&mut connection.stream)) + tokio::time::timeout(Duration::from_millis(500), send_watchdog_message(&mut connection.stream)) .await .wrap_err("timeout") .and_then(|r| r).wrap_err_with(|| From be92d9d43b366a9a34e86556bdc9c3cb8ab5af88 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 3 May 2023 14:58:09 +0200 Subject: [PATCH 05/11] Handle coordinator synchronization message internally in `pending` submodule --- binaries/daemon/src/lib.rs | 70 +++++--------------- binaries/daemon/src/pending.rs | 113 ++++++++++++++++++++++++--------- 2 files changed, 97 insertions(+), 86 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index b1807df7..e4204ecc 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -301,7 +301,7 @@ impl Daemon { dataflow .pending_nodes .handle_external_all_nodes_ready(success) - .await; + .await?; if success { tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); dataflow.start(&self.events_tx).await?; @@ -415,7 +415,7 @@ impl Daemon { nodes: Vec, daemon_communication_config: LocalCommunicationConfig, ) -> eyre::Result<()> { - let dataflow = RunningDataflow::new(dataflow_id); + let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); let dataflow = match self.running.entry(dataflow_id) { std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow), std::collections::hash_map::Entry::Occupied(_) => { @@ -507,7 +507,11 @@ impl Daemon { let status = dataflow .pending_nodes - .handle_node_subscription(node_id.clone(), reply_sender) + .handle_node_subscription( + node_id.clone(), + reply_sender, + &mut self.coordinator_connection, + ) .await?; match status { DataflowStatus::AllNodesReady => { @@ -516,20 +520,7 @@ impl Daemon { ); dataflow.start(&self.events_tx).await?; } - DataflowStatus::LocalNodesPending => {} - DataflowStatus::LocalNodesReady { success } => { - tracing::info!( - "all local nodes are ready, waiting for remote nodes" - ); - - Self::report_nodes_ready( - &mut self.coordinator_connection, - self.machine_id.clone(), - dataflow_id, - success, - ) - .await?; - } + DataflowStatus::Pending => {} } } } @@ -628,28 +619,6 @@ impl Daemon { Ok(()) } - async fn report_nodes_ready( - coordinator_connection: &mut Option, - machine_id: String, - dataflow_id: Uuid, - success: bool, - ) -> Result<(), eyre::ErrReport> { - let Some(connection) = coordinator_connection else { - bail!("no coordinator connection to send AllNodesReady"); - }; - let msg = serde_json::to_vec(&CoordinatorRequest::Event { - machine_id: machine_id.clone(), - event: DaemonEvent::AllNodesReady { - dataflow_id, - success, - }, - })?; - tcp_send(connection, &msg) - .await - .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; - Ok(()) - } - async fn send_reload( &mut self, dataflow_id: Uuid, @@ -776,19 +745,10 @@ impl Daemon { })?; tracing::warn!("node `{node_id}` exited before initializing dora connection"); - match dataflow.pending_nodes.handle_node_stop(node_id).await { - DataflowStatus::AllNodesReady => {} - DataflowStatus::LocalNodesPending => {} - DataflowStatus::LocalNodesReady { success } => { - Self::report_nodes_ready( - &mut self.coordinator_connection, - self.machine_id.clone(), - dataflow_id, - success, - ) - .await?; - } - } + dataflow + .pending_nodes + .handle_node_stop(node_id, &mut self.coordinator_connection) + .await?; Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, node_id).await?; @@ -1127,10 +1087,10 @@ pub struct RunningDataflow { } impl RunningDataflow { - fn new(id: Uuid) -> RunningDataflow { + fn new(dataflow_id: Uuid, machine_id: String) -> RunningDataflow { Self { - id, - pending_nodes: PendingNodes::default(), + id: dataflow_id, + pending_nodes: PendingNodes::new(dataflow_id, machine_id), subscribe_channels: HashMap::new(), drop_channels: HashMap::new(), mappings: HashMap::new(), diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 8f52c07b..2043ecc6 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -1,10 +1,19 @@ use std::collections::{HashMap, HashSet}; -use dora_core::{config::NodeId, daemon_messages::DaemonReply}; -use tokio::sync::oneshot; +use dora_core::{ + config::NodeId, + coordinator_messages::{CoordinatorRequest, DaemonEvent}, + daemon_messages::{DaemonReply, DataflowId}, +}; +use eyre::{bail, Context}; +use tokio::{net::TcpStream, sync::oneshot}; + +use crate::tcp_utils::tcp_send; -#[derive(Default)] pub struct PendingNodes { + dataflow_id: DataflowId, + machine_id: String, + /// The local nodes that are still waiting to start. local_nodes: HashSet, /// Whether there are external nodes for this dataflow. @@ -22,6 +31,17 @@ pub struct PendingNodes { } impl PendingNodes { + pub fn new(dataflow_id: DataflowId, machine_id: String) -> Self { + Self { + dataflow_id, + machine_id, + local_nodes: HashSet::new(), + external_nodes: false, + waiting_subscribers: HashMap::new(), + exited_before_subscribe: HashSet::new(), + } + } + pub fn insert(&mut self, node_id: NodeId) { self.local_nodes.insert(node_id); } @@ -34,47 +54,55 @@ impl PendingNodes { &mut self, node_id: NodeId, reply_sender: oneshot::Sender, + coordinator_connection: &mut Option, ) -> eyre::Result { self.waiting_subscribers .insert(node_id.clone(), reply_sender); - self.local_nodes.remove(&node_id); - if self.local_nodes.is_empty() { - if self.external_nodes { - Ok(DataflowStatus::LocalNodesReady { success: true }) - } else { - self.answer_subscribe_requests(None).await; - Ok(DataflowStatus::AllNodesReady) - } - } else { - Ok(DataflowStatus::LocalNodesPending) - } + + self.update_dataflow_status(coordinator_connection).await } - pub async fn handle_node_stop(&mut self, node_id: &NodeId) -> DataflowStatus { + pub async fn handle_node_stop( + &mut self, + node_id: &NodeId, + coordinator_connection: &mut Option, + ) -> eyre::Result { self.exited_before_subscribe.insert(node_id.clone()); - self.local_nodes.remove(node_id); - if self.local_nodes.is_empty() { - if self.external_nodes { - DataflowStatus::LocalNodesReady { success: false } - } else { - self.answer_subscribe_requests(None).await; - DataflowStatus::AllNodesReady - } - } else { - // continue waiting for other nodes - DataflowStatus::LocalNodesPending - } + + self.update_dataflow_status(coordinator_connection).await } - pub async fn handle_external_all_nodes_ready(&mut self, success: bool) { + pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> { + if !self.local_nodes.is_empty() { + bail!("received external `all_nodes_ready` event before local nodes were ready"); + } let external_error = if success { None } else { Some("some nodes failed to initalize on remote machines".to_string()) }; - self.answer_subscribe_requests(external_error).await + self.answer_subscribe_requests(external_error).await; + + Ok(()) + } + + async fn update_dataflow_status( + &mut self, + coordinator_connection: &mut Option, + ) -> eyre::Result { + if self.local_nodes.is_empty() { + if self.external_nodes { + self.report_nodes_ready(coordinator_connection).await?; + Ok(DataflowStatus::Pending) + } else { + self.answer_subscribe_requests(None).await; + Ok(DataflowStatus::AllNodesReady) + } + } else { + Ok(DataflowStatus::Pending) + } } async fn answer_subscribe_requests(&mut self, external_error: Option) { @@ -95,10 +123,33 @@ impl PendingNodes { let _ = reply_sender.send(DaemonReply::Result(result.clone())); } } + + async fn report_nodes_ready( + &self, + coordinator_connection: &mut Option, + ) -> eyre::Result<()> { + let Some(connection) = coordinator_connection else { + bail!("no coordinator connection to send AllNodesReady"); + }; + + let success = self.exited_before_subscribe.is_empty(); + tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes"); + + let msg = serde_json::to_vec(&CoordinatorRequest::Event { + machine_id: self.machine_id.clone(), + event: DaemonEvent::AllNodesReady { + dataflow_id: self.dataflow_id, + success, + }, + })?; + tcp_send(connection, &msg) + .await + .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; + Ok(()) + } } pub enum DataflowStatus { AllNodesReady, - LocalNodesPending, - LocalNodesReady { success: bool }, + Pending, } From 043fe3cbbc86113bc15e5bdbf9b575e81fc09c8b Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 3 May 2023 14:58:45 +0200 Subject: [PATCH 06/11] Fix: Don't use blocking send operation in async coordinator function --- binaries/coordinator/src/control.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index 51a8219c..c8987a9d 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -44,7 +44,7 @@ async fn listen( let incoming = match result { Ok(incoming) => incoming, Err(err) => { - let _ = tx.blocking_send(err.into()); + let _ = tx.send(err.into()).await; return; } }; From ac0c521837b33e96adf1665520386c8160a8ab19 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 3 May 2023 14:59:29 +0200 Subject: [PATCH 07/11] Fix: Only report init state to coordinator once --- binaries/daemon/src/pending.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 2043ecc6..39d623c0 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -28,6 +28,9 @@ pub struct PendingNodes { /// If this list is non-empty, we should not start the dataflow at all. Instead, /// we report an error to the other nodes. exited_before_subscribe: HashSet, + + /// Whether the local init result was already reported to the coordinator. + reported_init_to_coordinator: bool, } impl PendingNodes { @@ -39,6 +42,7 @@ impl PendingNodes { external_nodes: false, waiting_subscribers: HashMap::new(), exited_before_subscribe: HashSet::new(), + reported_init_to_coordinator: false, } } @@ -94,7 +98,10 @@ impl PendingNodes { ) -> eyre::Result { if self.local_nodes.is_empty() { if self.external_nodes { - self.report_nodes_ready(coordinator_connection).await?; + if !self.reported_init_to_coordinator { + self.report_nodes_ready(coordinator_connection).await?; + self.reported_init_to_coordinator = true; + } Ok(DataflowStatus::Pending) } else { self.answer_subscribe_requests(None).await; From 026ddd915f4bbf1615748a058e9a780308dab8f0 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 3 May 2023 15:00:38 +0200 Subject: [PATCH 08/11] Fix: Only update dataflow status on exit events of pending nodes We don't want to consider exit events of nodes that sent a subscribe event already. --- binaries/daemon/src/lib.rs | 1 - binaries/daemon/src/pending.rs | 12 +++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index e4204ecc..53f1d754 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -744,7 +744,6 @@ impl Daemon { format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") })?; - tracing::warn!("node `{node_id}` exited before initializing dora connection"); dataflow .pending_nodes .handle_node_stop(node_id, &mut self.coordinator_connection) diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 39d623c0..b040211f 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -71,11 +71,13 @@ impl PendingNodes { &mut self, node_id: &NodeId, coordinator_connection: &mut Option, - ) -> eyre::Result { - self.exited_before_subscribe.insert(node_id.clone()); - self.local_nodes.remove(node_id); - - self.update_dataflow_status(coordinator_connection).await + ) -> eyre::Result<()> { + if self.local_nodes.remove(node_id) { + tracing::warn!("node `{node_id}` exited before initializing dora connection"); + self.exited_before_subscribe.insert(node_id.clone()); + self.update_dataflow_status(coordinator_connection).await?; + } + Ok(()) } pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> { From 4e72a8b2162bca6c471f17f4474e5dcdd6434368 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 4 May 2023 09:23:21 +0200 Subject: [PATCH 09/11] CI: Increase timeout for 'build CLI and binaries' step Apparently the step can take longer on macOS if no cache is found. --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 260aa5b4..372ccc55 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -82,7 +82,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: "Build cli and binaries" - timeout-minutes: 30 + timeout-minutes: 45 run: | cargo install --path binaries/coordinator cargo install --path binaries/daemon From 893ca100ece8c2c82786fd0eeda703edd5c36aba Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 4 May 2023 09:29:02 +0200 Subject: [PATCH 10/11] Improve subscribe error message when nodes failed before init --- binaries/daemon/src/pending.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index b040211f..85cbe56f 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -122,8 +122,11 @@ impl PendingNodes { } } else { Err(format!( - "Nodes failed before subscribing: {:?}", - self.exited_before_subscribe + "Some nodes exited before subscribing to dora: {:?}\n\n\ + This is typically happens when an initialization error occurs + in the node or operator. To check the output of the failed + nodes, run `dora logs {} `.", + self.exited_before_subscribe, self.dataflow_id )) }; // answer all subscribe requests From 7499d43c065e950e484b48af66661534cd1f472a Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 4 May 2023 13:56:58 +0200 Subject: [PATCH 11/11] Fix: Handle node spawn error like stop-before-subscribe --- binaries/daemon/src/lib.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 8f62be32..d04799f4 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -518,7 +518,7 @@ impl Daemon { dataflow.pending_nodes.insert(node.id.clone()); let node_id = node.id.clone(); - spawn::spawn_node( + match spawn::spawn_node( dataflow_id, &working_dir, node, @@ -526,8 +526,19 @@ impl Daemon { daemon_communication_config, ) .await - .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?; - dataflow.running_nodes.insert(node_id); + .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) + { + Ok(()) => { + dataflow.running_nodes.insert(node_id); + } + Err(err) => { + tracing::error!("{err:?}"); + dataflow + .pending_nodes + .handle_node_stop(&node_id, &mut self.coordinator_connection) + .await?; + } + } } else { dataflow.pending_nodes.set_external_nodes(true); }