|
|
|
@@ -269,16 +269,22 @@ async fn start_inner( |
|
|
|
DataflowEvent::DataflowFinishedOnMachine { machine_id, result } => { |
|
|
|
match running_dataflows.entry(uuid) { |
|
|
|
std::collections::hash_map::Entry::Occupied(mut entry) => { |
|
|
|
// Archive finished dataflow |
|
|
|
archived_dataflows |
|
|
|
.entry(uuid) |
|
|
|
.or_insert_with(|| ArchivedDataflow::from(entry.get())); |
|
|
|
entry.get_mut().machines.remove(&machine_id); |
|
|
|
let dataflow = entry.get_mut(); |
|
|
|
dataflow.machines.remove(&machine_id); |
|
|
|
tracing::info!( |
|
|
|
"removed machine id: {machine_id} from dataflow: {:#?}", |
|
|
|
dataflow.uuid |
|
|
|
); |
|
|
|
dataflow_results |
|
|
|
.entry(uuid) |
|
|
|
.or_default() |
|
|
|
.insert(machine_id, result); |
|
|
|
if entry.get_mut().machines.is_empty() { |
|
|
|
|
|
|
|
if dataflow.machines.is_empty() { |
|
|
|
// Archive finished dataflow |
|
|
|
archived_dataflows |
|
|
|
.entry(uuid) |
|
|
|
.or_insert_with(|| ArchivedDataflow::from(entry.get())); |
|
|
|
let finished_dataflow = entry.remove(); |
|
|
|
let reply = ControlRequestReply::DataflowStopped { |
|
|
|
uuid, |
|
|
|
|