Browse Source

Merge branch 'main' into heartbeat

tags/v0.2.3-rc6
Philipp Oppermann 2 years ago
parent
commit
97bc9d0f92
Failed to extract signature
11 changed files with 300 additions and 90 deletions
  1. +1
    -1
      .github/workflows/ci.yml
  2. +9
    -1
      apis/rust/node/src/event_stream/mod.rs
  3. +10
    -2
      apis/rust/node/src/node/drop_stream.rs
  4. +1
    -1
      binaries/coordinator/src/control.rs
  5. +10
    -2
      binaries/coordinator/src/lib.rs
  6. +8
    -2
      binaries/coordinator/src/listener.rs
  7. +88
    -78
      binaries/daemon/src/lib.rs
  8. +3
    -3
      binaries/daemon/src/main.rs
  9. +167
    -0
      binaries/daemon/src/pending.rs
  10. +1
    -0
      libraries/core/src/coordinator_messages.rs
  11. +2
    -0
      libraries/core/src/daemon_messages.rs

+ 1
- 1
.github/workflows/ci.yml View File

@@ -82,7 +82,7 @@ jobs:
- uses: Swatinem/rust-cache@v2

- name: "Build cli and binaries"
timeout-minutes: 30
timeout-minutes: 45
run: |
cargo install --path binaries/coordinator
cargo install --path binaries/daemon


+ 9
- 1
apis/rust/node/src/event_stream/mod.rs View File

