From 0f3c0fb2fe31f092c14bba1a24a66a558f252317 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Sun, 9 Feb 2025 15:16:05 +0100 Subject: [PATCH] Assign unique ID to each daemon The previous machine ID is still used, but optional. Users don't need to ensure that the chosen machine IDs are unique anymore because they are augmented with a UUID. --- Cargo.lock | 10 + binaries/cli/src/attach.rs | 2 +- binaries/cli/src/lib.rs | 2 +- binaries/coordinator/Cargo.toml | 1 + binaries/coordinator/src/lib.rs | 183 ++++++++++++------ binaries/coordinator/src/listener.rs | 15 +- binaries/coordinator/src/run/mod.rs | 69 ++++--- binaries/daemon/src/coordinator.rs | 4 +- binaries/daemon/src/lib.rs | 55 +++--- binaries/daemon/src/pending.rs | 9 +- examples/multiple-daemons/run.rs | 2 +- libraries/core/src/descriptor/mod.rs | 36 ++-- libraries/core/src/descriptor/validate.rs | 23 +-- libraries/core/src/descriptor/visualize.rs | 12 +- libraries/message/src/common.rs | 31 +++ libraries/message/src/coordinator_to_cli.rs | 4 +- .../message/src/coordinator_to_daemon.rs | 12 +- .../message/src/daemon_to_coordinator.rs | 4 +- libraries/message/src/descriptor.rs | 7 +- 19 files changed, 296 insertions(+), 185 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 382876b7..f17c7497 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2527,6 +2527,7 @@ dependencies = [ "eyre", "futures", "futures-concurrency", + "itertools 0.14.0", "log", "names", "serde_json", @@ -4941,6 +4942,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 59971526..7f65446e 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -40,7 +40,7 @@ pub fn attach_dataflow( .ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))? .to_owned(); - for node in nodes { + for node in nodes.into_values() { match node.kind { // Reloading Custom Nodes is not supported. See: https://github.com/dora-rs/dora/pull/239#discussion_r1154313139 CoreNodeKind::Custom(_cn) => (), diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index 6595b9c4..cb79ca5b 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -265,7 +265,7 @@ enum Lang { pub fn lib_main(args: Args) { if let Err(err) = run(args) { eprintln!("\n\n{}", "[ERROR]".bold().red()); - eprintln!("{err:#}"); + eprintln!("{err:?}"); std::process::exit(1); } } diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 02c72679..c11454f9 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -28,3 +28,4 @@ names = "0.14.0" ctrlc = "3.2.5" log = { version = "0.4.21", features = ["serde"] } dora-message = { workspace = true } +itertools = "0.14.0" diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 72576e90..37a555f7 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,6 +9,7 @@ use dora_core::{ }; use dora_message::{ cli_to_coordinator::ControlRequest, + common::DaemonId, coordinator_to_cli::{ ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult, DataflowStatus, LogMessage, @@ -121,6 +122,65 @@ fn resolve_name( } } +#[derive(Default)] +struct DaemonConnections { + daemons: BTreeMap, + default_deamon_id: Option, +} + +impl DaemonConnections { + fn add(&mut self, daemon_id: DaemonId, connection: DaemonConnection) { + let previous = self.daemons.insert(daemon_id.clone(), connection); + if previous.is_some() { + tracing::info!("closing previous connection `{daemon_id}` on new register"); + } + } + + fn get_default_mut(&mut self) -> Option<&mut DaemonConnection> { + self.default_deamon_id + .as_ref() + .and_then(|id| self.daemons.get_mut(id)) + } + + fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> { + self.daemons.get_mut(id) + } + + fn get_matching_daemon_id(&self, machine_id: Option<&str>) -> Option<&DaemonId> { + match machine_id { + Some(machine_id) => self + .daemons + .keys() + .find(|id| id.matches_machine_id(machine_id)), + None => self.default_deamon_id.as_ref(), + } + } + + fn drain(&mut self) -> impl Iterator { + std::mem::take(&mut self.daemons).into_iter() + } + + fn is_empty(&self) -> bool { + self.daemons.is_empty() + } + + fn keys(&self) -> impl Iterator { + self.daemons.keys() + } + + fn iter(&self) -> impl Iterator { + self.daemons.iter() + } + + fn iter_mut(&mut self) -> impl Iterator { + self.daemons.iter_mut() + } + + fn remove(&mut self, daemon_id: &DaemonId) -> Option { + self.daemons.remove(daemon_id) + } +} + async fn start_inner( events: impl Stream + Unpin, tasks: &FuturesUnordered>, @@ -142,10 +202,10 @@ async fn start_inner( let mut events = (abortable_events, daemon_events).merge(); let mut running_dataflows: HashMap = HashMap::new(); - let mut dataflow_results: HashMap> = + let mut dataflow_results: HashMap> = HashMap::new(); let mut archived_dataflows: HashMap = HashMap::new(); - let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); + let mut daemon_connections = DaemonConnections::default(); while let Some(event) = events.next().await { if event.log() { @@ -178,11 +238,7 @@ async fn start_inner( version_check_result, } => { // assign a unique ID to the daemon - let daemon_id = format!( - "{}{}", - machine_id.map(|id| format!("{id}-")).unwrap_or_default(), - Uuid::new_v4() - ); + let daemon_id = DaemonId::new(machine_id); let reply: Timestamped = Timestamped { inner: match &version_check_result { @@ -193,23 +249,19 @@ async fn start_inner( }, timestamp: clock.new_timestamp(), }; + let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?) .await .context("tcp send failed"); match version_check_result.map_err(|e| eyre!(e)).and(send_result) { Ok(()) => { - let previous = daemon_connections.insert( + daemon_connections.add( daemon_id.clone(), DaemonConnection { stream: connection, last_heartbeat: Instant::now(), }, ); - if let Some(_previous) = previous { - tracing::info!( - "closing previous connection `{daemon_id}` on new register" - ); - } } Err(err) => { tracing::warn!("failed to register daemon connection for daemon `{daemon_id}`: {err}"); @@ -218,18 +270,18 @@ async fn start_inner( } }, Event::Dataflow { uuid, event } => match event { - DataflowEvent::ReadyOnMachine { - machine_id, + DataflowEvent::ReadyOnDeamon { + daemon_id: machine_id, exited_before_subscribe, } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { let dataflow = entry.get_mut(); - dataflow.pending_machines.remove(&machine_id); + dataflow.pending_daemons.remove(&machine_id); dataflow .exited_before_subscribe .extend(exited_before_subscribe); - if dataflow.pending_machines.is_empty() { + if dataflow.pending_daemons.is_empty() { let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::AllNodesReady { dataflow_id: uuid, @@ -242,7 +294,7 @@ async fn start_inner( .wrap_err("failed to serialize AllNodesReady message")?; // notify all machines that run parts of the dataflow - for machine_id in &dataflow.machines { + for machine_id in &dataflow.daemons { let Some(connection) = daemon_connections.get_mut(machine_id) else { tracing::warn!( @@ -266,21 +318,21 @@ async fn start_inner( } } } - DataflowEvent::DataflowFinishedOnMachine { machine_id, result } => { + DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { let dataflow = entry.get_mut(); - dataflow.machines.remove(&machine_id); + dataflow.daemons.remove(&daemon_id); tracing::info!( - "removed machine id: {machine_id} from dataflow: {:#?}", + "removed machine id: {daemon_id} from dataflow: {:#?}", dataflow.uuid ); dataflow_results .entry(uuid) .or_default() - .insert(machine_id, result); + .insert(daemon_id, result); - if dataflow.machines.is_empty() { + if dataflow.daemons.is_empty() { // Archive finished dataflow archived_dataflows .entry(uuid) @@ -301,7 +353,7 @@ async fn start_inner( } } std::collections::hash_map::Entry::Vacant(_) => { - tracing::warn!("dataflow not running on DataflowFinishedOnMachine"); + tracing::warn!("dataflow not running on DataflowFinishedOnDaemon"); } } } @@ -538,7 +590,7 @@ async fn start_inner( .send(Ok(ControlRequestReply::DaemonConnected(running))); } ControlRequest::ConnectedMachines => { - let reply = Ok(ControlRequestReply::ConnectedMachines( + let reply = Ok(ControlRequestReply::ConnectedDaemons( daemon_connections.keys().cloned().collect(), )); let _ = reply_sender.send(reply); @@ -565,7 +617,7 @@ async fn start_inner( }, Event::DaemonHeartbeatInterval => { let mut disconnected = BTreeSet::new(); - for (machine_id, connection) in &mut daemon_connections { + for (machine_id, connection) in daemon_connections.iter_mut() { if connection.last_heartbeat.elapsed() > Duration::from_secs(15) { tracing::warn!( "no heartbeat message from machine `{machine_id}` since {:?}", @@ -609,7 +661,9 @@ async fn start_inner( ) .await?; } - Event::DaemonHeartbeat { machine_id } => { + Event::DaemonHeartbeat { + daemon_id: machine_id, + } => { if let Some(connection) = daemon_connections.get_mut(&machine_id) { connection.last_heartbeat = Instant::now(); } @@ -638,7 +692,7 @@ async fn start_inner( } fn dataflow_result( - results: &BTreeMap, + results: &BTreeMap, dataflow_uuid: Uuid, clock: &uhlc::HLC, ) -> DataflowResult { @@ -664,7 +718,7 @@ struct DaemonConnection { async fn handle_destroy( running_dataflows: &mut HashMap, - daemon_connections: &mut HashMap, + daemon_connections: &mut DaemonConnections, abortable_events: &futures::stream::AbortHandle, daemon_events_tx: &mut Option>, clock: &HLC, @@ -704,12 +758,12 @@ async fn send_heartbeat_message( struct RunningDataflow { name: Option, uuid: Uuid, - /// The IDs of the machines that the dataflow is running on. - machines: BTreeSet, - /// IDs of machines that are waiting until all nodes are started. - pending_machines: BTreeSet, + /// The IDs of the daemons that the dataflow is running on. + daemons: BTreeSet, + /// IDs of daemons that are waiting until all nodes are started. + pending_daemons: BTreeSet, exited_before_subscribe: Vec, - nodes: Vec, + nodes: BTreeMap, reply_senders: Vec>>, @@ -718,7 +772,7 @@ struct RunningDataflow { struct ArchivedDataflow { name: Option, - nodes: Vec, + nodes: BTreeMap, } impl From<&RunningDataflow> for ArchivedDataflow { @@ -732,7 +786,7 @@ impl From<&RunningDataflow> for ArchivedDataflow { impl PartialEq for RunningDataflow { fn eq(&self, other: &Self) -> bool { - self.name == other.name && self.uuid == other.uuid && self.machines == other.machines + self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons } } @@ -741,7 +795,7 @@ impl Eq for RunningDataflow {} async fn stop_dataflow<'a>( running_dataflows: &'a mut HashMap, dataflow_uuid: Uuid, - daemon_connections: &mut HashMap, + daemon_connections: &mut DaemonConnections, timestamp: uhlc::Timestamp, grace_duration: Option, ) -> eyre::Result<&'a mut RunningDataflow> { @@ -757,9 +811,9 @@ async fn stop_dataflow<'a>( timestamp, })?; - for machine_id in &dataflow.machines { + for daemon_id in &dataflow.daemons { let daemon_connection = daemon_connections - .get_mut(machine_id) + .get_mut(daemon_id) .wrap_err("no daemon connection")?; // TODO: take from dataflow spec tcp_send(&mut daemon_connection.stream, &message) .await @@ -789,7 +843,7 @@ async fn reload_dataflow( dataflow_id: Uuid, node_id: NodeId, operator_id: Option, - daemon_connections: &mut HashMap, + daemon_connections: &mut DaemonConnections, timestamp: uhlc::Timestamp, ) -> eyre::Result<()> { let Some(dataflow) = running_dataflows.get(&dataflow_id) else { @@ -804,7 +858,7 @@ async fn reload_dataflow( timestamp, })?; - for machine_id in &dataflow.machines { + for machine_id in &dataflow.daemons { let daemon_connection = daemon_connections .get_mut(machine_id) .wrap_err("no daemon connection")?; // TODO: take from dataflow spec @@ -835,7 +889,7 @@ async fn retrieve_logs( archived_dataflows: &HashMap, dataflow_id: Uuid, node_id: NodeId, - daemon_connections: &mut HashMap, + daemon_connections: &mut DaemonConnections, timestamp: uhlc::Timestamp, ) -> eyre::Result> { let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) { @@ -854,8 +908,8 @@ async fn retrieve_logs( timestamp, })?; - let machine_ids: Vec = nodes - .iter() + let machine_ids: Vec> = nodes + .values() .filter(|node| node.id == node_id) .map(|node| node.deploy.machine.clone()) .collect(); @@ -872,9 +926,12 @@ async fn retrieve_logs( ) }; - let daemon_connection = daemon_connections - .get_mut(machine_id.as_str()) - .wrap_err("no daemon connection")?; + let daemon_connection = match machine_id { + None => daemon_connections + .get_default_mut() + .wrap_err("no default daemon connection")?, + Some(machine_id) => unimplemented!(), + }; tcp_send(&mut daemon_connection.stream, &message) .await .wrap_err("failed to send logs message to daemon")?; @@ -898,24 +955,24 @@ async fn start_dataflow( dataflow: Descriptor, working_dir: PathBuf, name: Option, - daemon_connections: &mut HashMap, + daemon_connections: &mut DaemonConnections, clock: &HLC, ) -> eyre::Result { let SpawnedDataflow { uuid, - machines, + daemons, nodes, } = spawn_dataflow(dataflow, working_dir, daemon_connections, clock).await?; Ok(RunningDataflow { uuid, name, - pending_machines: if machines.len() > 1 { - machines.clone() + pending_daemons: if daemons.len() > 1 { + daemons.clone() } else { BTreeSet::new() }, exited_before_subscribe: Default::default(), - machines, + daemons, nodes, reply_senders: Vec::new(), log_subscribers: Vec::new(), @@ -923,7 +980,7 @@ async fn start_dataflow( } async fn destroy_daemon( - machine_id: String, + daemon_id: DaemonId, mut daemon_connection: DaemonConnection, timestamp: uhlc::Timestamp, @@ -936,7 +993,7 @@ async fn destroy_daemon( tcp_send(&mut daemon_connection.stream, &message) .await .wrap_err(format!( - "failed to send destroy message to daemon `{machine_id}`" + "failed to send destroy message to daemon `{daemon_id}`" ))?; // wait for reply @@ -952,18 +1009,18 @@ async fn destroy_daemon( other => bail!("unexpected reply after sending `destroy`: {other:?}"), } - tracing::info!("successfully destroyed daemon `{machine_id}`"); + tracing::info!("successfully destroyed daemon `{daemon_id}`"); Ok(()) } async fn destroy_daemons( - daemon_connections: &mut HashMap, + daemon_connections: &mut DaemonConnections, timestamp: uhlc::Timestamp, ) -> eyre::Result<()> { let futures = daemon_connections .drain() - .map(|(machine_id, daemon_connection)| { - destroy_daemon(machine_id, daemon_connection, timestamp) + .map(|(daemon_id, daemon_connection)| { + destroy_daemon(daemon_id, daemon_connection, timestamp) }) .collect::>(); let results: Vec> = @@ -978,7 +1035,7 @@ async fn destroy_daemons( pub enum Event { NewDaemonConnection(TcpStream), DaemonConnectError(eyre::Report), - DaemonHeartbeat { machine_id: String }, + DaemonHeartbeat { daemon_id: DaemonId }, Dataflow { uuid: Uuid, event: DataflowEvent }, Control(ControlEvent), Daemon(DaemonRequest), @@ -1000,12 +1057,12 @@ impl Event { #[derive(Debug)] pub enum DataflowEvent { - DataflowFinishedOnMachine { - machine_id: String, + DataflowFinishedOnDaemon { + daemon_id: DaemonId, result: DataflowDaemonResult, }, - ReadyOnMachine { - machine_id: String, + ReadyOnDeamon { + daemon_id: DaemonId, exited_before_subscribe: Vec, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 67e85d7c..fe0de99b 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -36,7 +36,7 @@ pub async fn handle_connection( } }; let message: Timestamped = - match serde_json::from_slice(&raw).wrap_err("failed to deserialize node message") { + match serde_json::from_slice(&raw).wrap_err("failed to deserialize message") { Ok(e) => e, Err(err) => { tracing::warn!("{err:?}"); @@ -59,18 +59,15 @@ pub async fn handle_connection( let _ = events_tx.send(Event::Daemon(event)).await; break; } - CoordinatorRequest::Event { - daemon_id: machine_id, - event, - } => match event { + CoordinatorRequest::Event { daemon_id, event } => match event { DaemonEvent::AllNodesReady { dataflow_id, exited_before_subscribe, } => { let event = Event::Dataflow { uuid: dataflow_id, - event: DataflowEvent::ReadyOnMachine { - machine_id, + event: DataflowEvent::ReadyOnDeamon { + daemon_id, exited_before_subscribe, }, }; @@ -84,14 +81,14 @@ pub async fn handle_connection( } => { let event = Event::Dataflow { uuid: dataflow_id, - event: DataflowEvent::DataflowFinishedOnMachine { machine_id, result }, + event: DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result }, }; if events_tx.send(event).await.is_err() { break; } } DaemonEvent::Heartbeat => { - let event = Event::DaemonHeartbeat { machine_id }; + let event = Event::DaemonHeartbeat { daemon_id }; if events_tx.send(event).await.is_err() { break; } diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 36ce859b..42e00061 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -1,17 +1,20 @@ use crate::{ tcp_utils::{tcp_receive, tcp_send}, - DaemonConnection, + DaemonConnections, }; use dora_core::{descriptor::DescriptorExt, uhlc::HLC}; use dora_message::{ + common::DaemonId, coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes, Timestamped}, daemon_to_coordinator::DaemonCoordinatorReply, descriptor::{Descriptor, ResolvedNode}, + id::NodeId, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; +use itertools::Itertools; use std::{ - collections::{BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet}, path::PathBuf, }; use uuid::{NoContext, Timestamp, Uuid}; @@ -20,7 +23,7 @@ use uuid::{NoContext, Timestamp, Uuid}; pub(super) async fn spawn_dataflow( dataflow: Descriptor, working_dir: PathBuf, - daemon_connections: &mut HashMap, + daemon_connections: &mut DaemonConnections, clock: &HLC, ) -> eyre::Result { dataflow.check_in_daemon(&working_dir, false)?; @@ -28,43 +31,55 @@ pub(super) async fn spawn_dataflow( let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); - let machines: BTreeSet<_> = nodes.iter().map(|n| n.deploy.machine.clone()).collect(); + let nodes_by_daemon = nodes.values().into_group_map_by(|n| &n.deploy.machine); - let spawn_command = SpawnDataflowNodes { - dataflow_id: uuid, - working_dir, - nodes: nodes.clone(), - dataflow_descriptor: dataflow, - }; - let message = serde_json::to_vec(&Timestamped { - inner: DaemonCoordinatorEvent::Spawn(spawn_command), - timestamp: clock.new_timestamp(), - })?; + let mut daemons = BTreeSet::new(); + for (machine, nodes_on_machine) in &nodes_by_daemon { + let spawn_nodes = nodes_on_machine.iter().map(|n| n.id.clone()).collect(); + tracing::trace!( + "Spawning dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})" + ); - for machine in &machines { - tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`"); - spawn_dataflow_on_machine(daemon_connections, machine, &message) + let spawn_command = SpawnDataflowNodes { + dataflow_id: uuid, + working_dir: working_dir.clone(), + nodes: nodes.clone(), + dataflow_descriptor: dataflow.clone(), + spawn_nodes, + }; + let message = serde_json::to_vec(&Timestamped { + inner: DaemonCoordinatorEvent::Spawn(spawn_command), + timestamp: clock.new_timestamp(), + })?; + + let daemon_id = spawn_dataflow_on_machine(daemon_connections, machine.as_deref(), &message) .await - .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine}`"))?; + .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?; + daemons.insert(daemon_id); } tracing::info!("successfully spawned dataflow `{uuid}`"); Ok(SpawnedDataflow { uuid, - machines, + daemons, nodes, }) } async fn spawn_dataflow_on_machine( - daemon_connections: &mut HashMap, - machine: &str, + daemon_connections: &mut DaemonConnections, + machine: Option<&str>, message: &[u8], -) -> Result<(), eyre::ErrReport> { +) -> Result { + let daemon_id = daemon_connections + .get_matching_daemon_id(machine) + .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))? + .clone(); + let daemon_connection = daemon_connections - .get_mut(machine) - .wrap_err_with(|| format!("no daemon connection for machine `{machine}`"))?; + .get_mut(&daemon_id) + .wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?; tcp_send(&mut daemon_connection.stream, message) .await .wrap_err("failed to send spawn message to daemon")?; @@ -79,11 +94,11 @@ async fn spawn_dataflow_on_machine( .wrap_err("daemon returned an error")?, _ => bail!("unexpected reply"), } - Ok(()) + Ok(daemon_id) } pub struct SpawnedDataflow { pub uuid: Uuid, - pub machines: BTreeSet, - pub nodes: Vec, + pub daemons: BTreeSet, + pub nodes: BTreeMap, } diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index 6f9c25ea..4e81bfa2 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -4,7 +4,7 @@ use crate::{ }; use dora_core::uhlc::HLC; use dora_message::{ - common::Timestamped, + common::{DaemonId, Timestamped}, coordinator_to_daemon::RegisterResult, daemon_to_coordinator::{CoordinatorRequest, DaemonCoordinatorReply, DaemonRegisterRequest}, }; @@ -30,7 +30,7 @@ pub async fn register( addr: SocketAddr, machine_id: Option, clock: &HLC, -) -> eyre::Result<(String, impl Stream>)> { +) -> eyre::Result<(DaemonId, impl Stream>)> { let mut stream = loop { match TcpStream::connect(addr) .await diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index b37979f6..103f65e2 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -11,7 +11,9 @@ use dora_core::{ uhlc::{self, HLC}, }; use dora_message::{ - common::{DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus}, + common::{ + DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus, + }, coordinator_to_cli::DataflowResult, coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes}, daemon_to_coordinator::{ @@ -78,7 +80,7 @@ pub struct Daemon { coordinator_connection: Option, last_coordinator_heartbeat: Instant, - daemon_id: String, + daemon_id: DaemonId, /// used for testing and examples exit_when_done: Option>, @@ -148,6 +150,7 @@ impl Daemon { let spawn_command = SpawnDataflowNodes { dataflow_id, working_dir, + spawn_nodes: nodes.keys().cloned().collect(), nodes, dataflow_descriptor: descriptor, }; @@ -158,7 +161,7 @@ impl Daemon { let exit_when_done = spawn_command .nodes - .iter() + .values() .map(|n| (spawn_command.dataflow_id, n.id.clone())) .collect(); let (reply_tx, reply_rx) = oneshot::channel(); @@ -176,7 +179,7 @@ impl Daemon { let run_result = Self::run_general( Box::pin(events), None, - "default".into(), + DaemonId::new(None), Some(exit_when_done), clock.clone(), ); @@ -206,7 +209,7 @@ impl Daemon { async fn run_general( external_events: impl Stream> + Unpin, coordinator_addr: Option, - daemon_id: String, + daemon_id: DaemonId, exit_when_done: Option>, clock: Arc, ) -> eyre::Result { @@ -387,6 +390,7 @@ impl Daemon { working_dir, nodes, dataflow_descriptor, + spawn_nodes, }) => { match dataflow_descriptor.communication.remote { dora_core::config::RemoteCommunicationConfig::Tcp => {} @@ -400,7 +404,13 @@ impl Daemon { }; let result = self - .spawn_dataflow(dataflow_id, working_dir, nodes, dataflow_descriptor) + .spawn_dataflow( + dataflow_id, + working_dir, + nodes, + dataflow_descriptor, + spawn_nodes, + ) .await; if let Err(err) = &result { tracing::error!("{err:?}"); @@ -619,8 +629,9 @@ impl Daemon { &mut self, dataflow_id: uuid::Uuid, working_dir: PathBuf, - nodes: Vec, + nodes: BTreeMap, dataflow_descriptor: Descriptor, + spawn_nodes: BTreeSet, ) -> eyre::Result<()> { let dataflow = RunningDataflow::new(dataflow_id, self.daemon_id.clone()); let dataflow = match self.running.entry(dataflow_id) { @@ -634,8 +645,8 @@ impl Daemon { }; let mut log_messages = Vec::new(); - for node in nodes { - let local = node.deploy.machine == self.daemon_id; + for node in nodes.into_values() { + let local = spawn_nodes.contains(&node.id); let inputs = node_inputs(&node); for (input_id, input) in inputs { @@ -664,11 +675,7 @@ impl Daemon { } else if let InputMapping::User(mapping) = input.mapping { dataflow .open_external_mappings - .entry(OutputId(mapping.source, mapping.output)) - .or_default() - .entry(node.deploy.machine.clone()) - .or_default() - .insert((node.id.clone(), input_id)); + .insert(OutputId(mapping.source, mapping.output)); } } if local { @@ -969,12 +976,8 @@ impl Daemon { .await?; let output_id = OutputId(node_id, output_id); - let remote_receivers: Vec<_> = dataflow - .open_external_mappings - .get(&output_id) - .map(|m| m.keys().cloned().collect()) - .unwrap_or_default(); - if !remote_receivers.is_empty() { + let remote_receivers = dataflow.open_external_mappings.contains(&output_id); + if remote_receivers { let event = InterDaemonEvent::Output { dataflow_id, node_id: output_id.0.clone(), @@ -1049,7 +1052,7 @@ impl Daemon { } let mut closed = Vec::new(); - for output_id in dataflow.open_external_mappings.keys() { + for output_id in &dataflow.open_external_mappings { if output_id.0 == node_id && outputs.contains(&output_id.1) { closed.push(output_id.clone()); } @@ -1393,7 +1396,7 @@ async fn set_up_event_stream( remote_daemon_events_rx: flume::Receiver>, // used for dynamic nodes local_listen_port: u16, -) -> eyre::Result<(String, impl Stream> + Unpin)> { +) -> eyre::Result<(DaemonId, impl Stream> + Unpin)> { let remote_daemon_events = remote_daemon_events_rx.into_stream().map(|e| Timestamped { inner: Event::Daemon(e.inner), timestamp: e.timestamp, @@ -1602,7 +1605,7 @@ pub struct RunningDataflow { /// to know which nodes are dynamic. dynamic_nodes: BTreeSet, - open_external_mappings: HashMap>>, + open_external_mappings: BTreeSet, pending_drop_tokens: HashMap, @@ -1625,10 +1628,10 @@ pub struct RunningDataflow { } impl RunningDataflow { - fn new(dataflow_id: Uuid, machine_id: String) -> RunningDataflow { + fn new(dataflow_id: Uuid, daemon_id: DaemonId) -> RunningDataflow { Self { id: dataflow_id, - pending_nodes: PendingNodes::new(dataflow_id, machine_id), + pending_nodes: PendingNodes::new(dataflow_id, daemon_id), subscribe_channels: HashMap::new(), drop_channels: HashMap::new(), mappings: HashMap::new(), @@ -1636,7 +1639,7 @@ impl RunningDataflow { open_inputs: BTreeMap::new(), running_nodes: BTreeMap::new(), dynamic_nodes: BTreeSet::new(), - open_external_mappings: HashMap::new(), + open_external_mappings: Default::default(), pending_drop_tokens: HashMap::new(), _timer_handles: Vec::new(), stop_sent: false, diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 2d4ce5e4..b36807d1 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -5,6 +5,7 @@ use dora_core::{ uhlc::{Timestamp, HLC}, }; use dora_message::{ + common::DaemonId, daemon_to_coordinator::{CoordinatorRequest, DaemonEvent, LogLevel, LogMessage, Timestamped}, daemon_to_node::DaemonReply, DataflowId, @@ -16,7 +17,7 @@ use crate::{socket_stream_utils::socket_stream_send, CascadingErrorCauses}; pub struct PendingNodes { dataflow_id: DataflowId, - machine_id: String, + daemon_id: DaemonId, /// The local nodes that are still waiting to start. local_nodes: HashSet, @@ -38,10 +39,10 @@ pub struct PendingNodes { } impl PendingNodes { - pub fn new(dataflow_id: DataflowId, machine_id: String) -> Self { + pub fn new(dataflow_id: DataflowId, daemon_id: DaemonId) -> Self { Self { dataflow_id, - machine_id, + daemon_id, local_nodes: HashSet::new(), external_nodes: false, waiting_subscribers: HashMap::new(), @@ -205,7 +206,7 @@ impl PendingNodes { let msg = serde_json::to_vec(&Timestamped { inner: CoordinatorRequest::Event { - daemon_id: self.machine_id.clone(), + daemon_id: self.daemon_id.clone(), event: DaemonEvent::AllNodesReady { dataflow_id: self.dataflow_id, exited_before_subscribe: self.exited_before_subscribe.clone(), diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 77020401..f1a4fe46 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -160,7 +160,7 @@ async fn connected_machines( .await?; let result = reply.await??; let machines = match result { - ControlRequestReply::ConnectedMachines(machines) => machines, + ControlRequestReply::ConnectedDaemons(machines) => machines, ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected start dataflow reply: {other:?}"), }; diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 6b6d78e8..38dcd3f3 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,6 +1,6 @@ use dora_message::{ config::{Input, InputMapping, NodeRunConfig}, - id::{DataId, OperatorId}, + id::{DataId, NodeId, OperatorId}, }; use eyre::{bail, Context, OptionExt, Result}; use std::{ @@ -12,8 +12,8 @@ use std::{ // reexport for compatibility pub use dora_message::descriptor::{ CoreNodeKind, CustomNode, Descriptor, Node, OperatorConfig, OperatorDefinition, OperatorSource, - PythonSource, ResolvedDeploy, ResolvedNode, RuntimeNode, SingleOperatorDefinition, - DYNAMIC_SOURCE, SHELL_SOURCE, + PythonSource, ResolvedNode, RuntimeNode, SingleOperatorDefinition, DYNAMIC_SOURCE, + SHELL_SOURCE, }; pub use validate::ResolvedNodeExt; pub use visualize::collect_dora_timers; @@ -22,7 +22,7 @@ mod validate; mod visualize; pub trait DescriptorExt { - fn resolve_aliases_and_set_defaults(&self) -> eyre::Result>; + fn resolve_aliases_and_set_defaults(&self) -> eyre::Result>; fn visualize_as_mermaid(&self) -> eyre::Result; fn blocking_read(path: &Path) -> eyre::Result; fn parse(buf: Vec) -> eyre::Result; @@ -33,7 +33,7 @@ pub trait DescriptorExt { pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op"; impl DescriptorExt for Descriptor { - fn resolve_aliases_and_set_defaults(&self) -> eyre::Result> { + fn resolve_aliases_and_set_defaults(&self) -> eyre::Result> { let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string()); let single_operator_nodes: HashMap<_, _> = self @@ -46,7 +46,7 @@ impl DescriptorExt for Descriptor { }) .collect(); - let mut resolved = vec![]; + let mut resolved = BTreeMap::new(); for mut node in self.nodes.clone() { // adjust input mappings let mut node_kind = node_kind_mut(&mut node)?; @@ -95,21 +95,17 @@ impl DescriptorExt for Descriptor { }), }; - resolved.push(ResolvedNode { - id: node.id, - name: node.name, - description: node.description, - env: node.env, - deploy: { - let default_machine = self.deploy.machine.as_deref().unwrap_or_default(); - let machine = match node.deploy.machine { - Some(m) => m, - None => default_machine.to_owned(), - }; - ResolvedDeploy { machine } + resolved.insert( + node.id.clone(), + ResolvedNode { + id: node.id, + name: node.name, + description: node.description, + env: node.env, + deploy: node.deploy, + kind, }, - kind, - }); + ); } Ok(resolved) diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index cffc7f82..c28bd451 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -7,10 +7,10 @@ use crate::{ use dora_message::{ config::{Input, InputMapping, UserInputMapping}, descriptor::{CoreNodeKind, OperatorSource, ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE}, - id::{DataId, OperatorId}, + id::{DataId, NodeId, OperatorId}, }; use eyre::{bail, eyre, Context}; -use std::{path::Path, process::Command}; +use std::{collections::BTreeMap, path::Path, process::Command}; use tracing::info; use super::{resolve_path, Descriptor, DescriptorExt}; @@ -26,7 +26,7 @@ pub fn check_dataflow( let mut has_python_operator = false; // check that nodes and operators exist - for node in &nodes { + for node in nodes.values() { match &node.kind { descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() { SHELL_SOURCE => (), @@ -35,10 +35,11 @@ pub fn check_dataflow( if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. } else if let Some(remote_daemon_id) = remote_daemon_id { - if remote_daemon_id.contains(&node.deploy.machine.as_str()) - || coordinator_is_remote - { - info!("skipping path check for remote node `{}`", node.id); + if let Some(machine) = &node.deploy.machine { + if remote_daemon_id.contains(&machine.as_str()) || coordinator_is_remote + { + info!("skipping path check for remote node `{}`", node.id); + } } } else { resolve_path(source, working_dir) @@ -82,7 +83,7 @@ pub fn check_dataflow( } // check that all inputs mappings point to an existing output - for node in &nodes { + for node in nodes.values() { match &node.kind { descriptor::CoreNodeKind::Custom(custom_node) => { for (input_id, input) in &custom_node.run_config.inputs { @@ -104,7 +105,7 @@ pub fn check_dataflow( } // Check that nodes can resolve `send_stdout_as` - for node in &nodes { + for node in nodes.values() { node.send_stdout_as() .context("Could not resolve `send_stdout_as` configuration")?; } @@ -149,13 +150,13 @@ impl ResolvedNodeExt for ResolvedNode { fn check_input( input: &Input, - nodes: &[super::ResolvedNode], + nodes: &BTreeMap, input_id_str: &str, ) -> Result<(), eyre::ErrReport> { match &input.mapping { InputMapping::Timer { interval: _ } => {} InputMapping::User(UserInputMapping { source, output }) => { - let source_node = nodes.iter().find(|n| &n.id == source).ok_or_else(|| { + let source_node = nodes.values().find(|n| &n.id == source).ok_or_else(|| { eyre!("source node `{source}` mapped to input `{input_id_str}` does not exist",) })?; match &source_node.kind { diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index cfd00cac..b16bd555 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -11,16 +11,16 @@ use std::{ time::Duration, }; -pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { +pub fn visualize_nodes(nodes: &BTreeMap) -> String { let mut flowchart = "flowchart TB\n".to_owned(); let mut all_nodes = HashMap::new(); - for node in nodes { + for node in nodes.values() { visualize_node(node, &mut flowchart); all_nodes.insert(&node.id, node); } - let dora_timers = collect_dora_timers(nodes); + let dora_timers = collect_dora_timers(&nodes); if !dora_timers.is_empty() { writeln!(flowchart, "subgraph ___dora___ [dora]").unwrap(); writeln!(flowchart, " subgraph ___timer_timer___ [timer]").unwrap(); @@ -32,16 +32,16 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { flowchart.push_str("end\n"); } - for node in nodes { + for node in nodes.values() { visualize_node_inputs(node, &mut flowchart, &all_nodes) } flowchart } -pub fn collect_dora_timers(nodes: &[ResolvedNode]) -> BTreeSet { +pub fn collect_dora_timers(nodes: &BTreeMap) -> BTreeSet { let mut dora_timers = BTreeSet::new(); - for node in nodes { + for node in nodes.values() { match &node.kind { CoreNodeKind::Runtime(node) => { for operator in &node.operators { diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 830bf4aa..2470cd6b 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -190,3 +190,34 @@ impl DropToken { Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext))) } } + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)] +pub struct DaemonId { + machine_id: Option, + uuid: Uuid, +} + +impl DaemonId { + pub fn new(machine_id: Option) -> Self { + DaemonId { + machine_id, + uuid: Uuid::new_v4(), + } + } + + pub fn matches_machine_id(&self, machine_id: &str) -> bool { + self.machine_id + .as_ref() + .map(|id| id == machine_id) + .unwrap_or_default() + } +} + +impl std::fmt::Display for DaemonId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(id) = &self.machine_id { + write!(f, "{id}-")?; + } + write!(f, "{}", self.uuid) + } +} diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index e8eb64d0..d5477a07 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet}; use uuid::Uuid; pub use crate::common::{LogMessage, NodeError, NodeErrorCause, NodeExitStatus}; -use crate::id::NodeId; +use crate::{common::DaemonId, id::NodeId}; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { @@ -15,7 +15,7 @@ pub enum ControlRequestReply { DataflowList(DataflowList), DestroyOk, DaemonConnected(bool), - ConnectedMachines(BTreeSet), + ConnectedDaemons(BTreeSet), Logs(Vec), } diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index c91c8fa9..17e1537a 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -1,6 +1,11 @@ -use std::{path::PathBuf, time::Duration}; +use std::{ + collections::{BTreeMap, BTreeSet}, + path::PathBuf, + time::Duration, +}; use crate::{ + common::DaemonId, descriptor::{Descriptor, ResolvedNode}, id::{NodeId, OperatorId}, DataflowId, @@ -17,8 +22,6 @@ pub enum RegisterResult { Err(String), } -type DaemonId = String; - impl RegisterResult { pub fn to_result(self) -> eyre::Result { match self { @@ -58,6 +61,7 @@ pub enum DaemonCoordinatorEvent { pub struct SpawnDataflowNodes { pub dataflow_id: DataflowId, pub working_dir: PathBuf, - pub nodes: Vec, + pub nodes: BTreeMap, pub dataflow_descriptor: Descriptor, + pub spawn_nodes: BTreeSet, } diff --git a/libraries/message/src/daemon_to_coordinator.rs b/libraries/message/src/daemon_to_coordinator.rs index 1e57607f..0c9d0845 100644 --- a/libraries/message/src/daemon_to_coordinator.rs +++ b/libraries/message/src/daemon_to_coordinator.rs @@ -3,13 +3,13 @@ use std::collections::BTreeMap; pub use crate::common::{ DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped, }; -use crate::{current_crate_version, id::NodeId, versions_compatible, DataflowId}; +use crate::{common::DaemonId, current_crate_version, id::NodeId, versions_compatible, DataflowId}; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum CoordinatorRequest { Register(DaemonRegisterRequest), Event { - daemon_id: String, + daemon_id: DaemonId, event: DaemonEvent, }, } diff --git a/libraries/message/src/descriptor.rs b/libraries/message/src/descriptor.rs index a84963e3..6b7a5320 100644 --- a/libraries/message/src/descriptor.rs +++ b/libraries/message/src/descriptor.rs @@ -81,17 +81,12 @@ pub struct ResolvedNode { pub env: Option>, #[serde(default)] - pub deploy: ResolvedDeploy, + pub deploy: Deploy, #[serde(flatten)] pub kind: CoreNodeKind, } -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -pub struct ResolvedDeploy { - pub machine: String, -} - #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum CoreNodeKind {