Browse Source

Merge pull request #302 from dora-rs/event-timestamps

Add timestamps generated by hybrid logical clocks to all sent events
tags/v0.2.4-rc
Haixuan Xavier Tao GitHub 2 years ago
parent
commit
caf0bd3bea
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 640 additions and 263 deletions
  1. +17
    -8
      apis/rust/node/src/daemon_connection/mod.rs
  2. +10
    -4
      apis/rust/node/src/daemon_connection/tcp.rs
  3. +25
    -8
      apis/rust/node/src/event_stream/mod.rs
  4. +49
    -20
      apis/rust/node/src/event_stream/thread.rs
  5. +24
    -10
      apis/rust/node/src/node/control_channel.rs
  6. +37
    -16
      apis/rust/node/src/node/drop_stream.rs
  7. +16
    -9
      apis/rust/node/src/node/mod.rs
  8. +81
    -31
      binaries/coordinator/src/lib.rs
  9. +13
    -5
      binaries/coordinator/src/listener.rs
  10. +10
    -3
      binaries/coordinator/src/run/mod.rs
  11. +28
    -9
      binaries/daemon/src/coordinator.rs
  12. +11
    -6
      binaries/daemon/src/inter_daemon.rs
  13. +200
    -81
      binaries/daemon/src/lib.rs
  14. +9
    -2
      binaries/daemon/src/main.rs
  15. +44
    -17
      binaries/daemon/src/node_communication/mod.rs
  16. +10
    -8
      binaries/daemon/src/node_communication/shmem.rs
  17. +12
    -8
      binaries/daemon/src/node_communication/tcp.rs
  18. +20
    -9
      binaries/daemon/src/pending.rs
  19. +13
    -3
      binaries/daemon/src/spawn.rs
  20. +2
    -3
      examples/c-dataflow/sink.c
  21. +9
    -3
      libraries/core/src/daemon_messages.rs

+ 17
- 8
apis/rust/node/src/daemon_connection/mod.rs View File

