From a124fb11c983f71a8ec759d813fb8ce4d9e3caa3 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 14 Nov 2022 12:18:23 +0100 Subject: [PATCH] Ensure that dataflow names are unique if set --- binaries/coordinator/src/lib.rs | 44 +++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index b8754959..aec5212e 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -103,14 +103,26 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { dataflow_path, name, } => { - let result = start_dataflow( - &dataflow_path, - name, - runtime_path, - &dataflow_events_tx, - ) - .await; - let reply = match result { + let inner = async { + if let Some(name) = name.as_deref() { + // check that name is unique + if running_dataflows + .values() + .any(|d: &RunningDataflow| d.name.as_deref() == Some(name)) + { + bail!("there is already a running dataflow with name `{name}`"); + } + } + let dataflow = start_dataflow( + &dataflow_path, + name, + runtime_path, + &dataflow_events_tx, + ) + .await?; + Ok(dataflow) + }; + let reply = match inner.await { Ok(dataflow) => { let uuid = dataflow.uuid; running_dataflows.insert(uuid, dataflow); @@ -137,14 +149,20 @@ async fn start(runtime_path: &Path) -> eyre::Result<()> { } ControlRequest::StopByName { name } => { let stop = async { - let dataflow_uuid = running_dataflows + let uuids: Vec<_> = running_dataflows .iter() - .find(|(_, v)| v.name.as_deref() == Some(name.as_str())) + .filter(|(_, v)| v.name.as_deref() == Some(name.as_str())) .map(|(k, _)| k) .copied() - .ok_or_else(|| { - eyre!("no running dataflow with name `{name}`") - })?; + .collect(); + let dataflow_uuid = if uuids.is_empty() { + bail!("no running dataflow with name `{name}`"); + } else if let [uuid] = uuids.as_slice() { + *uuid + } else { + bail!("multiple dataflows found with name `{name}`"); + }; + stop_dataflow(&running_dataflows, dataflow_uuid).await?; Result::<_, eyre::Report>::Ok(()) };