@@ -106,6 +106,12 @@ pub struct Daemon {
type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
struct NodePrepareTask<F> {
node_id: NodeId,
dynamic_node: bool,
task: F,
}
impl Daemon {
pub async fn run(
coordinator_addr: SocketAddr,
@@ -393,6 +399,7 @@ impl Daemon {
Event::SpawnNodeResult {
dataflow_id,
node_id,
dynamic_node,
result,
} => match result {
Ok(running_node) => {
@@ -407,7 +414,8 @@ impl Daemon {
.entry(dataflow_id)
.or_default()
.insert(node_id.clone(), Err(error));
self.handle_node_stop(dataflow_id, &node_id).await?;
self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
.await?;
}
},
Event::SpawnDataflowResult {
@@ -845,7 +853,8 @@ impl Daemon {
let mut logger = logger.reborrow().for_node(node.id.clone());
let local = spawn_nodes.contains(&node.id);
if local {
if node.kind.dynamic() {
let dynamic_node = node.kind.dynamic();
if dynamic_node {
dataflow.dynamic_nodes.insert(node.id.clone());
} else {
dataflow.pending_nodes.insert(node.id.clone());
@@ -872,7 +881,11 @@ impl Daemon {
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
{
Ok(result) => {
tasks.push((node_id, result));
tasks.push(NodePrepareTask {
node_id,
task: result,
dynamic_node,
});
}
Err(err) => {
logger
@@ -889,7 +902,7 @@ impl Daemon {
exit_status: NodeExitStatus::Unknown,
}),
);
stopped.push(node_id.clone());
stopped.push(( node_id.clone(), dynamic_node ));
}
}
} else {
@@ -947,8 +960,9 @@ impl Daemon {
}
}
}
for node_id in stopped {
self.handle_node_stop(dataflow_id, &node_id).await?;
for (node_id, dynamic) in stopped {
self.handle_node_stop(dataflow_id, &node_id, dynamic)
.await?;
}
let spawn_result = Self::spawn_prepared_nodes(
@@ -965,24 +979,27 @@ impl Daemon {
async fn spawn_prepared_nodes(
dataflow_id: Uuid,
mut logger: DataflowLogger<'_>,
tasks: Vec<(
NodeId,
impl Future<Output = std::result::Result<spawn::PreparedNode, eyre::Error>>,
)>,
tasks: Vec<NodePrepareTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
events_tx: mpsc::Sender<Timestamped<Event>>,
clock: Arc<HLC>,
) -> eyre::Result<()> {
let node_result = |node_id, result| Timestamped {
let node_result = |node_id, dynamic_node, result| Timestamped {
inner: Event::SpawnNodeResult {
dataflow_id,
node_id,
dynamic_node,
result,
},
timestamp: clock.new_timestamp(),
};
let mut failed_to_prepare = None;
let mut prepared_nodes = Vec::new();
for (node_id, task) in tasks {
for task in tasks {
let NodePrepareTask {
node_id,
dynamic_node,
task,
} = task;
match task.await {
Ok(node) => prepared_nodes.push(node),
Err(err) => {
@@ -996,7 +1013,9 @@ impl Daemon {
)),
exit_status: NodeExitStatus::Unknown,
};
let send_result = events_tx.send(node_result(node_id, Err(node_err))).await;
let send_result = events_tx
.send(node_result(node_id, dynamic_node, Err(node_err)))
.await;
if send_result.is_err() {
tracing::error!("failed to send SpawnNodeResult to main daemon task")
}
@@ -1016,7 +1035,11 @@ impl Daemon {
exit_status: NodeExitStatus::Unknown,
};
let send_result = events_tx
.send(node_result(node.node_id().clone(), Err(err)))
.send(node_result(
node.node_id().clone(),
node.dynamic(),
Err(err),
))
.await;
if send_result.is_err() {
tracing::error!("failed to send SpawnNodeResult to main daemon task")
@@ -1038,6 +1061,7 @@ impl Daemon {
// spawn the nodes
for node in prepared_nodes {
let node_id = node.node_id().clone();
let dynamic_node = node.dynamic();
let mut logger = logger.reborrow().for_node(node_id.clone());
let result = node.spawn(&mut logger).await;
let node_spawn_result = match result {
@@ -1055,7 +1079,7 @@ impl Daemon {
}
};
let send_result = events_tx
.send(node_result(node_id, node_spawn_result))
.send(node_result(node_id, dynamic_node, node_spawn_result))
.await;
if send_result.is_err() {
tracing::error!("failed to send SpawnNodeResult to main daemon task")
@@ -1486,11 +1510,27 @@ impl Daemon {
Ok(())
}
async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> {
async fn handle_node_stop(
&mut self,
dataflow_id: Uuid,
node_id: &NodeId,
dynamic_node: bool,
) -> eyre::Result<()> {
let mut logger = self.logger.for_dataflow(dataflow_id);
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
})?;
let dataflow = match self.running.get_mut(&dataflow_id) {
Some(dataflow) => dataflow,
None if dynamic_node => {
// The dataflow might be done already as we don't wait for dynamic nodes. In this
// case, we don't need to do anything to handle the node stop.
tracing::debug!(
"dynamic node {dataflow_id}/{node_id} stopped after dataflow was done"
);
return Ok(());
}
None => eyre::bail!(
"failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"
),
};
dataflow
.pending_nodes
@@ -1652,6 +1692,7 @@ impl Daemon {
DoraEvent::SpawnedNodeResult {
dataflow_id,
node_id,
dynamic_node,
exit_status,
} => {
let mut logger = self
@@ -1739,7 +1780,8 @@ impl Daemon {
.or_default()
.insert(node_id.clone(), node_result);
self.handle_node_stop(dataflow_id, &node_id).await?;
self.handle_node_stop(dataflow_id, &node_id, dynamic_node)
.await?;
if let Some(exit_when_done) = &mut self.exit_when_done {
exit_when_done.remove(&(dataflow_id, node_id));
@@ -2227,6 +2269,7 @@ pub enum Event {
SpawnNodeResult {
dataflow_id: DataflowId,
node_id: NodeId,
dynamic_node: bool,
result: Result<RunningNode, NodeError>,
},
SpawnDataflowResult {
@@ -2306,6 +2349,7 @@ pub enum DoraEvent {
SpawnedNodeResult {
dataflow_id: DataflowId,
node_id: NodeId,
dynamic_node: bool,
exit_status: NodeExitStatus,
},
}