From 2b7d2508f4d2a892adb47720b411e2f57bdbbbcd Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 29 Dec 2022 15:05:50 +0100 Subject: [PATCH] Implement watchdog messages for detecting sudden disconnects of daemon --- binaries/coordinator/src/lib.rs | 68 ++++++++++++++++++++++++++- binaries/daemon/src/lib.rs | 3 ++ libraries/core/src/daemon_messages.rs | 2 + 3 files changed, 71 insertions(+), 2 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index b54d71f5..5716931f 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -19,6 +19,7 @@ use run::SpawnedDataflow; use std::{ collections::{BTreeSet, HashMap}, path::{Path, PathBuf}, + time::Duration, }; use tokio::net::TcpStream; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; @@ -72,13 +73,25 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { .wrap_err("failed to create control events")?, ); - let mut events = (new_daemon_connections, daemon_events, control_events).merge(); + let daemon_watchdog_interval = + tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1))) + .map(|_| Event::DaemonWatchdogInterval); + + let mut events = ( + new_daemon_connections, + daemon_events, + control_events, + daemon_watchdog_interval, + ) + .merge(); let mut running_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, TcpStream> = HashMap::new(); while let Some(event) = events.next().await { - tracing::trace!("Handling event {event:?}"); + if event.log() { + tracing::trace!("Handling event {event:?}"); + } match event { Event::NewDaemonConnection(connection) => { let events_tx = daemon_events_tx.clone(); @@ -277,6 +290,28 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { } ControlEvent::Error(err) => tracing::error!("{err:?}"), }, + Event::DaemonWatchdogInterval => { + let mut disconnected = BTreeSet::new(); + for (machine_id, connection) in &mut daemon_connections { + let result: eyre::Result<()> = + tokio::time::timeout(Duration::from_millis(100), send_watchdog_message(connection)) + .await + .wrap_err("timeout") + .and_then(|r| r).wrap_err_with(|| + format!("daemon at `{machine_id}` did not react as expected to watchdog message"), + ); + if let Err(err) = result { + tracing::warn!("{err:?}"); + disconnected.insert(machine_id.clone()); + } + } + if !disconnected.is_empty() { + tracing::info!("Disconnecting daemons that failed watchdog: {disconnected:?}"); + for machine_id in disconnected { + daemon_connections.remove(&machine_id); + } + } + } } } @@ -285,6 +320,24 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { Ok(()) } +async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> { + let message = serde_json::to_vec(&DaemonCoordinatorEvent::Watchdog).unwrap(); + + tcp_send(connection, &message) + .await + .wrap_err("failed to send watchdog message to daemon")?; + let reply_raw = tcp_receive(connection) + .await + .wrap_err("failed to receive stop reply from daemon")?; + + match serde_json::from_slice(&reply_raw) + .wrap_err("failed to deserialize stop reply from daemon")? + { + DaemonCoordinatorReply::WatchdogAck => Ok(()), + _ => bail!("unexpected reply"), + } +} + struct RunningDataflow { name: Option, uuid: Uuid, @@ -392,6 +445,17 @@ pub enum Event { Dataflow { uuid: Uuid, event: DataflowEvent }, Control(ControlEvent), Daemon(DaemonEvent), + DaemonWatchdogInterval, +} +impl Event { + /// Whether this event should be logged. + #[allow(clippy::match_like_matches_macro)] + pub fn log(&self) -> bool { + match self { + Event::DaemonWatchdogInterval => false, + _ => true, + } + } } #[derive(Debug)] diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index ec89b7a3..f6554cb4 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -251,6 +251,9 @@ impl Daemon { let reply = DaemonCoordinatorReply::DestroyResult(Ok(())); (reply, RunStatus::Exit) } + DaemonCoordinatorEvent::Watchdog => { + (DaemonCoordinatorReply::WatchdogAck, RunStatus::Continue) + } } } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 74f471ef..6842d71a 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -85,6 +85,7 @@ pub enum DaemonCoordinatorEvent { Spawn(SpawnDataflowNodes), StopDataflow { dataflow_id: DataflowId }, Destroy, + Watchdog, } #[derive(Debug, serde::Deserialize, serde::Serialize)] @@ -92,6 +93,7 @@ pub enum DaemonCoordinatorReply { SpawnResult(Result<(), String>), StopResult(Result<(), String>), DestroyResult(Result<(), String>), + WatchdogAck, } pub type DataflowId = Uuid;