Browse Source

Fix: Pass `cascading_errors` as arg to ensure that it is always applied

Before this commit, there were some cases where the returned `DataflowStatus` was ignored and the reported `cascading_errors` were never applied.
tags/v0.3.5-rc0
Philipp Oppermann 1 year ago
parent
commit
4e69495012
Failed to extract signature
2 changed files with 30 additions and 14 deletions
  1. +13
    -4
      binaries/daemon/src/lib.rs
  2. +17
    -10
      binaries/daemon/src/pending.rs

+ 13
- 4
binaries/daemon/src/lib.rs View File

@@ -379,7 +379,10 @@ impl Daemon {
Some(dataflow) => {
dataflow
.pending_nodes
.handle_external_all_nodes_ready(success)
.handle_external_all_nodes_ready(
success,
&mut dataflow.cascading_errors,
)
.await?;
if success {
tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`");
@@ -633,6 +636,7 @@ impl Daemon {
&node_id,
&mut self.coordinator_connection,
&self.clock,
&mut dataflow.cascading_errors,
)
.await?;
}
@@ -739,11 +743,11 @@ impl Daemon {
reply_sender,
&mut self.coordinator_connection,
&self.clock,
&mut dataflow.cascading_errors,
)
.await?;
match status {
DataflowStatus::AllNodesReady { cascading_errors } => {
dataflow.cascading_errors.extend(cascading_errors);
DataflowStatus::AllNodesReady => {
tracing::info!(
"all nodes are ready, starting dataflow `{dataflow_id}`"
);
@@ -994,7 +998,12 @@ impl Daemon {

dataflow
.pending_nodes
.handle_node_stop(node_id, &mut self.coordinator_connection, &self.clock)
.handle_node_stop(
node_id,
&mut self.coordinator_connection,
&self.clock,
&mut dataflow.cascading_errors,
)
.await?;

Self::handle_outputs_done(


+ 17
- 10
binaries/daemon/src/pending.rs View File

@@ -61,12 +61,13 @@ impl PendingNodes {
reply_sender: oneshot::Sender<DaemonReply>,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
cascading_errors: &mut BTreeSet<NodeId>,
) -> eyre::Result<DataflowStatus> {
self.waiting_subscribers
.insert(node_id.clone(), reply_sender);
self.local_nodes.remove(&node_id);

self.update_dataflow_status(coordinator_connection, clock)
self.update_dataflow_status(coordinator_connection, clock, cascading_errors)
.await
}

@@ -75,17 +76,22 @@ impl PendingNodes {
node_id: &NodeId,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
cascading_errors: &mut BTreeSet<NodeId>,
) -> eyre::Result<()> {
if self.local_nodes.remove(node_id) {
tracing::warn!("node `{node_id}` exited before initializing dora connection");
self.exited_before_subscribe.insert(node_id.clone());
self.update_dataflow_status(coordinator_connection, clock)
self.update_dataflow_status(coordinator_connection, clock, cascading_errors)
.await?;
}
Ok(())
}

pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> {
pub async fn handle_external_all_nodes_ready(
&mut self,
success: bool,
cascading_errors: &mut BTreeSet<NodeId>,
) -> eyre::Result<()> {
if !self.local_nodes.is_empty() {
bail!("received external `all_nodes_ready` event before local nodes were ready");
}
@@ -94,7 +100,8 @@ impl PendingNodes {
} else {
Some("some nodes failed to initialize on remote machines".to_string())
};
self.answer_subscribe_requests(external_error).await;
self.answer_subscribe_requests(external_error, cascading_errors)
.await;

Ok(())
}
@@ -103,6 +110,7 @@ impl PendingNodes {
&mut self,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
cascading_errors: &mut BTreeSet<NodeId>,
) -> eyre::Result<DataflowStatus> {
if self.local_nodes.is_empty() {
if self.external_nodes {
@@ -113,8 +121,8 @@ impl PendingNodes {
}
Ok(DataflowStatus::Pending)
} else {
let cascading_errors = self.answer_subscribe_requests(None).await;
Ok(DataflowStatus::AllNodesReady { cascading_errors })
self.answer_subscribe_requests(None, cascading_errors).await;
Ok(DataflowStatus::AllNodesReady)
}
} else {
Ok(DataflowStatus::Pending)
@@ -124,7 +132,8 @@ impl PendingNodes {
async fn answer_subscribe_requests(
&mut self,
external_error: Option<String>,
) -> BTreeSet<NodeId> {
cascading_errors: &mut BTreeSet<NodeId>,
) {
let result = if self.exited_before_subscribe.is_empty() {
match external_error {
Some(err) => Err(err),
@@ -150,14 +159,12 @@ impl PendingNodes {
};
// answer all subscribe requests
let subscribe_replies = std::mem::take(&mut self.waiting_subscribers);
let mut cascading_errors = BTreeSet::new();
for (node_id, reply_sender) in subscribe_replies.into_iter() {
if result.is_err() {
cascading_errors.insert(node_id);
}
let _ = reply_sender.send(DaemonReply::Result(result.clone()));
}
cascading_errors
}

async fn report_nodes_ready(
@@ -190,6 +197,6 @@ impl PendingNodes {
}

pub enum DataflowStatus {
AllNodesReady { cascading_errors: BTreeSet<NodeId> },
AllNodesReady,
Pending,
}

Loading…
Cancel
Save