Browse Source

Assign unique ID to each daemon

The previous machine ID is still used, but optional. Users don't need to ensure that the chosen machine IDs are unique anymore because they are augmented with a UUID.
tags/v0.3.10-rc3
Philipp Oppermann 11 months ago
parent
commit
0f3c0fb2fe
Failed to extract signature
19 changed files with 296 additions and 185 deletions
  1. +10
    -0
      Cargo.lock
  2. +1
    -1
      binaries/cli/src/attach.rs
  3. +1
    -1
      binaries/cli/src/lib.rs
  4. +1
    -0
      binaries/coordinator/Cargo.toml
  5. +120
    -63
      binaries/coordinator/src/lib.rs
  6. +6
    -9
      binaries/coordinator/src/listener.rs
  7. +42
    -27
      binaries/coordinator/src/run/mod.rs
  8. +2
    -2
      binaries/daemon/src/coordinator.rs
  9. +29
    -26
      binaries/daemon/src/lib.rs
  10. +5
    -4
      binaries/daemon/src/pending.rs
  11. +1
    -1
      examples/multiple-daemons/run.rs
  12. +16
    -20
      libraries/core/src/descriptor/mod.rs
  13. +12
    -11
      libraries/core/src/descriptor/validate.rs
  14. +6
    -6
      libraries/core/src/descriptor/visualize.rs
  15. +31
    -0
      libraries/message/src/common.rs
  16. +2
    -2
      libraries/message/src/coordinator_to_cli.rs
  17. +8
    -4
      libraries/message/src/coordinator_to_daemon.rs
  18. +2
    -2
      libraries/message/src/daemon_to_coordinator.rs
  19. +1
    -6
      libraries/message/src/descriptor.rs

+ 10
- 0
Cargo.lock View File

