From 7d646509c2360528460e9a81ad8cbd0babd6563b Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 3 May 2023 17:36:44 +0200 Subject: [PATCH 1/2] Replace watchdog by asynchronous heartbeat messages The previous watchdog requests required waiting for a reply, which could slow down the system under load and also lead to false errors on slower systems (e.g. the CI). Because of this, this commit replaces the watchdog requests with asynchronous heartbeat messages, which don't require replies. Instead, we record the last heartbeat timestamp whenever we receive a heartbeat message. We then periodically check the time since the last received heartbeat message and consider the connection closed if too much time has passed. --- binaries/coordinator/src/lib.rs | 44 ++++++++++++---------- binaries/coordinator/src/listener.rs | 19 ++++------ binaries/daemon/src/lib.rs | 36 +++++++++--------- libraries/core/src/coordinator_messages.rs | 5 +-- libraries/core/src/daemon_messages.rs | 3 +- 5 files changed, 52 insertions(+), 55 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index c9ad5ed5..622a0d63 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -21,7 +21,7 @@ use std::{ collections::{BTreeSet, HashMap}, net::SocketAddr, path::PathBuf, - time::Duration, + time::{Duration, SystemTime}, }; use tokio::{ net::{TcpListener, TcpStream}, @@ -138,9 +138,9 @@ async fn start_inner( .await .wrap_err("failed to create control events")?; - let daemon_watchdog_interval = - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1))) - .map(|_| Event::DaemonWatchdogInterval); + let daemon_heartbeat_interval = + tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3))) + .map(|_| Event::DaemonHeartbeatInterval); // events that should be aborted on `dora destroy` let (abortable_events, abort_handle) = futures::stream::abortable( @@ -148,7 +148,7 @@ async fn start_inner( control_events, new_daemon_connections, external_events, - daemon_watchdog_interval, + daemon_heartbeat_interval, ) .merge(), ); @@ -203,6 +203,7 @@ async fn start_inner( DaemonConnection { stream: connection, listen_socket, + last_heartbeat: SystemTime::now(), }, ); if let Some(_previous) = previous { @@ -442,9 +443,15 @@ async fn start_inner( } ControlEvent::Error(err) => tracing::error!("{err:?}"), }, - Event::DaemonWatchdogInterval => { + Event::DaemonHeartbeatInterval => { let mut disconnected = BTreeSet::new(); for (machine_id, connection) in &mut daemon_connections { + if connection.last_heartbeat.elapsed().unwrap_or_default() + > Duration::from_secs(15) + { + disconnected.insert(machine_id.clone()); + continue; + } let result: eyre::Result<()> = tokio::time::timeout(Duration::from_millis(100), send_watchdog_message(&mut connection.stream)) .await @@ -474,6 +481,11 @@ async fn start_inner( ) .await?; } + Event::DaemonHeartbeat { machine_id } => { + if let Some(connection) = daemon_connections.get_mut(&machine_id) { + connection.last_heartbeat = SystemTime::now(); + } + } } } @@ -485,6 +497,7 @@ async fn start_inner( struct DaemonConnection { stream: TcpStream, listen_socket: SocketAddr, + last_heartbeat: SystemTime, } fn set_up_ctrlc_handler() -> Result, eyre::ErrReport> { @@ -525,21 +538,11 @@ async fn handle_destroy( } async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> { - let message = serde_json::to_vec(&DaemonCoordinatorEvent::Watchdog).unwrap(); + let message = serde_json::to_vec(&DaemonCoordinatorEvent::Heartbeat).unwrap(); tcp_send(connection, &message) .await - .wrap_err("failed to send watchdog message to daemon")?; - let reply_raw = tcp_receive(connection) - .await - .wrap_err("failed to receive stop reply from daemon")?; - - match serde_json::from_slice(&reply_raw) - .wrap_err("failed to deserialize stop reply from daemon")? - { - DaemonCoordinatorReply::WatchdogAck => Ok(()), - other => bail!("unexpected reply after sending `watchdog`: {other:?}"), - } + .wrap_err("failed to send watchdog message to daemon") } #[allow(dead_code)] // Keeping the communication layer for later use. @@ -770,10 +773,11 @@ async fn destroy_daemons( pub enum Event { NewDaemonConnection(TcpStream), DaemonConnectError(eyre::Report), + DaemonHeartbeat { machine_id: String }, Dataflow { uuid: Uuid, event: DataflowEvent }, Control(ControlEvent), Daemon(DaemonEvent), - DaemonWatchdogInterval, + DaemonHeartbeatInterval, CtrlC, } @@ -782,7 +786,7 @@ impl Event { #[allow(clippy::match_like_matches_macro)] pub fn log(&self) -> bool { match self { - Event::DaemonWatchdogInterval => false, + Event::DaemonHeartbeatInterval => false, _ => true, } } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index da4ee624..f8561865 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,10 +1,7 @@ -use crate::{ - tcp_utils::{tcp_receive, tcp_send}, - DaemonEvent, DataflowEvent, Event, -}; +use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use dora_core::coordinator_messages; use eyre::{eyre, Context}; -use std::{io::ErrorKind, net::Ipv4Addr, time::Duration}; +use std::{io::ErrorKind, net::Ipv4Addr}; use tokio::{ net::{TcpListener, TcpStream}, sync::mpsc, @@ -84,13 +81,11 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende break; } } - coordinator_messages::DaemonEvent::Watchdog => { - let reply = serde_json::to_vec(&coordinator_messages::WatchdogAck).unwrap(); - _ = tokio::time::timeout( - Duration::from_millis(10), - tcp_send(&mut connection, &reply), - ) - .await; + coordinator_messages::DaemonEvent::Heartbeat => { + let event = Event::DaemonHeartbeat { machine_id }; + if events_tx.send(event).await.is_err() { + break; + } } }, }; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 1192086d..d71c6dc4 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -20,6 +20,7 @@ use inter_daemon::InterDaemonConnection; use shared_memory_server::ShmemConf; use std::collections::HashSet; use std::env::temp_dir; +use std::time::SystemTime; use std::{ borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, @@ -28,7 +29,7 @@ use std::{ path::{Path, PathBuf}, time::Duration, }; -use tcp_utils::{tcp_receive, tcp_send}; +use tcp_utils::tcp_send; use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; @@ -57,6 +58,7 @@ pub struct Daemon { events_tx: mpsc::Sender, coordinator_connection: Option, + last_coordinator_heartbeat: SystemTime, inter_daemon_connections: BTreeMap, machine_id: String, @@ -183,6 +185,7 @@ impl Daemon { running: HashMap::new(), events_tx: dora_events_tx, coordinator_connection, + last_coordinator_heartbeat: SystemTime::now(), inter_daemon_connections: BTreeMap::new(), machine_id, exit_when_done, @@ -193,7 +196,7 @@ impl Daemon { let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( Duration::from_secs(5), )) - .map(|_| Event::WatchdogInterval); + .map(|_| Event::HeartbeatInterval); let events = (external_events, dora_events, watchdog_interval).merge(); daemon.run_inner(events).await } @@ -227,22 +230,24 @@ impl Daemon { RunStatus::Continue => {} RunStatus::Exit => break, }, - Event::WatchdogInterval => { + Event::HeartbeatInterval => { if let Some(connection) = &mut self.coordinator_connection { let msg = serde_json::to_vec(&CoordinatorRequest::Event { machine_id: self.machine_id.clone(), - event: DaemonEvent::Watchdog, + event: DaemonEvent::Heartbeat, })?; tcp_send(connection, &msg) .await .wrap_err("failed to send watchdog message to dora-coordinator")?; - let reply_raw = tcp_receive(connection) - .await - .wrap_err("lost connection to coordinator")?; - let _: dora_core::coordinator_messages::WatchdogAck = - serde_json::from_slice(&reply_raw) - .wrap_err("received unexpected watchdog reply from coordinator")?; + if self + .last_coordinator_heartbeat + .elapsed() + .unwrap_or_default() + > Duration::from_secs(20) + { + bail!("lost connection to coordinator") + } } } Event::CtrlC => { @@ -382,12 +387,9 @@ impl Daemon { .map_err(|_| error!("could not send destroy reply from daemon to coordinator")); RunStatus::Exit } - DaemonCoordinatorEvent::Watchdog => { - let _ = reply_tx - .send(Some(DaemonCoordinatorReply::WatchdogAck)) - .map_err(|_| { - error!("could not send WatchdogAck reply from daemon to coordinator") - }); + DaemonCoordinatorEvent::Heartbeat => { + self.last_coordinator_heartbeat = SystemTime::now(); + let _ = reply_tx.send(None); RunStatus::Continue } }; @@ -1283,7 +1285,7 @@ pub enum Event { Coordinator(CoordinatorEvent), Daemon(InterDaemonEvent), Dora(DoraEvent), - WatchdogInterval, + HeartbeatInterval, CtrlC, } diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 21ae38a9..7505b098 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -24,7 +24,7 @@ pub enum DaemonEvent { dataflow_id: DataflowId, result: Result<(), String>, }, - Watchdog, + Heartbeat, } #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -41,6 +41,3 @@ impl RegisterResult { } } } - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct WatchdogAck; diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index e51c1586..66704e7a 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -214,7 +214,7 @@ pub enum DaemonCoordinatorEvent { node_id: NodeId, }, Destroy, - Watchdog, + Heartbeat, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -238,7 +238,6 @@ pub enum DaemonCoordinatorReply { ReloadResult(Result<(), String>), StopResult(Result<(), String>), DestroyResult(Result<(), String>), - WatchdogAck, Logs(Result, String>), } From 5edfc394a640d1be730bce030c4b85e7fa11f3f9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 4 May 2023 09:17:05 +0200 Subject: [PATCH 2/2] Use monotonic `Instant` instead of `SystemTime` --- binaries/coordinator/src/lib.rs | 12 +++++------- binaries/daemon/src/lib.rs | 15 +++++---------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 622a0d63..881eed14 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -21,7 +21,7 @@ use std::{ collections::{BTreeSet, HashMap}, net::SocketAddr, path::PathBuf, - time::{Duration, SystemTime}, + time::{Duration, Instant}, }; use tokio::{ net::{TcpListener, TcpStream}, @@ -203,7 +203,7 @@ async fn start_inner( DaemonConnection { stream: connection, listen_socket, - last_heartbeat: SystemTime::now(), + last_heartbeat: Instant::now(), }, ); if let Some(_previous) = previous { @@ -446,9 +446,7 @@ async fn start_inner( Event::DaemonHeartbeatInterval => { let mut disconnected = BTreeSet::new(); for (machine_id, connection) in &mut daemon_connections { - if connection.last_heartbeat.elapsed().unwrap_or_default() - > Duration::from_secs(15) - { + if connection.last_heartbeat.elapsed() > Duration::from_secs(15) { disconnected.insert(machine_id.clone()); continue; } @@ -483,7 +481,7 @@ async fn start_inner( } Event::DaemonHeartbeat { machine_id } => { if let Some(connection) = daemon_connections.get_mut(&machine_id) { - connection.last_heartbeat = SystemTime::now(); + connection.last_heartbeat = Instant::now(); } } } @@ -497,7 +495,7 @@ async fn start_inner( struct DaemonConnection { stream: TcpStream, listen_socket: SocketAddr, - last_heartbeat: SystemTime, + last_heartbeat: Instant, } fn set_up_ctrlc_handler() -> Result, eyre::ErrReport> { diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index d71c6dc4..bd4b0786 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -20,7 +20,7 @@ use inter_daemon::InterDaemonConnection; use shared_memory_server::ShmemConf; use std::collections::HashSet; use std::env::temp_dir; -use std::time::SystemTime; +use std::time::Instant; use std::{ borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, @@ -58,7 +58,7 @@ pub struct Daemon { events_tx: mpsc::Sender, coordinator_connection: Option, - last_coordinator_heartbeat: SystemTime, + last_coordinator_heartbeat: Instant, inter_daemon_connections: BTreeMap, machine_id: String, @@ -185,7 +185,7 @@ impl Daemon { running: HashMap::new(), events_tx: dora_events_tx, coordinator_connection, - last_coordinator_heartbeat: SystemTime::now(), + last_coordinator_heartbeat: Instant::now(), inter_daemon_connections: BTreeMap::new(), machine_id, exit_when_done, @@ -240,12 +240,7 @@ impl Daemon { .await .wrap_err("failed to send watchdog message to dora-coordinator")?; - if self - .last_coordinator_heartbeat - .elapsed() - .unwrap_or_default() - > Duration::from_secs(20) - { + if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) { bail!("lost connection to coordinator") } } @@ -388,7 +383,7 @@ impl Daemon { RunStatus::Exit } DaemonCoordinatorEvent::Heartbeat => { - self.last_coordinator_heartbeat = SystemTime::now(); + self.last_coordinator_heartbeat = Instant::now(); let _ = reply_tx.send(None); RunStatus::Continue }