@@ -59,11 +59,19 @@ impl EventStream {
mut close_channel: DaemonChannel,
) -> eyre::Result<Self> {
channel.register(dataflow_id, node_id.clone())?;
channel
let reply = channel
.request(&DaemonRequest::Subscribe)
.map_err(|e| eyre!(e))
.wrap_err("failed to create subscription with dora-daemon")?;

match reply {
daemon_messages::DaemonReply::Result(Ok(())) => {}
daemon_messages::DaemonReply::Result(Err(err)) => {
eyre::bail!("subscribe failed: {err}")
}
other => eyre::bail!("unexpected subscribe reply: {other:?}"),
}

close_channel.register(dataflow_id, node_id.clone())?;

let (tx, rx) = flume::bounded(0);


+ 10
- 2
apis/rust/node/src/node/drop_stream.rs View File

@@ -4,7 +4,7 @@ use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::NodeId,
daemon_messages::{
DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent,
self, DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, DropToken, NodeDropEvent,
},
};
use eyre::{eyre, Context};
@@ -45,11 +45,19 @@ impl DropStream {
) -> eyre::Result<Self> {
channel.register(dataflow_id, node_id.clone())?;

channel
let reply = channel
.request(&DaemonRequest::SubscribeDrop)
.map_err(|e| eyre!(e))
.wrap_err("failed to create subscription with dora-daemon")?;

match reply {
daemon_messages::DaemonReply::Result(Ok(())) => {}
daemon_messages::DaemonReply::Result(Err(err)) => {
eyre::bail!("drop subscribe failed: {err}")
}
other => eyre::bail!("unexpected drop subscribe reply: {other:?}"),
}

let (tx, rx) = flume::bounded(0);
let node_id_cloned = node_id.clone();



+ 1
- 1
binaries/coordinator/src/control.rs View File

@@ -44,7 +44,7 @@ async fn listen(
let incoming = match result {
Ok(incoming) => incoming,
Err(err) => {
let _ = tx.blocking_send(err.into());
let _ = tx.send(err.into()).await;
return;
}
};


+ 10
- 2
binaries/coordinator/src/lib.rs View File

@@ -222,15 +222,20 @@ async fn start_inner(
}
},
Event::Dataflow { uuid, event } => match event {
DataflowEvent::ReadyOnMachine { machine_id } => {
DataflowEvent::ReadyOnMachine {
machine_id,
success,
} => {
match running_dataflows.entry(uuid) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
let dataflow = entry.get_mut();
dataflow.pending_machines.remove(&machine_id);
dataflow.init_success &= success;
if dataflow.pending_machines.is_empty() {
let message =
serde_json::to_vec(&DaemonCoordinatorEvent::AllNodesReady {
dataflow_id: uuid,
success: dataflow.init_success,
})
.wrap_err("failed to serialize AllNodesReady message")?;

@@ -451,7 +456,7 @@ async fn start_inner(
continue;
}
let result: eyre::Result<()> =
tokio::time::timeout(Duration::from_millis(100), send_watchdog_message(&mut connection.stream))
tokio::time::timeout(Duration::from_millis(500), send_watchdog_message(&mut connection.stream))
.await
.wrap_err("timeout")
.and_then(|r| r).wrap_err_with(||
@@ -551,6 +556,7 @@ struct RunningDataflow {
machines: BTreeSet<String>,
/// IDs of machines that are waiting until all nodes are started.
pending_machines: BTreeSet<String>,
init_success: bool,
nodes: Vec<ResolvedNode>,
}

@@ -733,6 +739,7 @@ async fn start_dataflow(
} else {
BTreeSet::new()
},
init_success: true,
machines,
nodes,
})
@@ -798,6 +805,7 @@ pub enum DataflowEvent {
},
ReadyOnMachine {
machine_id: String,
success: bool,
},
}



+ 8
- 2
binaries/coordinator/src/listener.rs View File

@@ -57,10 +57,16 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende
break;
}
coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event {
coordinator_messages::DaemonEvent::AllNodesReady { dataflow_id } => {
coordinator_messages::DaemonEvent::AllNodesReady {
dataflow_id,
success,
} => {
let event = Event::Dataflow {
uuid: dataflow_id,
event: DataflowEvent::ReadyOnMachine { machine_id },
event: DataflowEvent::ReadyOnMachine {
machine_id,
success,
},
};
if events_tx.send(event).await.is_err() {
break;


+ 88
- 78
binaries/daemon/src/lib.rs View File

@@ -17,8 +17,8 @@ use eyre::{bail, eyre, Context, ContextCompat};
use futures::{future, stream, FutureExt, TryFutureExt};
use futures_concurrency::stream::Merge;
use inter_daemon::InterDaemonConnection;
use pending::PendingNodes;
use shared_memory_server::ShmemConf;
use std::collections::HashSet;
use std::env::temp_dir;
use std::time::Instant;
use std::{
@@ -44,6 +44,7 @@ mod coordinator;
mod inter_daemon;
mod log;
mod node_communication;
mod pending;
mod spawn;
mod tcp_utils;

@@ -52,6 +53,8 @@ use dora_tracing::telemetry::serialize_context;
#[cfg(feature = "telemetry")]
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::pending::DataflowStatus;

pub struct Daemon {
running: HashMap<DataflowId, RunningDataflow>,

@@ -298,11 +301,20 @@ impl Daemon {
});
RunStatus::Continue
}
DaemonCoordinatorEvent::AllNodesReady { dataflow_id } => {
DaemonCoordinatorEvent::AllNodesReady {
dataflow_id,
success,
} => {
match self.running.get_mut(&dataflow_id) {
Some(dataflow) => {
tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`");
dataflow.start(&self.events_tx).await?;
dataflow
.pending_nodes
.handle_external_all_nodes_ready(success)
.await?;
if success {
tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`");
dataflow.start(&self.events_tx).await?;
}
}
None => {
tracing::warn!(
@@ -454,7 +466,7 @@ impl Daemon {
nodes: Vec<ResolvedNode>,
daemon_communication_config: LocalCommunicationConfig,
) -> eyre::Result<()> {
let dataflow = RunningDataflow::new(dataflow_id);
let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone());
let dataflow = match self.running.entry(dataflow_id) {
std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow),
std::collections::hash_map::Entry::Occupied(_) => {
@@ -503,7 +515,7 @@ impl Daemon {
dataflow.pending_nodes.insert(node.id.clone());

let node_id = node.id.clone();
spawn::spawn_node(
match spawn::spawn_node(
dataflow_id,
&working_dir,
node,
@@ -511,10 +523,21 @@ impl Daemon {
daemon_communication_config,
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?;
dataflow.running_nodes.insert(node_id);
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
{
Ok(()) => {
dataflow.running_nodes.insert(node_id);
}
Err(err) => {
tracing::error!("{err:?}");
dataflow
.pending_nodes
.handle_node_stop(&node_id, &mut self.coordinator_connection)
.await?;
}
}
} else {
dataflow.external_nodes.insert(node.id.clone(), node);
dataflow.pending_nodes.set_external_nodes(true);
}
}

@@ -532,39 +555,35 @@ impl Daemon {
event_sender,
reply_sender,
} => {
let result = self
.subscribe(dataflow_id, node_id.clone(), event_sender)
.await;
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("failed to subscribe: no running dataflow with ID `{dataflow_id}`")
})?;
tracing::debug!("node `{node_id}` is ready");
let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
});

dataflow
.subscribe_replies
.insert(node_id.clone(), (reply_sender, result));
dataflow.pending_nodes.remove(&node_id);
if dataflow.pending_nodes.is_empty() {
if dataflow.external_nodes.is_empty() {
tracing::info!("all nodes are ready, starting dataflow `{dataflow_id}`");
dataflow.start(&self.events_tx).await?;
} else {
tracing::info!(
"all local nodes are ready, waiting for remote nodes \
for dataflow `{dataflow_id}`"
);
match dataflow {
Err(err) => {
let _ = reply_sender.send(DaemonReply::Result(Err(err)));
}
Ok(dataflow) => {
tracing::debug!("node `{node_id}` is ready");
Self::subscribe(dataflow, node_id.clone(), event_sender).await;

// dataflow is split across multiple daemons -> synchronize with dora-coordinator
let Some(connection) = &mut self.coordinator_connection else {
bail!("no coordinator connection to send AllNodesReady");
};
let msg = serde_json::to_vec(&CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesReady { dataflow_id },
})?;
tcp_send(connection, &msg)
.await
.wrap_err("failed to send AllNodesReady message to dora-coordinator")?;
let status = dataflow
.pending_nodes
.handle_node_subscription(
node_id.clone(),
reply_sender,
&mut self.coordinator_connection,
)
.await?;
match status {
DataflowStatus::AllNodesReady => {
tracing::info!(
"all nodes are ready, starting dataflow `{dataflow_id}`"
);
dataflow.start(&self.events_tx).await?;
}
DataflowStatus::Pending => {}
}
}
}
}
@@ -602,8 +621,17 @@ impl Daemon {
let _ = reply_sender.send(DaemonReply::Result(reply));
}
DaemonNodeEvent::OutputsDone { reply_sender } => {
let _ = reply_sender.send(DaemonReply::Result(Ok(())));
self.handle_outputs_done(dataflow_id, &node_id).await?;
let result = match self.running.get_mut(&dataflow_id) {
Some(dataflow) => {
Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, &node_id)
.await
},
None => Err(eyre!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")),
};

let _ = reply_sender.send(DaemonReply::Result(
result.map_err(|err| format!("{err:?}")),
));
}
DaemonNodeEvent::SendOut {
output_id,
@@ -721,15 +749,10 @@ impl Daemon {
}

async fn subscribe(
&mut self,
dataflow_id: Uuid,
dataflow: &mut RunningDataflow,
node_id: NodeId,
event_sender: UnboundedSender<daemon_messages::NodeEvent>,
) -> Result<(), String> {
let dataflow = self.running.get_mut(&dataflow_id).ok_or_else(|| {
format!("subscribe failed: no running dataflow with ID `{dataflow_id}`")
})?;

) {
// some inputs might have been closed already -> report those events
let closed_inputs = dataflow
.mappings
@@ -760,22 +783,17 @@ impl Daemon {
}

dataflow.subscribe_channels.insert(node_id, event_sender);

Ok(())
}

#[tracing::instrument(skip(self), level = "trace")]
#[tracing::instrument(skip(dataflow, inter_daemon_connections), fields(uuid = %dataflow.id), level = "trace")]
async fn handle_outputs_done(
&mut self,
dataflow_id: Uuid,
dataflow: &mut RunningDataflow,
inter_daemon_connections: &mut BTreeMap<String, InterDaemonConnection>,
node_id: &NodeId,
) -> eyre::Result<()> {
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
})?;
send_input_closed_events(
dataflow,
&mut self.inter_daemon_connections,
inter_daemon_connections,
|OutputId(source_id, _)| source_id == node_id,
)
.await?;
@@ -783,13 +801,18 @@ impl Daemon {
Ok(())
}

#[tracing::instrument(skip(self), level = "trace")]
async fn handle_node_stop(&mut self, dataflow_id: Uuid, node_id: &NodeId) -> eyre::Result<()> {
self.handle_outputs_done(dataflow_id, node_id).await?;

let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`")
})?;

dataflow
.pending_nodes
.handle_node_stop(node_id, &mut self.coordinator_connection)
.await?;

Self::handle_outputs_done(dataflow, &mut self.inter_daemon_connections, node_id).await?;

dataflow.running_nodes.remove(node_id);
if dataflow.running_nodes.is_empty() {
tracing::info!(
@@ -1119,11 +1142,7 @@ fn close_input(dataflow: &mut RunningDataflow, receiver_id: &NodeId, input_id: &
pub struct RunningDataflow {
id: Uuid,
/// Local nodes that are not started yet
pending_nodes: HashSet<NodeId>,
/// Used to synchronize node starts.
///
/// Subscribe requests block the node until all other nodes are ready too.
subscribe_replies: HashMap<NodeId, (oneshot::Sender<DaemonReply>, Result<(), String>)>,
pending_nodes: PendingNodes,

subscribe_channels: HashMap<NodeId, UnboundedSender<daemon_messages::NodeEvent>>,
drop_channels: HashMap<NodeId, UnboundedSender<daemon_messages::NodeDropEvent>>,
@@ -1132,7 +1151,6 @@ pub struct RunningDataflow {
open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>,
running_nodes: BTreeSet<NodeId>,

external_nodes: BTreeMap<NodeId, ResolvedNode>,
open_external_mappings: HashMap<OutputId, BTreeMap<String, BTreeSet<InputId>>>,

pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,
@@ -1148,18 +1166,16 @@ pub struct RunningDataflow {
}

impl RunningDataflow {
fn new(id: Uuid) -> RunningDataflow {
fn new(dataflow_id: Uuid, machine_id: String) -> RunningDataflow {
Self {
id,
pending_nodes: HashSet::new(),
subscribe_replies: HashMap::new(),
id: dataflow_id,
pending_nodes: PendingNodes::new(dataflow_id, machine_id),
subscribe_channels: HashMap::new(),
drop_channels: HashMap::new(),
mappings: HashMap::new(),
timers: BTreeMap::new(),
open_inputs: BTreeMap::new(),
running_nodes: BTreeSet::new(),
external_nodes: BTreeMap::new(),
open_external_mappings: HashMap::new(),
pending_drop_tokens: HashMap::new(),
_timer_handles: Vec::new(),
@@ -1169,12 +1185,6 @@ impl RunningDataflow {
}

async fn start(&mut self, events_tx: &mpsc::Sender<Event>) -> eyre::Result<()> {
// answer all subscribe requests
let subscribe_replies = std::mem::take(&mut self.subscribe_replies);
for (reply_sender, subscribe_result) in subscribe_replies.into_values() {
let _ = reply_sender.send(DaemonReply::Result(subscribe_result));
}

for interval in self.timers.keys().copied() {
let events_tx = events_tx.clone();
let dataflow_id = self.id;


+ 3
- 3
binaries/daemon/src/main.rs View File

@@ -36,9 +36,6 @@ async fn main() -> eyre::Result<()> {
}

async fn run() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing("dora-daemon").wrap_err("failed to set up tracing subscriber")?;

let Args {
run_dataflow,
machine_id,
@@ -50,6 +47,9 @@ async fn run() -> eyre::Result<()> {
return tokio::task::block_in_place(dora_daemon::run_dora_runtime);
}

#[cfg(feature = "tracing")]
set_up_tracing("dora-daemon").wrap_err("failed to set up tracing subscriber")?;

let ctrl_c_events = {
let (ctrl_c_tx, ctrl_c_rx) = mpsc::channel(1);
let mut ctrlc_sent = false;


+ 167
- 0
binaries/daemon/src/pending.rs View File

@@ -0,0 +1,167 @@
use std::collections::{HashMap, HashSet};

use dora_core::{
config::NodeId,
coordinator_messages::{CoordinatorRequest, DaemonEvent},
daemon_messages::{DaemonReply, DataflowId},
};
use eyre::{bail, Context};
use tokio::{net::TcpStream, sync::oneshot};

use crate::tcp_utils::tcp_send;

pub struct PendingNodes {
dataflow_id: DataflowId,
machine_id: String,

/// The local nodes that are still waiting to start.
local_nodes: HashSet<NodeId>,
/// Whether there are external nodes for this dataflow.
external_nodes: bool,

/// Used to synchronize node starts.
///
/// Subscribe requests block the node until all other nodes are ready too.
waiting_subscribers: HashMap<NodeId, oneshot::Sender<DaemonReply>>,
/// List of nodes that finished before connecting to the dora daemon.
///
/// If this list is non-empty, we should not start the dataflow at all. Instead,
/// we report an error to the other nodes.
exited_before_subscribe: HashSet<NodeId>,

/// Whether the local init result was already reported to the coordinator.
reported_init_to_coordinator: bool,
}

impl PendingNodes {
pub fn new(dataflow_id: DataflowId, machine_id: String) -> Self {
Self {
dataflow_id,
machine_id,
local_nodes: HashSet::new(),
external_nodes: false,
waiting_subscribers: HashMap::new(),
exited_before_subscribe: HashSet::new(),
reported_init_to_coordinator: false,
}
}

pub fn insert(&mut self, node_id: NodeId) {
self.local_nodes.insert(node_id);
}

pub fn set_external_nodes(&mut self, value: bool) {
self.external_nodes = value;
}

pub async fn handle_node_subscription(
&mut self,
node_id: NodeId,
reply_sender: oneshot::Sender<DaemonReply>,
coordinator_connection: &mut Option<TcpStream>,
) -> eyre::Result<DataflowStatus> {
self.waiting_subscribers
.insert(node_id.clone(), reply_sender);
self.local_nodes.remove(&node_id);

self.update_dataflow_status(coordinator_connection).await
}

pub async fn handle_node_stop(
&mut self,
node_id: &NodeId,
coordinator_connection: &mut Option<TcpStream>,
) -> eyre::Result<()> {
if self.local_nodes.remove(node_id) {
tracing::warn!("node `{node_id}` exited before initializing dora connection");
self.exited_before_subscribe.insert(node_id.clone());
self.update_dataflow_status(coordinator_connection).await?;
}
Ok(())
}

pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> {
if !self.local_nodes.is_empty() {
bail!("received external `all_nodes_ready` event before local nodes were ready");
}
let external_error = if success {
None
} else {
Some("some nodes failed to initalize on remote machines".to_string())
};
self.answer_subscribe_requests(external_error).await;

Ok(())
}

async fn update_dataflow_status(
&mut self,
coordinator_connection: &mut Option<TcpStream>,
) -> eyre::Result<DataflowStatus> {
if self.local_nodes.is_empty() {
if self.external_nodes {
if !self.reported_init_to_coordinator {
self.report_nodes_ready(coordinator_connection).await?;
self.reported_init_to_coordinator = true;
}
Ok(DataflowStatus::Pending)
} else {
self.answer_subscribe_requests(None).await;
Ok(DataflowStatus::AllNodesReady)
}
} else {
Ok(DataflowStatus::Pending)
}
}

async fn answer_subscribe_requests(&mut self, external_error: Option<String>) {
let result = if self.exited_before_subscribe.is_empty() {
match external_error {
Some(err) => Err(err),
None => Ok(()),
}
} else {
Err(format!(
"Some nodes exited before subscribing to dora: {:?}\n\n\
This is typically happens when an initialization error occurs
in the node or operator. To check the output of the failed
nodes, run `dora logs {} <node_id>`.",
self.exited_before_subscribe, self.dataflow_id
))
};
// answer all subscribe requests
let subscribe_replies = std::mem::take(&mut self.waiting_subscribers);
for reply_sender in subscribe_replies.into_values() {
let _ = reply_sender.send(DaemonReply::Result(result.clone()));
}
}

async fn report_nodes_ready(
&self,
coordinator_connection: &mut Option<TcpStream>,
) -> eyre::Result<()> {
let Some(connection) = coordinator_connection else {
bail!("no coordinator connection to send AllNodesReady");
};

let success = self.exited_before_subscribe.is_empty();
tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes");

let msg = serde_json::to_vec(&CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesReady {
dataflow_id: self.dataflow_id,
success,
},
})?;
tcp_send(connection, &msg)
.await
.wrap_err("failed to send AllNodesReady message to dora-coordinator")?;
Ok(())
}
}

pub enum DataflowStatus {
AllNodesReady,
Pending,
}

+ 1
- 0
libraries/core/src/coordinator_messages.rs View File

@@ -19,6 +19,7 @@ pub enum CoordinatorRequest {
pub enum DaemonEvent {
AllNodesReady {
dataflow_id: DataflowId,
success: bool,
},
AllNodesFinished {
dataflow_id: DataflowId,


+ 2
- 0
libraries/core/src/daemon_messages.rs View File

@@ -127,6 +127,7 @@ impl fmt::Debug for Data {
type SharedMemoryId = String;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[must_use]
pub enum DaemonReply {
Result(Result<(), String>),
PreparedMessage { shared_memory_id: SharedMemoryId },
@@ -200,6 +201,7 @@ pub enum DaemonCoordinatorEvent {
Spawn(SpawnDataflowNodes),
AllNodesReady {
dataflow_id: DataflowId,
success: bool,
},
StopDataflow {
dataflow_id: DataflowId,


Loading…
Cancel
Save