diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 260aa5b4..372ccc55 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -82,7 +82,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: "Build cli and binaries" - timeout-minutes: 30 + timeout-minutes: 45 run: | cargo install --path binaries/coordinator cargo install --path binaries/daemon diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 99f72bfe..c47a5674 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -59,11 +59,19 @@ impl EventStream { mut close_channel: DaemonChannel, ) -> eyre::Result { channel.register(dataflow_id, node_id.clone())?; - channel + let reply = channel .request(&DaemonRequest::Subscribe) .map_err(|e| eyre!(e)) .wrap_err("failed to create subscription with dora-daemon")?; + match reply { + daemon_messages::DaemonReply::Result(Ok(())) => {} + daemon_messages::DaemonReply::Result(Err(err)) => { + eyre::bail!("subscribe failed: {err}") + } + other => eyre::bail!("unexpected subscribe reply: {other:?}"), + } + close_channel.register(dataflow_id, node_id.clone())?; let (tx, rx) = flume::bounded(0); diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index c594a252..405b9d3e 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -4,7 +4,7 @@ use crate::daemon_connection::DaemonChannel; use dora_core::{ config::NodeId, daemon_messages::{ - DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent, + self, DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent, }, }; use eyre::{eyre, Context}; @@ -45,11 +45,19 @@ impl DropStream { ) -> eyre::Result { channel.register(dataflow_id, node_id.clone())?; - channel + let reply = channel .request(&DaemonRequest::SubscribeDrop) .map_err(|e| eyre!(e)) .wrap_err("failed to create subscription with dora-daemon")?; + match reply { + daemon_messages::DaemonReply::Result(Ok(())) => {} + daemon_messages::DaemonReply::Result(Err(err)) => { + eyre::bail!("drop subscribe failed: {err}") + } + other => eyre::bail!("unexpected drop subscribe reply: {other:?}"), + } + let (tx, rx) = flume::bounded(0); let node_id_cloned = node_id.clone(); diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index 51a8219c..c8987a9d 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -44,7 +44,7 @@ async fn listen( let incoming = match result { Ok(incoming) => incoming, Err(err) => { - let _ = tx.blocking_send(err.into()); + let _ = tx.send(err.into()).await; return; } }; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index c9ad5ed5..40758ad7 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -21,7 +21,7 @@ use std::{ collections::{BTreeSet, HashMap}, net::SocketAddr, path::PathBuf, - time::Duration, + time::{Duration, Instant}, }; use tokio::{ net::{TcpListener, TcpStream}, @@ -138,9 +138,9 @@ async fn start_inner( .await .wrap_err("failed to create control events")?; - let daemon_watchdog_interval = - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1))) - .map(|_| Event::DaemonWatchdogInterval); + let daemon_heartbeat_interval = + tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3))) + .map(|_| Event::DaemonHeartbeatInterval); // events that should be aborted on `dora destroy` let (abortable_events, abort_handle) = futures::stream::abortable( @@ -148,7 +148,7 @@ async fn start_inner( control_events, new_daemon_connections, external_events, - daemon_watchdog_interval, + daemon_heartbeat_interval, ) .merge(), ); @@ -203,6 +203,7 @@ async fn start_inner( DaemonConnection { stream: connection, listen_socket, + last_heartbeat: Instant::now(), }, ); if let Some(_previous) = previous { @@ -221,15 +222,20 @@ async fn start_inner( } }, Event::Dataflow { uuid, event } => match event { - DataflowEvent::ReadyOnMachine { machine_id } => { + DataflowEvent::ReadyOnMachine { + machine_id, + success, + } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { let dataflow = entry.get_mut(); dataflow.pending_machines.remove(&machine_id); + dataflow.init_success &= success; if dataflow.pending_machines.is_empty() { let message = serde_json::to_vec(&DaemonCoordinatorEvent::AllNodesReady { dataflow_id: uuid, + success: dataflow.init_success, }) .wrap_err("failed to serialize AllNodesReady message")?; @@ -442,11 +448,15 @@ async fn start_inner( } ControlEvent::Error(err) => tracing::error!("{err:?}"), }, - Event::DaemonWatchdogInterval => { + Event::DaemonHeartbeatInterval => { let mut disconnected = BTreeSet::new(); for (machine_id, connection) in &mut daemon_connections { + if connection.last_heartbeat.elapsed() > Duration::from_secs(15) { + disconnected.insert(machine_id.clone()); + continue; + } let result: eyre::Result<()> = - tokio::time::timeout(Duration::from_millis(100), send_watchdog_message(&mut connection.stream)) + tokio::time::timeout(Duration::from_millis(500), send_watchdog_message(&mut connection.stream)) .await .wrap_err("timeout") .and_then(|r| r).wrap_err_with(|| @@ -474,6 +484,11 @@ async fn start_inner( ) .await?; } + Event::DaemonHeartbeat { machine_id } => { + if let Some(connection) = daemon_connections.get_mut(&machine_id) { + connection.last_heartbeat = Instant::now(); + } + } } } @@ -485,6 +500,7 @@ async fn start_inner( struct DaemonConnection { stream: TcpStream, listen_socket: SocketAddr, + last_heartbeat: Instant, } fn set_up_ctrlc_handler() -> Result, eyre::ErrReport> { @@ -525,21 +541,11 @@ async fn handle_destroy( } async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> { - let message = serde_json::to_vec(&DaemonCoordinatorEvent::Watchdog).unwrap(); + let message = serde_json::to_vec(&DaemonCoordinatorEvent::Heartbeat).unwrap(); tcp_send(connection, &message) .await - .wrap_err("failed to send watchdog message to daemon")?; - let reply_raw = tcp_receive(connection) - .await - .wrap_err("failed to receive stop reply from daemon")?; - - match serde_json::from_slice(&reply_raw) - .wrap_err("failed to deserialize stop reply from daemon")? - { - DaemonCoordinatorReply::WatchdogAck => Ok(()), - other => bail!("unexpected reply after sending `watchdog`: {other:?}"), - } + .wrap_err("failed to send watchdog message to daemon") } #[allow(dead_code)] // Keeping the communication layer for later use. @@ -550,6 +556,7 @@ struct RunningDataflow { machines: BTreeSet, /// IDs of machines that are waiting until all nodes are started. pending_machines: BTreeSet, + init_success: bool, nodes: Vec, } @@ -732,6 +739,7 @@ async fn start_dataflow( } else { BTreeSet::new() }, + init_success: true, machines, nodes, }) @@ -770,10 +778,11 @@ async fn destroy_daemons( pub enum Event { NewDaemonConnection(TcpStream), DaemonConnectError(eyre::Report), + DaemonHeartbeat { machine_id: String }, Dataflow { uuid: Uuid, event: DataflowEvent }, Control(ControlEvent), Daemon(DaemonEvent), - DaemonWatchdogInterval, + DaemonHeartbeatInterval, CtrlC, } @@ -782,7 +791,7 @@ impl Event { #[allow(clippy::match_like_matches_macro)] pub fn log(&self) -> bool { match self { - Event::DaemonWatchdogInterval => false, + Event::DaemonHeartbeatInterval => false, _ => true, } } @@ -796,6 +805,7 @@ pub enum DataflowEvent { }, ReadyOnMachine { machine_id: String, + success: bool, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index da4ee624..6ea02894 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,10 +1,7 @@ -use crate::{ - tcp_utils::{tcp_receive, tcp_send}, - DaemonEvent, DataflowEvent, Event, -}; +use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use dora_core::coordinator_messages; use eyre::{eyre, Context}; -use std::{io::ErrorKind, net::Ipv4Addr, time::Duration}; +use std::{io::ErrorKind, net::Ipv4Addr}; use tokio::{ net::{TcpListener, TcpStream}, sync::mpsc, @@ -60,10 +57,16 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende break; } coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { - coordinator_messages::DaemonEvent::AllNodesReady { dataflow_id } => { + coordinator_messages::DaemonEvent::AllNodesReady { + dataflow_id, + success, + } => { let event = Event::Dataflow { uuid: dataflow_id, - event: DataflowEvent::ReadyOnMachine { machine_id }, + event: DataflowEvent::ReadyOnMachine { + machine_id, + success, + }, }; if events_tx.send(event).await.is_err() { break; @@ -84,13 +87,11 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende break; } } - coordinator_messages::DaemonEvent::Watchdog => { - let reply = serde_json::to_vec(&coordinator_messages::WatchdogAck).unwrap(); - _ = tokio::time::timeout( - Duration::from_millis(10), - tcp_send(&mut connection, &reply), - ) - .await; + coordinator_messages::DaemonEvent::Heartbeat => { + let event = Event::DaemonHeartbeat { machine_id }; + if events_tx.send(event).await.is_err() { + break; + } } }, }; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 1192086d..1f84ba5e 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -17,9 +17,10 @@ use eyre::{bail, eyre, Context, ContextCompat}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; use inter_daemon::InterDaemonConnection; +use pending::PendingNodes; use shared_memory_server::ShmemConf; -use std::collections::HashSet; use std::env::temp_dir; +use std::time::Instant; use std::{ borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, @@ -28,7 +29,7 @@ use std::{ path::{Path, PathBuf}, time::Duration, }; -use tcp_utils::{tcp_receive, tcp_send}; +use tcp_utils::tcp_send; use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; @@ -43,6 +44,7 @@ mod coordinator; mod inter_daemon; mod log; mod node_communication; +mod pending; mod spawn; mod tcp_utils; @@ -51,12 +53,15 @@ use dora_tracing::telemetry::serialize_context; #[cfg(feature = "telemetry")] use tracing_opentelemetry::OpenTelemetrySpanExt; +use crate::pending::DataflowStatus; + pub struct Daemon { running: HashMap, events_tx: mpsc::Sender, coordinator_connection: Option, + last_coordinator_heartbeat: Instant, inter_daemon_connections: BTreeMap, machine_id: String, @@ -183,6 +188,7 @@ impl Daemon { running: HashMap::new(), events_tx: dora_events_tx, coordinator_connection, + last_coordinator_heartbeat: Instant::now(), inter_daemon_connections: BTreeMap::new(), machine_id, exit_when_done, @@ -193,7 +199,7 @@ impl Daemon { let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( Duration::from_secs(5), )) - .map(|_| Event::WatchdogInterval); + .map(|_| Event::HeartbeatInterval); let events = (external_events, dora_events, watchdog_interval).merge(); daemon.run_inner(events).await } @@ -227,22 +233,19 @@ impl Daemon { RunStatus::Continue => {} RunStatus::Exit => break, }, - Event::WatchdogInterval => { + Event::HeartbeatInterval => { if let Some(connection) = &mut self.coordinator_connection { let msg = serde_json::to_vec(&CoordinatorRequest::Event { machine_id: self.machine_id.clone(), - event: DaemonEvent::Watchdog, + event: DaemonEvent::Heartbeat, })?; tcp_send(connection, &msg) .await .wrap_err("failed to send watchdog message to dora-coordinator")?; - let reply_raw = tcp_receive(connection) - .await - .wrap_err("lost connection to coordinator")?; - let _: dora_core::coordinator_messages::WatchdogAck = - serde_json::from_slice(&reply_raw) - .wrap_err("received unexpected watchdog reply from coordinator")?; + if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) { + bail!("lost connection to coordinator") + } } } Event::CtrlC => { @@ -298,11 +301,20 @@ impl Daemon { }); RunStatus::Continue } - DaemonCoordinatorEvent::AllNodesReady { dataflow_id } => { + DaemonCoordinatorEvent::AllNodesReady { + dataflow_id, + success, + } => { match self.running.get_mut(&dataflow_id) { Some(dataflow) => { - tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); - dataflow.start(&self.events_tx).await?; + dataflow + .pending_nodes + .handle_external_all_nodes_ready(success) + .await?; + if success { + tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); + dataflow.start(&self.events_tx).await?; + } } None => { tracing::warn!( @@ -382,12 +394,9 @@ impl Daemon { .map_err(|_| error!("could not send destroy reply from daemon to coordinator")); RunStatus::Exit } - DaemonCoordinatorEvent::Watchdog => { - let _ = reply_tx - .send(Some(DaemonCoordinatorReply::WatchdogAck)) - .map_err(|_| { - error!("could not send WatchdogAck reply from daemon to coordinator") - }); + DaemonCoordinatorEvent::Heartbeat => { + self.last_coordinator_heartbeat = Instant::now(); + let _ = reply_tx.send(None); RunStatus::Continue } }; @@ -457,7 +466,7 @@ impl Daemon { nodes: Vec, daemon_communication_config: LocalCommunicationConfig, ) -> eyre::Result<()> { - let dataflow = RunningDataflow::new(dataflow_id); + let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); let dataflow = match self.running.entry(dataflow_id) { std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow), std::collections::hash_map::Entry::Occupied(_) => { @@ -506,7 +515,7 @@ impl Daemon { dataflow.pending_nodes.insert(node.id.clone()); let node_id = node.id.clone(); - spawn::spawn_node( + match spawn::spawn_node( dataflow_id, &working_dir, node, @@ -514,10 +523,21 @@ impl Daemon { daemon_communication_config, ) .await - .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?; - dataflow.running_nodes.insert(node_id); + .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) + { + Ok(()) => { + dataflow.running_nodes.insert(node_id); + } + Err(err) => { + tracing::error!("{err:?}"); + dataflow + .pending_nodes + .handle_node_stop(&node_id, &mut self.coordinator_connection) + .await?; + } + } } else { - dataflow.external_nodes.insert(node.id.clone(), node); + dataflow.pending_nodes.set_external_nodes(true); } } @@ -535,39 +555,35 @@ impl Daemon { event_sender, reply_sender, } => { - let result = self - .subscribe(dataflow_id, node_id.clone(), event_sender) - .await; - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { - format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`") - })?; - tracing::debug!("node `{node_id}` is ready"); + let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { + format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") + }); - dataflow - .subscribe_replies - .insert(node_id.clone(), (reply_sender, result)); - dataflow.pending_nodes.remove(&node_id); - if dataflow.pending_nodes.is_empty() { - if dataflow.external_nodes.is_empty() { - tracing::info!("all nodes are ready, starting dataflow `{dataflow_id}`"); - dataflow.start(&self.events_tx).await?; - } else { - tracing::info!( - "all local nodes are ready, waiting for remote nodes \ - for dataflow `{dataflow_id}`" - ); + match dataflow { + Err(err) => { + let _ = reply_sender.send(DaemonReply::Result(Err(err))); + } + Ok(dataflow) => { + tracing::debug!("node `{node_id}` is ready"); + Self::subscribe(dataflow, node_id.clone(), event_sender).await; - // dataflow is split across multiple daemons -> synchronize with dora-coordinator - let Some(connection) = &mut self.coordinator_connection else { - bail!("no coordinator connection to send AllNodesReady"); - }; - let msg = serde_json::to_vec(&CoordinatorRequest::Event { - machine_id: self.machine_id.clone(), - event: DaemonEvent::AllNodesReady { dataflow_id }, - })?; - tcp_send(connection, &msg) - .await - .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; + let status = dataflow + .pending_nodes + .handle_node_subscription( + node_id.clone(), + reply_sender, + &mut self.coordinator_connection, + ) + .await?; + match status { + DataflowStatus::AllNodesReady => { + tracing::info!( + "all nodes are ready, starting dataflow `{dataflow_id}`" + ); + dataflow.start(&self.events_tx).await?; + } + DataflowStatus::Pending => {} + } } } } @@ -605,8 +621,17 @@ impl Daemon { let _ = reply_sender.send(DaemonReply::Result(reply)); } DaemonNodeEvent::OutputsDone { reply_sender } => { - let _ = reply_sender.send(DaemonReply::Result(Ok(()))); - self.handle_outputs_done(dataflow_id, &node_id).await?; + let result = match self.running.get_mut(&dataflow_id) { + Some(dataflow) => { + Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, &node_id) + .await + }, + None => Err(eyre!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")), + }; + + let _ = reply_sender.send(DaemonReply::Result( + result.map_err(|err| format!("{err:?}")), + )); } DaemonNodeEvent::SendOut { output_id, @@ -724,15 +749,10 @@ impl Daemon { } async fn subscribe( - &mut self, - dataflow_id: Uuid, + dataflow: &mut RunningDataflow, node_id: NodeId, event_sender: UnboundedSender, - ) -> Result<(), String> { - let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { - format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") - })?; - + ) { // some inputs might have been closed already -> report those events let closed_inputs = dataflow .mappings @@ -763,22 +783,17 @@ impl Daemon { } dataflow.subscribe_channels.insert(node_id, event_sender); - - Ok(()) } - #[tracing::instrument(skip(self), level = "trace")] + #[tracing::instrument(skip(dataflow, inter_daemon_connections), fields(uuid = %dataflow.id), level = "trace")] async fn handle_outputs_done( - &mut self, - dataflow_id: Uuid, + dataflow: &mut RunningDataflow, + inter_daemon_connections: &mut BTreeMap, node_id: &NodeId, ) -> eyre::Result<()> { - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { - format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") - })?; send_input_closed_events( dataflow, - &mut self.inter_daemon_connections, + inter_daemon_connections, |OutputId(source_id, _)| source_id == node_id, ) .await?; @@ -786,13 +801,18 @@ impl Daemon { Ok(()) } - #[tracing::instrument(skip(self), level = "trace")] async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> { - self.handle_outputs_done(dataflow_id, node_id).await?; - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") })?; + + dataflow + .pending_nodes + .handle_node_stop(node_id, &mut self.coordinator_connection) + .await?; + + Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, node_id).await?; + dataflow.running_nodes.remove(node_id); if dataflow.running_nodes.is_empty() { tracing::info!( @@ -1122,11 +1142,7 @@ fn close_input(dataflow: &mut RunningDataflow, receiver_id: &NodeId, input_id: & pub struct RunningDataflow { id: Uuid, /// Local nodes that are not started yet - pending_nodes: HashSet, - /// Used to synchronize node starts. - /// - /// Subscribe requests block the node until all other nodes are ready too. - subscribe_replies: HashMap, Result<(), String>)>, + pending_nodes: PendingNodes, subscribe_channels: HashMap>, drop_channels: HashMap>, @@ -1135,7 +1151,6 @@ pub struct RunningDataflow { open_inputs: BTreeMap>, running_nodes: BTreeSet, - external_nodes: BTreeMap, open_external_mappings: HashMap>>, pending_drop_tokens: HashMap, @@ -1151,18 +1166,16 @@ pub struct RunningDataflow { } impl RunningDataflow { - fn new(id: Uuid) -> RunningDataflow { + fn new(dataflow_id: Uuid, machine_id: String) -> RunningDataflow { Self { - id, - pending_nodes: HashSet::new(), - subscribe_replies: HashMap::new(), + id: dataflow_id, + pending_nodes: PendingNodes::new(dataflow_id, machine_id), subscribe_channels: HashMap::new(), drop_channels: HashMap::new(), mappings: HashMap::new(), timers: BTreeMap::new(), open_inputs: BTreeMap::new(), running_nodes: BTreeSet::new(), - external_nodes: BTreeMap::new(), open_external_mappings: HashMap::new(), pending_drop_tokens: HashMap::new(), _timer_handles: Vec::new(), @@ -1172,12 +1185,6 @@ impl RunningDataflow { } async fn start(&mut self, events_tx: &mpsc::Sender) -> eyre::Result<()> { - // answer all subscribe requests - let subscribe_replies = std::mem::take(&mut self.subscribe_replies); - for (reply_sender, subscribe_result) in subscribe_replies.into_values() { - let _ = reply_sender.send(DaemonReply::Result(subscribe_result)); - } - for interval in self.timers.keys().copied() { let events_tx = events_tx.clone(); let dataflow_id = self.id; @@ -1283,7 +1290,7 @@ pub enum Event { Coordinator(CoordinatorEvent), Daemon(InterDaemonEvent), Dora(DoraEvent), - WatchdogInterval, + HeartbeatInterval, CtrlC, } diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs new file mode 100644 index 00000000..85cbe56f --- /dev/null +++ b/binaries/daemon/src/pending.rs @@ -0,0 +1,167 @@ +use std::collections::{HashMap, HashSet}; + +use dora_core::{ + config::NodeId, + coordinator_messages::{CoordinatorRequest, DaemonEvent}, + daemon_messages::{DaemonReply, DataflowId}, +}; +use eyre::{bail, Context}; +use tokio::{net::TcpStream, sync::oneshot}; + +use crate::tcp_utils::tcp_send; + +pub struct PendingNodes { + dataflow_id: DataflowId, + machine_id: String, + + /// The local nodes that are still waiting to start. + local_nodes: HashSet, + /// Whether there are external nodes for this dataflow. + external_nodes: bool, + + /// Used to synchronize node starts. + /// + /// Subscribe requests block the node until all other nodes are ready too. + waiting_subscribers: HashMap>, + /// List of nodes that finished before connecting to the dora daemon. + /// + /// If this list is non-empty, we should not start the dataflow at all. Instead, + /// we report an error to the other nodes. + exited_before_subscribe: HashSet, + + /// Whether the local init result was already reported to the coordinator. + reported_init_to_coordinator: bool, +} + +impl PendingNodes { + pub fn new(dataflow_id: DataflowId, machine_id: String) -> Self { + Self { + dataflow_id, + machine_id, + local_nodes: HashSet::new(), + external_nodes: false, + waiting_subscribers: HashMap::new(), + exited_before_subscribe: HashSet::new(), + reported_init_to_coordinator: false, + } + } + + pub fn insert(&mut self, node_id: NodeId) { + self.local_nodes.insert(node_id); + } + + pub fn set_external_nodes(&mut self, value: bool) { + self.external_nodes = value; + } + + pub async fn handle_node_subscription( + &mut self, + node_id: NodeId, + reply_sender: oneshot::Sender, + coordinator_connection: &mut Option, + ) -> eyre::Result { + self.waiting_subscribers + .insert(node_id.clone(), reply_sender); + self.local_nodes.remove(&node_id); + + self.update_dataflow_status(coordinator_connection).await + } + + pub async fn handle_node_stop( + &mut self, + node_id: &NodeId, + coordinator_connection: &mut Option, + ) -> eyre::Result<()> { + if self.local_nodes.remove(node_id) { + tracing::warn!("node `{node_id}` exited before initializing dora connection"); + self.exited_before_subscribe.insert(node_id.clone()); + self.update_dataflow_status(coordinator_connection).await?; + } + Ok(()) + } + + pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> { + if !self.local_nodes.is_empty() { + bail!("received external `all_nodes_ready` event before local nodes were ready"); + } + let external_error = if success { + None + } else { + Some("some nodes failed to initalize on remote machines".to_string()) + }; + self.answer_subscribe_requests(external_error).await; + + Ok(()) + } + + async fn update_dataflow_status( + &mut self, + coordinator_connection: &mut Option, + ) -> eyre::Result { + if self.local_nodes.is_empty() { + if self.external_nodes { + if !self.reported_init_to_coordinator { + self.report_nodes_ready(coordinator_connection).await?; + self.reported_init_to_coordinator = true; + } + Ok(DataflowStatus::Pending) + } else { + self.answer_subscribe_requests(None).await; + Ok(DataflowStatus::AllNodesReady) + } + } else { + Ok(DataflowStatus::Pending) + } + } + + async fn answer_subscribe_requests(&mut self, external_error: Option) { + let result = if self.exited_before_subscribe.is_empty() { + match external_error { + Some(err) => Err(err), + None => Ok(()), + } + } else { + Err(format!( + "Some nodes exited before subscribing to dora: {:?}\n\n\ + This is typically happens when an initialization error occurs + in the node or operator. To check the output of the failed + nodes, run `dora logs {} `.", + self.exited_before_subscribe, self.dataflow_id + )) + }; + // answer all subscribe requests + let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); + for reply_sender in subscribe_replies.into_values() { + let _ = reply_sender.send(DaemonReply::Result(result.clone())); + } + } + + async fn report_nodes_ready( + &self, + coordinator_connection: &mut Option, + ) -> eyre::Result<()> { + let Some(connection) = coordinator_connection else { + bail!("no coordinator connection to send AllNodesReady"); + }; + + let success = self.exited_before_subscribe.is_empty(); + tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes"); + + let msg = serde_json::to_vec(&CoordinatorRequest::Event { + machine_id: self.machine_id.clone(), + event: DaemonEvent::AllNodesReady { + dataflow_id: self.dataflow_id, + success, + }, + })?; + tcp_send(connection, &msg) + .await + .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; + Ok(()) + } +} + +pub enum DataflowStatus { + AllNodesReady, + Pending, +} diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 21ae38a9..e83ee784 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -19,12 +19,13 @@ pub enum CoordinatorRequest { pub enum DaemonEvent { AllNodesReady { dataflow_id: DataflowId, + success: bool, }, AllNodesFinished { dataflow_id: DataflowId, result: Result<(), String>, }, - Watchdog, + Heartbeat, } #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -41,6 +42,3 @@ impl RegisterResult { } } } - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct WatchdogAck; diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index e51c1586..5929fb6a 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -127,6 +127,7 @@ impl fmt::Debug for Data { type SharedMemoryId = String; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[must_use] pub enum DaemonReply { Result(Result<(), String>), PreparedMessage { shared_memory_id: SharedMemoryId }, @@ -200,6 +201,7 @@ pub enum DaemonCoordinatorEvent { Spawn(SpawnDataflowNodes), AllNodesReady { dataflow_id: DataflowId, + success: bool, }, StopDataflow { dataflow_id: DataflowId, @@ -214,7 +216,7 @@ pub enum DaemonCoordinatorEvent { node_id: NodeId, }, Destroy, - Watchdog, + Heartbeat, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -238,7 +240,6 @@ pub enum DaemonCoordinatorReply { ReloadResult(Result<(), String>), StopResult(Result<(), String>), DestroyResult(Result<(), String>), - WatchdogAck, Logs(Result, String>), }