diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index e362d88d..1410ce8f 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -18,7 +18,7 @@ use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; use futures_concurrency::stream::Merge; use run::SpawnedDataflow; use std::{ - collections::{BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap}, net::SocketAddr, path::PathBuf, time::{Duration, Instant}, @@ -156,7 +156,7 @@ async fn start_inner( let mut events = (abortable_events, daemon_events).merge(); let mut running_dataflows: HashMap = HashMap::new(); - let mut dataflow_results: HashMap> = HashMap::new(); + let mut dataflow_results: HashMap>> = HashMap::new(); let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); @@ -271,9 +271,6 @@ async fn start_inner( .insert(uuid, ArchivedDataflow::from(entry.get())); } entry.get_mut().machines.remove(&machine_id); - let result = result.wrap_err(format!( - "error occured in dataflow `{uuid}` on machine `{machine_id}`" - )); match &result { Ok(()) => { tracing::info!("dataflow `{uuid}` finished successfully on machine `{machine_id}`"); @@ -282,10 +279,19 @@ async fn start_inner( tracing::error!("{err:?}"); } } + dataflow_results + .entry(uuid) + .or_default() + .insert(machine_id, result.map_err(|err| format!("{err:?}"))); if entry.get_mut().machines.is_empty() { - entry.remove(); - dataflow_results - .insert(uuid, result.map_err(|err| format!("{err:?}"))); + let finished_dataflow = entry.remove(); + let reply = ControlRequestReply::DataflowStopped { + uuid, + result: dataflow_result(&dataflow_results, uuid), + }; + for sender in finished_dataflow.reply_senders { + let _ = sender.send(Ok(reply.clone())); + } } } std::collections::hash_map::Entry::Vacant(_) => { @@ -300,7 +306,7 @@ async fn start_inner( request, reply_sender, } => { - let reply = match request { + match request { ControlRequest::Start { dataflow, name, @@ -327,11 +333,12 @@ async fn start_inner( .await?; Ok(dataflow) }; - inner.await.map(|dataflow| { + let reply = inner.await.map(|dataflow| { let uuid = dataflow.uuid; running_dataflows.insert(uuid, dataflow); ControlRequestReply::DataflowStarted { uuid } - }) + }); + let _ = reply_sender.send(reply); } ControlRequest::Check { dataflow_uuid } => { let status = match &running_dataflows.get(&dataflow_uuid) { @@ -340,13 +347,10 @@ async fn start_inner( }, None => ControlRequestReply::DataflowStopped { uuid: dataflow_uuid, - result: dataflow_results - .get(&dataflow_uuid) - .cloned() - .unwrap_or(Ok(())), + result: dataflow_result(&dataflow_results, dataflow_uuid), }, }; - Ok(status) + let _ = reply_sender.send(Ok(status)); } ControlRequest::Reload { dataflow_id, @@ -364,45 +368,38 @@ async fn start_inner( .await?; Result::<_, eyre::Report>::Ok(()) }; - reload - .await - .map(|()| ControlRequestReply::DataflowReloaded { - uuid: dataflow_id, - }) + let reply = + reload + .await + .map(|()| ControlRequestReply::DataflowReloaded { + uuid: dataflow_id, + }); + let _ = reply_sender.send(reply); } ControlRequest::Stop { dataflow_uuid } => { - let stop = async { - stop_dataflow( - &running_dataflows, - dataflow_uuid, - &mut daemon_connections, - ) - .await?; - Result::<_, eyre::Report>::Ok(()) - }; - stop.await.map(|()| ControlRequestReply::DataflowStopped { - uuid: dataflow_uuid, - result: dataflow_results - .get(&dataflow_uuid) - .cloned() - .unwrap_or(Ok(())), - }) + stop_dataflow_by_uuid( + &mut running_dataflows, + dataflow_uuid, + &mut daemon_connections, + reply_sender, + ) + .await?; } ControlRequest::StopByName { name } => { - let stop = async { - let dataflow_uuid = resolve_name(name, &running_dataflows, None)?; - stop_dataflow( - &running_dataflows, - dataflow_uuid, - &mut daemon_connections, - ) - .await?; - Result::<_, eyre::Report>::Ok(dataflow_uuid) - }; - stop.await.map(|uuid| ControlRequestReply::DataflowStopped { - uuid, - result: dataflow_results.get(&uuid).cloned().unwrap_or(Ok(())), - }) + match resolve_name(name, &running_dataflows, None) { + Ok(uuid) => { + stop_dataflow_by_uuid( + &mut running_dataflows, + uuid, + &mut daemon_connections, + reply_sender, + ) + .await? + } + Err(err) => { + let _ = reply_sender.send(Err(err)); + } + } } ControlRequest::Logs { uuid, name, node } => { let dataflow_uuid = if let Some(uuid) = uuid { @@ -413,7 +410,7 @@ async fn start_inner( bail!("No uuid") }; - retrieve_logs( + let reply = retrieve_logs( &running_dataflows, &archived_dataflows, dataflow_uuid, @@ -421,25 +418,27 @@ async fn start_inner( &mut daemon_connections, ) .await - .map(|logs| ControlRequestReply::Logs(logs)) + .map(|logs| ControlRequestReply::Logs(logs)); + let _ = reply_sender.send(reply); } ControlRequest::Destroy => { tracing::info!("Received destroy command"); - handle_destroy( + let reply = handle_destroy( &running_dataflows, &mut daemon_connections, &abort_handle, &mut daemon_events_tx, ) .await - .map(|()| ControlRequestReply::DestroyOk) + .map(|()| ControlRequestReply::DestroyOk); + let _ = reply_sender.send(reply); } ControlRequest::List => { let mut dataflows: Vec<_> = running_dataflows.values().collect(); dataflows.sort_by_key(|d| (&d.name, d.uuid)); - Ok(ControlRequestReply::DataflowList { + let reply = Ok(ControlRequestReply::DataflowList { dataflows: dataflows .into_iter() .map(|d| DataflowId { @@ -447,19 +446,21 @@ async fn start_inner( name: d.name.clone(), }) .collect(), - }) + }); + let _ = reply_sender.send(reply); } ControlRequest::DaemonConnected => { let running = !daemon_connections.is_empty(); - Ok(ControlRequestReply::DaemonConnected(running)) + let _ = reply_sender + .send(Ok(ControlRequestReply::DaemonConnected(running))); } ControlRequest::ConnectedMachines => { - Ok(ControlRequestReply::ConnectedMachines( + let reply = Ok(ControlRequestReply::ConnectedMachines( daemon_connections.keys().cloned().collect(), - )) + )); + let _ = reply_sender.send(reply); } - }; - let _ = reply_sender.send(reply); + } } ControlEvent::Error(err) => tracing::error!("{err:?}"), }, @@ -512,6 +513,53 @@ async fn start_inner( Ok(()) } +async fn stop_dataflow_by_uuid( + running_dataflows: &mut HashMap, + dataflow_uuid: Uuid, + daemon_connections: &mut HashMap, + reply_sender: tokio::sync::oneshot::Sender>, +) -> Result<(), eyre::ErrReport> { + let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else { + bail!("No running dataflow found with UUID `{dataflow_uuid}`") + }; + let stop = async { + stop_dataflow(dataflow, dataflow_uuid, daemon_connections).await?; + Result::<_, eyre::Report>::Ok(()) + }; + match stop.await { + Ok(()) => { + dataflow.reply_senders.push(reply_sender); + } + Err(err) => { + let _ = reply_sender.send(Err(err)); + } + }; + Ok(()) +} + +fn dataflow_result( + dataflow_results: &HashMap>>, + dataflow_uuid: Uuid, +) -> Result<(), String> { + let Some(results) = dataflow_results.get(&dataflow_uuid) else { return Ok(()) }; + + let mut errors = Vec::new(); + for (machine, result) in results { + if let Err(err) = result { + let err: String = err.lines().map(|line| format!(" {line}\n")).collect(); + errors.push(format!("- machine `{machine}`:\n{err}\n")); + } + } + + if errors.is_empty() { + Ok(()) + } else { + let mut formatted = format!("errors occurred in dataflow {dataflow_uuid}:\n"); + formatted.push_str(&errors.join("\n")); + Err(formatted) + } +} + struct DaemonConnection { stream: TcpStream, listen_socket: SocketAddr, @@ -547,8 +595,8 @@ async fn handle_destroy( daemon_events_tx: &mut Option>, ) -> Result<(), eyre::ErrReport> { abortable_events.abort(); - for &uuid in running_dataflows.keys() { - stop_dataflow(running_dataflows, uuid, daemon_connections).await?; + for (&uuid, dataflow) in running_dataflows { + stop_dataflow(dataflow, uuid, daemon_connections).await?; } destroy_daemons(daemon_connections).await?; *daemon_events_tx = None; @@ -563,7 +611,6 @@ async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> { .wrap_err("failed to send watchdog message to daemon") } -#[allow(dead_code)] // Keeping the communication layer for later use. struct RunningDataflow { name: Option, uuid: Uuid, @@ -573,6 +620,8 @@ struct RunningDataflow { pending_machines: BTreeSet, init_success: bool, nodes: Vec, + + reply_senders: Vec>>, } struct ArchivedDataflow { @@ -598,13 +647,10 @@ impl PartialEq for RunningDataflow { impl Eq for RunningDataflow {} async fn stop_dataflow( - running_dataflows: &HashMap, + dataflow: &RunningDataflow, uuid: Uuid, daemon_connections: &mut HashMap, ) -> eyre::Result<()> { - let Some(dataflow) = running_dataflows.get(&uuid) else { - bail!("No running dataflow found with UUID `{uuid}`") - }; let message = serde_json::to_vec(&DaemonCoordinatorEvent::StopDataflow { dataflow_id: uuid })?; for machine_id in &dataflow.machines { @@ -757,6 +803,7 @@ async fn start_dataflow( init_success: true, machines, nodes, + reply_senders: Vec::new(), }) } diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index a5d4a7c6..658d59ee 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -53,7 +53,7 @@ pub enum ControlRequest { ConnectedMachines, } -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { Error(String), CoordinatorStopped, @@ -67,6 +67,7 @@ pub enum ControlRequestReply { uuid: Uuid, result: Result<(), String>, }, + DataflowList { dataflows: Vec, },