| @@ -82,7 +82,7 @@ jobs: | |||
| - uses: Swatinem/rust-cache@v2 | |||
| - name: "Build cli and binaries" | |||
| timeout-minutes: 30 | |||
| timeout-minutes: 45 | |||
| run: | | |||
| cargo install --path binaries/coordinator | |||
| cargo install --path binaries/daemon | |||
| @@ -59,11 +59,19 @@ impl EventStream { | |||
| mut close_channel: DaemonChannel, | |||
| ) -> eyre::Result<Self> { | |||
| channel.register(dataflow_id, node_id.clone())?; | |||
| channel | |||
| let reply = channel | |||
| .request(&DaemonRequest::Subscribe) | |||
| .map_err(|e| eyre!(e)) | |||
| .wrap_err("failed to create subscription with dora-daemon")?; | |||
| match reply { | |||
| daemon_messages::DaemonReply::Result(Ok(())) => {} | |||
| daemon_messages::DaemonReply::Result(Err(err)) => { | |||
| eyre::bail!("subscribe failed: {err}") | |||
| } | |||
| other => eyre::bail!("unexpected subscribe reply: {other:?}"), | |||
| } | |||
| close_channel.register(dataflow_id, node_id.clone())?; | |||
| let (tx, rx) = flume::bounded(0); | |||
| @@ -4,7 +4,7 @@ use crate::daemon_connection::DaemonChannel; | |||
| use dora_core::{ | |||
| config::NodeId, | |||
| daemon_messages::{ | |||
| DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent, | |||
| self, DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent, | |||
| }, | |||
| }; | |||
| use eyre::{eyre, Context}; | |||
| @@ -45,11 +45,19 @@ impl DropStream { | |||
| ) -> eyre::Result<Self> { | |||
| channel.register(dataflow_id, node_id.clone())?; | |||
| channel | |||
| let reply = channel | |||
| .request(&DaemonRequest::SubscribeDrop) | |||
| .map_err(|e| eyre!(e)) | |||
| .wrap_err("failed to create subscription with dora-daemon")?; | |||
| match reply { | |||
| daemon_messages::DaemonReply::Result(Ok(())) => {} | |||
| daemon_messages::DaemonReply::Result(Err(err)) => { | |||
| eyre::bail!("drop subscribe failed: {err}") | |||
| } | |||
| other => eyre::bail!("unexpected drop subscribe reply: {other:?}"), | |||
| } | |||
| let (tx, rx) = flume::bounded(0); | |||
| let node_id_cloned = node_id.clone(); | |||
| @@ -44,7 +44,7 @@ async fn listen( | |||
| let incoming = match result { | |||
| Ok(incoming) => incoming, | |||
| Err(err) => { | |||
| let _ = tx.blocking_send(err.into()); | |||
| let _ = tx.send(err.into()).await; | |||
| return; | |||
| } | |||
| }; | |||
| @@ -21,7 +21,7 @@ use std::{ | |||
| collections::{BTreeSet, HashMap}, | |||
| net::SocketAddr, | |||
| path::PathBuf, | |||
| time::Duration, | |||
| time::{Duration, Instant}, | |||
| }; | |||
| use tokio::{ | |||
| net::{TcpListener, TcpStream}, | |||
| @@ -138,9 +138,9 @@ async fn start_inner( | |||
| .await | |||
| .wrap_err("failed to create control events")?; | |||
| let daemon_watchdog_interval = | |||
| tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1))) | |||
| .map(|_| Event::DaemonWatchdogInterval); | |||
| let daemon_heartbeat_interval = | |||
| tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3))) | |||
| .map(|_| Event::DaemonHeartbeatInterval); | |||
| // events that should be aborted on `dora destroy` | |||
| let (abortable_events, abort_handle) = futures::stream::abortable( | |||
| @@ -148,7 +148,7 @@ async fn start_inner( | |||
| control_events, | |||
| new_daemon_connections, | |||
| external_events, | |||
| daemon_watchdog_interval, | |||
| daemon_heartbeat_interval, | |||
| ) | |||
| .merge(), | |||
| ); | |||
| @@ -203,6 +203,7 @@ async fn start_inner( | |||
| DaemonConnection { | |||
| stream: connection, | |||
| listen_socket, | |||
| last_heartbeat: Instant::now(), | |||
| }, | |||
| ); | |||
| if let Some(_previous) = previous { | |||
| @@ -221,15 +222,20 @@ async fn start_inner( | |||
| } | |||
| }, | |||
| Event::Dataflow { uuid, event } => match event { | |||
| DataflowEvent::ReadyOnMachine { machine_id } => { | |||
| DataflowEvent::ReadyOnMachine { | |||
| machine_id, | |||
| success, | |||
| } => { | |||
| match running_dataflows.entry(uuid) { | |||
| std::collections::hash_map::Entry::Occupied(mut entry) => { | |||
| let dataflow = entry.get_mut(); | |||
| dataflow.pending_machines.remove(&machine_id); | |||
| dataflow.init_success &= success; | |||
| if dataflow.pending_machines.is_empty() { | |||
| let message = | |||
| serde_json::to_vec(&DaemonCoordinatorEvent::AllNodesReady { | |||
| dataflow_id: uuid, | |||
| success: dataflow.init_success, | |||
| }) | |||
| .wrap_err("failed to serialize AllNodesReady message")?; | |||
| @@ -442,11 +448,15 @@ async fn start_inner( | |||
| } | |||
| ControlEvent::Error(err) => tracing::error!("{err:?}"), | |||
| }, | |||
| Event::DaemonWatchdogInterval => { | |||
| Event::DaemonHeartbeatInterval => { | |||
| let mut disconnected = BTreeSet::new(); | |||
| for (machine_id, connection) in &mut daemon_connections { | |||
| if connection.last_heartbeat.elapsed() > Duration::from_secs(15) { | |||
| disconnected.insert(machine_id.clone()); | |||
| continue; | |||
| } | |||
| let result: eyre::Result<()> = | |||
| tokio::time::timeout(Duration::from_millis(100), send_watchdog_message(&mut connection.stream)) | |||
| tokio::time::timeout(Duration::from_millis(500), send_watchdog_message(&mut connection.stream)) | |||
| .await | |||
| .wrap_err("timeout") | |||
| .and_then(|r| r).wrap_err_with(|| | |||
| @@ -474,6 +484,11 @@ async fn start_inner( | |||
| ) | |||
| .await?; | |||
| } | |||
| Event::DaemonHeartbeat { machine_id } => { | |||
| if let Some(connection) = daemon_connections.get_mut(&machine_id) { | |||
| connection.last_heartbeat = Instant::now(); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| @@ -485,6 +500,7 @@ async fn start_inner( | |||
| struct DaemonConnection { | |||
| stream: TcpStream, | |||
| listen_socket: SocketAddr, | |||
| last_heartbeat: Instant, | |||
| } | |||
| fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> { | |||
| @@ -525,21 +541,11 @@ async fn handle_destroy( | |||
| } | |||
| async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> { | |||
| let message = serde_json::to_vec(&DaemonCoordinatorEvent::Watchdog).unwrap(); | |||
| let message = serde_json::to_vec(&DaemonCoordinatorEvent::Heartbeat).unwrap(); | |||
| tcp_send(connection, &message) | |||
| .await | |||
| .wrap_err("failed to send watchdog message to daemon")?; | |||
| let reply_raw = tcp_receive(connection) | |||
| .await | |||
| .wrap_err("failed to receive stop reply from daemon")?; | |||
| match serde_json::from_slice(&reply_raw) | |||
| .wrap_err("failed to deserialize stop reply from daemon")? | |||
| { | |||
| DaemonCoordinatorReply::WatchdogAck => Ok(()), | |||
| other => bail!("unexpected reply after sending `watchdog`: {other:?}"), | |||
| } | |||
| .wrap_err("failed to send watchdog message to daemon") | |||
| } | |||
| #[allow(dead_code)] // Keeping the communication layer for later use. | |||
| @@ -550,6 +556,7 @@ struct RunningDataflow { | |||
| machines: BTreeSet<String>, | |||
| /// IDs of machines that are waiting until all nodes are started. | |||
| pending_machines: BTreeSet<String>, | |||
| init_success: bool, | |||
| nodes: Vec<ResolvedNode>, | |||
| } | |||
| @@ -732,6 +739,7 @@ async fn start_dataflow( | |||
| } else { | |||
| BTreeSet::new() | |||
| }, | |||
| init_success: true, | |||
| machines, | |||
| nodes, | |||
| }) | |||
| @@ -770,10 +778,11 @@ async fn destroy_daemons( | |||
| pub enum Event { | |||
| NewDaemonConnection(TcpStream), | |||
| DaemonConnectError(eyre::Report), | |||
| DaemonHeartbeat { machine_id: String }, | |||
| Dataflow { uuid: Uuid, event: DataflowEvent }, | |||
| Control(ControlEvent), | |||
| Daemon(DaemonEvent), | |||
| DaemonWatchdogInterval, | |||
| DaemonHeartbeatInterval, | |||
| CtrlC, | |||
| } | |||
| @@ -782,7 +791,7 @@ impl Event { | |||
| #[allow(clippy::match_like_matches_macro)] | |||
| pub fn log(&self) -> bool { | |||
| match self { | |||
| Event::DaemonWatchdogInterval => false, | |||
| Event::DaemonHeartbeatInterval => false, | |||
| _ => true, | |||
| } | |||
| } | |||
| @@ -796,6 +805,7 @@ pub enum DataflowEvent { | |||
| }, | |||
| ReadyOnMachine { | |||
| machine_id: String, | |||
| success: bool, | |||
| }, | |||
| } | |||
| @@ -1,10 +1,7 @@ | |||
| use crate::{ | |||
| tcp_utils::{tcp_receive, tcp_send}, | |||
| DaemonEvent, DataflowEvent, Event, | |||
| }; | |||
| use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; | |||
| use dora_core::coordinator_messages; | |||
| use eyre::{eyre, Context}; | |||
| use std::{io::ErrorKind, net::Ipv4Addr, time::Duration}; | |||
| use std::{io::ErrorKind, net::Ipv4Addr}; | |||
| use tokio::{ | |||
| net::{TcpListener, TcpStream}, | |||
| sync::mpsc, | |||
| @@ -60,10 +57,16 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende | |||
| break; | |||
| } | |||
| coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { | |||
| coordinator_messages::DaemonEvent::AllNodesReady { dataflow_id } => { | |||
| coordinator_messages::DaemonEvent::AllNodesReady { | |||
| dataflow_id, | |||
| success, | |||
| } => { | |||
| let event = Event::Dataflow { | |||
| uuid: dataflow_id, | |||
| event: DataflowEvent::ReadyOnMachine { machine_id }, | |||
| event: DataflowEvent::ReadyOnMachine { | |||
| machine_id, | |||
| success, | |||
| }, | |||
| }; | |||
| if events_tx.send(event).await.is_err() { | |||
| break; | |||
| @@ -84,13 +87,11 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende | |||
| break; | |||
| } | |||
| } | |||
| coordinator_messages::DaemonEvent::Watchdog => { | |||
| let reply = serde_json::to_vec(&coordinator_messages::WatchdogAck).unwrap(); | |||
| _ = tokio::time::timeout( | |||
| Duration::from_millis(10), | |||
| tcp_send(&mut connection, &reply), | |||
| ) | |||
| .await; | |||
| coordinator_messages::DaemonEvent::Heartbeat => { | |||
| let event = Event::DaemonHeartbeat { machine_id }; | |||
| if events_tx.send(event).await.is_err() { | |||
| break; | |||
| } | |||
| } | |||
| }, | |||
| }; | |||
| @@ -17,9 +17,10 @@ use eyre::{bail, eyre, Context, ContextCompat}; | |||
| use futures::{future, stream, FutureExt, TryFutureExt}; | |||
| use futures_concurrency::stream::Merge; | |||
| use inter_daemon::InterDaemonConnection; | |||
| use pending::PendingNodes; | |||
| use shared_memory_server::ShmemConf; | |||
| use std::collections::HashSet; | |||
| use std::env::temp_dir; | |||
| use std::time::Instant; | |||
| use std::{ | |||
| borrow::Cow, | |||
| collections::{BTreeMap, BTreeSet, HashMap}, | |||
| @@ -28,7 +29,7 @@ use std::{ | |||
| path::{Path, PathBuf}, | |||
| time::Duration, | |||
| }; | |||
| use tcp_utils::{tcp_receive, tcp_send}; | |||
| use tcp_utils::tcp_send; | |||
| use tokio::fs::File; | |||
| use tokio::io::AsyncReadExt; | |||
| use tokio::net::TcpStream; | |||
| @@ -43,6 +44,7 @@ mod coordinator; | |||
| mod inter_daemon; | |||
| mod log; | |||
| mod node_communication; | |||
| mod pending; | |||
| mod spawn; | |||
| mod tcp_utils; | |||
| @@ -51,12 +53,15 @@ use dora_tracing::telemetry::serialize_context; | |||
| #[cfg(feature = "telemetry")] | |||
| use tracing_opentelemetry::OpenTelemetrySpanExt; | |||
| use crate::pending::DataflowStatus; | |||
| pub struct Daemon { | |||
| running: HashMap<DataflowId, RunningDataflow>, | |||
| events_tx: mpsc::Sender<Event>, | |||
| coordinator_connection: Option<TcpStream>, | |||
| last_coordinator_heartbeat: Instant, | |||
| inter_daemon_connections: BTreeMap<String, InterDaemonConnection>, | |||
| machine_id: String, | |||
| @@ -183,6 +188,7 @@ impl Daemon { | |||
| running: HashMap::new(), | |||
| events_tx: dora_events_tx, | |||
| coordinator_connection, | |||
| last_coordinator_heartbeat: Instant::now(), | |||
| inter_daemon_connections: BTreeMap::new(), | |||
| machine_id, | |||
| exit_when_done, | |||
| @@ -193,7 +199,7 @@ impl Daemon { | |||
| let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( | |||
| Duration::from_secs(5), | |||
| )) | |||
| .map(|_| Event::WatchdogInterval); | |||
| .map(|_| Event::HeartbeatInterval); | |||
| let events = (external_events, dora_events, watchdog_interval).merge(); | |||
| daemon.run_inner(events).await | |||
| } | |||
| @@ -227,22 +233,19 @@ impl Daemon { | |||
| RunStatus::Continue => {} | |||
| RunStatus::Exit => break, | |||
| }, | |||
| Event::WatchdogInterval => { | |||
| Event::HeartbeatInterval => { | |||
| if let Some(connection) = &mut self.coordinator_connection { | |||
| let msg = serde_json::to_vec(&CoordinatorRequest::Event { | |||
| machine_id: self.machine_id.clone(), | |||
| event: DaemonEvent::Watchdog, | |||
| event: DaemonEvent::Heartbeat, | |||
| })?; | |||
| tcp_send(connection, &msg) | |||
| .await | |||
| .wrap_err("failed to send watchdog message to dora-coordinator")?; | |||
| let reply_raw = tcp_receive(connection) | |||
| .await | |||
| .wrap_err("lost connection to coordinator")?; | |||
| let _: dora_core::coordinator_messages::WatchdogAck = | |||
| serde_json::from_slice(&reply_raw) | |||
| .wrap_err("received unexpected watchdog reply from coordinator")?; | |||
| if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) { | |||
| bail!("lost connection to coordinator") | |||
| } | |||
| } | |||
| } | |||
| Event::CtrlC => { | |||
| @@ -298,11 +301,20 @@ impl Daemon { | |||
| }); | |||
| RunStatus::Continue | |||
| } | |||
| DaemonCoordinatorEvent::AllNodesReady { dataflow_id } => { | |||
| DaemonCoordinatorEvent::AllNodesReady { | |||
| dataflow_id, | |||
| success, | |||
| } => { | |||
| match self.running.get_mut(&dataflow_id) { | |||
| Some(dataflow) => { | |||
| tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); | |||
| dataflow.start(&self.events_tx).await?; | |||
| dataflow | |||
| .pending_nodes | |||
| .handle_external_all_nodes_ready(success) | |||
| .await?; | |||
| if success { | |||
| tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); | |||
| dataflow.start(&self.events_tx).await?; | |||
| } | |||
| } | |||
| None => { | |||
| tracing::warn!( | |||
| @@ -382,12 +394,9 @@ impl Daemon { | |||
| .map_err(|_| error!("could not send destroy reply from daemon to coordinator")); | |||
| RunStatus::Exit | |||
| } | |||
| DaemonCoordinatorEvent::Watchdog => { | |||
| let _ = reply_tx | |||
| .send(Some(DaemonCoordinatorReply::WatchdogAck)) | |||
| .map_err(|_| { | |||
| error!("could not send WatchdogAck reply from daemon to coordinator") | |||
| }); | |||
| DaemonCoordinatorEvent::Heartbeat => { | |||
| self.last_coordinator_heartbeat = Instant::now(); | |||
| let _ = reply_tx.send(None); | |||
| RunStatus::Continue | |||
| } | |||
| }; | |||
| @@ -457,7 +466,7 @@ impl Daemon { | |||
| nodes: Vec<ResolvedNode>, | |||
| daemon_communication_config: LocalCommunicationConfig, | |||
| ) -> eyre::Result<()> { | |||
| let dataflow = RunningDataflow::new(dataflow_id); | |||
| let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); | |||
| let dataflow = match self.running.entry(dataflow_id) { | |||
| std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow), | |||
| std::collections::hash_map::Entry::Occupied(_) => { | |||
| @@ -506,7 +515,7 @@ impl Daemon { | |||
| dataflow.pending_nodes.insert(node.id.clone()); | |||
| let node_id = node.id.clone(); | |||
| spawn::spawn_node( | |||
| match spawn::spawn_node( | |||
| dataflow_id, | |||
| &working_dir, | |||
| node, | |||
| @@ -514,10 +523,21 @@ impl Daemon { | |||
| daemon_communication_config, | |||
| ) | |||
| .await | |||
| .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?; | |||
| dataflow.running_nodes.insert(node_id); | |||
| .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) | |||
| { | |||
| Ok(()) => { | |||
| dataflow.running_nodes.insert(node_id); | |||
| } | |||
| Err(err) => { | |||
| tracing::error!("{err:?}"); | |||
| dataflow | |||
| .pending_nodes | |||
| .handle_node_stop(&node_id, &mut self.coordinator_connection) | |||
| .await?; | |||
| } | |||
| } | |||
| } else { | |||
| dataflow.external_nodes.insert(node.id.clone(), node); | |||
| dataflow.pending_nodes.set_external_nodes(true); | |||
| } | |||
| } | |||
| @@ -535,39 +555,35 @@ impl Daemon { | |||
| event_sender, | |||
| reply_sender, | |||
| } => { | |||
| let result = self | |||
| .subscribe(dataflow_id, node_id.clone(), event_sender) | |||
| .await; | |||
| let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { | |||
| format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`") | |||
| })?; | |||
| tracing::debug!("node `{node_id}` is ready"); | |||
| let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { | |||
| format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") | |||
| }); | |||
| dataflow | |||
| .subscribe_replies | |||
| .insert(node_id.clone(), (reply_sender, result)); | |||
| dataflow.pending_nodes.remove(&node_id); | |||
| if dataflow.pending_nodes.is_empty() { | |||
| if dataflow.external_nodes.is_empty() { | |||
| tracing::info!("all nodes are ready, starting dataflow `{dataflow_id}`"); | |||
| dataflow.start(&self.events_tx).await?; | |||
| } else { | |||
| tracing::info!( | |||
| "all local nodes are ready, waiting for remote nodes \ | |||
| for dataflow `{dataflow_id}`" | |||
| ); | |||
| match dataflow { | |||
| Err(err) => { | |||
| let _ = reply_sender.send(DaemonReply::Result(Err(err))); | |||
| } | |||
| Ok(dataflow) => { | |||
| tracing::debug!("node `{node_id}` is ready"); | |||
| Self::subscribe(dataflow, node_id.clone(), event_sender).await; | |||
| // dataflow is split across multiple daemons -> synchronize with dora-coordinator | |||
| let Some(connection) = &mut self.coordinator_connection else { | |||
| bail!("no coordinator connection to send AllNodesReady"); | |||
| }; | |||
| let msg = serde_json::to_vec(&CoordinatorRequest::Event { | |||
| machine_id: self.machine_id.clone(), | |||
| event: DaemonEvent::AllNodesReady { dataflow_id }, | |||
| })?; | |||
| tcp_send(connection, &msg) | |||
| .await | |||
| .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; | |||
| let status = dataflow | |||
| .pending_nodes | |||
| .handle_node_subscription( | |||
| node_id.clone(), | |||
| reply_sender, | |||
| &mut self.coordinator_connection, | |||
| ) | |||
| .await?; | |||
| match status { | |||
| DataflowStatus::AllNodesReady => { | |||
| tracing::info!( | |||
| "all nodes are ready, starting dataflow `{dataflow_id}`" | |||
| ); | |||
| dataflow.start(&self.events_tx).await?; | |||
| } | |||
| DataflowStatus::Pending => {} | |||
| } | |||
| } | |||
| } | |||
| } | |||
| @@ -605,8 +621,17 @@ impl Daemon { | |||
| let _ = reply_sender.send(DaemonReply::Result(reply)); | |||
| } | |||
| DaemonNodeEvent::OutputsDone { reply_sender } => { | |||
| let _ = reply_sender.send(DaemonReply::Result(Ok(()))); | |||
| self.handle_outputs_done(dataflow_id, &node_id).await?; | |||
| let result = match self.running.get_mut(&dataflow_id) { | |||
| Some(dataflow) => { | |||
| Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, &node_id) | |||
| .await | |||
| }, | |||
| None => Err(eyre!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")), | |||
| }; | |||
| let _ = reply_sender.send(DaemonReply::Result( | |||
| result.map_err(|err| format!("{err:?}")), | |||
| )); | |||
| } | |||
| DaemonNodeEvent::SendOut { | |||
| output_id, | |||
| @@ -724,15 +749,10 @@ impl Daemon { | |||
| } | |||
| async fn subscribe( | |||
| &mut self, | |||
| dataflow_id: Uuid, | |||
| dataflow: &mut RunningDataflow, | |||
| node_id: NodeId, | |||
| event_sender: UnboundedSender<daemon_messages::NodeEvent>, | |||
| ) -> Result<(), String> { | |||
| let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { | |||
| format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") | |||
| })?; | |||
| ) { | |||
| // some inputs might have been closed already -> report those events | |||
| let closed_inputs = dataflow | |||
| .mappings | |||
| @@ -763,22 +783,17 @@ impl Daemon { | |||
| } | |||
| dataflow.subscribe_channels.insert(node_id, event_sender); | |||
| Ok(()) | |||
| } | |||
| #[tracing::instrument(skip(self), level = "trace")] | |||
| #[tracing::instrument(skip(dataflow, inter_daemon_connections), fields(uuid = %dataflow.id), level = "trace")] | |||
| async fn handle_outputs_done( | |||
| &mut self, | |||
| dataflow_id: Uuid, | |||
| dataflow: &mut RunningDataflow, | |||
| inter_daemon_connections: &mut BTreeMap<String, InterDaemonConnection>, | |||
| node_id: &NodeId, | |||
| ) -> eyre::Result<()> { | |||
| let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { | |||
| format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") | |||
| })?; | |||
| send_input_closed_events( | |||
| dataflow, | |||
| &mut self.inter_daemon_connections, | |||
| inter_daemon_connections, | |||
| |OutputId(source_id, _)| source_id == node_id, | |||
| ) | |||
| .await?; | |||
| @@ -786,13 +801,18 @@ impl Daemon { | |||
| Ok(()) | |||
| } | |||
| #[tracing::instrument(skip(self), level = "trace")] | |||
| async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> { | |||
| self.handle_outputs_done(dataflow_id, node_id).await?; | |||
| let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { | |||
| format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") | |||
| })?; | |||
| dataflow | |||
| .pending_nodes | |||
| .handle_node_stop(node_id, &mut self.coordinator_connection) | |||
| .await?; | |||
| Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, node_id).await?; | |||
| dataflow.running_nodes.remove(node_id); | |||
| if dataflow.running_nodes.is_empty() { | |||
| tracing::info!( | |||
| @@ -1122,11 +1142,7 @@ fn close_input(dataflow: &mut RunningDataflow, receiver_id: &NodeId, input_id: & | |||
| pub struct RunningDataflow { | |||
| id: Uuid, | |||
| /// Local nodes that are not started yet | |||
| pending_nodes: HashSet<NodeId>, | |||
| /// Used to synchronize node starts. | |||
| /// | |||
| /// Subscribe requests block the node until all other nodes are ready too. | |||
| subscribe_replies: HashMap<NodeId, (oneshot::Sender<DaemonReply>, Result<(), String>)>, | |||
| pending_nodes: PendingNodes, | |||
| subscribe_channels: HashMap<NodeId, UnboundedSender<daemon_messages::NodeEvent>>, | |||
| drop_channels: HashMap<NodeId, UnboundedSender<daemon_messages::NodeDropEvent>>, | |||
| @@ -1135,7 +1151,6 @@ pub struct RunningDataflow { | |||
| open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>, | |||
| running_nodes: BTreeSet<NodeId>, | |||
| external_nodes: BTreeMap<NodeId, ResolvedNode>, | |||
| open_external_mappings: HashMap<OutputId, BTreeMap<String, BTreeSet<InputId>>>, | |||
| pending_drop_tokens: HashMap<DropToken, DropTokenInformation>, | |||
| @@ -1151,18 +1166,16 @@ pub struct RunningDataflow { | |||
| } | |||
| impl RunningDataflow { | |||
| fn new(id: Uuid) -> RunningDataflow { | |||
| fn new(dataflow_id: Uuid, machine_id: String) -> RunningDataflow { | |||
| Self { | |||
| id, | |||
| pending_nodes: HashSet::new(), | |||
| subscribe_replies: HashMap::new(), | |||
| id: dataflow_id, | |||
| pending_nodes: PendingNodes::new(dataflow_id, machine_id), | |||
| subscribe_channels: HashMap::new(), | |||
| drop_channels: HashMap::new(), | |||
| mappings: HashMap::new(), | |||
| timers: BTreeMap::new(), | |||
| open_inputs: BTreeMap::new(), | |||
| running_nodes: BTreeSet::new(), | |||
| external_nodes: BTreeMap::new(), | |||
| open_external_mappings: HashMap::new(), | |||
| pending_drop_tokens: HashMap::new(), | |||
| _timer_handles: Vec::new(), | |||
| @@ -1172,12 +1185,6 @@ impl RunningDataflow { | |||
| } | |||
| async fn start(&mut self, events_tx: &mpsc::Sender<Event>) -> eyre::Result<()> { | |||
| // answer all subscribe requests | |||
| let subscribe_replies = std::mem::take(&mut self.subscribe_replies); | |||
| for (reply_sender, subscribe_result) in subscribe_replies.into_values() { | |||
| let _ = reply_sender.send(DaemonReply::Result(subscribe_result)); | |||
| } | |||
| for interval in self.timers.keys().copied() { | |||
| let events_tx = events_tx.clone(); | |||
| let dataflow_id = self.id; | |||
| @@ -1283,7 +1290,7 @@ pub enum Event { | |||
| Coordinator(CoordinatorEvent), | |||
| Daemon(InterDaemonEvent), | |||
| Dora(DoraEvent), | |||
| WatchdogInterval, | |||
| HeartbeatInterval, | |||
| CtrlC, | |||
| } | |||
| @@ -0,0 +1,167 @@ | |||
| use std::collections::{HashMap, HashSet}; | |||
| use dora_core::{ | |||
| config::NodeId, | |||
| coordinator_messages::{CoordinatorRequest, DaemonEvent}, | |||
| daemon_messages::{DaemonReply, DataflowId}, | |||
| }; | |||
| use eyre::{bail, Context}; | |||
| use tokio::{net::TcpStream, sync::oneshot}; | |||
| use crate::tcp_utils::tcp_send; | |||
| pub struct PendingNodes { | |||
| dataflow_id: DataflowId, | |||
| machine_id: String, | |||
| /// The local nodes that are still waiting to start. | |||
| local_nodes: HashSet<NodeId>, | |||
| /// Whether there are external nodes for this dataflow. | |||
| external_nodes: bool, | |||
| /// Used to synchronize node starts. | |||
| /// | |||
| /// Subscribe requests block the node until all other nodes are ready too. | |||
| waiting_subscribers: HashMap<NodeId, oneshot::Sender<DaemonReply>>, | |||
| /// List of nodes that finished before connecting to the dora daemon. | |||
| /// | |||
| /// If this list is non-empty, we should not start the dataflow at all. Instead, | |||
| /// we report an error to the other nodes. | |||
| exited_before_subscribe: HashSet<NodeId>, | |||
| /// Whether the local init result was already reported to the coordinator. | |||
| reported_init_to_coordinator: bool, | |||
| } | |||
| impl PendingNodes { | |||
| pub fn new(dataflow_id: DataflowId, machine_id: String) -> Self { | |||
| Self { | |||
| dataflow_id, | |||
| machine_id, | |||
| local_nodes: HashSet::new(), | |||
| external_nodes: false, | |||
| waiting_subscribers: HashMap::new(), | |||
| exited_before_subscribe: HashSet::new(), | |||
| reported_init_to_coordinator: false, | |||
| } | |||
| } | |||
| pub fn insert(&mut self, node_id: NodeId) { | |||
| self.local_nodes.insert(node_id); | |||
| } | |||
| pub fn set_external_nodes(&mut self, value: bool) { | |||
| self.external_nodes = value; | |||
| } | |||
| pub async fn handle_node_subscription( | |||
| &mut self, | |||
| node_id: NodeId, | |||
| reply_sender: oneshot::Sender<DaemonReply>, | |||
| coordinator_connection: &mut Option<TcpStream>, | |||
| ) -> eyre::Result<DataflowStatus> { | |||
| self.waiting_subscribers | |||
| .insert(node_id.clone(), reply_sender); | |||
| self.local_nodes.remove(&node_id); | |||
| self.update_dataflow_status(coordinator_connection).await | |||
| } | |||
| pub async fn handle_node_stop( | |||
| &mut self, | |||
| node_id: &NodeId, | |||
| coordinator_connection: &mut Option<TcpStream>, | |||
| ) -> eyre::Result<()> { | |||
| if self.local_nodes.remove(node_id) { | |||
| tracing::warn!("node `{node_id}` exited before initializing dora connection"); | |||
| self.exited_before_subscribe.insert(node_id.clone()); | |||
| self.update_dataflow_status(coordinator_connection).await?; | |||
| } | |||
| Ok(()) | |||
| } | |||
| pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> { | |||
| if !self.local_nodes.is_empty() { | |||
| bail!("received external `all_nodes_ready` event before local nodes were ready"); | |||
| } | |||
| let external_error = if success { | |||
| None | |||
| } else { | |||
| Some("some nodes failed to initalize on remote machines".to_string()) | |||
| }; | |||
| self.answer_subscribe_requests(external_error).await; | |||
| Ok(()) | |||
| } | |||
| async fn update_dataflow_status( | |||
| &mut self, | |||
| coordinator_connection: &mut Option<TcpStream>, | |||
| ) -> eyre::Result<DataflowStatus> { | |||
| if self.local_nodes.is_empty() { | |||
| if self.external_nodes { | |||
| if !self.reported_init_to_coordinator { | |||
| self.report_nodes_ready(coordinator_connection).await?; | |||
| self.reported_init_to_coordinator = true; | |||
| } | |||
| Ok(DataflowStatus::Pending) | |||
| } else { | |||
| self.answer_subscribe_requests(None).await; | |||
| Ok(DataflowStatus::AllNodesReady) | |||
| } | |||
| } else { | |||
| Ok(DataflowStatus::Pending) | |||
| } | |||
| } | |||
| async fn answer_subscribe_requests(&mut self, external_error: Option<String>) { | |||
| let result = if self.exited_before_subscribe.is_empty() { | |||
| match external_error { | |||
| Some(err) => Err(err), | |||
| None => Ok(()), | |||
| } | |||
| } else { | |||
| Err(format!( | |||
| "Some nodes exited before subscribing to dora: {:?}\n\n\ | |||
| This is typically happens when an initialization error occurs | |||
| in the node or operator. To check the output of the failed | |||
| nodes, run `dora logs {} <node_id>`.", | |||
| self.exited_before_subscribe, self.dataflow_id | |||
| )) | |||
| }; | |||
| // answer all subscribe requests | |||
| let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); | |||
| for reply_sender in subscribe_replies.into_values() { | |||
| let _ = reply_sender.send(DaemonReply::Result(result.clone())); | |||
| } | |||
| } | |||
| async fn report_nodes_ready( | |||
| &self, | |||
| coordinator_connection: &mut Option<TcpStream>, | |||
| ) -> eyre::Result<()> { | |||
| let Some(connection) = coordinator_connection else { | |||
| bail!("no coordinator connection to send AllNodesReady"); | |||
| }; | |||
| let success = self.exited_before_subscribe.is_empty(); | |||
| tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes"); | |||
| let msg = serde_json::to_vec(&CoordinatorRequest::Event { | |||
| machine_id: self.machine_id.clone(), | |||
| event: DaemonEvent::AllNodesReady { | |||
| dataflow_id: self.dataflow_id, | |||
| success, | |||
| }, | |||
| })?; | |||
| tcp_send(connection, &msg) | |||
| .await | |||
| .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; | |||
| Ok(()) | |||
| } | |||
| } | |||
| pub enum DataflowStatus { | |||
| AllNodesReady, | |||
| Pending, | |||
| } | |||
| @@ -19,12 +19,13 @@ pub enum CoordinatorRequest { | |||
| pub enum DaemonEvent { | |||
| AllNodesReady { | |||
| dataflow_id: DataflowId, | |||
| success: bool, | |||
| }, | |||
| AllNodesFinished { | |||
| dataflow_id: DataflowId, | |||
| result: Result<(), String>, | |||
| }, | |||
| Watchdog, | |||
| Heartbeat, | |||
| } | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||
| @@ -41,6 +42,3 @@ impl RegisterResult { | |||
| } | |||
| } | |||
| } | |||
| #[derive(Debug, serde::Serialize, serde::Deserialize)] | |||
| pub struct WatchdogAck; | |||
| @@ -127,6 +127,7 @@ impl fmt::Debug for Data { | |||
| type SharedMemoryId = String; | |||
| #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] | |||
| #[must_use] | |||
| pub enum DaemonReply { | |||
| Result(Result<(), String>), | |||
| PreparedMessage { shared_memory_id: SharedMemoryId }, | |||
| @@ -200,6 +201,7 @@ pub enum DaemonCoordinatorEvent { | |||
| Spawn(SpawnDataflowNodes), | |||
| AllNodesReady { | |||
| dataflow_id: DataflowId, | |||
| success: bool, | |||
| }, | |||
| StopDataflow { | |||
| dataflow_id: DataflowId, | |||
| @@ -214,7 +216,7 @@ pub enum DaemonCoordinatorEvent { | |||
| node_id: NodeId, | |||
| }, | |||
| Destroy, | |||
| Watchdog, | |||
| Heartbeat, | |||
| } | |||
| #[derive(Debug, serde::Deserialize, serde::Serialize)] | |||
| @@ -238,7 +240,6 @@ pub enum DaemonCoordinatorReply { | |||
| ReloadResult(Result<(), String>), | |||
| StopResult(Result<(), String>), | |||
| DestroyResult(Result<(), String>), | |||
| WatchdogAck, | |||
| Logs(Result<Vec<u8>, String>), | |||
| } | |||