diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 37a555f7..f683d6eb 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -125,7 +125,6 @@ fn resolve_name( #[derive(Default)] struct DaemonConnections { daemons: BTreeMap, - default_deamon_id: Option, } impl DaemonConnections { @@ -136,24 +135,14 @@ impl DaemonConnections { } } - fn get_default_mut(&mut self) -> Option<&mut DaemonConnection> { - self.default_deamon_id - .as_ref() - .and_then(|id| self.daemons.get_mut(id)) - } - fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> { self.daemons.get_mut(id) } - fn get_matching_daemon_id(&self, machine_id: Option<&str>) -> Option<&DaemonId> { - match machine_id { - Some(machine_id) => self - .daemons - .keys() - .find(|id| id.matches_machine_id(machine_id)), - None => self.default_deamon_id.as_ref(), - } + fn get_matching_daemon_id(&self, machine_id: &str) -> Option<&DaemonId> { + self.daemons + .keys() + .find(|id| id.matches_machine_id(machine_id)) } fn drain(&mut self) -> impl Iterator { @@ -179,6 +168,10 @@ impl DaemonConnections { fn remove(&mut self, daemon_id: &DaemonId) -> Option { self.daemons.remove(daemon_id) } + + fn unnamed(&self) -> impl Iterator { + self.daemons.keys().filter(|id| id.machine_id().is_none()) + } } async fn start_inner( @@ -926,12 +919,21 @@ async fn retrieve_logs( ) }; - let daemon_connection = match machine_id { - None => daemon_connections - .get_default_mut() - .wrap_err("no default daemon connection")?, - Some(machine_id) => unimplemented!(), + let daemon_ids: Vec<_> = match machine_id { + None => daemon_connections.unnamed().collect(), + Some(machine_id) => daemon_connections + .get_matching_daemon_id(machine_id) + .into_iter() + .collect(), + }; + let daemon_id = match &daemon_ids[..] { + [id] => (*id).clone(), + [] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"), + _ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"), }; + let daemon_connection = daemon_connections + .get_mut(&daemon_id) + .wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?; tcp_send(&mut daemon_connection.stream, &message) .await .wrap_err("failed to send logs message to daemon")?; diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 42e00061..53c8f2aa 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -72,10 +72,17 @@ async fn spawn_dataflow_on_machine( machine: Option<&str>, message: &[u8], ) -> Result { - let daemon_id = daemon_connections - .get_matching_daemon_id(machine) - .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))? - .clone(); + let daemon_id = match machine { + Some(machine) => daemon_connections + .get_matching_daemon_id(machine) + .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))? + .clone(), + None => daemon_connections + .unnamed() + .next() + .wrap_err("no unnamed daemon connections")? + .clone(), + }; let daemon_connection = daemon_connections .get_mut(&daemon_id) diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 2470cd6b..a9d5d049 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -211,6 +211,10 @@ impl DaemonId { .map(|id| id == machine_id) .unwrap_or_default() } + + pub fn machine_id(&self) -> Option<&str> { + self.machine_id.as_deref() + } } impl std::fmt::Display for DaemonId {