| @@ -103,14 +103,26 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { | |||
| dataflow_path, | |||
| name, | |||
| } => { | |||
| let result = start_dataflow( | |||
| &dataflow_path, | |||
| name, | |||
| runtime_path, | |||
| &dataflow_events_tx, | |||
| ) | |||
| .await; | |||
| let reply = match result { | |||
| let inner = async { | |||
| if let Some(name) = name.as_deref() { | |||
| // check that name is unique | |||
| if running_dataflows | |||
| .values() | |||
| .any(|d: &RunningDataflow| d.name.as_deref() == Some(name)) | |||
| { | |||
| bail!("there is already a running dataflow with name `{name}`"); | |||
| } | |||
| } | |||
| let dataflow = start_dataflow( | |||
| &dataflow_path, | |||
| name, | |||
| runtime_path, | |||
| &dataflow_events_tx, | |||
| ) | |||
| .await?; | |||
| Ok(dataflow) | |||
| }; | |||
| let reply = match inner.await { | |||
| Ok(dataflow) => { | |||
| let uuid = dataflow.uuid; | |||
| running_dataflows.insert(uuid, dataflow); | |||
| @@ -137,14 +149,20 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { | |||
| } | |||
| ControlRequest::StopByName { name } => { | |||
| let stop = async { | |||
| let dataflow_uuid = running_dataflows | |||
| let uuids: Vec<_> = running_dataflows | |||
| .iter() | |||
| .find(|(_, v)| v.name.as_deref() == Some(name.as_str())) | |||
| .filter(|(_, v)| v.name.as_deref() == Some(name.as_str())) | |||
| .map(|(k, _)| k) | |||
| .copied() | |||
| .ok_or_else(|| { | |||
| eyre!("no running dataflow with name `{name}`") | |||
| })?; | |||
| .collect(); | |||
| let dataflow_uuid = if uuids.is_empty() { | |||
| bail!("no running dataflow with name `{name}`"); | |||
| } else if let [uuid] = uuids.as_slice() { | |||
| *uuid | |||
| } else { | |||
| bail!("multiple dataflows found with name `{name}`"); | |||
| }; | |||
| stop_dataflow(&running_dataflows, dataflow_uuid).await?; | |||
| Result::<_, eyre::Report>::Ok(()) | |||
| }; | |||