diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 260aa5b4..372ccc55 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -82,7 +82,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: "Build cli and binaries" - timeout-minutes: 30 + timeout-minutes: 45 run: | cargo install --path binaries/coordinator cargo install --path binaries/daemon diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 99f72bfe..c47a5674 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -59,11 +59,19 @@ impl EventStream { mut close_channel: DaemonChannel, ) -> eyre::Result { channel.register(dataflow_id, node_id.clone())?; - channel + let reply = channel .request(&DaemonRequest::Subscribe) .map_err(|e| eyre!(e)) .wrap_err("failed to create subscription with dora-daemon")?; + match reply { + daemon_messages::DaemonReply::Result(Ok(())) => {} + daemon_messages::DaemonReply::Result(Err(err)) => { + eyre::bail!("subscribe failed: {err}") + } + other => eyre::bail!("unexpected subscribe reply: {other:?}"), + } + close_channel.register(dataflow_id, node_id.clone())?; let (tx, rx) = flume::bounded(0); diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index c594a252..405b9d3e 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -4,7 +4,7 @@ use crate::daemon_connection::DaemonChannel; use dora_core::{ config::NodeId, daemon_messages::{ - DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent, + self, DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent, }, }; use eyre::{eyre, Context}; @@ -45,11 +45,19 @@ impl DropStream { ) -> eyre::Result { channel.register(dataflow_id, node_id.clone())?; - channel + let reply = channel .request(&DaemonRequest::SubscribeDrop) .map_err(|e| eyre!(e)) .wrap_err("failed to create subscription with dora-daemon")?; + match reply { + daemon_messages::DaemonReply::Result(Ok(())) => {} + daemon_messages::DaemonReply::Result(Err(err)) => { + eyre::bail!("drop subscribe failed: {err}") + } + other => eyre::bail!("unexpected drop subscribe reply: {other:?}"), + } + let (tx, rx) = flume::bounded(0); let node_id_cloned = node_id.clone(); diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index 51a8219c..c8987a9d 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -44,7 +44,7 @@ async fn listen( let incoming = match result { Ok(incoming) => incoming, Err(err) => { - let _ = tx.blocking_send(err.into()); + let _ = tx.send(err.into()).await; return; } }; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 881eed14..40758ad7 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -222,15 +222,20 @@ async fn start_inner( } }, Event::Dataflow { uuid, event } => match event { - DataflowEvent::ReadyOnMachine { machine_id } => { + DataflowEvent::ReadyOnMachine { + machine_id, + success, + } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { let dataflow = entry.get_mut(); dataflow.pending_machines.remove(&machine_id); + dataflow.init_success &= success; if dataflow.pending_machines.is_empty() { let message = serde_json::to_vec(&DaemonCoordinatorEvent::AllNodesReady { dataflow_id: uuid, + success: dataflow.init_success, }) .wrap_err("failed to serialize AllNodesReady message")?; @@ -451,7 +456,7 @@ async fn start_inner( continue; } let result: eyre::Result<()> = - tokio::time::timeout(Duration::from_millis(100), send_watchdog_message(&mut connection.stream)) + tokio::time::timeout(Duration::from_millis(500), send_watchdog_message(&mut connection.stream)) .await .wrap_err("timeout") .and_then(|r| r).wrap_err_with(|| @@ -551,6 +556,7 @@ struct RunningDataflow { machines: BTreeSet, /// IDs of machines that are waiting until all nodes are started. pending_machines: BTreeSet, + init_success: bool, nodes: Vec, } @@ -733,6 +739,7 @@ async fn start_dataflow( } else { BTreeSet::new() }, + init_success: true, machines, nodes, }) @@ -798,6 +805,7 @@ pub enum DataflowEvent { }, ReadyOnMachine { machine_id: String, + success: bool, }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index f8561865..6ea02894 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -57,10 +57,16 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende break; } coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { - coordinator_messages::DaemonEvent::AllNodesReady { dataflow_id } => { + coordinator_messages::DaemonEvent::AllNodesReady { + dataflow_id, + success, + } => { let event = Event::Dataflow { uuid: dataflow_id, - event: DataflowEvent::ReadyOnMachine { machine_id }, + event: DataflowEvent::ReadyOnMachine { + machine_id, + success, + }, }; if events_tx.send(event).await.is_err() { break; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index bd4b0786..1f84ba5e 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -17,8 +17,8 @@ use eyre::{bail, eyre, Context, ContextCompat}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; use inter_daemon::InterDaemonConnection; +use pending::PendingNodes; use shared_memory_server::ShmemConf; -use std::collections::HashSet; use std::env::temp_dir; use std::time::Instant; use std::{ @@ -44,6 +44,7 @@ mod coordinator; mod inter_daemon; mod log; mod node_communication; +mod pending; mod spawn; mod tcp_utils; @@ -52,6 +53,8 @@ use dora_tracing::telemetry::serialize_context; #[cfg(feature = "telemetry")] use tracing_opentelemetry::OpenTelemetrySpanExt; +use crate::pending::DataflowStatus; + pub struct Daemon { running: HashMap, @@ -298,11 +301,20 @@ impl Daemon { }); RunStatus::Continue } - DaemonCoordinatorEvent::AllNodesReady { dataflow_id } => { + DaemonCoordinatorEvent::AllNodesReady { + dataflow_id, + success, + } => { match self.running.get_mut(&dataflow_id) { Some(dataflow) => { - tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); - dataflow.start(&self.events_tx).await?; + dataflow + .pending_nodes + .handle_external_all_nodes_ready(success) + .await?; + if success { + tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); + dataflow.start(&self.events_tx).await?; + } } None => { tracing::warn!( @@ -454,7 +466,7 @@ impl Daemon { nodes: Vec, daemon_communication_config: LocalCommunicationConfig, ) -> eyre::Result<()> { - let dataflow = RunningDataflow::new(dataflow_id); + let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone()); let dataflow = match self.running.entry(dataflow_id) { std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow), std::collections::hash_map::Entry::Occupied(_) => { @@ -503,7 +515,7 @@ impl Daemon { dataflow.pending_nodes.insert(node.id.clone()); let node_id = node.id.clone(); - spawn::spawn_node( + match spawn::spawn_node( dataflow_id, &working_dir, node, @@ -511,10 +523,21 @@ impl Daemon { daemon_communication_config, ) .await - .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?; - dataflow.running_nodes.insert(node_id); + .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) + { + Ok(()) => { + dataflow.running_nodes.insert(node_id); + } + Err(err) => { + tracing::error!("{err:?}"); + dataflow + .pending_nodes + .handle_node_stop(&node_id, &mut self.coordinator_connection) + .await?; + } + } } else { - dataflow.external_nodes.insert(node.id.clone(), node); + dataflow.pending_nodes.set_external_nodes(true); } } @@ -532,39 +555,35 @@ impl Daemon { event_sender, reply_sender, } => { - let result = self - .subscribe(dataflow_id, node_id.clone(), event_sender) - .await; - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { - format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`") - })?; - tracing::debug!("node `{node_id}` is ready"); + let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { + format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") + }); - dataflow - .subscribe_replies - .insert(node_id.clone(), (reply_sender, result)); - dataflow.pending_nodes.remove(&node_id); - if dataflow.pending_nodes.is_empty() { - if dataflow.external_nodes.is_empty() { - tracing::info!("all nodes are ready, starting dataflow `{dataflow_id}`"); - dataflow.start(&self.events_tx).await?; - } else { - tracing::info!( - "all local nodes are ready, waiting for remote nodes \ - for dataflow `{dataflow_id}`" - ); + match dataflow { + Err(err) => { + let _ = reply_sender.send(DaemonReply::Result(Err(err))); + } + Ok(dataflow) => { + tracing::debug!("node `{node_id}` is ready"); + Self::subscribe(dataflow, node_id.clone(), event_sender).await; - // dataflow is split across multiple daemons -> synchronize with dora-coordinator - let Some(connection) = &mut self.coordinator_connection else { - bail!("no coordinator connection to send AllNodesReady"); - }; - let msg = serde_json::to_vec(&CoordinatorRequest::Event { - machine_id: self.machine_id.clone(), - event: DaemonEvent::AllNodesReady { dataflow_id }, - })?; - tcp_send(connection, &msg) - .await - .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; + let status = dataflow + .pending_nodes + .handle_node_subscription( + node_id.clone(), + reply_sender, + &mut self.coordinator_connection, + ) + .await?; + match status { + DataflowStatus::AllNodesReady => { + tracing::info!( + "all nodes are ready, starting dataflow `{dataflow_id}`" + ); + dataflow.start(&self.events_tx).await?; + } + DataflowStatus::Pending => {} + } } } } @@ -602,8 +621,17 @@ impl Daemon { let _ = reply_sender.send(DaemonReply::Result(reply)); } DaemonNodeEvent::OutputsDone { reply_sender } => { - let _ = reply_sender.send(DaemonReply::Result(Ok(()))); - self.handle_outputs_done(dataflow_id, &node_id).await?; + let result = match self.running.get_mut(&dataflow_id) { + Some(dataflow) => { + Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, &node_id) + .await + }, + None => Err(eyre!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")), + }; + + let _ = reply_sender.send(DaemonReply::Result( + result.map_err(|err| format!("{err:?}")), + )); } DaemonNodeEvent::SendOut { output_id, @@ -721,15 +749,10 @@ impl Daemon { } async fn subscribe( - &mut self, - dataflow_id: Uuid, + dataflow: &mut RunningDataflow, node_id: NodeId, event_sender: UnboundedSender, - ) -> Result<(), String> { - let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| { - format!("subscribe failed: no running dataflow with ID `{dataflow_id}`") - })?; - + ) { // some inputs might have been closed already -> report those events let closed_inputs = dataflow .mappings @@ -760,22 +783,17 @@ impl Daemon { } dataflow.subscribe_channels.insert(node_id, event_sender); - - Ok(()) } - #[tracing::instrument(skip(self), level = "trace")] + #[tracing::instrument(skip(dataflow, inter_daemon_connections), fields(uuid = %dataflow.id), level = "trace")] async fn handle_outputs_done( - &mut self, - dataflow_id: Uuid, + dataflow: &mut RunningDataflow, + inter_daemon_connections: &mut BTreeMap, node_id: &NodeId, ) -> eyre::Result<()> { - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { - format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") - })?; send_input_closed_events( dataflow, - &mut self.inter_daemon_connections, + inter_daemon_connections, |OutputId(source_id, _)| source_id == node_id, ) .await?; @@ -783,13 +801,18 @@ impl Daemon { Ok(()) } - #[tracing::instrument(skip(self), level = "trace")] async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> { - self.handle_outputs_done(dataflow_id, node_id).await?; - let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`") })?; + + dataflow + .pending_nodes + .handle_node_stop(node_id, &mut self.coordinator_connection) + .await?; + + Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, node_id).await?; + dataflow.running_nodes.remove(node_id); if dataflow.running_nodes.is_empty() { tracing::info!( @@ -1119,11 +1142,7 @@ fn close_input(dataflow: &mut RunningDataflow, receiver_id: &NodeId, input_id: & pub struct RunningDataflow { id: Uuid, /// Local nodes that are not started yet - pending_nodes: HashSet, - /// Used to synchronize node starts. - /// - /// Subscribe requests block the node until all other nodes are ready too. - subscribe_replies: HashMap, Result<(), String>)>, + pending_nodes: PendingNodes, subscribe_channels: HashMap>, drop_channels: HashMap>, @@ -1132,7 +1151,6 @@ pub struct RunningDataflow { open_inputs: BTreeMap>, running_nodes: BTreeSet, - external_nodes: BTreeMap, open_external_mappings: HashMap>>, pending_drop_tokens: HashMap, @@ -1148,18 +1166,16 @@ pub struct RunningDataflow { } impl RunningDataflow { - fn new(id: Uuid) -> RunningDataflow { + fn new(dataflow_id: Uuid, machine_id: String) -> RunningDataflow { Self { - id, - pending_nodes: HashSet::new(), - subscribe_replies: HashMap::new(), + id: dataflow_id, + pending_nodes: PendingNodes::new(dataflow_id, machine_id), subscribe_channels: HashMap::new(), drop_channels: HashMap::new(), mappings: HashMap::new(), timers: BTreeMap::new(), open_inputs: BTreeMap::new(), running_nodes: BTreeSet::new(), - external_nodes: BTreeMap::new(), open_external_mappings: HashMap::new(), pending_drop_tokens: HashMap::new(), _timer_handles: Vec::new(), @@ -1169,12 +1185,6 @@ impl RunningDataflow { } async fn start(&mut self, events_tx: &mpsc::Sender) -> eyre::Result<()> { - // answer all subscribe requests - let subscribe_replies = std::mem::take(&mut self.subscribe_replies); - for (reply_sender, subscribe_result) in subscribe_replies.into_values() { - let _ = reply_sender.send(DaemonReply::Result(subscribe_result)); - } - for interval in self.timers.keys().copied() { let events_tx = events_tx.clone(); let dataflow_id = self.id; diff --git a/binaries/daemon/src/main.rs b/binaries/daemon/src/main.rs index c58db3cc..be189d0b 100644 --- a/binaries/daemon/src/main.rs +++ b/binaries/daemon/src/main.rs @@ -36,9 +36,6 @@ async fn main() -> eyre::Result<()> { } async fn run() -> eyre::Result<()> { - #[cfg(feature = "tracing")] - set_up_tracing("dora-daemon").wrap_err("failed to set up tracing subscriber")?; - let Args { run_dataflow, machine_id, @@ -50,6 +47,9 @@ async fn run() -> eyre::Result<()> { return tokio::task::block_in_place(dora_daemon::run_dora_runtime); } + #[cfg(feature = "tracing")] + set_up_tracing("dora-daemon").wrap_err("failed to set up tracing subscriber")?; + let ctrl_c_events = { let (ctrl_c_tx, ctrl_c_rx) = mpsc::channel(1); let mut ctrlc_sent = false; diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs new file mode 100644 index 00000000..85cbe56f --- /dev/null +++ b/binaries/daemon/src/pending.rs @@ -0,0 +1,167 @@ +use std::collections::{HashMap, HashSet}; + +use dora_core::{ + config::NodeId, + coordinator_messages::{CoordinatorRequest, DaemonEvent}, + daemon_messages::{DaemonReply, DataflowId}, +}; +use eyre::{bail, Context}; +use tokio::{net::TcpStream, sync::oneshot}; + +use crate::tcp_utils::tcp_send; + +pub struct PendingNodes { + dataflow_id: DataflowId, + machine_id: String, + + /// The local nodes that are still waiting to start. + local_nodes: HashSet, + /// Whether there are external nodes for this dataflow. + external_nodes: bool, + + /// Used to synchronize node starts. + /// + /// Subscribe requests block the node until all other nodes are ready too. + waiting_subscribers: HashMap>, + /// List of nodes that finished before connecting to the dora daemon. + /// + /// If this list is non-empty, we should not start the dataflow at all. Instead, + /// we report an error to the other nodes. + exited_before_subscribe: HashSet, + + /// Whether the local init result was already reported to the coordinator. + reported_init_to_coordinator: bool, +} + +impl PendingNodes { + pub fn new(dataflow_id: DataflowId, machine_id: String) -> Self { + Self { + dataflow_id, + machine_id, + local_nodes: HashSet::new(), + external_nodes: false, + waiting_subscribers: HashMap::new(), + exited_before_subscribe: HashSet::new(), + reported_init_to_coordinator: false, + } + } + + pub fn insert(&mut self, node_id: NodeId) { + self.local_nodes.insert(node_id); + } + + pub fn set_external_nodes(&mut self, value: bool) { + self.external_nodes = value; + } + + pub async fn handle_node_subscription( + &mut self, + node_id: NodeId, + reply_sender: oneshot::Sender, + coordinator_connection: &mut Option, + ) -> eyre::Result { + self.waiting_subscribers + .insert(node_id.clone(), reply_sender); + self.local_nodes.remove(&node_id); + + self.update_dataflow_status(coordinator_connection).await + } + + pub async fn handle_node_stop( + &mut self, + node_id: &NodeId, + coordinator_connection: &mut Option, + ) -> eyre::Result<()> { + if self.local_nodes.remove(node_id) { + tracing::warn!("node `{node_id}` exited before initializing dora connection"); + self.exited_before_subscribe.insert(node_id.clone()); + self.update_dataflow_status(coordinator_connection).await?; + } + Ok(()) + } + + pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> { + if !self.local_nodes.is_empty() { + bail!("received external `all_nodes_ready` event before local nodes were ready"); + } + let external_error = if success { + None + } else { + Some("some nodes failed to initalize on remote machines".to_string()) + }; + self.answer_subscribe_requests(external_error).await; + + Ok(()) + } + + async fn update_dataflow_status( + &mut self, + coordinator_connection: &mut Option, + ) -> eyre::Result { + if self.local_nodes.is_empty() { + if self.external_nodes { + if !self.reported_init_to_coordinator { + self.report_nodes_ready(coordinator_connection).await?; + self.reported_init_to_coordinator = true; + } + Ok(DataflowStatus::Pending) + } else { + self.answer_subscribe_requests(None).await; + Ok(DataflowStatus::AllNodesReady) + } + } else { + Ok(DataflowStatus::Pending) + } + } + + async fn answer_subscribe_requests(&mut self, external_error: Option) { + let result = if self.exited_before_subscribe.is_empty() { + match external_error { + Some(err) => Err(err), + None => Ok(()), + } + } else { + Err(format!( + "Some nodes exited before subscribing to dora: {:?}\n\n\ + This is typically happens when an initialization error occurs + in the node or operator. To check the output of the failed + nodes, run `dora logs {} `.", + self.exited_before_subscribe, self.dataflow_id + )) + }; + // answer all subscribe requests + let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); + for reply_sender in subscribe_replies.into_values() { + let _ = reply_sender.send(DaemonReply::Result(result.clone())); + } + } + + async fn report_nodes_ready( + &self, + coordinator_connection: &mut Option, + ) -> eyre::Result<()> { + let Some(connection) = coordinator_connection else { + bail!("no coordinator connection to send AllNodesReady"); + }; + + let success = self.exited_before_subscribe.is_empty(); + tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes"); + + let msg = serde_json::to_vec(&CoordinatorRequest::Event { + machine_id: self.machine_id.clone(), + event: DaemonEvent::AllNodesReady { + dataflow_id: self.dataflow_id, + success, + }, + })?; + tcp_send(connection, &msg) + .await + .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; + Ok(()) + } +} + +pub enum DataflowStatus { + AllNodesReady, + Pending, +} diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 7505b098..e83ee784 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -19,6 +19,7 @@ pub enum CoordinatorRequest { pub enum DaemonEvent { AllNodesReady { dataflow_id: DataflowId, + success: bool, }, AllNodesFinished { dataflow_id: DataflowId, diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 66704e7a..5929fb6a 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -127,6 +127,7 @@ impl fmt::Debug for Data { type SharedMemoryId = String; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[must_use] pub enum DaemonReply { Result(Result<(), String>), PreparedMessage { shared_memory_id: SharedMemoryId }, @@ -200,6 +201,7 @@ pub enum DaemonCoordinatorEvent { Spawn(SpawnDataflowNodes), AllNodesReady { dataflow_id: DataflowId, + success: bool, }, StopDataflow { dataflow_id: DataflowId,