diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index aaeee0e9..0af6b283 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -269,16 +269,22 @@ async fn start_inner( DataflowEvent::DataflowFinishedOnMachine { machine_id, result } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { - // Archive finished dataflow - archived_dataflows - .entry(uuid) - .or_insert_with(|| ArchivedDataflow::from(entry.get())); - entry.get_mut().machines.remove(&machine_id); + let dataflow = entry.get_mut(); + dataflow.machines.remove(&machine_id); + tracing::info!( + "removed machine id: {machine_id} from dataflow: {:#?}", + dataflow.uuid + ); dataflow_results .entry(uuid) .or_default() .insert(machine_id, result); - if entry.get_mut().machines.is_empty() { + + if dataflow.machines.is_empty() { + // Archive finished dataflow + archived_dataflows + .entry(uuid) + .or_insert_with(|| ArchivedDataflow::from(entry.get())); let finished_dataflow = entry.remove(); let reply = ControlRequestReply::DataflowStopped { uuid,