diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 7777e7a5..40758ad7 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -21,7 +21,7 @@ use std::{ collections::{BTreeSet, HashMap}, net::SocketAddr, path::PathBuf, - time::Duration, + time::{Duration, Instant}, }; use tokio::{ net::{TcpListener, TcpStream}, @@ -138,9 +138,9 @@ async fn start_inner( .await .wrap_err("failed to create control events")?; - let daemon_watchdog_interval = + let daemon_heartbeat_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3))) - .map(|_| Event::DaemonWatchdogInterval); + .map(|_| Event::DaemonHeartbeatInterval); // events that should be aborted on `dora destroy` let (abortable_events, abort_handle) = futures::stream::abortable( @@ -148,7 +148,7 @@ async fn start_inner( control_events, new_daemon_connections, external_events, - daemon_watchdog_interval, + daemon_heartbeat_interval, ) .merge(), ); @@ -203,6 +203,7 @@ async fn start_inner( DaemonConnection { stream: connection, listen_socket, + last_heartbeat: Instant::now(), }, ); if let Some(_previous) = previous { @@ -447,9 +448,13 @@ async fn start_inner( } ControlEvent::Error(err) => tracing::error!("{err:?}"), }, - Event::DaemonWatchdogInterval => { + Event::DaemonHeartbeatInterval => { let mut disconnected = BTreeSet::new(); for (machine_id, connection) in &mut daemon_connections { + if connection.last_heartbeat.elapsed() > Duration::from_secs(15) { + disconnected.insert(machine_id.clone()); + continue; + } let result: eyre::Result<()> = tokio::time::timeout(Duration::from_millis(500), send_watchdog_message(&mut connection.stream)) .await @@ -479,6 +484,11 @@ async fn start_inner( ) .await?; } + Event::DaemonHeartbeat { machine_id } => { + if let Some(connection) = daemon_connections.get_mut(&machine_id) { + connection.last_heartbeat = Instant::now(); + } + } } } @@ -490,6 +500,7 @@ async fn start_inner( struct DaemonConnection { stream: TcpStream, listen_socket: SocketAddr, + last_heartbeat: Instant, } fn set_up_ctrlc_handler() -> Result, eyre::ErrReport> { @@ -530,21 +541,11 @@ async fn handle_destroy( } async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> { - let message = serde_json::to_vec(&DaemonCoordinatorEvent::Watchdog).unwrap(); + let message = serde_json::to_vec(&DaemonCoordinatorEvent::Heartbeat).unwrap(); tcp_send(connection, &message) .await - .wrap_err("failed to send watchdog message to daemon")?; - let reply_raw = tcp_receive(connection) - .await - .wrap_err("failed to receive stop reply from daemon")?; - - match serde_json::from_slice(&reply_raw) - .wrap_err("failed to deserialize stop reply from daemon")? - { - DaemonCoordinatorReply::WatchdogAck => Ok(()), - other => bail!("unexpected reply after sending `watchdog`: {other:?}"), - } + .wrap_err("failed to send watchdog message to daemon") } #[allow(dead_code)] // Keeping the communication layer for later use. @@ -777,10 +778,11 @@ async fn destroy_daemons( pub enum Event { NewDaemonConnection(TcpStream), DaemonConnectError(eyre::Report), + DaemonHeartbeat { machine_id: String }, Dataflow { uuid: Uuid, event: DataflowEvent }, Control(ControlEvent), Daemon(DaemonEvent), - DaemonWatchdogInterval, + DaemonHeartbeatInterval, CtrlC, } @@ -789,7 +791,7 @@ impl Event { #[allow(clippy::match_like_matches_macro)] pub fn log(&self) -> bool { match self { - Event::DaemonWatchdogInterval => false, + Event::DaemonHeartbeatInterval => false, _ => true, } } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 8ed3b57f..6ea02894 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,10 +1,7 @@ -use crate::{ - tcp_utils::{tcp_receive, tcp_send}, - DaemonEvent, DataflowEvent, Event, -}; +use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use dora_core::coordinator_messages; use eyre::{eyre, Context}; -use std::{io::ErrorKind, net::Ipv4Addr, time::Duration}; +use std::{io::ErrorKind, net::Ipv4Addr}; use tokio::{ net::{TcpListener, TcpStream}, sync::mpsc, @@ -90,13 +87,11 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende break; } } - coordinator_messages::DaemonEvent::Watchdog => { - let reply = serde_json::to_vec(&coordinator_messages::WatchdogAck).unwrap(); - _ = tokio::time::timeout( - Duration::from_millis(10), - tcp_send(&mut connection, &reply), - ) - .await; + coordinator_messages::DaemonEvent::Heartbeat => { + let event = Event::DaemonHeartbeat { machine_id }; + if events_tx.send(event).await.is_err() { + break; + } } }, }; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index d04799f4..1f84ba5e 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -20,6 +20,7 @@ use inter_daemon::InterDaemonConnection; use pending::PendingNodes; use shared_memory_server::ShmemConf; use std::env::temp_dir; +use std::time::Instant; use std::{ borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, @@ -28,7 +29,7 @@ use std::{ path::{Path, PathBuf}, time::Duration, }; -use tcp_utils::{tcp_receive, tcp_send}; +use tcp_utils::tcp_send; use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; @@ -60,6 +61,7 @@ pub struct Daemon { events_tx: mpsc::Sender, coordinator_connection: Option, + last_coordinator_heartbeat: Instant, inter_daemon_connections: BTreeMap, machine_id: String, @@ -186,6 +188,7 @@ impl Daemon { running: HashMap::new(), events_tx: dora_events_tx, coordinator_connection, + last_coordinator_heartbeat: Instant::now(), inter_daemon_connections: BTreeMap::new(), machine_id, exit_when_done, @@ -196,7 +199,7 @@ impl Daemon { let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( Duration::from_secs(5), )) - .map(|_| Event::WatchdogInterval); + .map(|_| Event::HeartbeatInterval); let events = (external_events, dora_events, watchdog_interval).merge(); daemon.run_inner(events).await } @@ -230,22 +233,19 @@ impl Daemon { RunStatus::Continue => {} RunStatus::Exit => break, }, - Event::WatchdogInterval => { + Event::HeartbeatInterval => { if let Some(connection) = &mut self.coordinator_connection { let msg = serde_json::to_vec(&CoordinatorRequest::Event { machine_id: self.machine_id.clone(), - event: DaemonEvent::Watchdog, + event: DaemonEvent::Heartbeat, })?; tcp_send(connection, &msg) .await .wrap_err("failed to send watchdog message to dora-coordinator")?; - let reply_raw = tcp_receive(connection) - .await - .wrap_err("lost connection to coordinator")?; - let _: dora_core::coordinator_messages::WatchdogAck = - serde_json::from_slice(&reply_raw) - .wrap_err("received unexpected watchdog reply from coordinator")?; + if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) { + bail!("lost connection to coordinator") + } } } Event::CtrlC => { @@ -394,12 +394,9 @@ impl Daemon { .map_err(|_| error!("could not send destroy reply from daemon to coordinator")); RunStatus::Exit } - DaemonCoordinatorEvent::Watchdog => { - let _ = reply_tx - .send(Some(DaemonCoordinatorReply::WatchdogAck)) - .map_err(|_| { - error!("could not send WatchdogAck reply from daemon to coordinator") - }); + DaemonCoordinatorEvent::Heartbeat => { + self.last_coordinator_heartbeat = Instant::now(); + let _ = reply_tx.send(None); RunStatus::Continue } }; @@ -1293,7 +1290,7 @@ pub enum Event { Coordinator(CoordinatorEvent), Daemon(InterDaemonEvent), Dora(DoraEvent), - WatchdogInterval, + HeartbeatInterval, CtrlC, } diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index b1a6e550..e83ee784 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -25,7 +25,7 @@ pub enum DaemonEvent { dataflow_id: DataflowId, result: Result<(), String>, }, - Watchdog, + Heartbeat, } #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -42,6 +42,3 @@ impl RegisterResult { } } } - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct WatchdogAck; diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index c376bf63..5929fb6a 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -216,7 +216,7 @@ pub enum DaemonCoordinatorEvent { node_id: NodeId, }, Destroy, - Watchdog, + Heartbeat, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -240,7 +240,6 @@ pub enum DaemonCoordinatorReply { ReloadResult(Result<(), String>), StopResult(Result<(), String>), DestroyResult(Result<(), String>), - WatchdogAck, Logs(Result, String>), }