@@ -1,6 +1,7 @@
use dora_core::{
config::NodeId,
daemon_messages::{DaemonReply, DaemonRequest, DataflowId},
daemon_messages::{DaemonReply, DaemonRequest, DataflowId, Timestamped},
message::uhlc::Timestamp,
};
use eyre::{bail, eyre, Context};
use shared_memory_server::{ShmemClient, ShmemConf};
@@ -12,7 +13,7 @@ use std::{
mod tcp;

pub enum DaemonChannel {
Shmem(ShmemClient<DaemonRequest, DaemonReply>),
Shmem(ShmemClient<Timestamped<DaemonRequest>, DaemonReply>),
Tcp(TcpStream),
}

@@ -37,11 +38,19 @@ impl DaemonChannel {
Ok(channel)
}

pub fn register(&mut self, dataflow_id: DataflowId, node_id: NodeId) -> eyre::Result<()> {
let msg = DaemonRequest::Register {
dataflow_id,
node_id,
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
pub fn register(
&mut self,
dataflow_id: DataflowId,
node_id: NodeId,
timestamp: Timestamp,
) -> eyre::Result<()> {
let msg = Timestamped {
inner: DaemonRequest::Register {
dataflow_id,
node_id,
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
},
timestamp,
};
let reply = self
.request(&msg)
@@ -56,7 +65,7 @@ impl DaemonChannel {
Ok(())
}

pub fn request(&mut self, request: &DaemonRequest) -> eyre::Result<DaemonReply> {
pub fn request(&mut self, request: &Timestamped<DaemonRequest>) -> eyre::Result<DaemonReply> {
match self {
DaemonChannel::Shmem(client) => client.request(request),
DaemonChannel::Tcp(stream) => tcp::request(stream, request),


+ 10
- 4
apis/rust/node/src/daemon_connection/tcp.rs View File

@@ -1,13 +1,16 @@
use dora_core::daemon_messages::{DaemonReply, DaemonRequest};
use dora_core::daemon_messages::{DaemonReply, DaemonRequest, Timestamped};
use eyre::{eyre, Context};
use std::{
io::{Read, Write},
net::TcpStream,
};

pub fn request(connection: &mut TcpStream, request: &DaemonRequest) -> eyre::Result<DaemonReply> {
pub fn request(
connection: &mut TcpStream,
request: &Timestamped<DaemonRequest>,
) -> eyre::Result<DaemonReply> {
send_message(connection, request)?;
if request.expects_tcp_reply() {
if request.inner.expects_tcp_reply() {
receive_reply(connection)
.and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly")))
} else {
@@ -15,7 +18,10 @@ pub fn request(connection: &mut TcpStream, request: &DaemonRequest) -> eyre::Res
}
}

fn send_message(connection: &mut TcpStream, message: &DaemonRequest) -> eyre::Result<()> {
fn send_message(
connection: &mut TcpStream,
message: &Timestamped<DaemonRequest>,
) -> eyre::Result<()> {
let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonRequest")?;
tcp_send(connection, &serialized).wrap_err("failed to send DaemonRequest")?;
Ok(())


+ 25
- 8
apis/rust/node/src/event_stream/mod.rs View File

@@ -1,10 +1,15 @@
use std::sync::Arc;

pub use event::{Data, Event, MappedInputData};

use self::thread::{EventItem, EventStreamThreadHandle};
use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::NodeId,
daemon_messages::{self, DaemonCommunication, DaemonRequest, DataflowId, NodeEvent},
daemon_messages::{
self, DaemonCommunication, DaemonRequest, DataflowId, NodeEvent, Timestamped,
},
message::uhlc,
};
use eyre::{eyre, Context};

@@ -16,14 +21,16 @@ pub struct EventStream {
receiver: flume::Receiver<EventItem>,
_thread_handle: EventStreamThreadHandle,
close_channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
}

impl EventStream {
#[tracing::instrument(level = "trace")]
#[tracing::instrument(level = "trace", skip(clock))]
pub(crate) fn init(
dataflow_id: DataflowId,
node_id: &NodeId,
daemon_communication: &DaemonCommunication,
clock: Arc<uhlc::HLC>,
) -> eyre::Result<Self> {
let channel = match daemon_communication {
DaemonCommunication::Shmem {
@@ -49,7 +56,7 @@ impl EventStream {
})?,
};

Self::init_on_channel(dataflow_id, node_id, channel, close_channel)
Self::init_on_channel(dataflow_id, node_id, channel, close_channel, clock)
}

pub(crate) fn init_on_channel(
@@ -57,10 +64,14 @@ impl EventStream {
node_id: &NodeId,
mut channel: DaemonChannel,
mut close_channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
) -> eyre::Result<Self> {
channel.register(dataflow_id, node_id.clone())?;
channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
let reply = channel
.request(&DaemonRequest::Subscribe)
.request(&Timestamped {
inner: DaemonRequest::Subscribe,
timestamp: clock.new_timestamp(),
})
.map_err(|e| eyre!(e))
.wrap_err("failed to create subscription with dora-daemon")?;

@@ -72,16 +83,17 @@ impl EventStream {
other => eyre::bail!("unexpected subscribe reply: {other:?}"),
}

close_channel.register(dataflow_id, node_id.clone())?;
close_channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;

let (tx, rx) = flume::bounded(0);
let thread_handle = thread::init(node_id.clone(), tx, channel)?;
let thread_handle = thread::init(node_id.clone(), tx, channel, clock.clone())?;

Ok(EventStream {
node_id: node_id.clone(),
receiver: rx,
_thread_handle: thread_handle,
close_channel,
clock,
})
}

@@ -140,6 +152,7 @@ impl EventStream {
Event::Error(err.wrap_err("internal error").to_string())
}
},

EventItem::FatalError(err) => {
Event::Error(format!("fatal event stream error: {err:?}"))
}
@@ -152,9 +165,13 @@ impl EventStream {
impl Drop for EventStream {
#[tracing::instrument(skip(self), fields(%self.node_id))]
fn drop(&mut self) {
let request = Timestamped {
inner: DaemonRequest::EventStreamDropped,
timestamp: self.clock.new_timestamp(),
};
let result = self
.close_channel
.request(&DaemonRequest::EventStreamDropped)
.request(&request)
.map_err(|e| eyre!(e))
.wrap_err("failed to signal event stream closure to dora-daemon")
.and_then(|r| match r {


+ 49
- 20
apis/rust/node/src/event_stream/thread.rs View File

@@ -1,10 +1,14 @@
use dora_core::{
config::NodeId,
daemon_messages::{DaemonReply, DaemonRequest, DropToken, NodeEvent},
daemon_messages::{DaemonReply, DaemonRequest, DropToken, NodeEvent, Timestamped},
message::uhlc::{self, Timestamp},
};
use eyre::{eyre, Context};
use flume::RecvTimeoutError;
use std::time::{Duration, Instant};
use std::{
sync::Arc,
time::{Duration, Instant},
};

use crate::daemon_connection::DaemonChannel;

@@ -12,9 +16,10 @@ pub fn init(
node_id: NodeId,
tx: flume::Sender<EventItem>,
channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
) -> eyre::Result<EventStreamThreadHandle> {
let node_id_cloned = node_id.clone();
let join_handle = std::thread::spawn(|| event_stream_loop(node_id_cloned, tx, channel));
let join_handle = std::thread::spawn(|| event_stream_loop(node_id_cloned, tx, channel, clock));
Ok(EventStreamThreadHandle::new(node_id, join_handle))
}

@@ -68,8 +73,13 @@ impl Drop for EventStreamThreadHandle {
}
}

#[tracing::instrument(skip(tx, channel))]
fn event_stream_loop(node_id: NodeId, tx: flume::Sender<EventItem>, mut channel: DaemonChannel) {
#[tracing::instrument(skip(tx, channel, clock))]
fn event_stream_loop(
node_id: NodeId,
tx: flume::Sender<EventItem>,
mut channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
) {
let mut tx = Some(tx);
let mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)> = Vec::new();
let mut drop_tokens = Vec::new();
@@ -79,15 +89,21 @@ fn event_stream_loop(node_id: NodeId, tx: flume::Sender<EventItem>, mut channel:
break 'outer Err(err);
}

let daemon_request = DaemonRequest::NextEvent {
drop_tokens: std::mem::take(&mut drop_tokens),
let daemon_request = Timestamped {
inner: DaemonRequest::NextEvent {
drop_tokens: std::mem::take(&mut drop_tokens),
},
timestamp: clock.new_timestamp(),
};
let events = match channel.request(&daemon_request) {
Ok(DaemonReply::NextEvents(events)) if events.is_empty() => {
tracing::trace!("event stream closed for node `{node_id}`");
break Ok(());
Ok(DaemonReply::NextEvents(events)) => {
if events.is_empty() {
tracing::trace!("event stream closed for node `{node_id}`");
break Ok(());
} else {
events
}
}
Ok(DaemonReply::NextEvents(events)) => events,
Ok(other) => {
let err = eyre!("unexpected control reply: {other:?}");
tracing::warn!("{err:?}");
@@ -99,8 +115,11 @@ fn event_stream_loop(node_id: NodeId, tx: flume::Sender<EventItem>, mut channel:
continue;
}
};
for event in events {
let drop_token = match &event {
for Timestamped { inner, timestamp } in events {
if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
let drop_token = match &inner {
NodeEvent::Input {
data: Some(data), ..
} => data.drop_token(),
@@ -116,7 +135,7 @@ fn event_stream_loop(node_id: NodeId, tx: flume::Sender<EventItem>, mut channel:
if let Some(tx) = tx.as_ref() {
let (drop_tx, drop_rx) = flume::bounded(0);
match tx.send(EventItem::NodeEvent {
event,
event: inner,
ack_channel: drop_tx,
}) {
Ok(()) => {}
@@ -134,7 +153,7 @@ fn event_stream_loop(node_id: NodeId, tx: flume::Sender<EventItem>, mut channel:
pending_drop_tokens.push((token, drop_rx, Instant::now(), 1));
}
} else {
tracing::warn!("dropping event because event `tx` was already closed: `{event:?}`");
tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`");
}
}
};
@@ -152,8 +171,13 @@ fn event_stream_loop(node_id: NodeId, tx: flume::Sender<EventItem>, mut channel:
}
}

if let Err(err) = report_remaining_drop_tokens(channel, drop_tokens, pending_drop_tokens)
.context("failed to report remaining drop tokens")
if let Err(err) = report_remaining_drop_tokens(
channel,
drop_tokens,
pending_drop_tokens,
clock.new_timestamp(),
)
.context("failed to report remaining drop tokens")
{
tracing::warn!("{err:?}");
}
@@ -188,9 +212,10 @@ fn report_remaining_drop_tokens(
mut channel: DaemonChannel,
mut drop_tokens: Vec<DropToken>,
mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)>,
timestamp: Timestamp,
) -> eyre::Result<()> {
while !(pending_drop_tokens.is_empty() && drop_tokens.is_empty()) {
report_drop_tokens(&mut drop_tokens, &mut channel)?;
report_drop_tokens(&mut drop_tokens, &mut channel, timestamp)?;

let mut still_pending = Vec::new();
for (token, rx, since, _) in pending_drop_tokens.drain(..) {
@@ -225,12 +250,16 @@ fn report_remaining_drop_tokens(
fn report_drop_tokens(
drop_tokens: &mut Vec<DropToken>,
channel: &mut DaemonChannel,
timestamp: Timestamp,
) -> Result<(), eyre::ErrReport> {
if drop_tokens.is_empty() {
return Ok(());
}
let daemon_request = DaemonRequest::ReportDropTokens {
drop_tokens: std::mem::take(drop_tokens),
let daemon_request = Timestamped {
inner: DaemonRequest::ReportDropTokens {
drop_tokens: std::mem::take(drop_tokens),
},
timestamp,
};
match channel.request(&daemon_request)? {
dora_core::daemon_messages::DaemonReply::Empty => Ok(()),


+ 24
- 10
apis/rust/node/src/node/control_channel.rs View File

@@ -1,21 +1,25 @@
use std::sync::Arc;

use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::{DataId, NodeId},
daemon_messages::{DaemonCommunication, DaemonRequest, Data, DataflowId},
message::Metadata,
daemon_messages::{DaemonCommunication, DaemonRequest, Data, DataflowId, Timestamped},
message::{uhlc::HLC, Metadata},
};
use eyre::{bail, eyre, Context};

pub(crate) struct ControlChannel {
channel: DaemonChannel,
clock: Arc<HLC>,
}

impl ControlChannel {
#[tracing::instrument(level = "trace")]
#[tracing::instrument(level = "trace", skip(clock))]
pub(crate) fn init(
dataflow_id: DataflowId,
node_id: &NodeId,
daemon_communication: &DaemonCommunication,
clock: Arc<HLC>,
) -> eyre::Result<Self> {
let channel = match daemon_communication {
DaemonCommunication::Shmem {
@@ -27,24 +31,28 @@ impl ControlChannel {
.wrap_err("failed to connect control channel")?,
};

Self::init_on_channel(dataflow_id, node_id, channel)
Self::init_on_channel(dataflow_id, node_id, channel, clock)
}

#[tracing::instrument(skip(channel), level = "trace")]
#[tracing::instrument(skip(channel, clock), level = "trace")]
pub fn init_on_channel(
dataflow_id: DataflowId,
node_id: &NodeId,
mut channel: DaemonChannel,
clock: Arc<HLC>,
) -> eyre::Result<Self> {
channel.register(dataflow_id, node_id.clone())?;
channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;

Ok(Self { channel })
Ok(Self { channel, clock })
}

pub fn report_outputs_done(&mut self) -> eyre::Result<()> {
let reply = self
.channel
.request(&DaemonRequest::OutputsDone)
.request(&Timestamped {
inner: DaemonRequest::OutputsDone,
timestamp: self.clock.new_timestamp(),
})
.wrap_err("failed to report outputs done to dora-daemon")?;
match reply {
dora_core::daemon_messages::DaemonReply::Result(result) => result
@@ -58,7 +66,10 @@ impl ControlChannel {
pub fn report_closed_outputs(&mut self, outputs: Vec<DataId>) -> eyre::Result<()> {
let reply = self
.channel
.request(&DaemonRequest::CloseOutputs(outputs))
.request(&Timestamped {
inner: DaemonRequest::CloseOutputs(outputs),
timestamp: self.clock.new_timestamp(),
})
.wrap_err("failed to report closed outputs to dora-daemon")?;
match reply {
dora_core::daemon_messages::DaemonReply::Result(result) => result
@@ -82,7 +93,10 @@ impl ControlChannel {
};
let reply = self
.channel
.request(&request)
.request(&Timestamped {
inner: request,
timestamp: self.clock.new_timestamp(),
})
.wrap_err("failed to send SendMessage request to dora-daemon")?;
match reply {
dora_core::daemon_messages::DaemonReply::Empty => Ok(()),


+ 37
- 16
apis/rust/node/src/node/drop_stream.rs View File

@@ -1,11 +1,13 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::NodeId,
daemon_messages::{
self, DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent,
self, DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken,
NodeDropEvent, Timestamped,
},
message::uhlc,
};
use eyre::{eyre, Context};
use flume::RecvTimeoutError;
@@ -16,11 +18,12 @@ pub struct DropStream {
}

impl DropStream {
#[tracing::instrument(level = "trace")]
#[tracing::instrument(level = "trace", skip(hlc))]
pub(crate) fn init(
dataflow_id: DataflowId,
node_id: &NodeId,
daemon_communication: &DaemonCommunication,
hlc: Arc<uhlc::HLC>,
) -> eyre::Result<Self> {
let channel = match daemon_communication {
DaemonCommunication::Shmem {
@@ -35,18 +38,22 @@ impl DropStream {
.wrap_err_with(|| format!("failed to connect drop stream for node `{node_id}`"))?,
};

Self::init_on_channel(dataflow_id, node_id, channel)
Self::init_on_channel(dataflow_id, node_id, channel, hlc)
}

pub fn init_on_channel(
dataflow_id: DataflowId,
node_id: &NodeId,
mut channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
) -> eyre::Result<Self> {
channel.register(dataflow_id, node_id.clone())?;
channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;

let reply = channel
.request(&DaemonRequest::SubscribeDrop)
.request(&Timestamped {
inner: DaemonRequest::SubscribeDrop,
timestamp: clock.new_timestamp(),
})
.map_err(|e| eyre!(e))
.wrap_err("failed to create subscription with dora-daemon")?;

@@ -61,7 +68,7 @@ impl DropStream {
let (tx, rx) = flume::bounded(0);
let node_id_cloned = node_id.clone();

let handle = std::thread::spawn(|| drop_stream_loop(node_id_cloned, tx, channel));
let handle = std::thread::spawn(|| drop_stream_loop(node_id_cloned, tx, channel, clock));

Ok(Self {
receiver: rx,
@@ -78,16 +85,27 @@ impl std::ops::Deref for DropStream {
}
}

#[tracing::instrument(skip(tx, channel))]
fn drop_stream_loop(node_id: NodeId, tx: flume::Sender<DropToken>, mut channel: DaemonChannel) {
#[tracing::instrument(skip(tx, channel, clock))]
fn drop_stream_loop(
node_id: NodeId,
tx: flume::Sender<DropToken>,
mut channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
) {
'outer: loop {
let daemon_request = DaemonRequest::NextFinishedDropTokens;
let daemon_request = Timestamped {
inner: DaemonRequest::NextFinishedDropTokens,
timestamp: clock.new_timestamp(),
};
let events = match channel.request(&daemon_request) {
Ok(DaemonReply::NextDropEvents(events)) if events.is_empty() => {
tracing::trace!("drop stream closed for node `{node_id}`");
break;
Ok(DaemonReply::NextDropEvents(events)) => {
if events.is_empty() {
tracing::trace!("drop stream closed for node `{node_id}`");
break;
} else {
events
}
}
Ok(DaemonReply::NextDropEvents(events)) => events,
Ok(other) => {
let err = eyre!("unexpected drop reply: {other:?}");
tracing::warn!("{err:?}");
@@ -99,8 +117,11 @@ fn drop_stream_loop(node_id: NodeId, tx: flume::Sender<DropToken>, mut channel:
continue;
}
};
for event in events {
match event {
for Timestamped { inner, timestamp } in events {
if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
match inner {
NodeDropEvent::OutputDropped { drop_token } => {
if tx.send(drop_token).is_err() {
tracing::warn!(


+ 16
- 9
apis/rust/node/src/node/mod.rs View File

@@ -12,6 +12,7 @@ use shared_memory::{Shmem, ShmemConf};
use std::{
collections::{HashMap, VecDeque},
ops::{Deref, DerefMut},
sync::Arc,
time::Duration,
};

@@ -27,7 +28,7 @@ pub struct DoraNode {
id: NodeId,
node_config: NodeRunConfig,
control_channel: ControlChannel,
hlc: uhlc::HLC,
clock: Arc<uhlc::HLC>,

sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
drop_stream: DropStream,
@@ -67,18 +68,23 @@ impl DoraNode {
dataflow_descriptor,
} = node_config;

let event_stream = EventStream::init(dataflow_id, &node_id, &daemon_communication)
.wrap_err("failed to init event stream")?;
let drop_stream = DropStream::init(dataflow_id, &node_id, &daemon_communication)
.wrap_err("failed to init drop stream")?;
let control_channel = ControlChannel::init(dataflow_id, &node_id, &daemon_communication)
.wrap_err("failed to init control channel")?;
let clock = Arc::new(uhlc::HLC::default());

let event_stream =
EventStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
.wrap_err("failed to init event stream")?;
let drop_stream =
DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
.wrap_err("failed to init drop stream")?;
let control_channel =
ControlChannel::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
.wrap_err("failed to init control channel")?;

let node = Self {
id: node_id,
node_config: run_config,
control_channel,
hlc: uhlc::HLC::default(),
clock,
sent_out_shared_memory: HashMap::new(),
drop_stream,
cache: VecDeque::new(),
@@ -138,7 +144,8 @@ impl DoraNode {
if !self.node_config.outputs.contains(&output_id) {
eyre::bail!("unknown output");
}
let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters.into_owned());
let metadata =
Metadata::from_parameters(self.clock.new_timestamp(), parameters.into_owned());

let (data, shmem) = match sample {
Some(sample) => sample.finalize(),


+ 81
- 31
binaries/coordinator/src/lib.rs View File

@@ -6,8 +6,9 @@ pub use control::ControlEvent;
use dora_core::{
config::{NodeId, OperatorId},
coordinator_messages::RegisterResult,
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply},
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC},
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_DEFAULT,
@@ -21,6 +22,7 @@ use std::{
collections::{BTreeMap, BTreeSet, HashMap},
net::SocketAddr,
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
@@ -119,6 +121,8 @@ async fn start_inner(
tasks: &FuturesUnordered<JoinHandle<()>>,
external_events: impl Stream<Item = Event> + Unpin,
) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
c.map(Event::NewDaemonConnection)
.wrap_err("failed to open connection")
@@ -164,7 +168,11 @@ async fn start_inner(
connection.set_nodelay(true)?;
let events_tx = daemon_events_tx.clone();
if let Some(events_tx) = events_tx {
let task = tokio::spawn(listener::handle_connection(connection, events_tx));
let task = tokio::spawn(listener::handle_connection(
connection,
events_tx,
clock.clone(),
));
tasks.push(task);
} else {
tracing::warn!(
@@ -191,8 +199,12 @@ async fn start_inner(
not compatible with coordinator v{coordinator_version}"
))
};
let reply = Timestamped {
inner: reply,
timestamp: clock.new_timestamp(),
};
let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await;
match (reply, send_result) {
match (reply.inner, send_result) {
(RegisterResult::Ok, Ok(())) => {
let previous = daemon_connections.insert(
machine_id.clone(),
@@ -228,12 +240,14 @@ async fn start_inner(
dataflow.pending_machines.remove(&machine_id);
dataflow.init_success &= success;
if dataflow.pending_machines.is_empty() {
let message =
serde_json::to_vec(&DaemonCoordinatorEvent::AllNodesReady {
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::AllNodesReady {
dataflow_id: uuid,
success: dataflow.init_success,
})
.wrap_err("failed to serialize AllNodesReady message")?;
},
timestamp: clock.new_timestamp(),
})
.wrap_err("failed to serialize AllNodesReady message")?;

// notify all machines that run parts of the dataflow
for machine_id in &dataflow.machines {
@@ -327,6 +341,7 @@ async fn start_inner(
local_working_dir,
name,
&mut daemon_connections,
&clock,
)
.await?;
Ok(dataflow)
@@ -365,6 +380,7 @@ async fn start_inner(
node_id,
operator_id,
&mut daemon_connections,
clock.new_timestamp(),
)
.await?;
Result::<_, eyre::Report>::Ok(())
@@ -384,6 +400,7 @@ async fn start_inner(
dataflow_uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
)
.await?;
}
@@ -396,6 +413,7 @@ async fn start_inner(
uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
)
.await?
}
@@ -419,9 +437,10 @@ async fn start_inner(
dataflow_uuid,
node.into(),
&mut daemon_connections,
clock.new_timestamp(),
)
.await
.map(|logs| ControlRequestReply::Logs(logs));
.map(ControlRequestReply::Logs);
let _ = reply_sender.send(reply);
}
ControlRequest::Destroy => {
@@ -432,6 +451,7 @@ async fn start_inner(
&mut daemon_connections,
&abort_handle,
&mut daemon_events_tx,
&clock,
)
.await
.map(|()| ControlRequestReply::DestroyOk);
@@ -474,13 +494,16 @@ async fn start_inner(
disconnected.insert(machine_id.clone());
continue;
}
let result: eyre::Result<()> =
tokio::time::timeout(Duration::from_millis(500), send_watchdog_message(&mut connection.stream))
.await
.wrap_err("timeout")
.and_then(|r| r).wrap_err_with(||
format!("daemon at `{machine_id}` did not react as expected to watchdog message"),
);
let result: eyre::Result<()> = tokio::time::timeout(
Duration::from_millis(500),
send_heartbeat_message(&mut connection.stream, clock.new_timestamp()),
)
.await
.wrap_err("timeout")
.and_then(|r| r)
.wrap_err_with(|| {
format!("failed to send heartbeat message to daemon at `{machine_id}`")
});
if let Err(err) = result {
tracing::warn!("{err:?}");
disconnected.insert(machine_id.clone());
@@ -500,6 +523,7 @@ async fn start_inner(
&mut daemon_connections,
&abort_handle,
&mut daemon_events_tx,
&clock,
)
.await?;
}
@@ -522,6 +546,7 @@ async fn stop_dataflow_by_uuid(
dataflow_uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
reply_sender: tokio::sync::oneshot::Sender<Result<ControlRequestReply, eyre::ErrReport>>,
timestamp: uhlc::Timestamp,
) -> Result<(), eyre::ErrReport> {
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
@@ -535,7 +560,7 @@ async fn stop_dataflow_by_uuid(
bail!("no known dataflow found with UUID `{dataflow_uuid}`")
};
let stop = async {
stop_dataflow(dataflow, dataflow_uuid, daemon_connections).await?;
stop_dataflow(dataflow, dataflow_uuid, daemon_connections, timestamp).await?;
Result::<_, eyre::Report>::Ok(())
};
match stop.await {
@@ -603,22 +628,30 @@ async fn handle_destroy(
daemon_connections: &mut HashMap<String, DaemonConnection>,
abortable_events: &futures::stream::AbortHandle,
daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
clock: &HLC,
) -> Result<(), eyre::ErrReport> {
abortable_events.abort();
for (&uuid, dataflow) in running_dataflows {
stop_dataflow(dataflow, uuid, daemon_connections).await?;
stop_dataflow(dataflow, uuid, daemon_connections, clock.new_timestamp()).await?;
}
destroy_daemons(daemon_connections).await?;
destroy_daemons(daemon_connections, clock.new_timestamp()).await?;
*daemon_events_tx = None;
Ok(())
}

async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> {
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Heartbeat).unwrap();
async fn send_heartbeat_message(
connection: &mut TcpStream,
timestamp: uhlc::Timestamp,
) -> eyre::Result<()> {
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Heartbeat,
timestamp,
})
.unwrap();

tcp_send(connection, &message)
.await
.wrap_err("failed to send watchdog message to daemon")
.wrap_err("failed to send heartbeat message to daemon")
}

struct RunningDataflow {
@@ -660,8 +693,12 @@ async fn stop_dataflow(
dataflow: &RunningDataflow,
uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
) -> eyre::Result<()> {
let message = serde_json::to_vec(&DaemonCoordinatorEvent::StopDataflow { dataflow_id: uuid })?;
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::StopDataflow { dataflow_id: uuid },
timestamp,
})?;

for machine_id in &dataflow.machines {
let daemon_connection = daemon_connections
@@ -695,14 +732,18 @@ async fn reload_dataflow(
node_id: NodeId,
operator_id: Option<OperatorId>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
) -> eyre::Result<()> {
let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
bail!("No running dataflow found with UUID `{dataflow_id}`")
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::ReloadDataflow {
dataflow_id,
node_id,
operator_id,
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::ReloadDataflow {
dataflow_id,
node_id,
operator_id,
},
timestamp,
})?;

for machine_id in &dataflow.machines {
@@ -737,6 +778,7 @@ async fn retrieve_logs(
dataflow_id: Uuid,
node_id: NodeId,
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
) -> eyre::Result<Vec<u8>> {
let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
dataflow.nodes.clone()
@@ -746,9 +788,12 @@ async fn retrieve_logs(
bail!("No dataflow found with UUID `{dataflow_id}`")
};

let message = serde_json::to_vec(&DaemonCoordinatorEvent::Logs {
dataflow_id,
node_id: node_id.clone(),
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Logs {
dataflow_id,
node_id: node_id.clone(),
},
timestamp,
})?;

let machine_ids: Vec<String> = nodes
@@ -796,12 +841,13 @@ async fn start_dataflow(
working_dir: PathBuf,
name: Option<String>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
) -> eyre::Result<RunningDataflow> {
let SpawnedDataflow {
uuid,
machines,
nodes,
} = spawn_dataflow(dataflow, working_dir, daemon_connections).await?;
} = spawn_dataflow(dataflow, working_dir, daemon_connections, clock).await?;
Ok(RunningDataflow {
uuid,
name,
@@ -819,8 +865,12 @@ async fn start_dataflow(

async fn destroy_daemons(
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
) -> eyre::Result<()> {
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Destroy)?;
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Destroy,
timestamp,
})?;

for (machine_id, mut daemon_connection) in daemon_connections.drain() {
tcp_send(&mut daemon_connection.stream, &message)


+ 13
- 5
binaries/coordinator/src/listener.rs View File

@@ -1,7 +1,7 @@
use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event};
use dora_core::coordinator_messages;
use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC};
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::Ipv4Addr};
use std::{io::ErrorKind, net::Ipv4Addr, sync::Arc};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
@@ -18,7 +18,11 @@ pub async fn create_listener(port: u16) -> eyre::Result<TcpListener> {
Ok(socket)
}

pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sender<Event>) {
pub async fn handle_connection(
mut connection: TcpStream,
events_tx: mpsc::Sender<Event>,
clock: Arc<HLC>,
) {
loop {
// receive the next message and parse it
let raw = match tcp_receive(&mut connection).await {
@@ -31,7 +35,7 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende
continue;
}
};
let message: coordinator_messages::CoordinatorRequest =
let message: Timestamped<coordinator_messages::CoordinatorRequest> =
match serde_json::from_slice(&raw).wrap_err("failed to deserialize node message") {
Ok(e) => e,
Err(err) => {
@@ -40,8 +44,12 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende
}
};

if let Err(err) = clock.update_with_timestamp(&message.timestamp) {
tracing::warn!("failed to update coordinator clock: {err}");
}

// handle the message and translate it to a DaemonEvent
match message {
match message.inner {
coordinator_messages::CoordinatorRequest::Register {
machine_id,
dora_version,


+ 10
- 3
binaries/coordinator/src/run/mod.rs View File

@@ -4,8 +4,11 @@ use crate::{
};

use dora_core::{
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, SpawnDataflowNodes},
daemon_messages::{
DaemonCoordinatorEvent, DaemonCoordinatorReply, SpawnDataflowNodes, Timestamped,
},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::HLC,
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use std::{
@@ -14,11 +17,12 @@ use std::{
};
use uuid::Uuid;

#[tracing::instrument(skip(daemon_connections))]
#[tracing::instrument(skip(daemon_connections, clock))]
pub(super) async fn spawn_dataflow(
dataflow: Descriptor,
working_dir: PathBuf,
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
) -> eyre::Result<SpawnedDataflow> {
dataflow.check(&working_dir)?;

@@ -43,7 +47,10 @@ pub(super) async fn spawn_dataflow(
machine_listen_ports,
dataflow_descriptor: dataflow,
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Spawn(spawn_command))?;
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),
timestamp: clock.new_timestamp(),
})?;

for machine in &machines {
tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`");


+ 28
- 9
binaries/daemon/src/coordinator.rs View File

@@ -4,7 +4,8 @@ use crate::{
};
use dora_core::{
coordinator_messages::{CoordinatorRequest, RegisterResult},
daemon_messages::DaemonCoordinatorReply,
daemon_messages::{DaemonCoordinatorReply, Timestamped},
message::uhlc::HLC,
};
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::SocketAddr};
@@ -24,17 +25,21 @@ pub async fn register(
addr: SocketAddr,
machine_id: String,
listen_socket: SocketAddr,
) -> eyre::Result<impl Stream<Item = CoordinatorEvent>> {
clock: &HLC,
) -> eyre::Result<impl Stream<Item = Timestamped<CoordinatorEvent>>> {
let mut stream = TcpStream::connect(addr)
.await
.wrap_err("failed to connect to dora-coordinator")?;
stream
.set_nodelay(true)
.wrap_err("failed to set TCP_NODELAY")?;
let register = serde_json::to_vec(&CoordinatorRequest::Register {
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
machine_id,
listen_socket,
let register = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Register {
dora_version: env!("CARGO_PKG_VERSION").to_owned(),
machine_id,
listen_socket,
},
timestamp: clock.new_timestamp(),
})?;
tcp_send(&mut stream, &register)
.await
@@ -42,9 +47,13 @@ pub async fn register(
let reply_raw = tcp_receive(&mut stream)
.await
.wrap_err("failed to register reply from dora-coordinator")?;
let result: RegisterResult = serde_json::from_slice(&reply_raw)
let result: Timestamped<RegisterResult> = serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize dora-coordinator reply")?;
result.to_result()?;
result.inner.to_result()?;
if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
tracing::warn!("failed to update timestamp after register: {err}");
}

tracing::info!("Connected to dora-coordinator at {:?}", addr);

let (tx, rx) = mpsc::channel(1);
@@ -67,8 +76,18 @@ pub async fn register(
continue;
}
};
let Timestamped {
inner: event,
timestamp,
} = event;
let (reply_tx, reply_rx) = oneshot::channel();
match tx.send(CoordinatorEvent { event, reply_tx }).await {
match tx
.send(Timestamped {
inner: CoordinatorEvent { event, reply_tx },
timestamp,
})
.await
{
Ok(()) => {}
Err(_) => {
// receiving end of channel was closed


+ 11
- 6
binaries/daemon/src/inter_daemon.rs View File

@@ -1,5 +1,5 @@
use crate::tcp_utils::{tcp_receive, tcp_send};
use dora_core::daemon_messages::InterDaemonEvent;
use dora_core::daemon_messages::{InterDaemonEvent, Timestamped};
use eyre::{Context, ContextCompat};
use std::{
collections::BTreeMap,
@@ -46,7 +46,7 @@ impl InterDaemonConnection {
pub async fn send_inter_daemon_event(
target_machines: &[String],
inter_daemon_connections: &mut BTreeMap<String, InterDaemonConnection>,
event: &InterDaemonEvent,
event: &Timestamped<InterDaemonEvent>,
) -> eyre::Result<()> {
let message = bincode::serialize(event).wrap_err("failed to serialize InterDaemonEvent")?;
for target_machine in target_machines {
@@ -66,7 +66,7 @@ pub async fn send_inter_daemon_event(

pub async fn spawn_listener_loop(
machine_id: String,
events_tx: flume::Sender<InterDaemonEvent>,
events_tx: flume::Sender<Timestamped<InterDaemonEvent>>,
) -> eyre::Result<SocketAddr> {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, 0)).await {
@@ -87,7 +87,10 @@ pub async fn spawn_listener_loop(
Ok(socket_addr)
}

async fn listener_loop(listener: TcpListener, events_tx: flume::Sender<InterDaemonEvent>) {
async fn listener_loop(
listener: TcpListener,
events_tx: flume::Sender<Timestamped<InterDaemonEvent>>,
) {
loop {
match listener
.accept()
@@ -106,7 +109,7 @@ async fn listener_loop(listener: TcpListener, events_tx: flume::Sender<InterDaem

async fn handle_connection_loop(
mut connection: TcpStream,
events_tx: flume::Sender<InterDaemonEvent>,
events_tx: flume::Sender<Timestamped<InterDaemonEvent>>,
) {
if let Err(err) = connection.set_nodelay(true) {
tracing::warn!("failed to set nodelay for connection: {err}");
@@ -128,7 +131,9 @@ async fn handle_connection_loop(
}
}

async fn receive_message(connection: &mut TcpStream) -> eyre::Result<Option<InterDaemonEvent>> {
async fn receive_message(
connection: &mut TcpStream,
) -> eyre::Result<Option<Timestamped<InterDaemonEvent>>> {
let raw = match tcp_receive(connection).await {
Ok(raw) => raw,
Err(err) => match err.kind() {


+ 200
- 81
binaries/daemon/src/lib.rs View File

@@ -1,8 +1,8 @@
use coordinator::CoordinatorEvent;
use dora_core::config::{Input, OperatorId};
use dora_core::coordinator_messages::CoordinatorRequest;
use dora_core::daemon_messages::{Data, InterDaemonEvent};
use dora_core::message::uhlc::HLC;
use dora_core::daemon_messages::{Data, InterDaemonEvent, Timestamped};
use dora_core::message::uhlc::{self, HLC};
use dora_core::message::MetadataParameters;
use dora_core::{
config::{DataId, InputMapping, NodeId},
@@ -20,6 +20,7 @@ use inter_daemon::InterDaemonConnection;
use pending::PendingNodes;
use shared_memory_server::ShmemConf;
use std::env::temp_dir;
use std::sync::Arc;
use std::time::Instant;
use std::{
borrow::Cow,
@@ -58,7 +59,7 @@ use crate::pending::DataflowStatus;
pub struct Daemon {
running: HashMap<DataflowId, RunningDataflow>,

events_tx: mpsc::Sender<Event>,
events_tx: mpsc::Sender<Timestamped<Event>>,

coordinator_connection: Option<TcpStream>,
last_coordinator_heartbeat: Instant,
@@ -69,32 +70,48 @@ pub struct Daemon {
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
/// used to record dataflow results when `exit_when_done` is used
dataflow_errors: BTreeMap<Uuid, BTreeMap<NodeId, eyre::Report>>,

clock: Arc<uhlc::HLC>,
}

impl Daemon {
pub async fn run(
coordinator_addr: SocketAddr,
machine_id: String,
external_events: impl Stream<Item = Event> + Unpin,
external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

// spawn listen loop
let (events_tx, events_rx) = flume::bounded(10);
let listen_socket =
inter_daemon::spawn_listener_loop(machine_id.clone(), events_tx).await?;
let daemon_events = events_rx.into_stream().map(Event::Daemon);
let daemon_events = events_rx.into_stream().map(|e| Timestamped {
inner: Event::Daemon(e.inner),
timestamp: e.timestamp,
});

// connect to the coordinator
let coordinator_events =
coordinator::register(coordinator_addr, machine_id.clone(), listen_socket)
coordinator::register(coordinator_addr, machine_id.clone(), listen_socket, &clock)
.await
.wrap_err("failed to connect to dora-coordinator")?
.map(Event::Coordinator);
.map(
|Timestamped {
inner: event,
timestamp,
}| Timestamped {
inner: Event::Coordinator(event),
timestamp,
},
);

Self::run_general(
(coordinator_events, external_events, daemon_events).merge(),
Some(coordinator_addr),
machine_id,
None,
clock,
)
.await
.map(|_| ())
@@ -120,23 +137,30 @@ impl Daemon {
dataflow_descriptor: descriptor,
};

let clock = Arc::new(HLC::default());

let exit_when_done = spawn_command
.nodes
.iter()
.map(|n| (spawn_command.dataflow_id, n.id.clone()))
.collect();
let (reply_tx, reply_rx) = oneshot::channel();
let timestamp = clock.new_timestamp();
let coordinator_events = stream::once(async move {
Event::Coordinator(CoordinatorEvent {
event: DaemonCoordinatorEvent::Spawn(spawn_command),
reply_tx,
})
Timestamped {
inner: Event::Coordinator(CoordinatorEvent {
event: DaemonCoordinatorEvent::Spawn(spawn_command),
reply_tx,
}),
timestamp,
}
});
let run_result = Self::run_general(
Box::pin(coordinator_events),
None,
"".into(),
Some(exit_when_done),
clock,
);

let spawn_result = reply_rx
@@ -167,10 +191,11 @@ impl Daemon {
}

async fn run_general(
external_events: impl Stream<Item = Event> + Unpin,
external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
coordinator_addr: Option<SocketAddr>,
machine_id: String,
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
clock: Arc<HLC>,
) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, eyre::Report>>> {
let coordinator_connection = match coordinator_addr {
Some(addr) => {
@@ -195,13 +220,18 @@ impl Daemon {
machine_id,
exit_when_done,
dataflow_errors: BTreeMap::new(),
clock,
};

let dora_events = ReceiverStream::new(dora_events_rx);
let watchdog_clock = daemon.clock.clone();
let watchdog_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
Duration::from_secs(5),
))
.map(|_| Event::HeartbeatInterval);
.map(|_| Timestamped {
inner: Event::HeartbeatInterval,
timestamp: watchdog_clock.new_timestamp(),
});
let events = (external_events, dora_events, watchdog_interval).merge();
daemon.run_inner(events).await
}
@@ -209,12 +239,17 @@ impl Daemon {
#[tracing::instrument(skip(incoming_events, self), fields(%self.machine_id))]
async fn run_inner(
mut self,
incoming_events: impl Stream<Item = Event> + Unpin,
incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, eyre::Report>>> {
let mut events = incoming_events;

while let Some(event) = events.next().await {
match event {
let Timestamped { inner, timestamp } = event;
if let Err(err) = self.clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC with incoming event timestamp: {err}");
}

match inner {
Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
let status = self.handle_coordinator_event(event, reply_tx).await?;

@@ -237,9 +272,12 @@ impl Daemon {
},
Event::HeartbeatInterval => {
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::Heartbeat,
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::Heartbeat,
},
timestamp: self.clock.new_timestamp(),
})?;
tcp_send(connection, &msg)
.await
@@ -252,7 +290,7 @@ impl Daemon {
}
Event::CtrlC => {
for dataflow in self.running.values_mut() {
dataflow.stop_all().await;
dataflow.stop_all(&self.clock).await;
}
}
}
@@ -315,7 +353,7 @@ impl Daemon {
.await?;
if success {
tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`");
dataflow.start(&self.events_tx).await?;
dataflow.start(&self.events_tx, &self.clock).await?;
}
}
None => {
@@ -377,7 +415,7 @@ impl Daemon {
.running
.get_mut(&dataflow_id)
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
dataflow.stop_all().await;
dataflow.stop_all(&self.clock).await;
Result::<(), eyre::Report>::Ok(())
};
let reply = DaemonCoordinatorReply::StopResult(
@@ -424,6 +462,7 @@ impl Daemon {
dataflow,
&metadata,
data.map(Data::Vec),
&self.clock,
)
.await?;
Result::<_, eyre::Report>::Ok(())
@@ -446,7 +485,7 @@ impl Daemon {
format!("send out failed: no running dataflow with ID `{dataflow_id}`")
})?;
for (receiver_id, input_id) in &inputs {
close_input(dataflow, receiver_id, input_id);
close_input(dataflow, receiver_id, input_id, &self.clock);
}
Result::<(), eyre::Report>::Ok(())
};
@@ -523,6 +562,7 @@ impl Daemon {
node,
self.events_tx.clone(),
dataflow_descriptor.clone(),
self.clock.clone(),
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
@@ -534,7 +574,11 @@ impl Daemon {
tracing::error!("{err:?}");
dataflow
.pending_nodes
.handle_node_stop(&node_id, &mut self.coordinator_connection)
.handle_node_stop(
&node_id,
&mut self.coordinator_connection,
&self.clock,
)
.await?;
}
}
@@ -567,7 +611,7 @@ impl Daemon {
}
Ok(dataflow) => {
tracing::debug!("node `{node_id}` is ready");
Self::subscribe(dataflow, node_id.clone(), event_sender).await;
Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;

let status = dataflow
.pending_nodes
@@ -575,6 +619,7 @@ impl Daemon {
node_id.clone(),
reply_sender,
&mut self.coordinator_connection,
&self.clock,
)
.await?;
match status {
@@ -582,7 +627,7 @@ impl Daemon {
tracing::info!(
"all nodes are ready, starting dataflow `{dataflow_id}`"
);
dataflow.start(&self.events_tx).await?;
dataflow.start(&self.events_tx, &self.clock).await?;
}
DataflowStatus::Pending => {}
}
@@ -615,6 +660,7 @@ impl Daemon {
|OutputId(source_id, output_id)| {
source_id == &node_id && outputs.contains(output_id)
},
&self.clock,
)
.await
};
@@ -625,7 +671,7 @@ impl Daemon {
DaemonNodeEvent::OutputsDone { reply_sender } => {
let result = match self.running.get_mut(&dataflow_id) {
Some(dataflow) => {
Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, &node_id)
Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, &node_id, &self.clock)
.await
},
None => Err(eyre!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")),
@@ -655,7 +701,7 @@ impl Daemon {
match dataflow.pending_drop_tokens.get_mut(&token) {
Some(info) => {
if info.pending_nodes.remove(&node_id) {
dataflow.check_drop_token(token).await?;
dataflow.check_drop_token(token, &self.clock).await?;
} else {
tracing::warn!(
"node `{node_id}` is not pending for drop token `{token:?}`"
@@ -693,8 +739,11 @@ impl Daemon {
format!("Reload failed: no running dataflow with ID `{dataflow_id}`")
})?;
if let Some(channel) = dataflow.subscribe_channels.get(&node_id) {
let item = daemon_messages::NodeEvent::Reload { operator_id };
match channel.send(item) {
match send_with_timestamp(
channel,
daemon_messages::NodeEvent::Reload { operator_id },
&self.clock,
) {
Ok(()) => {}
Err(_) => {
dataflow.subscribe_channels.remove(&node_id);
@@ -721,6 +770,7 @@ impl Daemon {
dataflow,
&metadata,
data,
&self.clock,
)
.await?;

@@ -731,12 +781,15 @@ impl Daemon {
.map(|m| m.keys().cloned().collect())
.unwrap_or_default();
if !remote_receivers.is_empty() {
let event = InterDaemonEvent::Output {
dataflow_id,
node_id: output_id.0,
output_id: output_id.1,
metadata,
data: data_bytes,
let event = Timestamped {
inner: InterDaemonEvent::Output {
dataflow_id,
node_id: output_id.0,
output_id: output_id.1,
metadata,
data: data_bytes,
},
timestamp: self.clock.new_timestamp(),
};
inter_daemon::send_inter_daemon_event(
&remote_receivers,
@@ -753,7 +806,8 @@ impl Daemon {
async fn subscribe(
dataflow: &mut RunningDataflow,
node_id: NodeId,
event_sender: UnboundedSender<daemon_messages::NodeEvent>,
event_sender: UnboundedSender<Timestamped<daemon_messages::NodeEvent>>,
clock: &HLC,
) {
// some inputs might have been closed already -> report those events
let closed_inputs = dataflow
@@ -770,33 +824,43 @@ impl Daemon {
.unwrap_or(true)
});
for input_id in closed_inputs {
let _ = event_sender.send(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
});
let _ = send_with_timestamp(
&event_sender,
daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
},
clock,
);
}
if dataflow.open_inputs(&node_id).is_empty() {
let _ = event_sender.send(daemon_messages::NodeEvent::AllInputsClosed);
let _ = send_with_timestamp(
&event_sender,
daemon_messages::NodeEvent::AllInputsClosed,
clock,
);
}

// if a stop event was already sent for the dataflow, send it to
// the newly connected node too
if dataflow.stop_sent {
let _ = event_sender.send(daemon_messages::NodeEvent::Stop);
let _ = send_with_timestamp(&event_sender, daemon_messages::NodeEvent::Stop, clock);
}

dataflow.subscribe_channels.insert(node_id, event_sender);
}

#[tracing::instrument(skip(dataflow, inter_daemon_connections), fields(uuid = %dataflow.id), level = "trace")]
#[tracing::instrument(skip(dataflow, inter_daemon_connections, clock), fields(uuid = %dataflow.id), level = "trace")]
async fn handle_outputs_done(
dataflow: &mut RunningDataflow,
inter_daemon_connections: &mut BTreeMap<String, InterDaemonConnection>,
node_id: &NodeId,
clock: &HLC,
) -> eyre::Result<()> {
send_input_closed_events(
dataflow,
inter_daemon_connections,
|OutputId(source_id, _)| source_id == node_id,
clock,
)
.await?;
dataflow.drop_channels.remove(node_id);
@@ -810,10 +874,16 @@ impl Daemon {

dataflow
.pending_nodes
.handle_node_stop(node_id, &mut self.coordinator_connection)
.handle_node_stop(node_id, &mut self.coordinator_connection, &self.clock)
.await?;

Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, node_id).await?;
Self::handle_outputs_done(
dataflow,
&mut self.inter_daemon_connections,
node_id,
&self.clock,
)
.await?;

dataflow.running_nodes.remove(node_id);
if dataflow.running_nodes.is_empty() {
@@ -833,12 +903,15 @@ impl Daemon {
self.machine_id
);
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesFinished {
dataflow_id,
result,
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesFinished {
dataflow_id,
result,
},
},
timestamp: self.clock.new_timestamp(),
})?;
tcp_send(connection, &msg)
.await
@@ -871,11 +944,15 @@ impl Daemon {
continue;
};

let send_result = channel.send(daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: None,
});
let send_result = send_with_timestamp(
channel,
daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: None,
},
&self.clock,
);
match send_result {
Ok(()) => {}
Err(_) => {
@@ -994,7 +1071,9 @@ async fn send_output_to_local_receivers(
dataflow: &mut RunningDataflow,
metadata: &dora_core::message::Metadata<'static>,
data: Option<Data>,
clock: &HLC,
) -> Result<Option<Vec<u8>>, eyre::ErrReport> {
let timestamp = metadata.timestamp();
let empty_set = BTreeSet::new();
let output_id = OutputId(node_id, output_id);
let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
@@ -1007,7 +1086,10 @@ async fn send_output_to_local_receivers(
metadata: metadata.clone(),
data: data.clone(),
};
match channel.send(item) {
match channel.send(Timestamped {
inner: item,
timestamp,
}) {
Ok(()) => {
if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
dataflow
@@ -1056,7 +1138,7 @@ async fn send_output_to_local_receivers(
pending_nodes: Default::default(),
});
// check if all local subscribers are finished with the token
dataflow.check_drop_token(token).await?;
dataflow.check_drop_token(token, clock).await?;
}
Ok(data_bytes)
}
@@ -1099,6 +1181,7 @@ async fn send_input_closed_events<F>(
dataflow: &mut RunningDataflow,
inter_daemon_connections: &mut BTreeMap<String, InterDaemonConnection>,
mut filter: F,
clock: &HLC,
) -> eyre::Result<()>
where
F: FnMut(&OutputId) -> bool,
@@ -1111,7 +1194,7 @@ where
.cloned()
.collect();
for (receiver_id, input_id) in &local_node_inputs {
close_input(dataflow, receiver_id, input_id);
close_input(dataflow, receiver_id, input_id, clock);
}

let mut external_node_inputs = BTreeMap::new();
@@ -1122,9 +1205,12 @@ where
}
if !external_node_inputs.is_empty() {
for (target_machine, inputs) in external_node_inputs {
let event = InterDaemonEvent::InputsClosed {
dataflow_id: dataflow.id,
inputs,
let event = Timestamped {
inner: InterDaemonEvent::InputsClosed {
dataflow_id: dataflow.id,
inputs,
},
timestamp: clock.new_timestamp(),
};
inter_daemon::send_inter_daemon_event(
&[target_machine],
@@ -1138,19 +1224,29 @@ where
Ok(())
}

fn close_input(dataflow: &mut RunningDataflow, receiver_id: &NodeId, input_id: &DataId) {
fn close_input(
dataflow: &mut RunningDataflow,
receiver_id: &NodeId,
input_id: &DataId,
clock: &HLC,
) {
if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) {
if !open_inputs.remove(input_id) {
return;
}
}
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let _ = channel.send(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
});
let _ = send_with_timestamp(
channel,
daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
},
clock,
);

if dataflow.open_inputs(receiver_id).is_empty() {
let _ = channel.send(daemon_messages::NodeEvent::AllInputsClosed);
let _ =
send_with_timestamp(channel, daemon_messages::NodeEvent::AllInputsClosed, clock);
}
}
}
@@ -1160,8 +1256,8 @@ pub struct RunningDataflow {
/// Local nodes that are not started yet
pending_nodes: PendingNodes,

subscribe_channels: HashMap<NodeId, UnboundedSender<daemon_messages::NodeEvent>>,
drop_channels: HashMap<NodeId, UnboundedSender<daemon_messages::NodeDropEvent>>,
subscribe_channels: HashMap<NodeId, UnboundedSender<Timestamped<daemon_messages::NodeEvent>>>,
drop_channels: HashMap<NodeId, UnboundedSender<Timestamped<daemon_messages::NodeDropEvent>>>,
mappings: HashMap<OutputId, BTreeSet<InputId>>,
timers: BTreeMap<Duration, BTreeSet<InputId>>,
open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
@@ -1200,10 +1296,15 @@ impl RunningDataflow {
}
}

async fn start(&mut self, events_tx: &mpsc::Sender<Event>) -> eyre::Result<()> {
async fn start(
&mut self,
events_tx: &mpsc::Sender<Timestamped<Event>>,
clock: &Arc<HLC>,
) -> eyre::Result<()> {
for interval in self.timers.keys().copied() {
let events_tx = events_tx.clone();
let dataflow_id = self.id;
let clock = clock.clone();
let task = async move {
let mut interval_stream = tokio::time::interval(interval);
let hlc = HLC::default();
@@ -1225,12 +1326,16 @@ impl RunningDataflow {
},
);

let event = DoraEvent::Timer {
dataflow_id,
interval,
metadata,
let event = Timestamped {
inner: DoraEvent::Timer {
dataflow_id,
interval,
metadata,
}
.into(),
timestamp: clock.new_timestamp(),
};
if events_tx.send(event.into()).await.is_err() {
if events_tx.send(event).await.is_err() {
break;
}
}
@@ -1243,9 +1348,9 @@ impl RunningDataflow {
Ok(())
}

async fn stop_all(&mut self) {
async fn stop_all(&mut self, clock: &HLC) {
for (_node_id, channel) in self.subscribe_channels.drain() {
let _ = channel.send(daemon_messages::NodeEvent::Stop);
let _ = send_with_timestamp(&channel, daemon_messages::NodeEvent::Stop, clock);
}
self.stop_sent = true;
}
@@ -1254,15 +1359,18 @@ impl RunningDataflow {
self.open_inputs.get(node_id).unwrap_or(&self.empty_set)
}

async fn check_drop_token(&mut self, token: DropToken) -> eyre::Result<()> {
async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> {
match self.pending_drop_tokens.entry(token) {
std::collections::hash_map::Entry::Occupied(entry) => {
if entry.get().pending_nodes.is_empty() {
let (drop_token, info) = entry.remove_entry();
let result = match self.drop_channels.get_mut(&info.owner) {
Some(channel) => channel
.send(daemon_messages::NodeDropEvent::OutputDropped { drop_token })
.wrap_err("send failed"),
Some(channel) => send_with_timestamp(
channel,
daemon_messages::NodeDropEvent::OutputDropped { drop_token },
clock,
)
.wrap_err("send failed"),
None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)),
};
if let Err(err) = result.wrap_err_with(|| {
@@ -1322,11 +1430,11 @@ pub enum DaemonNodeEvent {
reply_sender: oneshot::Sender<DaemonReply>,
},
Subscribe {
event_sender: UnboundedSender<daemon_messages::NodeEvent>,
event_sender: UnboundedSender<Timestamped<daemon_messages::NodeEvent>>,
reply_sender: oneshot::Sender<DaemonReply>,
},
SubscribeDrop {
event_sender: UnboundedSender<daemon_messages::NodeDropEvent>,
event_sender: UnboundedSender<Timestamped<daemon_messages::NodeDropEvent>>,
reply_sender: oneshot::Sender<DaemonReply>,
},
CloseOutputs {
@@ -1398,3 +1506,14 @@ enum RunStatus {
Continue,
Exit,
}

fn send_with_timestamp<T>(
sender: &UnboundedSender<Timestamped<T>>,
event: T,
clock: &HLC,
) -> Result<(), mpsc::error::SendError<Timestamped<T>>> {
sender.send(Timestamped {
inner: event,
timestamp: clock.new_timestamp(),
})
}

+ 9
- 2
binaries/daemon/src/main.rs View File

@@ -1,4 +1,6 @@
use dora_core::topics::DORA_COORDINATOR_PORT_DEFAULT;
use dora_core::{
daemon_messages::Timestamped, message::uhlc::HLC, topics::DORA_COORDINATOR_PORT_DEFAULT,
};
use dora_daemon::{Daemon, Event};
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
@@ -54,12 +56,17 @@ async fn run() -> eyre::Result<()> {
let (ctrl_c_tx, ctrl_c_rx) = mpsc::channel(1);
let mut ctrlc_sent = false;
ctrlc::set_handler(move || {
let clock = HLC::default();
if ctrlc_sent {
tracing::warn!("received second ctrlc signal -> aborting immediately");
std::process::abort();
} else {
tracing::info!("received ctrlc signal");
if ctrl_c_tx.blocking_send(Event::CtrlC).is_err() {
let event = Timestamped {
inner: Event::CtrlC,
timestamp: clock.new_timestamp(),
};
if ctrl_c_tx.blocking_send(event).is_err() {
tracing::error!("failed to report ctrl-c event to dora-daemon");
}
ctrlc_sent = true;


+ 44
- 17
binaries/daemon/src/node_communication/mod.rs View File

@@ -3,7 +3,9 @@ use dora_core::{
config::{DataId, LocalCommunicationConfig, NodeId},
daemon_messages::{
DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeDropEvent, NodeEvent,
Timestamped,
},
message::uhlc,
};
use eyre::{eyre, Context};
use futures::{future, task, Future};
@@ -12,6 +14,7 @@ use std::{
collections::{BTreeMap, VecDeque},
mem,
net::Ipv4Addr,
sync::Arc,
task::Poll,
};
use tokio::{
@@ -29,9 +32,10 @@ pub mod tcp;
pub async fn spawn_listener_loop(
dataflow_id: &DataflowId,
node_id: &NodeId,
daemon_tx: &mpsc::Sender<Event>,
daemon_tx: &mpsc::Sender<Timestamped<Event>>,
config: LocalCommunicationConfig,
queue_sizes: BTreeMap<DataId, usize>,
clock: Arc<uhlc::HLC>,
) -> eyre::Result<DaemonCommunication> {
match config {
LocalCommunicationConfig::Tcp => {
@@ -51,7 +55,7 @@ pub async fn spawn_listener_loop(
let event_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
tokio::spawn(async move {
tcp::listener_loop(socket, daemon_tx, queue_sizes).await;
tcp::listener_loop(socket, daemon_tx, queue_sizes, clock).await;
tracing::debug!("event listener loop finished for `{event_loop_node_id}`");
});

@@ -84,7 +88,8 @@ pub async fn spawn_listener_loop(
.wrap_err("failed to create control server")?;
let daemon_tx = daemon_tx.clone();
let queue_sizes = queue_sizes.clone();
tokio::spawn(shmem::listener_loop(server, daemon_tx, queue_sizes));
let clock = clock.clone();
tokio::spawn(shmem::listener_loop(server, daemon_tx, queue_sizes, clock));
}

{
@@ -93,8 +98,9 @@ pub async fn spawn_listener_loop(
let event_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
let queue_sizes = queue_sizes.clone();
let clock = clock.clone();
tokio::task::spawn(async move {
shmem::listener_loop(server, daemon_tx, queue_sizes).await;
shmem::listener_loop(server, daemon_tx, queue_sizes, clock).await;
tracing::debug!("event listener loop finished for `{event_loop_node_id}`");
});
}
@@ -105,8 +111,9 @@ pub async fn spawn_listener_loop(
let drop_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
let queue_sizes = queue_sizes.clone();
let clock = clock.clone();
tokio::task::spawn(async move {
shmem::listener_loop(server, daemon_tx, queue_sizes).await;
shmem::listener_loop(server, daemon_tx, queue_sizes, clock).await;
tracing::debug!("drop listener loop finished for `{drop_loop_node_id}`");
});
}
@@ -116,8 +123,9 @@ pub async fn spawn_listener_loop(
.wrap_err("failed to create events close server")?;
let drop_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
let clock = clock.clone();
tokio::task::spawn(async move {
shmem::listener_loop(server, daemon_tx, queue_sizes).await;
shmem::listener_loop(server, daemon_tx, queue_sizes, clock).await;
tracing::debug!(
"events close listener loop finished for `{drop_loop_node_id}`"
);
@@ -137,18 +145,20 @@ pub async fn spawn_listener_loop(
struct Listener {
dataflow_id: DataflowId,
node_id: NodeId,
daemon_tx: mpsc::Sender<Event>,
subscribed_events: Option<UnboundedReceiver<NodeEvent>>,
subscribed_drop_events: Option<UnboundedReceiver<NodeDropEvent>>,
queue: VecDeque<Box<Option<NodeEvent>>>,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
subscribed_events: Option<UnboundedReceiver<Timestamped<NodeEvent>>>,
subscribed_drop_events: Option<UnboundedReceiver<Timestamped<NodeDropEvent>>>,
queue: VecDeque<Box<Option<Timestamped<NodeEvent>>>>,
queue_sizes: BTreeMap<DataId, usize>,
clock: Arc<uhlc::HLC>,
}

impl Listener {
pub(crate) async fn run<C: Connection>(
mut connection: C,
daemon_tx: mpsc::Sender<Event>,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
queue_sizes: BTreeMap<DataId, usize>,
hlc: Arc<uhlc::HLC>,
) {
// receive the first message
let message = match connection
@@ -167,7 +177,11 @@ impl Listener {
}
};

match message {
if let Err(err) = hlc.update_with_timestamp(&message.timestamp) {
tracing::warn!("failed to update HLC: {err}");
}

match message.inner {
DaemonRequest::Register {
dataflow_id,
node_id,
@@ -196,6 +210,7 @@ impl Listener {
subscribed_drop_events: None,
queue_sizes,
queue: VecDeque::new(),
clock: hlc.clone(),
};
match listener
.run_inner(connection)
@@ -284,7 +299,7 @@ impl Listener {

// iterate over queued events, newest first
for event in self.queue.iter_mut().rev() {
let Some(NodeEvent::Input { id, data, .. }) = event.as_mut() else {
let Some(Timestamped { inner: NodeEvent::Input { id, data, .. }, ..}) = event.as_mut() else {
continue;
};
match queue_size_remaining.get_mut(id) {
@@ -314,10 +329,14 @@ impl Listener {
#[tracing::instrument(skip(self, connection), fields(%self.dataflow_id, %self.node_id), level = "trace")]
async fn handle_message<C: Connection>(
&mut self,
message: DaemonRequest,
message: Timestamped<DaemonRequest>,
connection: &mut C,
) -> eyre::Result<()> {
match message {
let timestamp = message.timestamp;
if let Err(err) = self.clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
match message.inner {
DaemonRequest::Register { .. } => {
let reply = DaemonReply::Result(Err("unexpected register message".into()));
self.send_reply(reply, connection)
@@ -464,6 +483,10 @@ impl Listener {
tokens: drop_tokens,
},
};
let event = Timestamped {
inner: event,
timestamp: self.clock.new_timestamp(),
};
self.daemon_tx
.send(event)
.await
@@ -484,6 +507,10 @@ impl Listener {
node_id: self.node_id.clone(),
event,
};
let event = Timestamped {
inner: event,
timestamp: self.clock.new_timestamp(),
};
self.daemon_tx
.send(event)
.await
@@ -515,7 +542,7 @@ impl Listener {
/// This is similar to `self.subscribed_events.recv()`. The difference is that the future
/// does not return `None` when the channel is closed and instead stays pending forever.
/// This behavior can be useful when waiting for multiple event sources at once.
fn next_event(&mut self) -> impl Future<Output = NodeEvent> + Unpin + '_ {
fn next_event(&mut self) -> impl Future<Output = Timestamped<NodeEvent>> + Unpin + '_ {
let poll = |cx: &mut task::Context<'_>| {
if let Some(events) = &mut self.subscribed_events {
match events.poll_recv(cx) {
@@ -532,6 +559,6 @@ impl Listener {

#[async_trait::async_trait]
trait Connection {
async fn receive_message(&mut self) -> eyre::Result<Option<DaemonRequest>>;
async fn receive_message(&mut self) -> eyre::Result<Option<Timestamped<DaemonRequest>>>;
async fn send_reply(&mut self, message: DaemonReply) -> eyre::Result<()>;
}

+ 10
- 8
binaries/daemon/src/node_communication/shmem.rs View File

@@ -1,20 +1,22 @@
use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};

use super::{Connection, Listener};
use crate::Event;
use dora_core::{
config::DataId,
daemon_messages::{DaemonReply, DaemonRequest},
daemon_messages::{DaemonReply, DaemonRequest, Timestamped},
message::uhlc::HLC,
};
use eyre::eyre;
use shared_memory_server::ShmemServer;
use tokio::sync::{mpsc, oneshot};

#[tracing::instrument(skip(server, daemon_tx), level = "trace")]
#[tracing::instrument(skip(server, daemon_tx, clock), level = "trace")]
pub async fn listener_loop(
mut server: ShmemServer<DaemonRequest, DaemonReply>,
daemon_tx: mpsc::Sender<Event>,
mut server: ShmemServer<Timestamped<DaemonRequest>, DaemonReply>,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
queue_sizes: BTreeMap<DataId, usize>,
clock: Arc<HLC>,
) {
let (tx, rx) = flume::bounded(0);
tokio::task::spawn_blocking(move || {
@@ -38,11 +40,11 @@ pub async fn listener_loop(
}
});
let connection = ShmemConnection(tx);
Listener::run(connection, daemon_tx, queue_sizes).await
Listener::run(connection, daemon_tx, queue_sizes, clock).await
}

enum Operation {
Receive(oneshot::Sender<eyre::Result<Option<DaemonRequest>>>),
Receive(oneshot::Sender<eyre::Result<Option<Timestamped<DaemonRequest>>>>),
Send {
message: DaemonReply,
result_sender: oneshot::Sender<eyre::Result<()>>,
@@ -53,7 +55,7 @@ struct ShmemConnection(flume::Sender<Operation>);

#[async_trait::async_trait]
impl Connection for ShmemConnection {
async fn receive_message(&mut self) -> eyre::Result<Option<DaemonRequest>> {
async fn receive_message(&mut self) -> eyre::Result<Option<Timestamped<DaemonRequest>>> {
let (tx, rx) = oneshot::channel();
self.0
.send_async(Operation::Receive(tx))


+ 12
- 8
binaries/daemon/src/node_communication/tcp.rs View File

@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, io::ErrorKind};
use std::{collections::BTreeMap, io::ErrorKind, sync::Arc};

use super::{Connection, Listener};
use crate::{
@@ -7,7 +7,8 @@ use crate::{
};
use dora_core::{
config::DataId,
daemon_messages::{DaemonReply, DaemonRequest},
daemon_messages::{DaemonReply, DaemonRequest, Timestamped},
message::uhlc::HLC,
};
use eyre::Context;
use tokio::{
@@ -15,11 +16,12 @@ use tokio::{
sync::mpsc,
};

#[tracing::instrument(skip(listener, daemon_tx), level = "trace")]
#[tracing::instrument(skip(listener, daemon_tx, clock), level = "trace")]
pub async fn listener_loop(
listener: TcpListener,
daemon_tx: mpsc::Sender<Event>,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
queue_sizes: BTreeMap<DataId, usize>,
clock: Arc<HLC>,
) {
loop {
match listener
@@ -35,30 +37,32 @@ pub async fn listener_loop(
connection,
daemon_tx.clone(),
queue_sizes.clone(),
clock.clone(),
));
}
}
}
}

#[tracing::instrument(skip(connection, daemon_tx), level = "trace")]
#[tracing::instrument(skip(connection, daemon_tx, clock), level = "trace")]
async fn handle_connection_loop(
connection: TcpStream,
daemon_tx: mpsc::Sender<Event>,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
queue_sizes: BTreeMap<DataId, usize>,
clock: Arc<HLC>,
) {
if let Err(err) = connection.set_nodelay(true) {
tracing::warn!("failed to set nodelay for connection: {err}");
}

Listener::run(TcpConnection(connection), daemon_tx, queue_sizes).await
Listener::run(TcpConnection(connection), daemon_tx, queue_sizes, clock).await
}

struct TcpConnection(TcpStream);

#[async_trait::async_trait]
impl Connection for TcpConnection {
async fn receive_message(&mut self) -> eyre::Result<Option<DaemonRequest>> {
async fn receive_message(&mut self) -> eyre::Result<Option<Timestamped<DaemonRequest>>> {
let raw = match tcp_receive(&mut self.0).await {
Ok(raw) => raw,
Err(err) => match err.kind() {


+ 20
- 9
binaries/daemon/src/pending.rs View File

@@ -3,7 +3,8 @@ use std::collections::{HashMap, HashSet};
use dora_core::{
config::NodeId,
coordinator_messages::{CoordinatorRequest, DaemonEvent},
daemon_messages::{DaemonReply, DataflowId},
daemon_messages::{DaemonReply, DataflowId, Timestamped},
message::uhlc::{Timestamp, HLC},
};
use eyre::{bail, Context};
use tokio::{net::TcpStream, sync::oneshot};
@@ -59,23 +60,27 @@ impl PendingNodes {
node_id: NodeId,
reply_sender: oneshot::Sender<DaemonReply>,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
) -> eyre::Result<DataflowStatus> {
self.waiting_subscribers
.insert(node_id.clone(), reply_sender);
self.local_nodes.remove(&node_id);

self.update_dataflow_status(coordinator_connection).await
self.update_dataflow_status(coordinator_connection, clock)
.await
}

pub async fn handle_node_stop(
&mut self,
node_id: &NodeId,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
) -> eyre::Result<()> {
if self.local_nodes.remove(node_id) {
tracing::warn!("node `{node_id}` exited before initializing dora connection");
self.exited_before_subscribe.insert(node_id.clone());
self.update_dataflow_status(coordinator_connection).await?;
self.update_dataflow_status(coordinator_connection, clock)
.await?;
}
Ok(())
}
@@ -97,11 +102,13 @@ impl PendingNodes {
async fn update_dataflow_status(
&mut self,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
) -> eyre::Result<DataflowStatus> {
if self.local_nodes.is_empty() {
if self.external_nodes {
if !self.reported_init_to_coordinator {
self.report_nodes_ready(coordinator_connection).await?;
self.report_nodes_ready(coordinator_connection, clock.new_timestamp())
.await?;
self.reported_init_to_coordinator = true;
}
Ok(DataflowStatus::Pending)
@@ -139,6 +146,7 @@ impl PendingNodes {
async fn report_nodes_ready(
&self,
coordinator_connection: &mut Option<TcpStream>,
timestamp: Timestamp,
) -> eyre::Result<()> {
let Some(connection) = coordinator_connection else {
bail!("no coordinator connection to send AllNodesReady");
@@ -147,12 +155,15 @@ impl PendingNodes {
let success = self.exited_before_subscribe.is_empty();
tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes");

let msg = serde_json::to_vec(&CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesReady {
dataflow_id: self.dataflow_id,
success,
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesReady {
dataflow_id: self.dataflow_id,
success,
},
},
timestamp,
})?;
tcp_send(connection, &msg)
.await


+ 13
- 3
binaries/daemon/src/spawn.rs View File

@@ -4,10 +4,11 @@ use crate::{
};
use dora_core::{
config::NodeRunConfig,
daemon_messages::{DataflowId, NodeConfig, RuntimeConfig},
daemon_messages::{DataflowId, NodeConfig, RuntimeConfig, Timestamped},
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE,
},
message::uhlc::HLC,
};
use dora_download::download_file;
use eyre::WrapErr;
@@ -15,6 +16,7 @@ use std::{
env::{consts::EXE_EXTENSION, temp_dir},
path::Path,
process::Stdio,
sync::Arc,
};
use tokio::{
fs::File,
@@ -23,12 +25,14 @@ use tokio::{
};
use tracing::{debug, error};

/// clock is required for generating timestamps when dropping messages early because queue is full
pub async fn spawn_node(
dataflow_id: DataflowId,
working_dir: &Path,
node: ResolvedNode,
daemon_tx: mpsc::Sender<Event>,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
dataflow_descriptor: Descriptor,
clock: Arc<HLC>,
) -> eyre::Result<()> {
let node_id = node.id.clone();
tracing::debug!("Spawning node `{dataflow_id}/{node_id}`");
@@ -43,6 +47,7 @@ pub async fn spawn_node(
&daemon_tx,
dataflow_descriptor.communication.local,
queue_sizes,
clock.clone(),
)
.await?;

@@ -261,8 +266,13 @@ pub async fn spawn_node(
dataflow_id,
node_id,
exit_status,
}
.into();
let event = Timestamped {
inner: event,
timestamp: clock.new_timestamp(),
};
let _ = daemon_tx.send(event.into()).await;
let _ = daemon_tx.send(event).await;
});

// Log to file stream.


+ 2
- 3
examples/c-dataflow/sink.c View File

@@ -12,9 +12,8 @@ int main()
{
fprintf(stderr, "failed to init dora context\n");
return -1;

printf("[c sink] dora context initialized\n");
}
printf("[c sink] dora context initialized\n");

while (1)
{
@@ -39,7 +38,7 @@ int main()

printf("[c sink] received input `");
fwrite(id, id_len, 1, stdout);
printf("` with data: %s\n", data);
printf("` with data: %.*s\n", (int)data_len, data);
}
else if (ty == DoraEventType_InputClosed)
{


+ 9
- 3
libraries/core/src/daemon_messages.rs View File

@@ -9,7 +9,7 @@ use crate::{
config::{DataId, NodeId, NodeRunConfig, OperatorId},
descriptor::{Descriptor, OperatorDefinition, ResolvedNode},
};
use dora_message::Metadata;
use dora_message::{uhlc, Metadata};
use uuid::Uuid;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
@@ -132,11 +132,17 @@ type SharedMemoryId = String;
pub enum DaemonReply {
Result(Result<(), String>),
PreparedMessage { shared_memory_id: SharedMemoryId },
NextEvents(Vec<NodeEvent>),
NextDropEvents(Vec<NodeDropEvent>),
NextEvents(Vec<Timestamped<NodeEvent>>),
NextDropEvents(Vec<Timestamped<NodeDropEvent>>),
Empty,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Timestamped<T> {
pub inner: T,
pub timestamp: uhlc::Timestamp,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum NodeEvent {
Stop,


Loading…
Cancel
Save