| @@ -390,34 +390,66 @@ async fn start_inner( | |||
| dataflow_uuid, | |||
| grace_duration, | |||
| } => { | |||
| stop_dataflow_by_uuid( | |||
| if let Some(result) = dataflow_results.get(&dataflow_uuid) { | |||
| let reply = ControlRequestReply::DataflowStopped { | |||
| uuid: dataflow_uuid, | |||
| result: dataflow_result(result, dataflow_uuid, &clock), | |||
| }; | |||
| let _ = reply_sender.send(Ok(reply)); | |||
| continue; | |||
| } | |||
| let dataflow = stop_dataflow( | |||
| &mut running_dataflows, | |||
| &dataflow_results, | |||
| dataflow_uuid, | |||
| &mut daemon_connections, | |||
| reply_sender, | |||
| clock.new_timestamp(), | |||
| grace_duration, | |||
| &clock, | |||
| ) | |||
| .await?; | |||
| .await; | |||
| match dataflow { | |||
| Ok(dataflow) => { | |||
| dataflow.reply_senders.push(reply_sender); | |||
| } | |||
| Err(err) => { | |||
| let _ = reply_sender.send(Err(err)); | |||
| } | |||
| } | |||
| } | |||
| ControlRequest::StopByName { | |||
| name, | |||
| grace_duration, | |||
| } => match resolve_name(name, &running_dataflows, &archived_dataflows) { | |||
| Ok(uuid) => { | |||
| stop_dataflow_by_uuid( | |||
| Ok(dataflow_uuid) => { | |||
| if let Some(result) = dataflow_results.get(&dataflow_uuid) { | |||
| let reply = ControlRequestReply::DataflowStopped { | |||
| uuid: dataflow_uuid, | |||
| result: dataflow_result(result, dataflow_uuid, &clock), | |||
| }; | |||
| let _ = reply_sender.send(Ok(reply)); | |||
| continue; | |||
| } | |||
| let dataflow = stop_dataflow( | |||
| &mut running_dataflows, | |||
| &dataflow_results, | |||
| uuid, | |||
| dataflow_uuid, | |||
| &mut daemon_connections, | |||
| reply_sender, | |||
| clock.new_timestamp(), | |||
| grace_duration, | |||
| &clock, | |||
| ) | |||
| .await? | |||
| .await; | |||
| match dataflow { | |||
| Ok(dataflow) => { | |||
| dataflow.reply_senders.push(reply_sender); | |||
| } | |||
| Err(err) => { | |||
| let _ = reply_sender.send(Err(err)); | |||
| } | |||
| } | |||
| } | |||
| Err(err) => { | |||
| let _ = reply_sender.send(Err(err)); | |||
| @@ -425,30 +457,37 @@ async fn start_inner( | |||
| }, | |||
| ControlRequest::Logs { uuid, name, node } => { | |||
| let dataflow_uuid = if let Some(uuid) = uuid { | |||
| uuid | |||
| Ok(uuid) | |||
| } else if let Some(name) = name { | |||
| resolve_name(name, &running_dataflows, &archived_dataflows)? | |||
| resolve_name(name, &running_dataflows, &archived_dataflows) | |||
| } else { | |||
| bail!("No uuid") | |||
| Err(eyre!("No uuid")) | |||
| }; | |||
| let reply = retrieve_logs( | |||
| &running_dataflows, | |||
| &archived_dataflows, | |||
| dataflow_uuid, | |||
| node.into(), | |||
| &mut daemon_connections, | |||
| clock.new_timestamp(), | |||
| ) | |||
| .await | |||
| .map(ControlRequestReply::Logs); | |||
| let _ = reply_sender.send(reply); | |||
| match dataflow_uuid { | |||
| Ok(uuid) => { | |||
| let reply = retrieve_logs( | |||
| &running_dataflows, | |||
| &archived_dataflows, | |||
| uuid, | |||
| node.into(), | |||
| &mut daemon_connections, | |||
| clock.new_timestamp(), | |||
| ) | |||
| .await | |||
| .map(ControlRequestReply::Logs); | |||
| let _ = reply_sender.send(reply); | |||
| } | |||
| Err(err) => { | |||
| let _ = reply_sender.send(Err(err)); | |||
| } | |||
| } | |||
| } | |||
| ControlRequest::Destroy => { | |||
| tracing::info!("Received destroy command"); | |||
| let reply = handle_destroy( | |||
| &running_dataflows, | |||
| &mut running_dataflows, | |||
| &mut daemon_connections, | |||
| &abort_handle, | |||
| &mut daemon_events_tx, | |||
| @@ -556,7 +595,7 @@ async fn start_inner( | |||
| Event::CtrlC => { | |||
| tracing::info!("Destroying coordinator after receiving Ctrl-C signal"); | |||
| handle_destroy( | |||
| &running_dataflows, | |||
| &mut running_dataflows, | |||
| &mut daemon_connections, | |||
| &abort_handle, | |||
| &mut daemon_events_tx, | |||
| @@ -592,50 +631,6 @@ async fn start_inner( | |||
| Ok(()) | |||
| } | |||
| #[allow(clippy::too_many_arguments)] | |||
| async fn stop_dataflow_by_uuid( | |||
| running_dataflows: &mut HashMap<Uuid, RunningDataflow>, | |||
| dataflow_results: &HashMap<Uuid, BTreeMap<String, DataflowDaemonResult>>, | |||
| dataflow_uuid: Uuid, | |||
| daemon_connections: &mut HashMap<String, DaemonConnection>, | |||
| reply_sender: tokio::sync::oneshot::Sender<Result<ControlRequestReply, eyre::ErrReport>>, | |||
| timestamp: uhlc::Timestamp, | |||
| grace_duration: Option<Duration>, | |||
| clock: &uhlc::HLC, | |||
| ) -> Result<(), eyre::ErrReport> { | |||
| let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else { | |||
| if let Some(result) = dataflow_results.get(&dataflow_uuid) { | |||
| let reply = ControlRequestReply::DataflowStopped { | |||
| uuid: dataflow_uuid, | |||
| result: dataflow_result(result, dataflow_uuid, clock), | |||
| }; | |||
| let _ = reply_sender.send(Ok(reply)); | |||
| return Ok(()); | |||
| } | |||
| bail!("no known dataflow found with UUID `{dataflow_uuid}`") | |||
| }; | |||
| let stop = async { | |||
| stop_dataflow( | |||
| dataflow, | |||
| dataflow_uuid, | |||
| daemon_connections, | |||
| timestamp, | |||
| grace_duration, | |||
| ) | |||
| .await?; | |||
| Result::<_, eyre::Report>::Ok(()) | |||
| }; | |||
| match stop.await { | |||
| Ok(()) => { | |||
| dataflow.reply_senders.push(reply_sender); | |||
| } | |||
| Err(err) => { | |||
| let _ = reply_sender.send(Err(err)); | |||
| } | |||
| }; | |||
| Ok(()) | |||
| } | |||
| fn dataflow_result( | |||
| results: &BTreeMap<String, DataflowDaemonResult>, | |||
| dataflow_uuid: Uuid, | |||
| @@ -663,17 +658,17 @@ struct DaemonConnection { | |||
| } | |||
| async fn handle_destroy( | |||
| running_dataflows: &HashMap<Uuid, RunningDataflow>, | |||
| running_dataflows: &mut HashMap<Uuid, RunningDataflow>, | |||
| daemon_connections: &mut HashMap<String, DaemonConnection>, | |||
| abortable_events: &futures::stream::AbortHandle, | |||
| daemon_events_tx: &mut Option<mpsc::Sender<Event>>, | |||
| clock: &HLC, | |||
| ) -> Result<(), eyre::ErrReport> { | |||
| abortable_events.abort(); | |||
| for (&uuid, dataflow) in running_dataflows { | |||
| stop_dataflow( | |||
| dataflow, | |||
| uuid, | |||
| for dataflow_uuid in running_dataflows.keys().cloned().collect::<Vec<_>>() { | |||
| let _ = stop_dataflow( | |||
| running_dataflows, | |||
| dataflow_uuid, | |||
| daemon_connections, | |||
| clock.new_timestamp(), | |||
| None, | |||
| @@ -737,16 +732,20 @@ impl PartialEq for RunningDataflow { | |||
| impl Eq for RunningDataflow {} | |||
| async fn stop_dataflow( | |||
| dataflow: &RunningDataflow, | |||
| uuid: Uuid, | |||
| async fn stop_dataflow<'a>( | |||
| running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>, | |||
| dataflow_uuid: Uuid, | |||
| daemon_connections: &mut HashMap<String, DaemonConnection>, | |||
| timestamp: uhlc::Timestamp, | |||
| grace_duration: Option<Duration>, | |||
| ) -> eyre::Result<()> { | |||
| ) -> eyre::Result<&'a mut RunningDataflow> { | |||
| let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else { | |||
| bail!("no known running dataflow found with UUID `{dataflow_uuid}`") | |||
| }; | |||
| let message = serde_json::to_vec(&Timestamped { | |||
| inner: DaemonCoordinatorEvent::StopDataflow { | |||
| dataflow_id: uuid, | |||
| dataflow_id: dataflow_uuid, | |||
| grace_duration, | |||
| }, | |||
| timestamp, | |||
| @@ -773,9 +772,10 @@ async fn stop_dataflow( | |||
| other => bail!("unexpected reply after sending stop: {other:?}"), | |||
| } | |||
| } | |||
| tracing::info!("successfully send stop dataflow `{uuid}` to all daemons"); | |||
| Ok(()) | |||
| tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons"); | |||
| Ok(dataflow) | |||
| } | |||
| async fn reload_dataflow( | |||