| @@ -128,9 +128,11 @@ pub fn attach_dataflow( | |||
| serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; | |||
| match result { | |||
| ControlRequestReply::DataflowStarted { uuid: _ } => (), | |||
| ControlRequestReply::DataflowStopped { uuid } => { | |||
| ControlRequestReply::DataflowStopped { uuid, result } => { | |||
| info!("dataflow {uuid} stopped"); | |||
| break; | |||
| break result | |||
| .map_err(|err| eyre::eyre!(err)) | |||
| .wrap_err("dataflow failed"); | |||
| } | |||
| ControlRequestReply::DataflowReloaded { uuid } => { | |||
| info!("dataflow {uuid} reloaded") | |||
| @@ -138,6 +140,4 @@ pub fn attach_dataflow( | |||
| other => error!("Received unexpected Coordinator Reply: {:#?}", other), | |||
| }; | |||
| } | |||
| Ok(()) | |||
| } | |||
| @@ -288,7 +288,9 @@ fn stop_dataflow( | |||
| let result: ControlRequestReply = | |||
| serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; | |||
| match result { | |||
| ControlRequestReply::DataflowStopped { uuid: _ } => Ok(()), | |||
| ControlRequestReply::DataflowStopped { uuid: _, result } => result | |||
| .map_err(|err| eyre::eyre!(err)) | |||
| .wrap_err("dataflow failed"), | |||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||
| other => bail!("unexpected stop dataflow reply: {other:?}"), | |||
| } | |||
| @@ -304,7 +306,9 @@ fn stop_dataflow_by_name( | |||
| let result: ControlRequestReply = | |||
| serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; | |||
| match result { | |||
| ControlRequestReply::DataflowStopped { uuid: _ } => Ok(()), | |||
| ControlRequestReply::DataflowStopped { uuid: _, result } => result | |||
| .map_err(|err| eyre::eyre!(err)) | |||
| .wrap_err("dataflow failed"), | |||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||
| other => bail!("unexpected stop dataflow reply: {other:?}"), | |||
| } | |||
| @@ -156,6 +156,7 @@ async fn start_inner( | |||
| let mut events = (abortable_events, daemon_events).merge(); | |||
| let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new(); | |||
| let mut dataflow_results: HashMap<Uuid, Result<(), String>> = HashMap::new(); | |||
| let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new(); | |||
| let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); | |||
| @@ -270,19 +271,21 @@ async fn start_inner( | |||
| .insert(uuid, ArchivedDataflow::from(entry.get())); | |||
| } | |||
| entry.get_mut().machines.remove(&machine_id); | |||
| match result { | |||
| let result = result.wrap_err(format!( | |||
| "error occured in dataflow `{uuid}` on machine `{machine_id}`" | |||
| )); | |||
| match &result { | |||
| Ok(()) => { | |||
| tracing::info!("dataflow `{uuid}` finished successfully on machine `{machine_id}`"); | |||
| } | |||
| Err(err) => { | |||
| let err = | |||
| err.wrap_err(format!("error occured in dataflow `{uuid}` on machine `{machine_id}`")); | |||
| tracing::error!("{err:?}"); | |||
| } | |||
| } | |||
| if entry.get_mut().machines.is_empty() { | |||
| entry.remove(); | |||
| tracing::info!("dataflow `{uuid}` finished"); | |||
| dataflow_results | |||
| .insert(uuid, result.map_err(|err| format!("{err:?}"))); | |||
| } | |||
| } | |||
| std::collections::hash_map::Entry::Vacant(_) => { | |||
| @@ -337,6 +340,10 @@ async fn start_inner( | |||
| }, | |||
| None => ControlRequestReply::DataflowStopped { | |||
| uuid: dataflow_uuid, | |||
| result: dataflow_results | |||
| .get(&dataflow_uuid) | |||
| .cloned() | |||
| .unwrap_or(Ok(())), | |||
| }, | |||
| }; | |||
| Ok(status) | |||
| @@ -375,6 +382,10 @@ async fn start_inner( | |||
| }; | |||
| stop.await.map(|()| ControlRequestReply::DataflowStopped { | |||
| uuid: dataflow_uuid, | |||
| result: dataflow_results | |||
| .get(&dataflow_uuid) | |||
| .cloned() | |||
| .unwrap_or(Ok(())), | |||
| }) | |||
| } | |||
| ControlRequest::StopByName { name } => { | |||
| @@ -388,8 +399,10 @@ async fn start_inner( | |||
| .await?; | |||
| Result::<_, eyre::Report>::Ok(dataflow_uuid) | |||
| }; | |||
| stop.await | |||
| .map(|uuid| ControlRequestReply::DataflowStopped { uuid }) | |||
| stop.await.map(|uuid| ControlRequestReply::DataflowStopped { | |||
| uuid, | |||
| result: dataflow_results.get(&uuid).cloned().unwrap_or(Ok(())), | |||
| }) | |||
| } | |||
| ControlRequest::Logs { uuid, name, node } => { | |||
| let dataflow_uuid = if let Some(uuid) = uuid { | |||
| @@ -57,10 +57,19 @@ pub enum ControlRequest { | |||
| pub enum ControlRequestReply { | |||
| Error(String), | |||
| CoordinatorStopped, | |||
| DataflowStarted { uuid: Uuid }, | |||
| DataflowReloaded { uuid: Uuid }, | |||
| DataflowStopped { uuid: Uuid }, | |||
| DataflowList { dataflows: Vec<DataflowId> }, | |||
| DataflowStarted { | |||
| uuid: Uuid, | |||
| }, | |||
| DataflowReloaded { | |||
| uuid: Uuid, | |||
| }, | |||
| DataflowStopped { | |||
| uuid: Uuid, | |||
| result: Result<(), String>, | |||
| }, | |||
| DataflowList { | |||
| dataflows: Vec<DataflowId>, | |||
| }, | |||
| DestroyOk, | |||
| DaemonConnected(bool), | |||
| ConnectedMachines(BTreeSet<String>), | |||