@@ -2527,6 +2527,7 @@ dependencies = [
"eyre",
"futures",
"futures-concurrency",
"itertools 0.14.0",
"log",
"names",
"serde_json",
@@ -4941,6 +4942,15 @@ dependencies = [
"either",
]

[[package]]
name = "itertools"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
dependencies = [
"either",
]

[[package]]
name = "itoa"
version = "1.0.11"


+ 1
- 1
binaries/cli/src/attach.rs View File

@@ -40,7 +40,7 @@ pub fn attach_dataflow(
.ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
.to_owned();

for node in nodes {
for node in nodes.into_values() {
match node.kind {
// Reloading Custom Nodes is not supported. See: https://github.com/dora-rs/dora/pull/239#discussion_r1154313139
CoreNodeKind::Custom(_cn) => (),


+ 1
- 1
binaries/cli/src/lib.rs View File

@@ -265,7 +265,7 @@ enum Lang {
pub fn lib_main(args: Args) {
if let Err(err) = run(args) {
eprintln!("\n\n{}", "[ERROR]".bold().red());
eprintln!("{err:#}");
eprintln!("{err:?}");
std::process::exit(1);
}
}


+ 1
- 0
binaries/coordinator/Cargo.toml View File

@@ -28,3 +28,4 @@ names = "0.14.0"
ctrlc = "3.2.5"
log = { version = "0.4.21", features = ["serde"] }
dora-message = { workspace = true }
itertools = "0.14.0"

+ 120
- 63
binaries/coordinator/src/lib.rs View File

@@ -9,6 +9,7 @@ use dora_core::{
};
use dora_message::{
cli_to_coordinator::ControlRequest,
common::DaemonId,
coordinator_to_cli::{
ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
DataflowStatus, LogMessage,
@@ -121,6 +122,65 @@ fn resolve_name(
}
}

#[derive(Default)]
struct DaemonConnections {
daemons: BTreeMap<DaemonId, DaemonConnection>,
default_deamon_id: Option<DaemonId>,
}

impl DaemonConnections {
fn add(&mut self, daemon_id: DaemonId, connection: DaemonConnection) {
let previous = self.daemons.insert(daemon_id.clone(), connection);
if previous.is_some() {
tracing::info!("closing previous connection `{daemon_id}` on new register");
}
}

fn get_default_mut(&mut self) -> Option<&mut DaemonConnection> {
self.default_deamon_id
.as_ref()
.and_then(|id| self.daemons.get_mut(id))
}

fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
self.daemons.get_mut(id)
}

fn get_matching_daemon_id(&self, machine_id: Option<&str>) -> Option<&DaemonId> {
match machine_id {
Some(machine_id) => self
.daemons
.keys()
.find(|id| id.matches_machine_id(machine_id)),
None => self.default_deamon_id.as_ref(),
}
}

fn drain(&mut self) -> impl Iterator<Item = (DaemonId, DaemonConnection)> {
std::mem::take(&mut self.daemons).into_iter()
}

fn is_empty(&self) -> bool {
self.daemons.is_empty()
}

fn keys(&self) -> impl Iterator<Item = &DaemonId> {
self.daemons.keys()
}

fn iter(&self) -> impl Iterator<Item = (&DaemonId, &DaemonConnection)> {
self.daemons.iter()
}

fn iter_mut(&mut self) -> impl Iterator<Item = (&DaemonId, &mut DaemonConnection)> {
self.daemons.iter_mut()
}

fn remove(&mut self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
self.daemons.remove(daemon_id)
}
}

async fn start_inner(
events: impl Stream<Item = Event> + Unpin,
tasks: &FuturesUnordered<JoinHandle<()>>,
@@ -142,10 +202,10 @@ async fn start_inner(
let mut events = (abortable_events, daemon_events).merge();

let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<Uuid, BTreeMap<String, DataflowDaemonResult>> =
let mut dataflow_results: HashMap<Uuid, BTreeMap<DaemonId, DataflowDaemonResult>> =
HashMap::new();
let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new();
let mut daemon_connections = DaemonConnections::default();

while let Some(event) = events.next().await {
if event.log() {
@@ -178,11 +238,7 @@ async fn start_inner(
version_check_result,
} => {
// assign a unique ID to the daemon
let daemon_id = format!(
"{}{}",
machine_id.map(|id| format!("{id}-")).unwrap_or_default(),
Uuid::new_v4()
);
let daemon_id = DaemonId::new(machine_id);

let reply: Timestamped<RegisterResult> = Timestamped {
inner: match &version_check_result {
@@ -193,23 +249,19 @@ async fn start_inner(
},
timestamp: clock.new_timestamp(),
};

let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
.await
.context("tcp send failed");
match version_check_result.map_err(|e| eyre!(e)).and(send_result) {
Ok(()) => {
let previous = daemon_connections.insert(
daemon_connections.add(
daemon_id.clone(),
DaemonConnection {
stream: connection,
last_heartbeat: Instant::now(),
},
);
if let Some(_previous) = previous {
tracing::info!(
"closing previous connection `{daemon_id}` on new register"
);
}
}
Err(err) => {
tracing::warn!("failed to register daemon connection for daemon `{daemon_id}`: {err}");
@@ -218,18 +270,18 @@ async fn start_inner(
}
},
Event::Dataflow { uuid, event } => match event {
DataflowEvent::ReadyOnMachine {
machine_id,
DataflowEvent::ReadyOnDeamon {
daemon_id: machine_id,
exited_before_subscribe,
} => {
match running_dataflows.entry(uuid) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
let dataflow = entry.get_mut();
dataflow.pending_machines.remove(&machine_id);
dataflow.pending_daemons.remove(&machine_id);
dataflow
.exited_before_subscribe
.extend(exited_before_subscribe);
if dataflow.pending_machines.is_empty() {
if dataflow.pending_daemons.is_empty() {
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::AllNodesReady {
dataflow_id: uuid,
@@ -242,7 +294,7 @@ async fn start_inner(
.wrap_err("failed to serialize AllNodesReady message")?;

// notify all machines that run parts of the dataflow
for machine_id in &dataflow.machines {
for machine_id in &dataflow.daemons {
let Some(connection) = daemon_connections.get_mut(machine_id)
else {
tracing::warn!(
@@ -266,21 +318,21 @@ async fn start_inner(
}
}
}
DataflowEvent::DataflowFinishedOnMachine { machine_id, result } => {
DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result } => {
match running_dataflows.entry(uuid) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
let dataflow = entry.get_mut();
dataflow.machines.remove(&machine_id);
dataflow.daemons.remove(&daemon_id);
tracing::info!(
"removed machine id: {machine_id} from dataflow: {:#?}",
"removed machine id: {daemon_id} from dataflow: {:#?}",
dataflow.uuid
);
dataflow_results
.entry(uuid)
.or_default()
.insert(machine_id, result);
.insert(daemon_id, result);

if dataflow.machines.is_empty() {
if dataflow.daemons.is_empty() {
// Archive finished dataflow
archived_dataflows
.entry(uuid)
@@ -301,7 +353,7 @@ async fn start_inner(
}
}
std::collections::hash_map::Entry::Vacant(_) => {
tracing::warn!("dataflow not running on DataflowFinishedOnMachine");
tracing::warn!("dataflow not running on DataflowFinishedOnDaemon");
}
}
}
@@ -538,7 +590,7 @@ async fn start_inner(
.send(Ok(ControlRequestReply::DaemonConnected(running)));
}
ControlRequest::ConnectedMachines => {
let reply = Ok(ControlRequestReply::ConnectedMachines(
let reply = Ok(ControlRequestReply::ConnectedDaemons(
daemon_connections.keys().cloned().collect(),
));
let _ = reply_sender.send(reply);
@@ -565,7 +617,7 @@ async fn start_inner(
},
Event::DaemonHeartbeatInterval => {
let mut disconnected = BTreeSet::new();
for (machine_id, connection) in &mut daemon_connections {
for (machine_id, connection) in daemon_connections.iter_mut() {
if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
tracing::warn!(
"no heartbeat message from machine `{machine_id}` since {:?}",
@@ -609,7 +661,9 @@ async fn start_inner(
)
.await?;
}
Event::DaemonHeartbeat { machine_id } => {
Event::DaemonHeartbeat {
daemon_id: machine_id,
} => {
if let Some(connection) = daemon_connections.get_mut(&machine_id) {
connection.last_heartbeat = Instant::now();
}
@@ -638,7 +692,7 @@ async fn start_inner(
}

fn dataflow_result(
results: &BTreeMap<String, DataflowDaemonResult>,
results: &BTreeMap<DaemonId, DataflowDaemonResult>,
dataflow_uuid: Uuid,
clock: &uhlc::HLC,
) -> DataflowResult {
@@ -664,7 +718,7 @@ struct DaemonConnection {

async fn handle_destroy(
running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
daemon_connections: &mut DaemonConnections,
abortable_events: &futures::stream::AbortHandle,
daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
clock: &HLC,
@@ -704,12 +758,12 @@ async fn send_heartbeat_message(
struct RunningDataflow {
name: Option<String>,
uuid: Uuid,
/// The IDs of the machines that the dataflow is running on.
machines: BTreeSet<String>,
/// IDs of machines that are waiting until all nodes are started.
pending_machines: BTreeSet<String>,
/// The IDs of the daemons that the dataflow is running on.
daemons: BTreeSet<DaemonId>,
/// IDs of daemons that are waiting until all nodes are started.
pending_daemons: BTreeSet<DaemonId>,
exited_before_subscribe: Vec<NodeId>,
nodes: Vec<ResolvedNode>,
nodes: BTreeMap<NodeId, ResolvedNode>,

reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,

@@ -718,7 +772,7 @@ struct RunningDataflow {

struct ArchivedDataflow {
name: Option<String>,
nodes: Vec<ResolvedNode>,
nodes: BTreeMap<NodeId, ResolvedNode>,
}

impl From<&RunningDataflow> for ArchivedDataflow {
@@ -732,7 +786,7 @@ impl From<&RunningDataflow> for ArchivedDataflow {

impl PartialEq for RunningDataflow {
fn eq(&self, other: &Self) -> bool {
self.name == other.name && self.uuid == other.uuid && self.machines == other.machines
self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
}
}

@@ -741,7 +795,7 @@ impl Eq for RunningDataflow {}
async fn stop_dataflow<'a>(
running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>,
dataflow_uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
daemon_connections: &mut DaemonConnections,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
) -> eyre::Result<&'a mut RunningDataflow> {
@@ -757,9 +811,9 @@ async fn stop_dataflow<'a>(
timestamp,
})?;

for machine_id in &dataflow.machines {
for daemon_id in &dataflow.daemons {
let daemon_connection = daemon_connections
.get_mut(machine_id)
.get_mut(daemon_id)
.wrap_err("no daemon connection")?; // TODO: take from dataflow spec
tcp_send(&mut daemon_connection.stream, &message)
.await
@@ -789,7 +843,7 @@ async fn reload_dataflow(
dataflow_id: Uuid,
node_id: NodeId,
operator_id: Option<OperatorId>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
daemon_connections: &mut DaemonConnections,
timestamp: uhlc::Timestamp,
) -> eyre::Result<()> {
let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
@@ -804,7 +858,7 @@ async fn reload_dataflow(
timestamp,
})?;

for machine_id in &dataflow.machines {
for machine_id in &dataflow.daemons {
let daemon_connection = daemon_connections
.get_mut(machine_id)
.wrap_err("no daemon connection")?; // TODO: take from dataflow spec
@@ -835,7 +889,7 @@ async fn retrieve_logs(
archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
dataflow_id: Uuid,
node_id: NodeId,
daemon_connections: &mut HashMap<String, DaemonConnection>,
daemon_connections: &mut DaemonConnections,
timestamp: uhlc::Timestamp,
) -> eyre::Result<Vec<u8>> {
let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
@@ -854,8 +908,8 @@ async fn retrieve_logs(
timestamp,
})?;

let machine_ids: Vec<String> = nodes
.iter()
let machine_ids: Vec<Option<String>> = nodes
.values()
.filter(|node| node.id == node_id)
.map(|node| node.deploy.machine.clone())
.collect();
@@ -872,9 +926,12 @@ async fn retrieve_logs(
)
};

let daemon_connection = daemon_connections
.get_mut(machine_id.as_str())
.wrap_err("no daemon connection")?;
let daemon_connection = match machine_id {
None => daemon_connections
.get_default_mut()
.wrap_err("no default daemon connection")?,
Some(machine_id) => unimplemented!(),
};
tcp_send(&mut daemon_connection.stream, &message)
.await
.wrap_err("failed to send logs message to daemon")?;
@@ -898,24 +955,24 @@ async fn start_dataflow(
dataflow: Descriptor,
working_dir: PathBuf,
name: Option<String>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
daemon_connections: &mut DaemonConnections,
clock: &HLC,
) -> eyre::Result<RunningDataflow> {
let SpawnedDataflow {
uuid,
machines,
daemons,
nodes,
} = spawn_dataflow(dataflow, working_dir, daemon_connections, clock).await?;
Ok(RunningDataflow {
uuid,
name,
pending_machines: if machines.len() > 1 {
machines.clone()
pending_daemons: if daemons.len() > 1 {
daemons.clone()
} else {
BTreeSet::new()
},
exited_before_subscribe: Default::default(),
machines,
daemons,
nodes,
reply_senders: Vec::new(),
log_subscribers: Vec::new(),
@@ -923,7 +980,7 @@ async fn start_dataflow(
}

async fn destroy_daemon(
machine_id: String,
daemon_id: DaemonId,
mut daemon_connection: DaemonConnection,

timestamp: uhlc::Timestamp,
@@ -936,7 +993,7 @@ async fn destroy_daemon(
tcp_send(&mut daemon_connection.stream, &message)
.await
.wrap_err(format!(
"failed to send destroy message to daemon `{machine_id}`"
"failed to send destroy message to daemon `{daemon_id}`"
))?;

// wait for reply
@@ -952,18 +1009,18 @@ async fn destroy_daemon(
other => bail!("unexpected reply after sending `destroy`: {other:?}"),
}

tracing::info!("successfully destroyed daemon `{machine_id}`");
tracing::info!("successfully destroyed daemon `{daemon_id}`");
Ok(())
}

async fn destroy_daemons(
daemon_connections: &mut HashMap<String, DaemonConnection>,
daemon_connections: &mut DaemonConnections,
timestamp: uhlc::Timestamp,
) -> eyre::Result<()> {
let futures = daemon_connections
.drain()
.map(|(machine_id, daemon_connection)| {
destroy_daemon(machine_id, daemon_connection, timestamp)
.map(|(daemon_id, daemon_connection)| {
destroy_daemon(daemon_id, daemon_connection, timestamp)
})
.collect::<Vec<_>>();
let results: Vec<std::result::Result<(), eyre::Error>> =
@@ -978,7 +1035,7 @@ async fn destroy_daemons(
pub enum Event {
NewDaemonConnection(TcpStream),
DaemonConnectError(eyre::Report),
DaemonHeartbeat { machine_id: String },
DaemonHeartbeat { daemon_id: DaemonId },
Dataflow { uuid: Uuid, event: DataflowEvent },
Control(ControlEvent),
Daemon(DaemonRequest),
@@ -1000,12 +1057,12 @@ impl Event {

#[derive(Debug)]
pub enum DataflowEvent {
DataflowFinishedOnMachine {
machine_id: String,
DataflowFinishedOnDaemon {
daemon_id: DaemonId,
result: DataflowDaemonResult,
},
ReadyOnMachine {
machine_id: String,
ReadyOnDeamon {
daemon_id: DaemonId,
exited_before_subscribe: Vec<NodeId>,
},
}


+ 6
- 9
binaries/coordinator/src/listener.rs View File

@@ -36,7 +36,7 @@ pub async fn handle_connection(
}
};
let message: Timestamped<CoordinatorRequest> =
match serde_json::from_slice(&raw).wrap_err("failed to deserialize node message") {
match serde_json::from_slice(&raw).wrap_err("failed to deserialize message") {
Ok(e) => e,
Err(err) => {
tracing::warn!("{err:?}");
@@ -59,18 +59,15 @@ pub async fn handle_connection(
let _ = events_tx.send(Event::Daemon(event)).await;
break;
}
CoordinatorRequest::Event {
daemon_id: machine_id,
event,
} => match event {
CoordinatorRequest::Event { daemon_id, event } => match event {
DaemonEvent::AllNodesReady {
dataflow_id,
exited_before_subscribe,
} => {
let event = Event::Dataflow {
uuid: dataflow_id,
event: DataflowEvent::ReadyOnMachine {
machine_id,
event: DataflowEvent::ReadyOnDeamon {
daemon_id,
exited_before_subscribe,
},
};
@@ -84,14 +81,14 @@ pub async fn handle_connection(
} => {
let event = Event::Dataflow {
uuid: dataflow_id,
event: DataflowEvent::DataflowFinishedOnMachine { machine_id, result },
event: DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result },
};
if events_tx.send(event).await.is_err() {
break;
}
}
DaemonEvent::Heartbeat => {
let event = Event::DaemonHeartbeat { machine_id };
let event = Event::DaemonHeartbeat { daemon_id };
if events_tx.send(event).await.is_err() {
break;
}


+ 42
- 27
binaries/coordinator/src/run/mod.rs View File

@@ -1,17 +1,20 @@
use crate::{
tcp_utils::{tcp_receive, tcp_send},
DaemonConnection,
DaemonConnections,
};

use dora_core::{descriptor::DescriptorExt, uhlc::HLC};
use dora_message::{
common::DaemonId,
coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes, Timestamped},
daemon_to_coordinator::DaemonCoordinatorReply,
descriptor::{Descriptor, ResolvedNode},
id::NodeId,
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use itertools::Itertools;
use std::{
collections::{BTreeSet, HashMap},
collections::{BTreeMap, BTreeSet},
path::PathBuf,
};
use uuid::{NoContext, Timestamp, Uuid};
@@ -20,7 +23,7 @@ use uuid::{NoContext, Timestamp, Uuid};
pub(super) async fn spawn_dataflow(
dataflow: Descriptor,
working_dir: PathBuf,
daemon_connections: &mut HashMap<String, DaemonConnection>,
daemon_connections: &mut DaemonConnections,
clock: &HLC,
) -> eyre::Result<SpawnedDataflow> {
dataflow.check_in_daemon(&working_dir, false)?;
@@ -28,43 +31,55 @@ pub(super) async fn spawn_dataflow(
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));

let machines: BTreeSet<_> = nodes.iter().map(|n| n.deploy.machine.clone()).collect();
let nodes_by_daemon = nodes.values().into_group_map_by(|n| &n.deploy.machine);

let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
working_dir,
nodes: nodes.clone(),
dataflow_descriptor: dataflow,
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),
timestamp: clock.new_timestamp(),
})?;
let mut daemons = BTreeSet::new();
for (machine, nodes_on_machine) in &nodes_by_daemon {
let spawn_nodes = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
tracing::trace!(
"Spawning dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})"
);

for machine in &machines {
tracing::trace!("Spawning dataflow `{uuid}` on machine `{machine}`");
spawn_dataflow_on_machine(daemon_connections, machine, &message)
let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,
working_dir: working_dir.clone(),
nodes: nodes.clone(),
dataflow_descriptor: dataflow.clone(),
spawn_nodes,
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),
timestamp: clock.new_timestamp(),
})?;

let daemon_id = spawn_dataflow_on_machine(daemon_connections, machine.as_deref(), &message)
.await
.wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine}`"))?;
.wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?;
daemons.insert(daemon_id);
}

tracing::info!("successfully spawned dataflow `{uuid}`");

Ok(SpawnedDataflow {
uuid,
machines,
daemons,
nodes,
})
}

async fn spawn_dataflow_on_machine(
daemon_connections: &mut HashMap<String, DaemonConnection>,
machine: &str,
daemon_connections: &mut DaemonConnections,
machine: Option<&str>,
message: &[u8],
) -> Result<(), eyre::ErrReport> {
) -> Result<DaemonId, eyre::ErrReport> {
let daemon_id = daemon_connections
.get_matching_daemon_id(machine)
.wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
.clone();

let daemon_connection = daemon_connections
.get_mut(machine)
.wrap_err_with(|| format!("no daemon connection for machine `{machine}`"))?;
.get_mut(&daemon_id)
.wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?;
tcp_send(&mut daemon_connection.stream, message)
.await
.wrap_err("failed to send spawn message to daemon")?;
@@ -79,11 +94,11 @@ async fn spawn_dataflow_on_machine(
.wrap_err("daemon returned an error")?,
_ => bail!("unexpected reply"),
}
Ok(())
Ok(daemon_id)
}

pub struct SpawnedDataflow {
pub uuid: Uuid,
pub machines: BTreeSet<String>,
pub nodes: Vec<ResolvedNode>,
pub daemons: BTreeSet<DaemonId>,
pub nodes: BTreeMap<NodeId, ResolvedNode>,
}

+ 2
- 2
binaries/daemon/src/coordinator.rs View File

@@ -4,7 +4,7 @@ use crate::{
};
use dora_core::uhlc::HLC;
use dora_message::{
common::Timestamped,
common::{DaemonId, Timestamped},
coordinator_to_daemon::RegisterResult,
daemon_to_coordinator::{CoordinatorRequest, DaemonCoordinatorReply, DaemonRegisterRequest},
};
@@ -30,7 +30,7 @@ pub async fn register(
addr: SocketAddr,
machine_id: Option<String>,
clock: &HLC,
) -> eyre::Result<(String, impl Stream<Item = Timestamped<CoordinatorEvent>>)> {
) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<CoordinatorEvent>>)> {
let mut stream = loop {
match TcpStream::connect(addr)
.await


+ 29
- 26
binaries/daemon/src/lib.rs View File

@@ -11,7 +11,9 @@ use dora_core::{
uhlc::{self, HLC},
};
use dora_message::{
common::{DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus},
common::{
DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
},
coordinator_to_cli::DataflowResult,
coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
daemon_to_coordinator::{
@@ -78,7 +80,7 @@ pub struct Daemon {

coordinator_connection: Option<TcpStream>,
last_coordinator_heartbeat: Instant,
daemon_id: String,
daemon_id: DaemonId,

/// used for testing and examples
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
@@ -148,6 +150,7 @@ impl Daemon {
let spawn_command = SpawnDataflowNodes {
dataflow_id,
working_dir,
spawn_nodes: nodes.keys().cloned().collect(),
nodes,
dataflow_descriptor: descriptor,
};
@@ -158,7 +161,7 @@ impl Daemon {

let exit_when_done = spawn_command
.nodes
.iter()
.values()
.map(|n| (spawn_command.dataflow_id, n.id.clone()))
.collect();
let (reply_tx, reply_rx) = oneshot::channel();
@@ -176,7 +179,7 @@ impl Daemon {
let run_result = Self::run_general(
Box::pin(events),
None,
"default".into(),
DaemonId::new(None),
Some(exit_when_done),
clock.clone(),
);
@@ -206,7 +209,7 @@ impl Daemon {
async fn run_general(
external_events: impl Stream<Item = Timestamped<Event>> + Unpin,
coordinator_addr: Option<SocketAddr>,
daemon_id: String,
daemon_id: DaemonId,
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
clock: Arc<HLC>,
) -> eyre::Result<DaemonRunResult> {
@@ -387,6 +390,7 @@ impl Daemon {
working_dir,
nodes,
dataflow_descriptor,
spawn_nodes,
}) => {
match dataflow_descriptor.communication.remote {
dora_core::config::RemoteCommunicationConfig::Tcp => {}
@@ -400,7 +404,13 @@ impl Daemon {
};

let result = self
.spawn_dataflow(dataflow_id, working_dir, nodes, dataflow_descriptor)
.spawn_dataflow(
dataflow_id,
working_dir,
nodes,
dataflow_descriptor,
spawn_nodes,
)
.await;
if let Err(err) = &result {
tracing::error!("{err:?}");
@@ -619,8 +629,9 @@ impl Daemon {
&mut self,
dataflow_id: uuid::Uuid,
working_dir: PathBuf,
nodes: Vec<ResolvedNode>,
nodes: BTreeMap<NodeId, ResolvedNode>,
dataflow_descriptor: Descriptor,
spawn_nodes: BTreeSet<NodeId>,
) -> eyre::Result<()> {
let dataflow = RunningDataflow::new(dataflow_id, self.daemon_id.clone());
let dataflow = match self.running.entry(dataflow_id) {
@@ -634,8 +645,8 @@ impl Daemon {
};

let mut log_messages = Vec::new();
for node in nodes {
let local = node.deploy.machine == self.daemon_id;
for node in nodes.into_values() {
let local = spawn_nodes.contains(&node.id);

let inputs = node_inputs(&node);
for (input_id, input) in inputs {
@@ -664,11 +675,7 @@ impl Daemon {
} else if let InputMapping::User(mapping) = input.mapping {
dataflow
.open_external_mappings
.entry(OutputId(mapping.source, mapping.output))
.or_default()
.entry(node.deploy.machine.clone())
.or_default()
.insert((node.id.clone(), input_id));
.insert(OutputId(mapping.source, mapping.output));
}
}
if local {
@@ -969,12 +976,8 @@ impl Daemon {
.await?;

let output_id = OutputId(node_id, output_id);
let remote_receivers: Vec<_> = dataflow
.open_external_mappings
.get(&output_id)
.map(|m| m.keys().cloned().collect())
.unwrap_or_default();
if !remote_receivers.is_empty() {
let remote_receivers = dataflow.open_external_mappings.contains(&output_id);
if remote_receivers {
let event = InterDaemonEvent::Output {
dataflow_id,
node_id: output_id.0.clone(),
@@ -1049,7 +1052,7 @@ impl Daemon {
}

let mut closed = Vec::new();
for output_id in dataflow.open_external_mappings.keys() {
for output_id in &dataflow.open_external_mappings {
if output_id.0 == node_id && outputs.contains(&output_id.1) {
closed.push(output_id.clone());
}
@@ -1393,7 +1396,7 @@ async fn set_up_event_stream(
remote_daemon_events_rx: flume::Receiver<Timestamped<InterDaemonEvent>>,
// used for dynamic nodes
local_listen_port: u16,
) -> eyre::Result<(String, impl Stream<Item = Timestamped<Event>> + Unpin)> {
) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
let remote_daemon_events = remote_daemon_events_rx.into_stream().map(|e| Timestamped {
inner: Event::Daemon(e.inner),
timestamp: e.timestamp,
@@ -1602,7 +1605,7 @@ pub struct RunningDataflow {
/// to know which nodes are dynamic.
dynamic_nodes: BTreeSet<NodeId>,

open_external_mappings: HashMap<OutputId, BTreeMap<String, BTreeSet<InputId>>>,
open_external_mappings: BTreeSet<OutputId>,

pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,

@@ -1625,10 +1628,10 @@ pub struct RunningDataflow {
}

impl RunningDataflow {
fn new(dataflow_id: Uuid, machine_id: String) -> RunningDataflow {
fn new(dataflow_id: Uuid, daemon_id: DaemonId) -> RunningDataflow {
Self {
id: dataflow_id,
pending_nodes: PendingNodes::new(dataflow_id, machine_id),
pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
subscribe_channels: HashMap::new(),
drop_channels: HashMap::new(),
mappings: HashMap::new(),
@@ -1636,7 +1639,7 @@ impl RunningDataflow {
open_inputs: BTreeMap::new(),
running_nodes: BTreeMap::new(),
dynamic_nodes: BTreeSet::new(),
open_external_mappings: HashMap::new(),
open_external_mappings: Default::default(),
pending_drop_tokens: HashMap::new(),
_timer_handles: Vec::new(),
stop_sent: false,


+ 5
- 4
binaries/daemon/src/pending.rs View File

@@ -5,6 +5,7 @@ use dora_core::{
uhlc::{Timestamp, HLC},
};
use dora_message::{
common::DaemonId,
daemon_to_coordinator::{CoordinatorRequest, DaemonEvent, LogLevel, LogMessage, Timestamped},
daemon_to_node::DaemonReply,
DataflowId,
@@ -16,7 +17,7 @@ use crate::{socket_stream_utils::socket_stream_send, CascadingErrorCauses};

pub struct PendingNodes {
dataflow_id: DataflowId,
machine_id: String,
daemon_id: DaemonId,

/// The local nodes that are still waiting to start.
local_nodes: HashSet<NodeId>,
@@ -38,10 +39,10 @@ pub struct PendingNodes {
}

impl PendingNodes {
pub fn new(dataflow_id: DataflowId, machine_id: String) -> Self {
pub fn new(dataflow_id: DataflowId, daemon_id: DaemonId) -> Self {
Self {
dataflow_id,
machine_id,
daemon_id,
local_nodes: HashSet::new(),
external_nodes: false,
waiting_subscribers: HashMap::new(),
@@ -205,7 +206,7 @@ impl PendingNodes {

let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
daemon_id: self.machine_id.clone(),
daemon_id: self.daemon_id.clone(),
event: DaemonEvent::AllNodesReady {
dataflow_id: self.dataflow_id,
exited_before_subscribe: self.exited_before_subscribe.clone(),


+ 1
- 1
examples/multiple-daemons/run.rs View File

@@ -160,7 +160,7 @@ async fn connected_machines(
.await?;
let result = reply.await??;
let machines = match result {
ControlRequestReply::ConnectedMachines(machines) => machines,
ControlRequestReply::ConnectedDaemons(machines) => machines,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
};


+ 16
- 20
libraries/core/src/descriptor/mod.rs View File

@@ -1,6 +1,6 @@
use dora_message::{
config::{Input, InputMapping, NodeRunConfig},
id::{DataId, OperatorId},
id::{DataId, NodeId, OperatorId},
};
use eyre::{bail, Context, OptionExt, Result};
use std::{
@@ -12,8 +12,8 @@ use std::{
// reexport for compatibility
pub use dora_message::descriptor::{
CoreNodeKind, CustomNode, Descriptor, Node, OperatorConfig, OperatorDefinition, OperatorSource,
PythonSource, ResolvedDeploy, ResolvedNode, RuntimeNode, SingleOperatorDefinition,
DYNAMIC_SOURCE, SHELL_SOURCE,
PythonSource, ResolvedNode, RuntimeNode, SingleOperatorDefinition, DYNAMIC_SOURCE,
SHELL_SOURCE,
};
pub use validate::ResolvedNodeExt;
pub use visualize::collect_dora_timers;
@@ -22,7 +22,7 @@ mod validate;
mod visualize;

pub trait DescriptorExt {
fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<Vec<ResolvedNode>>;
fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>>;
fn visualize_as_mermaid(&self) -> eyre::Result<String>;
fn blocking_read(path: &Path) -> eyre::Result<Descriptor>;
fn parse(buf: Vec<u8>) -> eyre::Result<Descriptor>;
@@ -33,7 +33,7 @@ pub trait DescriptorExt {
pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op";

impl DescriptorExt for Descriptor {
fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<Vec<ResolvedNode>> {
fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>> {
let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());

let single_operator_nodes: HashMap<_, _> = self
@@ -46,7 +46,7 @@ impl DescriptorExt for Descriptor {
})
.collect();

let mut resolved = vec![];
let mut resolved = BTreeMap::new();
for mut node in self.nodes.clone() {
// adjust input mappings
let mut node_kind = node_kind_mut(&mut node)?;
@@ -95,21 +95,17 @@ impl DescriptorExt for Descriptor {
}),
};

resolved.push(ResolvedNode {
id: node.id,
name: node.name,
description: node.description,
env: node.env,
deploy: {
let default_machine = self.deploy.machine.as_deref().unwrap_or_default();
let machine = match node.deploy.machine {
Some(m) => m,
None => default_machine.to_owned(),
};
ResolvedDeploy { machine }
resolved.insert(
node.id.clone(),
ResolvedNode {
id: node.id,
name: node.name,
description: node.description,
env: node.env,
deploy: node.deploy,
kind,
},
kind,
});
);
}

Ok(resolved)


+ 12
- 11
libraries/core/src/descriptor/validate.rs View File

@@ -7,10 +7,10 @@ use crate::{
use dora_message::{
config::{Input, InputMapping, UserInputMapping},
descriptor::{CoreNodeKind, OperatorSource, ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE},
id::{DataId, OperatorId},
id::{DataId, NodeId, OperatorId},
};
use eyre::{bail, eyre, Context};
use std::{path::Path, process::Command};
use std::{collections::BTreeMap, path::Path, process::Command};
use tracing::info;

use super::{resolve_path, Descriptor, DescriptorExt};
@@ -26,7 +26,7 @@ pub fn check_dataflow(
let mut has_python_operator = false;

// check that nodes and operators exist
for node in &nodes {
for node in nodes.values() {
match &node.kind {
descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() {
SHELL_SOURCE => (),
@@ -35,10 +35,11 @@ pub fn check_dataflow(
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else if let Some(remote_daemon_id) = remote_daemon_id {
if remote_daemon_id.contains(&node.deploy.machine.as_str())
|| coordinator_is_remote
{
info!("skipping path check for remote node `{}`", node.id);
if let Some(machine) = &node.deploy.machine {
if remote_daemon_id.contains(&machine.as_str()) || coordinator_is_remote
{
info!("skipping path check for remote node `{}`", node.id);
}
}
} else {
resolve_path(source, working_dir)
@@ -82,7 +83,7 @@ pub fn check_dataflow(
}

// check that all inputs mappings point to an existing output
for node in &nodes {
for node in nodes.values() {
match &node.kind {
descriptor::CoreNodeKind::Custom(custom_node) => {
for (input_id, input) in &custom_node.run_config.inputs {
@@ -104,7 +105,7 @@ pub fn check_dataflow(
}

// Check that nodes can resolve `send_stdout_as`
for node in &nodes {
for node in nodes.values() {
node.send_stdout_as()
.context("Could not resolve `send_stdout_as` configuration")?;
}
@@ -149,13 +150,13 @@ impl ResolvedNodeExt for ResolvedNode {

fn check_input(
input: &Input,
nodes: &[super::ResolvedNode],
nodes: &BTreeMap<NodeId, super::ResolvedNode>,
input_id_str: &str,
) -> Result<(), eyre::ErrReport> {
match &input.mapping {
InputMapping::Timer { interval: _ } => {}
InputMapping::User(UserInputMapping { source, output }) => {
let source_node = nodes.iter().find(|n| &n.id == source).ok_or_else(|| {
let source_node = nodes.values().find(|n| &n.id == source).ok_or_else(|| {
eyre!("source node `{source}` mapped to input `{input_id_str}` does not exist",)
})?;
match &source_node.kind {


+ 6
- 6
libraries/core/src/descriptor/visualize.rs View File

@@ -11,16 +11,16 @@ use std::{
time::Duration,
};

pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String {
pub fn visualize_nodes(nodes: &BTreeMap<NodeId, ResolvedNode>) -> String {
let mut flowchart = "flowchart TB\n".to_owned();
let mut all_nodes = HashMap::new();

for node in nodes {
for node in nodes.values() {
visualize_node(node, &mut flowchart);
all_nodes.insert(&node.id, node);
}

let dora_timers = collect_dora_timers(nodes);
let dora_timers = collect_dora_timers(&nodes);
if !dora_timers.is_empty() {
writeln!(flowchart, "subgraph ___dora___ [dora]").unwrap();
writeln!(flowchart, " subgraph ___timer_timer___ [timer]").unwrap();
@@ -32,16 +32,16 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String {
flowchart.push_str("end\n");
}

for node in nodes {
for node in nodes.values() {
visualize_node_inputs(node, &mut flowchart, &all_nodes)
}

flowchart
}

pub fn collect_dora_timers(nodes: &[ResolvedNode]) -> BTreeSet<Duration> {
pub fn collect_dora_timers(nodes: &BTreeMap<NodeId, ResolvedNode>) -> BTreeSet<Duration> {
let mut dora_timers = BTreeSet::new();
for node in nodes {
for node in nodes.values() {
match &node.kind {
CoreNodeKind::Runtime(node) => {
for operator in &node.operators {


+ 31
- 0
libraries/message/src/common.rs View File

@@ -190,3 +190,34 @@ impl DropToken {
Self(Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext)))
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
pub struct DaemonId {
machine_id: Option<String>,
uuid: Uuid,
}

impl DaemonId {
pub fn new(machine_id: Option<String>) -> Self {
DaemonId {
machine_id,
uuid: Uuid::new_v4(),
}
}

pub fn matches_machine_id(&self, machine_id: &str) -> bool {
self.machine_id
.as_ref()
.map(|id| id == machine_id)
.unwrap_or_default()
}
}

impl std::fmt::Display for DaemonId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(id) = &self.machine_id {
write!(f, "{id}-")?;
}
write!(f, "{}", self.uuid)
}
}

+ 2
- 2
libraries/message/src/coordinator_to_cli.rs View File

@@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet};
use uuid::Uuid;

pub use crate::common::{LogMessage, NodeError, NodeErrorCause, NodeExitStatus};
use crate::id::NodeId;
use crate::{common::DaemonId, id::NodeId};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply {
@@ -15,7 +15,7 @@ pub enum ControlRequestReply {
DataflowList(DataflowList),
DestroyOk,
DaemonConnected(bool),
ConnectedMachines(BTreeSet<String>),
ConnectedDaemons(BTreeSet<DaemonId>),
Logs(Vec<u8>),
}



+ 8
- 4
libraries/message/src/coordinator_to_daemon.rs View File

@@ -1,6 +1,11 @@
use std::{path::PathBuf, time::Duration};
use std::{
collections::{BTreeMap, BTreeSet},
path::PathBuf,
time::Duration,
};

use crate::{
common::DaemonId,
descriptor::{Descriptor, ResolvedNode},
id::{NodeId, OperatorId},
DataflowId,
@@ -17,8 +22,6 @@ pub enum RegisterResult {
Err(String),
}

type DaemonId = String;

impl RegisterResult {
pub fn to_result(self) -> eyre::Result<DaemonId> {
match self {
@@ -58,6 +61,7 @@ pub enum DaemonCoordinatorEvent {
pub struct SpawnDataflowNodes {
pub dataflow_id: DataflowId,
pub working_dir: PathBuf,
pub nodes: Vec<ResolvedNode>,
pub nodes: BTreeMap<NodeId, ResolvedNode>,
pub dataflow_descriptor: Descriptor,
pub spawn_nodes: BTreeSet<NodeId>,
}

+ 2
- 2
libraries/message/src/daemon_to_coordinator.rs View File

@@ -3,13 +3,13 @@ use std::collections::BTreeMap;
pub use crate::common::{
DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped,
};
use crate::{current_crate_version, id::NodeId, versions_compatible, DataflowId};
use crate::{common::DaemonId, current_crate_version, id::NodeId, versions_compatible, DataflowId};

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
Register(DaemonRegisterRequest),
Event {
daemon_id: String,
daemon_id: DaemonId,
event: DaemonEvent,
},
}


+ 1
- 6
libraries/message/src/descriptor.rs View File

@@ -81,17 +81,12 @@ pub struct ResolvedNode {
pub env: Option<BTreeMap<String, EnvValue>>,

#[serde(default)]
pub deploy: ResolvedDeploy,
pub deploy: Deploy,

#[serde(flatten)]
pub kind: CoreNodeKind,
}

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ResolvedDeploy {
pub machine: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CoreNodeKind {


Loading…
Cancel
Save