Browse Source

Add watchdog for checking that coordinator is still reachable

Throw an error if a daemon cannot reach the coordinator anymore.

This is only an interim solution. In the future, we want to make the daemon more robust and ideally even allow restarts of the coordinator.
tags/v0.2.0-candidate
Philipp Oppermann 3 years ago
parent
commit
218106721d
Failed to extract signature
4 changed files with 51 additions and 5 deletions
  1. +13
    -2
      binaries/coordinator/src/listener.rs
  2. +4
    -2
      binaries/daemon/src/coordinator.rs
  3. +30
    -1
      binaries/daemon/src/lib.rs
  4. +4
    -0
      libraries/core/src/coordinator_messages.rs

+ 13
- 2
binaries/coordinator/src/listener.rs View File

@@ -1,7 +1,10 @@
use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event};
use crate::{
tcp_utils::{tcp_receive, tcp_send},
DaemonEvent, DataflowEvent, Event,
};
use dora_core::coordinator_messages;
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::Ipv4Addr};
use std::{io::ErrorKind, net::Ipv4Addr, time::Duration};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
@@ -66,6 +69,14 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende
break;
}
}
coordinator_messages::DaemonEvent::Watchdog => {
let reply = serde_json::to_vec(&coordinator_messages::WatchdogAck).unwrap();
_ = tokio::time::timeout(
Duration::from_millis(10),
tcp_send(&mut connection, &reply),
)
.await;
}
},
};
}


+ 4
- 2
binaries/daemon/src/coordinator.rs View File

@@ -98,7 +98,7 @@ pub async fn send_event(
addr: SocketAddr,
machine_id: String,
event: DaemonEvent,
) -> eyre::Result<()> {
) -> eyre::Result<TcpStream> {
let mut stream = TcpStream::connect(addr)
.await
.wrap_err("failed to connect to dora-coordinator")?;
@@ -108,5 +108,7 @@ pub async fn send_event(
let msg = serde_json::to_vec(&CoordinatorRequest::Event { machine_id, event })?;
tcp_send(&mut stream, &msg)
.await
.wrap_err("failed to send event to dora-coordinator")
.wrap_err("failed to send event to dora-coordinator")?;

Ok(stream)
}

+ 30
- 1
binaries/daemon/src/lib.rs View File

@@ -20,6 +20,7 @@ use std::{
rc::Rc,
time::Duration,
};
use tcp_utils::tcp_receive;
use tokio::{
fs,
net::TcpStream,
@@ -157,7 +158,17 @@ impl Daemon {
exit_when_done,
};
let dora_events = ReceiverStream::new(dora_events_rx).map(Event::Dora);
let events = (external_events, new_connections, dora_events).merge();
let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
Duration::from_secs(5),
))
.map(|_| Event::WatchdogInterval);
let events = (
external_events,
new_connections,
dora_events,
watchdog_interval,
)
.merge();
daemon.run_inner(events).await
}

@@ -212,6 +223,23 @@ impl Daemon {
None => tracing::warn!("received unknown drop token {token:?}"),
}
}
Event::WatchdogInterval => {
if let Some(addr) = self.coordinator_addr {
let mut connection = coordinator::send_event(
addr,
self.machine_id.clone(),
DaemonEvent::Watchdog,
)
.await
.wrap_err("lost connection to coordinator")?;
let reply_raw = tcp_receive(&mut connection)
.await
.wrap_err("lost connection to coordinator")?;
let _: dora_core::coordinator_messages::WatchdogAck =
serde_json::from_slice(&reply_raw)
.wrap_err("received unexpected watchdog reply from coordinator")?;
}
}
}
}

@@ -620,6 +648,7 @@ pub enum Event {
Coordinator(CoordinatorEvent),
Dora(DoraEvent),
Drop(DropEvent),
WatchdogInterval,
}

#[derive(Debug)]


+ 4
- 0
libraries/core/src/coordinator_messages.rs View File

@@ -19,6 +19,7 @@ pub enum DaemonEvent {
dataflow_id: DataflowId,
result: Result<(), String>,
},
Watchdog,
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
@@ -35,3 +36,6 @@ impl RegisterResult {
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct WatchdogAck;

Loading…
Cancel
Save