Browse Source

Rework default daemon selection

tags/v0.3.10-rc3
Philipp Oppermann 11 months ago
parent
commit
fc401b0f62
Failed to extract signature
3 changed files with 37 additions and 24 deletions
  1. +22
    -20
      binaries/coordinator/src/lib.rs
  2. +11
    -4
      binaries/coordinator/src/run/mod.rs
  3. +4
    -0
      libraries/message/src/common.rs

+ 22
- 20
binaries/coordinator/src/lib.rs View File

@@ -125,7 +125,6 @@ fn resolve_name(
#[derive(Default)]
struct DaemonConnections {
daemons: BTreeMap<DaemonId, DaemonConnection>,
default_deamon_id: Option<DaemonId>,
}

impl DaemonConnections {
@@ -136,24 +135,14 @@ impl DaemonConnections {
}
}

fn get_default_mut(&mut self) -> Option<&mut DaemonConnection> {
self.default_deamon_id
.as_ref()
.and_then(|id| self.daemons.get_mut(id))
}

fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
self.daemons.get_mut(id)
}

fn get_matching_daemon_id(&self, machine_id: Option<&str>) -> Option<&DaemonId> {
match machine_id {
Some(machine_id) => self
.daemons
.keys()
.find(|id| id.matches_machine_id(machine_id)),
None => self.default_deamon_id.as_ref(),
}
fn get_matching_daemon_id(&self, machine_id: &str) -> Option<&DaemonId> {
self.daemons
.keys()
.find(|id| id.matches_machine_id(machine_id))
}

fn drain(&mut self) -> impl Iterator<Item = (DaemonId, DaemonConnection)> {
@@ -179,6 +168,10 @@ impl DaemonConnections {
fn remove(&mut self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
self.daemons.remove(daemon_id)
}

fn unnamed(&self) -> impl Iterator<Item = &DaemonId> {
self.daemons.keys().filter(|id| id.machine_id().is_none())
}
}

async fn start_inner(
@@ -926,12 +919,21 @@ async fn retrieve_logs(
)
};

let daemon_connection = match machine_id {
None => daemon_connections
.get_default_mut()
.wrap_err("no default daemon connection")?,
Some(machine_id) => unimplemented!(),
let daemon_ids: Vec<_> = match machine_id {
None => daemon_connections.unnamed().collect(),
Some(machine_id) => daemon_connections
.get_matching_daemon_id(machine_id)
.into_iter()
.collect(),
};
let daemon_id = match &daemon_ids[..] {
[id] => (*id).clone(),
[] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
_ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
};
let daemon_connection = daemon_connections
.get_mut(&daemon_id)
.wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?;
tcp_send(&mut daemon_connection.stream, &message)
.await
.wrap_err("failed to send logs message to daemon")?;


+ 11
- 4
binaries/coordinator/src/run/mod.rs View File

@@ -72,10 +72,17 @@ async fn spawn_dataflow_on_machine(
machine: Option<&str>,
message: &[u8],
) -> Result<DaemonId, eyre::ErrReport> {
let daemon_id = daemon_connections
.get_matching_daemon_id(machine)
.wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
.clone();
let daemon_id = match machine {
Some(machine) => daemon_connections
.get_matching_daemon_id(machine)
.wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
.clone(),
None => daemon_connections
.unnamed()
.next()
.wrap_err("no unnamed daemon connections")?
.clone(),
};

let daemon_connection = daemon_connections
.get_mut(&daemon_id)


+ 4
- 0
libraries/message/src/common.rs View File

@@ -211,6 +211,10 @@ impl DaemonId {
.map(|id| id == machine_id)
.unwrap_or_default()
}

pub fn machine_id(&self) -> Option<&str> {
self.machine_id.as_deref()
}
}

impl std::fmt::Display for DaemonId {


Loading…
Cancel
Save