Browse Source

Use timestamps for coordinator messages and inter daemon events

tags/v0.2.4-rc
Philipp Oppermann 2 years ago
parent
commit
5139c85a87
Failed to extract signature
7 changed files with 184 additions and 75 deletions
  1. +85
    -30
      binaries/coordinator/src/lib.rs
  2. +13
    -5
      binaries/coordinator/src/listener.rs
  3. +10
    -3
      binaries/coordinator/src/run/mod.rs
  4. +15
    -6
      binaries/daemon/src/coordinator.rs
  5. +1
    -1
      binaries/daemon/src/inter_daemon.rs
  6. +40
    -21
      binaries/daemon/src/lib.rs
  7. +20
    -9
      binaries/daemon/src/pending.rs

+ 85
- 30
binaries/coordinator/src/lib.rs View File

@@ -6,8 +6,9 @@ pub use control::ControlEvent;
use dora_core::{
config::{NodeId, OperatorId},
coordinator_messages::RegisterResult,
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply},
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC},
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_DEFAULT,
@@ -21,6 +22,7 @@ use std::{
collections::{BTreeSet, HashMap},
net::SocketAddr,
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
@@ -124,6 +126,8 @@ async fn start_inner(
tasks: &FuturesUnordered<JoinHandle<()>>,
external_events: impl Stream<Item = Event> + Unpin,
) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
c.map(Event::NewDaemonConnection)
.wrap_err("failed to open connection")
@@ -168,7 +172,11 @@ async fn start_inner(
connection.set_nodelay(true)?;
let events_tx = daemon_events_tx.clone();
if let Some(events_tx) = events_tx {
let task = tokio::spawn(listener::handle_connection(connection, events_tx));
let task = tokio::spawn(listener::handle_connection(
connection,
events_tx,
clock.clone(),
));
tasks.push(task);
} else {
tracing::warn!(
@@ -195,8 +203,12 @@ async fn start_inner(
not compatible with coordinator v{coordinator_version}"
))
};
let reply = Timestamped {
inner: reply,
timestamp: clock.new_timestamp(),
};
let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await;
match (reply, send_result) {
match (reply.inner, send_result) {
(RegisterResult::Ok, Ok(())) => {
let previous = daemon_connections.insert(
machine_id.clone(),
@@ -232,12 +244,14 @@ async fn start_inner(
dataflow.pending_machines.remove(&machine_id);
dataflow.init_success &= success;
if dataflow.pending_machines.is_empty() {
let message =
serde_json::to_vec(&DaemonCoordinatorEvent::AllNodesReady {
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::AllNodesReady {
dataflow_id: uuid,
success: dataflow.init_success,
})
.wrap_err("failed to serialize AllNodesReady message")?;
},
timestamp: clock.new_timestamp(),
})
.wrap_err("failed to serialize AllNodesReady message")?;

// notify all machines that run parts of the dataflow
for machine_id in &dataflow.machines {
@@ -320,6 +334,7 @@ async fn start_inner(
local_working_dir,
name,
&mut daemon_connections,
&clock,
)
.await?;
Ok(dataflow)
@@ -353,6 +368,7 @@ async fn start_inner(
node_id,
operator_id,
&mut daemon_connections,
clock.new_timestamp(),
)
.await?;
Result::<_, eyre::Report>::Ok(())
@@ -369,6 +385,7 @@ async fn start_inner(
&running_dataflows,
dataflow_uuid,
&mut daemon_connections,
clock.new_timestamp(),
)
.await?;
Result::<_, eyre::Report>::Ok(())
@@ -384,6 +401,7 @@ async fn start_inner(
&running_dataflows,
dataflow_uuid,
&mut daemon_connections,
clock.new_timestamp(),
)
.await?;
Result::<_, eyre::Report>::Ok(dataflow_uuid)
@@ -406,9 +424,10 @@ async fn start_inner(
dataflow_uuid,
node.into(),
&mut daemon_connections,
clock.new_timestamp(),
)
.await
.map(|logs| ControlRequestReply::Logs(logs))
.map(ControlRequestReply::Logs)
}
ControlRequest::Destroy => {
tracing::info!("Received destroy command");
@@ -418,6 +437,7 @@ async fn start_inner(
&mut daemon_connections,
&abort_handle,
&mut daemon_events_tx,
&clock,
)
.await
.map(|()| ControlRequestReply::DestroyOk)
@@ -457,13 +477,16 @@ async fn start_inner(
disconnected.insert(machine_id.clone());
continue;
}
let result: eyre::Result<()> =
tokio::time::timeout(Duration::from_millis(500), send_watchdog_message(&mut connection.stream))
.await
.wrap_err("timeout")
.and_then(|r| r).wrap_err_with(||
format!("daemon at `{machine_id}` did not react as expected to watchdog message"),
);
let result: eyre::Result<()> = tokio::time::timeout(
Duration::from_millis(500),
send_heartbeat_message(&mut connection.stream, clock.new_timestamp()),
)
.await
.wrap_err("timeout")
.and_then(|r| r)
.wrap_err_with(|| {
format!("failed to send heartbeat message to daemon at `{machine_id}`")
});
if let Err(err) = result {
tracing::warn!("{err:?}");
disconnected.insert(machine_id.clone());
@@ -483,6 +506,7 @@ async fn start_inner(
&mut daemon_connections,
&abort_handle,
&mut daemon_events_tx,
&clock,
)
.await?;
}
@@ -532,22 +556,36 @@ async fn handle_destroy(
daemon_connections: &mut HashMap<String, DaemonConnection>,
abortable_events: &futures::stream::AbortHandle,
daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
clock: &HLC,
) -> Result<(), eyre::ErrReport> {
abortable_events.abort();
for &uuid in running_dataflows.keys() {
stop_dataflow(running_dataflows, uuid, daemon_connections).await?;
stop_dataflow(
running_dataflows,
uuid,
daemon_connections,
clock.new_timestamp(),
)
.await?;
}
destroy_daemons(daemon_connections).await?;
destroy_daemons(daemon_connections, clock.new_timestamp()).await?;
*daemon_events_tx = None;
Ok(())
}

async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> {
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Heartbeat).unwrap();
async fn send_heartbeat_message(
connection: &mut TcpStream,
timestamp: uhlc::Timestamp,
) -> eyre::Result<()> {
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Heartbeat,
timestamp,
})
.unwrap();

tcp_send(connection, &message)
.await
.wrap_err("failed to send watchdog message to daemon")
.wrap_err("failed to send heartbeat message to daemon")
}

#[allow(dead_code)] // Keeping the communication layer for later use.
@@ -588,11 +626,15 @@ async fn stop_dataflow(
running_dataflows: &HashMap<Uuid, RunningDataflow>,
uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
) -> eyre::Result<()> {
let Some(dataflow) = running_dataflows.get(&uuid) else {
bail!("No running dataflow found with UUID `{uuid}`")
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::StopDataflow { dataflow_id: uuid })?;
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::StopDataflow { dataflow_id: uuid },
timestamp,
})?;

for machine_id in &dataflow.machines {
let daemon_connection = daemon_connections
@@ -626,14 +668,18 @@ async fn reload_dataflow(
node_id: NodeId,
operator_id: Option<OperatorId>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
) -> eyre::Result<()> {
let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
bail!("No running dataflow found with UUID `{dataflow_id}`")
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::ReloadDataflow {
dataflow_id,
node_id,
operator_id,
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::ReloadDataflow {
dataflow_id,
node_id,
operator_id,
},
timestamp,
})?;

for machine_id in &dataflow.machines {
@@ -668,6 +714,7 @@ async fn retrieve_logs(
dataflow_id: Uuid,
node_id: NodeId,
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
) -> eyre::Result<Vec<u8>> {
let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
dataflow.nodes.clone()
@@ -677,9 +724,12 @@ async fn retrieve_logs(
bail!("No dataflow found with UUID `{dataflow_id}`")
};

let message = serde_json::to_vec(&DaemonCoordinatorEvent::Logs {
dataflow_id,
node_id: node_id.clone(),
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Logs {
dataflow_id,
node_id: node_id.clone(),
},
timestamp,
})?;

let machine_ids: Vec<String> = nodes
@@ -727,12 +777,13 @@ async fn start_dataflow(
working_dir: PathBuf,
name: Option<String>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
) -> eyre::Result<RunningDataflow> {
let SpawnedDataflow {
uuid,
machines,
nodes,
} = spawn_dataflow(dataflow, working_dir, daemon_connections).await?;
} = spawn_dataflow(dataflow, working_dir, daemon_connections, clock).await?;
Ok(RunningDataflow {
uuid,
name,
@@ -749,8 +800,12 @@ async fn start_dataflow(

async fn destroy_daemons(
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
) -> eyre::Result<()> {
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Destroy)?;
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Destroy,
timestamp,
})?;

for (machine_id, mut daemon_connection) in daemon_connections.drain() {
tcp_send(&mut daemon_connection.stream, &message)


+ 13
- 5
binaries/coordinator/src/listener.rs View File

@@ -1,7 +1,7 @@
use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event};
use dora_core::coordinator_messages;
use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC};
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::Ipv4Addr};
use std::{io::ErrorKind, net::Ipv4Addr, sync::Arc};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
@@ -18,7 +18,11 @@ pub async fn create_listener(port: u16) -> eyre::Result<TcpListener> {
Ok(socket)
}

pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sender<Event>) {
pub async fn handle_connection(
mut connection: TcpStream,
events_tx: mpsc::Sender<Event>,
clock: Arc<HLC>,
) {
loop {
// receive the next message and parse it
let raw = match tcp_receive(&mut connection).await {
@@ -31,7 +35,7 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende
continue;
}
};
let message: coordinator_messages::CoordinatorRequest =
let message: Timestamped<coordinator_messages::CoordinatorRequest> =
match serde_json::from_slice(&raw).wrap_err("failed to deserialize node message") {
Ok(e) => e,
Err(err) => {
@@ -40,8 +44,12 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende
}
};

if let Err(err) = clock.update_with_timestamp(&message.timestamp) {
tracing::warn!("failed to update coordinator clock: {err}");
}

// handle the message and translate it to a DaemonEvent
match message {
match message.inner {
coordinator_messages::CoordinatorRequest::Register {
machine_id,
dora_version,


+ 10
- 3
binaries/coordinator/src/run/mod.rs View File

@@ -4,8 +4,11 @@ use crate::{
};

use dora_core::{
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, SpawnDataflowNodes},
daemon_messages::{
DaemonCoordinatorEvent, DaemonCoordinatorReply, SpawnDataflowNodes, Timestamped,
},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::HLC,
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use std::{
@@ -14,11 +17,12 @@ use std::{
};
use uuid::Uuid;

#[tracing::instrument(skip(daemon_connections))]
#[tracing::instrument(skip(daemon_connections, clock))]
pub(super) async fn spawn_dataflow(
dataflow: Descriptor,
working_dir: PathBuf,
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
) -> eyre::Result<SpawnedDataflow> {
dataflow.check(&working_dir)?;

@@ -43,7 +47,10 @@ pub(super) async fn spawn_dataflow(
machine_listen_ports,
dataflow_descriptor: dataflow,
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Spawn(spawn_command))?;
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),
timestamp: clock.new_timestamp(),
})?;

for machine in &machines {
tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`");


+ 15
- 6
binaries/daemon/src/coordinator.rs View File

@@ -5,6 +5,7 @@ use crate::{
use dora_core::{
coordinator_messages::{CoordinatorRequest, RegisterResult},
daemon_messages::{DaemonCoordinatorReply, Timestamped},
message::uhlc::HLC,
};
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::SocketAddr};
@@ -24,6 +25,7 @@ pub async fn register(
addr: SocketAddr,
machine_id: String,
listen_socket: SocketAddr,
clock: &HLC,
) -> eyre::Result<impl Stream<Item = Timestamped<CoordinatorEvent>>> {
let mut stream = TcpStream::connect(addr)
.await
@@ -31,10 +33,13 @@ pub async fn register(
stream
.set_nodelay(true)
.wrap_err("failed to set TCP_NODELAY")?;
let register = serde_json::to_vec(&CoordinatorRequest::Register {
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
machine_id,
listen_socket,
let register = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Register {
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
machine_id,
listen_socket,
},
timestamp: clock.new_timestamp(),
})?;
tcp_send(&mut stream, &register)
.await
@@ -42,9 +47,13 @@ pub async fn register(
let reply_raw = tcp_receive(&mut stream)
.await
.wrap_err("failed to register reply from dora-coordinator")?;
let result: RegisterResult = serde_json::from_slice(&reply_raw)
let result: Timestamped<RegisterResult> = serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize dora-coordinator reply")?;
result.to_result()?;
result.inner.to_result()?;
if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
tracing::warn!("failed to update timestamp after register: {err}");
}

tracing::info!("Connected to dora-coordinator at {:?}", addr);

let (tx, rx) = mpsc::channel(1);


+ 1
- 1
binaries/daemon/src/inter_daemon.rs View File

@@ -46,7 +46,7 @@ impl InterDaemonConnection {
pub async fn send_inter_daemon_event(
target_machines: &[String],
inter_daemon_connections: &mut BTreeMap<String, InterDaemonConnection>,
event: &InterDaemonEvent,
event: &Timestamped<InterDaemonEvent>,
) -> eyre::Result<()> {
let message = bincode::serialize(event).wrap_err("failed to serialize InterDaemonEvent")?;
for target_machine in target_machines {


+ 40
- 21
binaries/daemon/src/lib.rs View File

@@ -80,6 +80,8 @@ impl Daemon {
machine_id: String,
external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

// spawn listen loop
let (events_tx, events_rx) = flume::bounded(10);
let listen_socket =
@@ -91,7 +93,7 @@ impl Daemon {

// connect to the coordinator
let coordinator_events =
coordinator::register(coordinator_addr, machine_id.clone(), listen_socket)
coordinator::register(coordinator_addr, machine_id.clone(), listen_socket, &clock)
.await
.wrap_err("failed to connect to dora-coordinator")?
.map(
@@ -109,7 +111,7 @@ impl Daemon {
Some(coordinator_addr),
machine_id,
None,
Default::default(),
clock,
)
.await
.map(|_| ())
@@ -268,9 +270,12 @@ impl Daemon {
},
Event::HeartbeatInterval => {
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::Heartbeat,
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::Heartbeat,
},
timestamp: self.clock.new_timestamp(),
})?;
tcp_send(connection, &msg)
.await
@@ -567,7 +572,11 @@ impl Daemon {
tracing::error!("{err:?}");
dataflow
.pending_nodes
.handle_node_stop(&node_id, &mut self.coordinator_connection)
.handle_node_stop(
&node_id,
&mut self.coordinator_connection,
&self.clock,
)
.await?;
}
}
@@ -608,6 +617,7 @@ impl Daemon {
node_id.clone(),
reply_sender,
&mut self.coordinator_connection,
&self.clock,
)
.await?;
match status {
@@ -769,12 +779,15 @@ impl Daemon {
.map(|m| m.keys().cloned().collect())
.unwrap_or_default();
if !remote_receivers.is_empty() {
let event = InterDaemonEvent::Output {
dataflow_id,
node_id: output_id.0,
output_id: output_id.1,
metadata,
data: data_bytes,
let event = Timestamped {
inner: InterDaemonEvent::Output {
dataflow_id,
node_id: output_id.0,
output_id: output_id.1,
metadata,
data: data_bytes,
},
timestamp: self.clock.new_timestamp(),
};
inter_daemon::send_inter_daemon_event(
&remote_receivers,
@@ -859,7 +872,7 @@ impl Daemon {

dataflow
.pending_nodes
.handle_node_stop(node_id, &mut self.coordinator_connection)
.handle_node_stop(node_id, &mut self.coordinator_connection, &self.clock)
.await?;

Self::handle_outputs_done(
@@ -877,12 +890,15 @@ impl Daemon {
self.machine_id
);
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesFinished {
dataflow_id,
result: Ok(()),
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesFinished {
dataflow_id,
result: Ok(()),
},
},
timestamp: self.clock.new_timestamp(),
})?;
tcp_send(connection, &msg)
.await
@@ -1173,9 +1189,12 @@ where
}
if !external_node_inputs.is_empty() {
for (target_machine, inputs) in external_node_inputs {
let event = InterDaemonEvent::InputsClosed {
dataflow_id: dataflow.id,
inputs,
let event = Timestamped {
inner: InterDaemonEvent::InputsClosed {
dataflow_id: dataflow.id,
inputs,
},
timestamp: clock.new_timestamp(),
};
inter_daemon::send_inter_daemon_event(
&[target_machine],


+ 20
- 9
binaries/daemon/src/pending.rs View File

@@ -3,7 +3,8 @@ use std::collections::{HashMap, HashSet};
use dora_core::{
config::NodeId,
coordinator_messages::{CoordinatorRequest, DaemonEvent},
daemon_messages::{DaemonReply, DataflowId},
daemon_messages::{DaemonReply, DataflowId, Timestamped},
message::uhlc::{Timestamp, HLC},
};
use eyre::{bail, Context};
use tokio::{net::TcpStream, sync::oneshot};
@@ -59,23 +60,27 @@ impl PendingNodes {
node_id: NodeId,
reply_sender: oneshot::Sender<DaemonReply>,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
) -> eyre::Result<DataflowStatus> {
self.waiting_subscribers
.insert(node_id.clone(), reply_sender);
self.local_nodes.remove(&node_id);

self.update_dataflow_status(coordinator_connection).await
self.update_dataflow_status(coordinator_connection, clock)
.await
}

pub async fn handle_node_stop(
&mut self,
node_id: &NodeId,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
) -> eyre::Result<()> {
if self.local_nodes.remove(node_id) {
tracing::warn!("node `{node_id}` exited before initializing dora connection");
self.exited_before_subscribe.insert(node_id.clone());
self.update_dataflow_status(coordinator_connection).await?;
self.update_dataflow_status(coordinator_connection, clock)
.await?;
}
Ok(())
}
@@ -97,11 +102,13 @@ impl PendingNodes {
async fn update_dataflow_status(
&mut self,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
) -> eyre::Result<DataflowStatus> {
if self.local_nodes.is_empty() {
if self.external_nodes {
if !self.reported_init_to_coordinator {
self.report_nodes_ready(coordinator_connection).await?;
self.report_nodes_ready(coordinator_connection, clock.new_timestamp())
.await?;
self.reported_init_to_coordinator = true;
}
Ok(DataflowStatus::Pending)
@@ -139,6 +146,7 @@ impl PendingNodes {
async fn report_nodes_ready(
&self,
coordinator_connection: &mut Option<TcpStream>,
timestamp: Timestamp,
) -> eyre::Result<()> {
let Some(connection) = coordinator_connection else {
bail!("no coordinator connection to send AllNodesReady");
@@ -147,12 +155,15 @@ impl PendingNodes {
let success = self.exited_before_subscribe.is_empty();
tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes");

let msg = serde_json::to_vec(&CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesReady {
dataflow_id: self.dataflow_id,
success,
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesReady {
dataflow_id: self.dataflow_id,
success,
},
},
timestamp,
})?;
tcp_send(connection, &msg)
.await


Loading…
Cancel
Save