|
|
|
@@ -655,6 +655,7 @@ impl Daemon { |
|
|
|
}; |
|
|
|
|
|
|
|
let mut log_messages = Vec::new(); |
|
|
|
let mut stopped = Vec::new(); |
|
|
|
for node in nodes.into_values() { |
|
|
|
let local = spawn_nodes.contains(&node.id); |
|
|
|
|
|
|
|
@@ -727,16 +728,7 @@ impl Daemon { |
|
|
|
line: None, |
|
|
|
message: format!("{err:?}"), |
|
|
|
}); |
|
|
|
let messages = dataflow |
|
|
|
.pending_nodes |
|
|
|
.handle_node_stop( |
|
|
|
&node_id, |
|
|
|
&mut self.coordinator_connection, |
|
|
|
&self.clock, |
|
|
|
&mut dataflow.cascading_error_causes, |
|
|
|
) |
|
|
|
.await?; |
|
|
|
log_messages.extend(messages); |
|
|
|
stopped.push(node_id.clone()); |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
@@ -792,6 +784,9 @@ impl Daemon { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
for node_id in stopped { |
|
|
|
self.handle_node_stop(dataflow_id, &node_id).await?; |
|
|
|
} |
|
|
|
|
|
|
|
for log_message in log_messages { |
|
|
|
self.send_log_message(log_message).await?; |
|
|
|
|