@@ -17,8 +17,8 @@ use eyre::{bail, eyre, Context, ContextCompat};
use futures::{future, stream, FutureExt, TryFutureExt};
use futures_concurrency::stream::Merge;
use inter_daemon::InterDaemonConnection;
use pending::PendingNodes;
use shared_memory_server::ShmemConf;
use std::collections::HashSet;
use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet, HashMap},
@@ -37,6 +37,7 @@ use uuid::Uuid;
mod coordinator;
mod inter_daemon;
mod node_communication;
mod pending;
mod spawn;
mod tcp_utils;
@@ -45,6 +46,8 @@ use dora_tracing::telemetry::serialize_context;
#[cfg(feature = "telemetry")]
use tracing_opentelemetry::OpenTelemetrySpanExt;
use crate::pending::DataflowStatus;
pub struct Daemon {
running: HashMap<DataflowId, RunningDataflow>,
@@ -289,11 +292,20 @@ impl Daemon {
DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}")));
(Some(reply), RunStatus::Continue)
}
DaemonCoordinatorEvent::AllNodesReady { dataflow_id } => {
DaemonCoordinatorEvent::AllNodesReady {
dataflow_id,
success,
} => {
match self.running.get_mut(&dataflow_id) {
Some(dataflow) => {
tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`");
dataflow.start(&self.events_tx).await?;
dataflow
.pending_nodes
.handle_external_all_nodes_ready(success)
.await;
if success {
tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`");
dataflow.start(&self.events_tx).await?;
}
}
None => {
tracing::warn!(
@@ -463,7 +475,7 @@ impl Daemon {
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?;
dataflow.running_nodes.insert(node_id);
} else {
dataflow.external_nodes.insert(node.id.clone(), nod e);
dataflow.pending_nodes.set_external_nodes(tru e);
}
}
@@ -481,39 +493,44 @@ impl Daemon {
event_sender,
reply_sender,
} => {
let result = self
.subscribe(dataflow_id, node_id.clone(), event_sender)
.await;
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
})?;
tracing::debug!("node `{node_id}` is ready");
let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
});
dataflow
.subscribe_replies
.insert(node_id.clone(), (reply_sender, result));
dataflow.pending_nodes.remove(&node_id);
if dataflow.pending_nodes.is_empty() {
if dataflow.external_nodes.is_empty() {
tracing::info!("all nodes are ready, starting dataflow `{dataflow_id}`");
dataflow.start(&self.events_tx).await?;
} else {
tracing::info!(
"all local nodes are ready, waiting for remote nodes \
for dataflow `{dataflow_id}`"
);
match dataflow {
Err(err) => {
let _ = reply_sender.send(DaemonReply::Result(Err(err)));
}
Ok(dataflow) => {
tracing::debug!("node `{node_id}` is ready");
Self::subscribe(dataflow, node_id.clone(), event_sender).await;
// dataflow is split across multiple daemons -> synchronize with dora-coordinator
let Some(connection) = &mut self.coordinator_connection else {
bail!("no coordinator connection to send AllNodesReady");
};
let msg = serde_json::to_vec(&CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesReady { dataflow_id },
})?;
tcp_send(connection, &msg)
.await
.wrap_err("failed to send AllNodesReady message to dora-coordinator")?;
let status = dataflow
.pending_nodes
.handle_node_subscription(node_id.clone(), reply_sender)
.await?;
match status {
DataflowStatus::AllNodesReady => {
tracing::info!(
"all nodes are ready, starting dataflow `{dataflow_id}`"
);
dataflow.start(&self.events_tx).await?;
}
DataflowStatus::LocalNodesPending => {}
DataflowStatus::LocalNodesReady { success } => {
tracing::info!(
"all local nodes are ready, waiting for remote nodes"
);
Self::report_nodes_ready(
&mut self.coordinator_connection,
self.machine_id.clone(),
dataflow_id,
success,
)
.await?;
}
}
}
}
}
@@ -551,8 +568,17 @@ impl Daemon {
let _ = reply_sender.send(DaemonReply::Result(reply));
}
DaemonNodeEvent::OutputsDone { reply_sender } => {
let _ = reply_sender.send(DaemonReply::Result(Ok(())));
self.handle_outputs_done(dataflow_id, &node_id).await?;
let result = match self.running.get_mut(&dataflow_id) {
Some(dataflow) => {
Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, &node_id)
.await
},
None => Err(eyre!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")),
};
let _ = reply_sender.send(DaemonReply::Result(
result.map_err(|err| format!("{err:?}")),
));
}
DaemonNodeEvent::SendOut {
output_id,
@@ -602,6 +628,28 @@ impl Daemon {
Ok(())
}
async fn report_nodes_ready(
coordinator_connection: &mut Option<TcpStream>,
machine_id: String,
dataflow_id: Uuid,
success: bool,
) -> Result<(), eyre::ErrReport> {
let Some(connection) = coordinator_connection else {
bail!("no coordinator connection to send AllNodesReady");
};
let msg = serde_json::to_vec(&CoordinatorRequest::Event {
machine_id: machine_id.clone(),
event: DaemonEvent::AllNodesReady {
dataflow_id,
success,
},
})?;
tcp_send(connection, &msg)
.await
.wrap_err("failed to send AllNodesReady message to dora-coordinator")?;
Ok(())
}
async fn send_reload(
&mut self,
dataflow_id: Uuid,
@@ -670,15 +718,10 @@ impl Daemon {
}
async fn subscribe(
&mut self,
dataflow_id: Uuid,
dataflow: &mut RunningDataflow,
node_id: NodeId,
event_sender: UnboundedSender<daemon_messages::NodeEvent>,
) -> Result<(), String> {
let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
})?;
) {
// some inputs might have been closed already -> report those events
let closed_inputs = dataflow
.mappings
@@ -709,22 +752,17 @@ impl Daemon {
}
dataflow.subscribe_channels.insert(node_id, event_sender);
Ok(())
}
#[tracing::instrument(skip(self), level = "trace")]
#[tracing::instrument(skip(dataflow, inter_daemon_connection s), fi elds(uuid = %data flow.id ), level = "trace")]
async fn handle_outputs_done(
&mut self ,
dataflow_id: Uuid ,
dataflow: &mut RunningDataflow ,
inter_daemon_connections: &mut BTreeMap<String, InterDaemonConnection> ,
node_id: &NodeId,
) -> eyre::Result<()> {
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
})?;
send_input_closed_events(
dataflow,
&mut self. inter_daemon_connections,
inter_daemon_connections,
|OutputId(source_id, _)| source_id == node_id,
)
.await?;
@@ -732,13 +770,28 @@ impl Daemon {
Ok(())
}
#[tracing::instrument(skip(self), level = "trace")]
async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> {
self.handle_outputs_done(dataflow_id, node_id).await?;
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
})?;
tracing::warn!("node `{node_id}` exited before initializing dora connection");
match dataflow.pending_nodes.handle_node_stop(node_id).await {
DataflowStatus::AllNodesReady => {}
DataflowStatus::LocalNodesPending => {}
DataflowStatus::LocalNodesReady { success } => {
Self::report_nodes_ready(
&mut self.coordinator_connection,
self.machine_id.clone(),
dataflow_id,
success,
)
.await?;
}
}
Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, node_id).await?;
dataflow.running_nodes.remove(node_id);
if dataflow.running_nodes.is_empty() {
tracing::info!(
@@ -1050,11 +1103,7 @@ fn close_input(dataflow: &mut RunningDataflow, receiver_id: &NodeId, input_id: &
pub struct RunningDataflow {
id: Uuid,
/// Local nodes that are not started yet
pending_nodes: HashSet<NodeId>,
/// Used to synchronize node starts.
///
/// Subscribe requests block the node until all other nodes are ready too.
subscribe_replies: HashMap<NodeId, (oneshot::Sender<DaemonReply>, Result<(), String>)>,
pending_nodes: PendingNodes,
subscribe_channels: HashMap<NodeId, UnboundedSender<daemon_messages::NodeEvent>>,
drop_channels: HashMap<NodeId, UnboundedSender<daemon_messages::NodeDropEvent>>,
@@ -1063,7 +1112,6 @@ pub struct RunningDataflow {
open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
running_nodes: BTreeSet<NodeId>,
external_nodes: BTreeMap<NodeId, ResolvedNode>,
open_external_mappings: HashMap<OutputId, BTreeMap<String, BTreeSet<InputId>>>,
pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
@@ -1082,15 +1130,13 @@ impl RunningDataflow {
fn new(id: Uuid) -> RunningDataflow {
Self {
id,
pending_nodes: HashSet::new(),
subscribe_replies: HashMap::new(),
pending_nodes: PendingNodes::default(),
subscribe_channels: HashMap::new(),
drop_channels: HashMap::new(),
mappings: HashMap::new(),
timers: BTreeMap::new(),
open_inputs: BTreeMap::new(),
running_nodes: BTreeSet::new(),
external_nodes: BTreeMap::new(),
open_external_mappings: HashMap::new(),
pending_drop_tokens: HashMap::new(),
_timer_handles: Vec::new(),
@@ -1100,12 +1146,6 @@ impl RunningDataflow {
}
async fn start(&mut self, events_tx: &mpsc::Sender<Event>) -> eyre::Result<()> {
// answer all subscribe requests
let subscribe_replies = std::mem::take(&mut self.subscribe_replies);
for (reply_sender, subscribe_result) in subscribe_replies.into_values() {
let _ = reply_sender.send(DaemonReply::Result(subscribe_result));
}
for interval in self.timers.keys().copied() {
let events_tx = events_tx.clone();
let dataflow_id = self.id;