|
|
|
@@ -19,6 +19,7 @@ use run::SpawnedDataflow; |
|
|
|
use std::{ |
|
|
|
collections::{BTreeSet, HashMap}, |
|
|
|
path::{Path, PathBuf}, |
|
|
|
time::Duration, |
|
|
|
}; |
|
|
|
use tokio::net::TcpStream; |
|
|
|
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; |
|
|
|
@@ -72,13 +73,25 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { |
|
|
|
.wrap_err("failed to create control events")?, |
|
|
|
); |
|
|
|
|
|
|
|
let mut events = (new_daemon_connections, daemon_events, control_events).merge(); |
|
|
|
let daemon_watchdog_interval = |
|
|
|
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1))) |
|
|
|
.map(|_| Event::DaemonWatchdogInterval); |
|
|
|
|
|
|
|
let mut events = ( |
|
|
|
new_daemon_connections, |
|
|
|
daemon_events, |
|
|
|
control_events, |
|
|
|
daemon_watchdog_interval, |
|
|
|
) |
|
|
|
.merge(); |
|
|
|
|
|
|
|
let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new(); |
|
|
|
let mut daemon_connections: HashMap<_, TcpStream> = HashMap::new(); |
|
|
|
|
|
|
|
while let Some(event) = events.next().await { |
|
|
|
tracing::trace!("Handling event {event:?}"); |
|
|
|
if event.log() { |
|
|
|
tracing::trace!("Handling event {event:?}"); |
|
|
|
} |
|
|
|
match event { |
|
|
|
Event::NewDaemonConnection(connection) => { |
|
|
|
let events_tx = daemon_events_tx.clone(); |
|
|
|
@@ -277,6 +290,28 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { |
|
|
|
} |
|
|
|
ControlEvent::Error(err) => tracing::error!("{err:?}"), |
|
|
|
}, |
|
|
|
Event::DaemonWatchdogInterval => { |
|
|
|
let mut disconnected = BTreeSet::new(); |
|
|
|
for (machine_id, connection) in &mut daemon_connections { |
|
|
|
let result: eyre::Result<()> = |
|
|
|
tokio::time::timeout(Duration::from_millis(100), send_watchdog_message(connection)) |
|
|
|
.await |
|
|
|
.wrap_err("timeout") |
|
|
|
.and_then(|r| r).wrap_err_with(|| |
|
|
|
format!("daemon at `{machine_id}` did not react as expected to watchdog message"), |
|
|
|
); |
|
|
|
if let Err(err) = result { |
|
|
|
tracing::warn!("{err:?}"); |
|
|
|
disconnected.insert(machine_id.clone()); |
|
|
|
} |
|
|
|
} |
|
|
|
if !disconnected.is_empty() { |
|
|
|
tracing::info!("Disconnecting daemons that failed watchdog: {disconnected:?}"); |
|
|
|
for machine_id in disconnected { |
|
|
|
daemon_connections.remove(&machine_id); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@@ -285,6 +320,24 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> { |
|
|
|
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Watchdog).unwrap(); |
|
|
|
|
|
|
|
tcp_send(connection, &message) |
|
|
|
.await |
|
|
|
.wrap_err("failed to send watchdog message to daemon")?; |
|
|
|
let reply_raw = tcp_receive(connection) |
|
|
|
.await |
|
|
|
.wrap_err("failed to receive stop reply from daemon")?; |
|
|
|
|
|
|
|
match serde_json::from_slice(&reply_raw) |
|
|
|
.wrap_err("failed to deserialize stop reply from daemon")? |
|
|
|
{ |
|
|
|
DaemonCoordinatorReply::WatchdogAck => Ok(()), |
|
|
|
_ => bail!("unexpected reply"), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
struct RunningDataflow { |
|
|
|
name: Option<String>, |
|
|
|
uuid: Uuid, |
|
|
|
@@ -392,6 +445,17 @@ pub enum Event { |
|
|
|
Dataflow { uuid: Uuid, event: DataflowEvent }, |
|
|
|
Control(ControlEvent), |
|
|
|
Daemon(DaemonEvent), |
|
|
|
DaemonWatchdogInterval, |
|
|
|
} |
|
|
|
impl Event { |
|
|
|
/// Whether this event should be logged. |
|
|
|
#[allow(clippy::match_like_matches_macro)] |
|
|
|
pub fn log(&self) -> bool { |
|
|
|
match self { |
|
|
|
Event::DaemonWatchdogInterval => false, |
|
|
|
_ => true, |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
#[derive(Debug)] |
|
|
|
|