Browse Source

Merge pull request #278 from dora-rs/heartbeat

Replace watchdog by asynchronous heartbeat messages
tags/v0.2.3-rc6
Philipp Oppermann GitHub 2 years ago
parent
commit
ab8e40aeea
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 54 deletions
  1. +21
    -19
      binaries/coordinator/src/lib.rs
  2. +7
    -12
      binaries/coordinator/src/listener.rs
  3. +14
    -17
      binaries/daemon/src/lib.rs
  4. +1
    -4
      libraries/core/src/coordinator_messages.rs
  5. +1
    -2
      libraries/core/src/daemon_messages.rs

+ 21
- 19
binaries/coordinator/src/lib.rs View File

@@ -21,7 +21,7 @@ use std::{
collections::{BTreeSet, HashMap},
net::SocketAddr,
path::PathBuf,
time::Duration,
time::{Duration, Instant},
};
use tokio::{
net::{TcpListener, TcpStream},
@@ -138,9 +138,9 @@ async fn start_inner(
.await
.wrap_err("failed to create control events")?;

let daemon_watchdog_interval =
let daemon_heartbeat_interval =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
.map(|_| Event::DaemonWatchdogInterval);
.map(|_| Event::DaemonHeartbeatInterval);

// events that should be aborted on `dora destroy`
let (abortable_events, abort_handle) = futures::stream::abortable(
@@ -148,7 +148,7 @@ async fn start_inner(
control_events,
new_daemon_connections,
external_events,
daemon_watchdog_interval,
daemon_heartbeat_interval,
)
.merge(),
);
@@ -203,6 +203,7 @@ async fn start_inner(
DaemonConnection {
stream: connection,
listen_socket,
last_heartbeat: Instant::now(),
},
);
if let Some(_previous) = previous {
@@ -447,9 +448,13 @@ async fn start_inner(
}
ControlEvent::Error(err) => tracing::error!("{err:?}"),
},
Event::DaemonWatchdogInterval => {
Event::DaemonHeartbeatInterval => {
let mut disconnected = BTreeSet::new();
for (machine_id, connection) in &mut daemon_connections {
if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
disconnected.insert(machine_id.clone());
continue;
}
let result: eyre::Result<()> =
tokio::time::timeout(Duration::from_millis(500), send_watchdog_message(&mut connection.stream))
.await
@@ -479,6 +484,11 @@ async fn start_inner(
)
.await?;
}
Event::DaemonHeartbeat { machine_id } => {
if let Some(connection) = daemon_connections.get_mut(&machine_id) {
connection.last_heartbeat = Instant::now();
}
}
}
}

@@ -490,6 +500,7 @@ async fn start_inner(
struct DaemonConnection {
stream: TcpStream,
listen_socket: SocketAddr,
last_heartbeat: Instant,
}

fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
@@ -530,21 +541,11 @@ async fn handle_destroy(
}

async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> {
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Watchdog).unwrap();
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Heartbeat).unwrap();

tcp_send(connection, &message)
.await
.wrap_err("failed to send watchdog message to daemon")?;
let reply_raw = tcp_receive(connection)
.await
.wrap_err("failed to receive stop reply from daemon")?;

match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize stop reply from daemon")?
{
DaemonCoordinatorReply::WatchdogAck => Ok(()),
other => bail!("unexpected reply after sending `watchdog`: {other:?}"),
}
.wrap_err("failed to send watchdog message to daemon")
}

#[allow(dead_code)] // Keeping the communication layer for later use.
@@ -777,10 +778,11 @@ async fn destroy_daemons(
pub enum Event {
NewDaemonConnection(TcpStream),
DaemonConnectError(eyre::Report),
DaemonHeartbeat { machine_id: String },
Dataflow { uuid: Uuid, event: DataflowEvent },
Control(ControlEvent),
Daemon(DaemonEvent),
DaemonWatchdogInterval,
DaemonHeartbeatInterval,
CtrlC,
}

@@ -789,7 +791,7 @@ impl Event {
#[allow(clippy::match_like_matches_macro)]
pub fn log(&self) -> bool {
match self {
Event::DaemonWatchdogInterval => false,
Event::DaemonHeartbeatInterval => false,
_ => true,
}
}


+ 7
- 12
binaries/coordinator/src/listener.rs View File

@@ -1,10 +1,7 @@
use crate::{
tcp_utils::{tcp_receive, tcp_send},
DaemonEvent, DataflowEvent, Event,
};
use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event};
use dora_core::coordinator_messages;
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::Ipv4Addr, time::Duration};
use std::{io::ErrorKind, net::Ipv4Addr};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
@@ -90,13 +87,11 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende
break;
}
}
coordinator_messages::DaemonEvent::Watchdog => {
let reply = serde_json::to_vec(&coordinator_messages::WatchdogAck).unwrap();
_ = tokio::time::timeout(
Duration::from_millis(10),
tcp_send(&mut connection, &reply),
)
.await;
coordinator_messages::DaemonEvent::Heartbeat => {
let event = Event::DaemonHeartbeat { machine_id };
if events_tx.send(event).await.is_err() {
break;
}
}
},
};


+ 14
- 17
binaries/daemon/src/lib.rs View File

@@ -20,6 +20,7 @@ use inter_daemon::InterDaemonConnection;
use pending::PendingNodes;
use shared_memory_server::ShmemConf;
use std::env::temp_dir;
use std::time::Instant;
use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet, HashMap},
@@ -28,7 +29,7 @@ use std::{
path::{Path, PathBuf},
time::Duration,
};
use tcp_utils::{tcp_receive, tcp_send};
use tcp_utils::tcp_send;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
@@ -60,6 +61,7 @@ pub struct Daemon {
events_tx: mpsc::Sender<Event>,

coordinator_connection: Option<TcpStream>,
last_coordinator_heartbeat: Instant,
inter_daemon_connections: BTreeMap<String, InterDaemonConnection>,
machine_id: String,

@@ -186,6 +188,7 @@ impl Daemon {
running: HashMap::new(),
events_tx: dora_events_tx,
coordinator_connection,
last_coordinator_heartbeat: Instant::now(),
inter_daemon_connections: BTreeMap::new(),
machine_id,
exit_when_done,
@@ -196,7 +199,7 @@ impl Daemon {
let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
Duration::from_secs(5),
))
.map(|_| Event::WatchdogInterval);
.map(|_| Event::HeartbeatInterval);
let events = (external_events, dora_events, watchdog_interval).merge();
daemon.run_inner(events).await
}
@@ -230,22 +233,19 @@ impl Daemon {
RunStatus::Continue => {}
RunStatus::Exit => break,
},
Event::WatchdogInterval => {
Event::HeartbeatInterval => {
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::Watchdog,
event: DaemonEvent::Heartbeat,
})?;
tcp_send(connection, &msg)
.await
.wrap_err("failed to send watchdog message to dora-coordinator")?;

let reply_raw = tcp_receive(connection)
.await
.wrap_err("lost connection to coordinator")?;
let _: dora_core::coordinator_messages::WatchdogAck =
serde_json::from_slice(&reply_raw)
.wrap_err("received unexpected watchdog reply from coordinator")?;
if self.last_coordinator_heartbeat.elapsed() > Duration::from_secs(20) {
bail!("lost connection to coordinator")
}
}
}
Event::CtrlC => {
@@ -394,12 +394,9 @@ impl Daemon {
.map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
RunStatus::Exit
}
DaemonCoordinatorEvent::Watchdog => {
let _ = reply_tx
.send(Some(DaemonCoordinatorReply::WatchdogAck))
.map_err(|_| {
error!("could not send WatchdogAck reply from daemon to coordinator")
});
DaemonCoordinatorEvent::Heartbeat => {
self.last_coordinator_heartbeat = Instant::now();
let _ = reply_tx.send(None);
RunStatus::Continue
}
};
@@ -1293,7 +1290,7 @@ pub enum Event {
Coordinator(CoordinatorEvent),
Daemon(InterDaemonEvent),
Dora(DoraEvent),
WatchdogInterval,
HeartbeatInterval,
CtrlC,
}



+ 1
- 4
libraries/core/src/coordinator_messages.rs View File

@@ -25,7 +25,7 @@ pub enum DaemonEvent {
dataflow_id: DataflowId,
result: Result<(), String>,
},
Watchdog,
Heartbeat,
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
@@ -42,6 +42,3 @@ impl RegisterResult {
}
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct WatchdogAck;

+ 1
- 2
libraries/core/src/daemon_messages.rs View File

@@ -216,7 +216,7 @@ pub enum DaemonCoordinatorEvent {
node_id: NodeId,
},
Destroy,
Watchdog,
Heartbeat,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
@@ -240,7 +240,6 @@ pub enum DaemonCoordinatorReply {
ReloadResult(Result<(), String>),
StopResult(Result<(), String>),
DestroyResult(Result<(), String>),
WatchdogAck,
Logs(Result<Vec<u8>, String>),
}



Loading…
Cancel
Save