Browse Source

Merge remote-tracking branch 'upstream/main' into proc_macro_dir

tags/v0.3.5
EricLBuehler 1 year ago
parent
commit
72f604295b
16 changed files with 607 additions and 336 deletions
  1. +6
    -6
      .github/workflows/ci.yml
  2. +42
    -17
      Cargo.lock
  3. +1
    -0
      binaries/cli/Cargo.toml
  4. +3
    -3
      binaries/cli/src/attach.rs
  5. +50
    -0
      binaries/cli/src/formatting.rs
  6. +71
    -27
      binaries/cli/src/main.rs
  7. +70
    -75
      binaries/coordinator/src/lib.rs
  8. +4
    -7
      binaries/coordinator/src/listener.rs
  9. +2
    -0
      binaries/daemon/Cargo.toml
  10. +127
    -137
      binaries/daemon/src/lib.rs
  11. +49
    -41
      binaries/daemon/src/pending.rs
  12. +9
    -3
      binaries/daemon/src/spawn.rs
  13. +1
    -1
      examples/multiple-daemons/run.rs
  14. +3
    -3
      libraries/core/src/coordinator_messages.rs
  15. +1
    -1
      libraries/core/src/daemon_messages.rs
  16. +168
    -15
      libraries/core/src/topics.rs

+ 6
- 6
.github/workflows/ci.yml View File

@@ -286,12 +286,12 @@ jobs:
cargo build --all cargo build --all
dora up dora up
dora list dora list
dora start dataflow.yml --name ci-rust-test
dora start dataflow.yml --name ci-rust-test --detach
sleep 10 sleep 10
dora stop --name ci-rust-test --grace-duration 5s dora stop --name ci-rust-test --grace-duration 5s
cd .. cd ..
dora build examples/rust-dataflow/dataflow_dynamic.yml dora build examples/rust-dataflow/dataflow_dynamic.yml
dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic
dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic --detach
cargo run -p rust-dataflow-example-sink-dynamic cargo run -p rust-dataflow-example-sink-dynamic
sleep 5 sleep 5
dora stop --name ci-rust-dynamic --grace-duration 5s dora stop --name ci-rust-dynamic --grace-duration 5s
@@ -315,11 +315,11 @@ jobs:
cd test_python_project cd test_python_project
dora up dora up
dora list dora list
dora start dataflow.yml --name ci-python-test
dora start dataflow.yml --name ci-python-test --detach
sleep 10 sleep 10
dora stop --name ci-python-test --grace-duration 5s dora stop --name ci-python-test --grace-duration 5s
pip install "numpy<2.0.0" opencv-python pip install "numpy<2.0.0" opencv-python
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach
python ../examples/python-dataflow/plot_dynamic.py python ../examples/python-dataflow/plot_dynamic.py
sleep 5 sleep 5
dora stop --name ci-python-test --grace-duration 5s dora stop --name ci-python-test --grace-duration 5s
@@ -339,7 +339,7 @@ jobs:
cmake -B build cmake -B build
cmake --build build cmake --build build
cmake --install build cmake --install build
dora start dataflow.yml --name ci-c-test
dora start dataflow.yml --name ci-c-test --detach
sleep 10 sleep 10
dora stop --name ci-c-test --grace-duration 5s dora stop --name ci-c-test --grace-duration 5s
dora destroy dora destroy
@@ -358,7 +358,7 @@ jobs:
cmake -B build cmake -B build
cmake --build build cmake --build build
cmake --install build cmake --install build
dora start dataflow.yml --name ci-cxx-test
dora start dataflow.yml --name ci-cxx-test --detach
sleep 10 sleep 10
dora stop --name ci-cxx-test --grace-duration 5s dora stop --name ci-cxx-test --grace-duration 5s
dora destroy dora destroy


+ 42
- 17
Cargo.lock View File

@@ -885,9 +885,12 @@ dependencies = [


[[package]] [[package]]
name = "atomic" name = "atomic"
version = "0.5.3"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994"
dependencies = [
"bytemuck",
]


[[package]] [[package]]
name = "atomic-waker" name = "atomic-waker"
@@ -1925,6 +1928,16 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
] ]


[[package]]
name = "crossbeam-skiplist"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]

[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.20" version = "0.8.20"
@@ -2011,9 +2024,9 @@ checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991"


[[package]] [[package]]
name = "cxx" name = "cxx"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8194f089b6da4751d6c1da1ef37c17255df51f9346cdb160f8b096562ae4a85c"
checksum = "273dcfd3acd4e1e276af13ed2a43eea7001318823e7a726a6b3ed39b4acc0b82"
dependencies = [ dependencies = [
"cc", "cc",
"cxxbridge-flags", "cxxbridge-flags",
@@ -2023,9 +2036,9 @@ dependencies = [


[[package]] [[package]]
name = "cxx-build" name = "cxx-build"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e8df9a089caae66634d754672d5f909395f30f38af6ff19366980d8a8b57501"
checksum = "d8b2766fbd92be34e9ed143898fce6c572dc009de39506ed6903e5a05b68914e"
dependencies = [ dependencies = [
"cc", "cc",
"codespan-reporting", "codespan-reporting",
@@ -2038,15 +2051,15 @@ dependencies = [


[[package]] [[package]]
name = "cxxbridge-flags" name = "cxxbridge-flags"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25290be4751803672a70b98c68b51c1e7d0a640ab5a4377f240f9d2e70054cd1"
checksum = "839fcd5e43464614ffaa989eaf1c139ef1f0c51672a1ed08023307fa1b909ccd"


[[package]] [[package]]
name = "cxxbridge-macro" name = "cxxbridge-macro"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8cb317cb13604b4752416783bb25070381c36e844743e4146b7f8e55de7d140"
checksum = "4b2c1c1776b986979be68bb2285da855f8d8a35851a769fca8740df7c3d07877"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -2268,6 +2281,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"serde_yaml 0.9.34+deprecated", "serde_yaml 0.9.34+deprecated",
"tabwriter",
"termcolor", "termcolor",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
@@ -2320,6 +2334,8 @@ dependencies = [
"aligned-vec", "aligned-vec",
"async-trait", "async-trait",
"bincode", "bincode",
"crossbeam",
"crossbeam-skiplist",
"ctrlc", "ctrlc",
"dora-arrow-convert", "dora-arrow-convert",
"dora-core", "dora-core",
@@ -4582,7 +4598,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"windows-targets 0.52.5",
"windows-targets 0.48.5",
] ]


[[package]] [[package]]
@@ -6390,9 +6406,9 @@ dependencies = [


[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.85"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23"
checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@@ -9195,6 +9211,15 @@ dependencies = [
"windows 0.52.0", "windows 0.52.0",
] ]


[[package]]
name = "tabwriter"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a327282c4f64f6dc37e3bba4c2b6842cc3a992f204fa58d917696a89f691e5f6"
dependencies = [
"unicode-width",
]

[[package]] [[package]]
name = "tap" name = "tap"
version = "1.0.1" version = "1.0.1"
@@ -9909,9 +9934,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"


[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.8.0"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
checksum = "3ea73390fe27785838dcbf75b91b1d84799e28f1ce71e6f372a5dc2200c80de5"
dependencies = [ dependencies = [
"atomic", "atomic",
"getrandom", "getrandom",
@@ -9923,9 +9948,9 @@ dependencies = [


[[package]] [[package]]
name = "uuid-macro-internal" name = "uuid-macro-internal"
version = "1.8.0"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2"
checksum = "cdb394c528d1cb434c2f0522027e50c0705305caf6d20405c07a6f7e4cf9543c"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",


+ 1
- 0
binaries/cli/Cargo.toml View File

@@ -42,3 +42,4 @@ tokio = { version = "1.20.1", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["io-util", "net"] } tokio-stream = { version = "0.1.8", features = ["io-util", "net"] }
futures = "0.3.21" futures = "0.3.21"
duration-str = "0.5" duration-str = "0.5"
tabwriter = "1.4.0"

+ 3
- 3
binaries/cli/src/attach.rs View File

@@ -11,6 +11,8 @@ use std::{path::PathBuf, sync::mpsc, time::Duration};
use tracing::{error, info}; use tracing::{error, info};
use uuid::Uuid; use uuid::Uuid;


use crate::handle_dataflow_result;

pub fn attach_dataflow( pub fn attach_dataflow(
dataflow: Descriptor, dataflow: Descriptor,
dataflow_path: PathBuf, dataflow_path: PathBuf,
@@ -133,9 +135,7 @@ pub fn attach_dataflow(
ControlRequestReply::DataflowStarted { uuid: _ } => (), ControlRequestReply::DataflowStarted { uuid: _ } => (),
ControlRequestReply::DataflowStopped { uuid, result } => { ControlRequestReply::DataflowStopped { uuid, result } => {
info!("dataflow {uuid} stopped"); info!("dataflow {uuid} stopped");
break result
.map_err(|err| eyre::eyre!(err))
.wrap_err("dataflow failed");
break handle_dataflow_result(result, Some(uuid));
} }
ControlRequestReply::DataflowReloaded { uuid } => { ControlRequestReply::DataflowReloaded { uuid } => {
info!("dataflow {uuid} reloaded") info!("dataflow {uuid} reloaded")


+ 50
- 0
binaries/cli/src/formatting.rs View File

@@ -0,0 +1,50 @@
use dora_core::topics::{DataflowResult, NodeErrorCause};

pub struct FormatDataflowError<'a>(pub &'a DataflowResult);

impl std::fmt::Display for FormatDataflowError<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f)?;
let failed = self
.0
.node_results
.iter()
.filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e)));
let total_failed = failed.clone().count();

let mut non_cascading: Vec<_> = failed
.clone()
.filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. }))
.collect();
non_cascading.sort_by_key(|(_, e)| e.timestamp);
// try to print earliest non-cascading error
let hidden = if !non_cascading.is_empty() {
let printed = non_cascading.len();
for (id, err) in non_cascading {
writeln!(f, "Node `{id}` failed: {err}")?;
}
total_failed - printed
} else {
// no non-cascading errors -> print earliest cascading
let mut all: Vec<_> = failed.collect();
all.sort_by_key(|(_, e)| e.timestamp);
if let Some((id, err)) = all.first() {
write!(f, "Node `{id}` failed: {err}")?;
total_failed - 1
} else {
write!(f, "unknown error")?;
0
}
};

if hidden > 1 {
write!(
f,
"\n\nThere are {hidden} consequential errors. Check the `out/{}` folder for full details.",
self.0.uuid
)?;
}

Ok(())
}
}

+ 71
- 27
binaries/cli/src/main.rs View File

@@ -5,7 +5,7 @@ use dora_coordinator::Event;
use dora_core::{ use dora_core::{
descriptor::Descriptor, descriptor::Descriptor,
topics::{ topics::{
ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
}, },
}; };
@@ -15,18 +15,21 @@ use dora_tracing::set_up_tracing;
use dora_tracing::set_up_tracing_opts; use dora_tracing::set_up_tracing_opts;
use duration_str::parse; use duration_str::parse;
use eyre::{bail, Context}; use eyre::{bail, Context};
use std::net::SocketAddr;
use formatting::FormatDataflowError;
use std::{io::Write, net::SocketAddr};
use std::{ use std::{
net::{IpAddr, Ipv4Addr}, net::{IpAddr, Ipv4Addr},
path::PathBuf, path::PathBuf,
time::Duration, time::Duration,
}; };
use tabwriter::TabWriter;
use tokio::runtime::Builder; use tokio::runtime::Builder;
use uuid::Uuid; use uuid::Uuid;


mod attach; mod attach;
mod build; mod build;
mod check; mod check;
mod formatting;
mod graph; mod graph;
mod logs; mod logs;
mod template; mod template;
@@ -117,6 +120,9 @@ enum Command {
/// Attach to the dataflow and wait for its completion /// Attach to the dataflow and wait for its completion
#[clap(long, action)] #[clap(long, action)]
attach: bool, attach: bool,
/// Run the dataflow in background
#[clap(long, action)]
detach: bool,
/// Enable hot reloading (Python only) /// Enable hot reloading (Python only)
#[clap(long, action)] #[clap(long, action)]
hot_reload: bool, hot_reload: bool,
@@ -320,17 +326,18 @@ fn run() -> eyre::Result<()> {
} => { } => {
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?; .wrap_err("failed to connect to dora coordinator")?;
let uuids = query_running_dataflows(&mut *session)
let list = query_running_dataflows(&mut *session)
.wrap_err("failed to query running dataflows")?; .wrap_err("failed to query running dataflows")?;
if let Some(dataflow) = dataflow { if let Some(dataflow) = dataflow {
let uuid = Uuid::parse_str(&dataflow).ok(); let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) }; let name = if uuid.is_some() { None } else { Some(dataflow) };
logs::logs(&mut *session, uuid, name, node)? logs::logs(&mut *session, uuid, name, node)?
} else { } else {
let uuid = match &uuids[..] {
let active = list.get_active();
let uuid = match &active[..] {
[] => bail!("No dataflows are running"), [] => bail!("No dataflows are running"),
[uuid] => uuid.clone(), [uuid] => uuid.clone(),
_ => inquire::Select::new("Choose dataflow to show logs:", uuids).prompt()?,
_ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?,
}; };
logs::logs(&mut *session, Some(uuid.uuid), None, node)? logs::logs(&mut *session, Some(uuid.uuid), None, node)?
} }
@@ -341,6 +348,7 @@ fn run() -> eyre::Result<()> {
coordinator_addr, coordinator_addr,
coordinator_port, coordinator_port,
attach, attach,
detach,
hot_reload, hot_reload,
} => { } => {
let dataflow_descriptor = let dataflow_descriptor =
@@ -368,6 +376,16 @@ fn run() -> eyre::Result<()> {
&mut *session, &mut *session,
)?; )?;


let attach = match (attach, detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
(true, false) => true,
(false, true) => false,
(false, false) => {
println!("attaching to dataflow (use `--detach` to run in background)");
true
}
};

if attach { if attach {
attach_dataflow( attach_dataflow(
dataflow_descriptor, dataflow_descriptor,
@@ -457,7 +475,8 @@ fn run() -> eyre::Result<()> {
); );
} }


Daemon::run_dataflow(&dataflow_path).await
let result = Daemon::run_dataflow(&dataflow_path).await?;
handle_dataflow_result(result, None)
} }
None => { None => {
if coordinator_addr.ip() == LOCALHOST { if coordinator_addr.ip() == LOCALHOST {
@@ -508,11 +527,12 @@ fn stop_dataflow_interactive(
grace_duration: Option<Duration>, grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection, session: &mut TcpRequestReplyConnection,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
if uuids.is_empty() {
let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
let active = list.get_active();
if active.is_empty() {
eprintln!("No dataflows are running"); eprintln!("No dataflows are running");
} else { } else {
let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?;
let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?;
stop_dataflow(selection.uuid, grace_duration, session)?; stop_dataflow(selection.uuid, grace_duration, session)?;
} }


@@ -536,14 +556,32 @@ fn stop_dataflow(
let result: ControlRequestReply = let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result { match result {
ControlRequestReply::DataflowStopped { uuid: _, result } => result
.map_err(|err| eyre::eyre!(err))
.wrap_err("dataflow failed"),
ControlRequestReply::DataflowStopped { uuid, result } => {
handle_dataflow_result(result, Some(uuid))
}
ControlRequestReply::Error(err) => bail!("{err}"), ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"), other => bail!("unexpected stop dataflow reply: {other:?}"),
} }
} }


fn handle_dataflow_result(
result: dora_core::topics::DataflowResult,
uuid: Option<Uuid>,
) -> Result<(), eyre::Error> {
if result.is_ok() {
Ok(())
} else {
Err(match uuid {
Some(uuid) => {
eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result))
}
None => {
eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result))
}
})
}
}

fn stop_dataflow_by_name( fn stop_dataflow_by_name(
name: String, name: String,
grace_duration: Option<Duration>, grace_duration: Option<Duration>,
@@ -561,39 +599,45 @@ fn stop_dataflow_by_name(
let result: ControlRequestReply = let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result { match result {
ControlRequestReply::DataflowStopped { uuid: _, result } => result
.map_err(|err| eyre::eyre!(err))
.wrap_err("dataflow failed"),
ControlRequestReply::DataflowStopped { uuid, result } => {
handle_dataflow_result(result, Some(uuid))
}
ControlRequestReply::Error(err) => bail!("{err}"), ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"), other => bail!("unexpected stop dataflow reply: {other:?}"),
} }
} }


fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> {
let ids = query_running_dataflows(session)?;
let list = query_running_dataflows(session)?;


if ids.is_empty() {
eprintln!("No dataflows are running");
} else {
println!("Running dataflows:");
for id in ids {
println!("- {id}");
}
let mut tw = TabWriter::new(vec![]);
tw.write_all(b"UUID\tName\tStatus\n")?;
for entry in list.0 {
let uuid = entry.id.uuid;
let name = entry.id.name.unwrap_or_default();
let status = match entry.status {
dora_core::topics::DataflowStatus::Running => "Running",
dora_core::topics::DataflowStatus::Finished => "Succeeded",
dora_core::topics::DataflowStatus::Failed => "Failed",
};
tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?;
} }
tw.flush()?;
let formatted = String::from_utf8(tw.into_inner()?)?;

println!("{formatted}");


Ok(()) Ok(())
} }


fn query_running_dataflows(
session: &mut TcpRequestReplyConnection,
) -> Result<Vec<DataflowId>, eyre::ErrReport> {
fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result<DataflowList> {
let reply_raw = session let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::List).unwrap()) .request(&serde_json::to_vec(&ControlRequest::List).unwrap())
.wrap_err("failed to send list message")?; .wrap_err("failed to send list message")?;
let reply: ControlRequestReply = let reply: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let ids = match reply { let ids = match reply {
ControlRequestReply::DataflowList { dataflows } => dataflows,
ControlRequestReply::DataflowList(list) => list,
ControlRequestReply::Error(err) => bail!("{err}"), ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected list dataflow reply: {other:?}"), other => bail!("unexpected list dataflow reply: {other:?}"),
}; };


+ 70
- 75
binaries/coordinator/src/lib.rs View File

@@ -9,7 +9,10 @@ use dora_core::{
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped},
descriptor::{Descriptor, ResolvedNode}, descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC}, message::uhlc::{self, HLC},
topics::{ControlRequest, ControlRequestReply, DataflowId},
topics::{
ControlRequest, ControlRequestReply, DataflowDaemonResult, DataflowId, DataflowListEntry,
DataflowResult,
},
}; };
use eyre::{bail, eyre, ContextCompat, WrapErr}; use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
@@ -134,7 +137,8 @@ async fn start_inner(
let mut events = (abortable_events, daemon_events).merge(); let mut events = (abortable_events, daemon_events).merge();


let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new(); let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<Uuid, BTreeMap<String, Result<(), String>>> = HashMap::new();
let mut dataflow_results: HashMap<Uuid, BTreeMap<String, DataflowDaemonResult>> =
HashMap::new();
let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new(); let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new();


@@ -220,18 +224,22 @@ async fn start_inner(
Event::Dataflow { uuid, event } => match event { Event::Dataflow { uuid, event } => match event {
DataflowEvent::ReadyOnMachine { DataflowEvent::ReadyOnMachine {
machine_id, machine_id,
success,
exited_before_subscribe,
} => { } => {
match running_dataflows.entry(uuid) { match running_dataflows.entry(uuid) {
std::collections::hash_map::Entry::Occupied(mut entry) => { std::collections::hash_map::Entry::Occupied(mut entry) => {
let dataflow = entry.get_mut(); let dataflow = entry.get_mut();
dataflow.pending_machines.remove(&machine_id); dataflow.pending_machines.remove(&machine_id);
dataflow.init_success &= success;
dataflow
.exited_before_subscribe
.extend(exited_before_subscribe);
if dataflow.pending_machines.is_empty() { if dataflow.pending_machines.is_empty() {
let message = serde_json::to_vec(&Timestamped { let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::AllNodesReady { inner: DaemonCoordinatorEvent::AllNodesReady {
dataflow_id: uuid, dataflow_id: uuid,
success: dataflow.init_success,
exited_before_subscribe: dataflow
.exited_before_subscribe
.clone(),
}, },
timestamp: clock.new_timestamp(), timestamp: clock.new_timestamp(),
}) })
@@ -271,26 +279,20 @@ async fn start_inner(
.insert(uuid, ArchivedDataflow::from(entry.get())); .insert(uuid, ArchivedDataflow::from(entry.get()));
} }
entry.get_mut().machines.remove(&machine_id); entry.get_mut().machines.remove(&machine_id);
match &result {
Ok(()) => {
tracing::info!("dataflow `{uuid}` finished successfully on machine `{machine_id}`");
}
Err(err) => {
tracing::error!("{err:?}");
}
}
dataflow_results dataflow_results
.entry(uuid) .entry(uuid)
.or_default() .or_default()
.insert(machine_id, result.map_err(|err| format!("{err:?}")));
.insert(machine_id, result);
if entry.get_mut().machines.is_empty() { if entry.get_mut().machines.is_empty() {
let finished_dataflow = entry.remove(); let finished_dataflow = entry.remove();
let reply = ControlRequestReply::DataflowStopped { let reply = ControlRequestReply::DataflowStopped {
uuid, uuid,
result: dataflow_results result: dataflow_results
.get(&uuid) .get(&uuid)
.map(|r| dataflow_result(r, uuid))
.unwrap_or(Ok(())),
.map(|r| dataflow_result(r, uuid, &clock))
.unwrap_or_else(|| {
DataflowResult::ok_empty(uuid, clock.new_timestamp())
}),
}; };
for sender in finished_dataflow.reply_senders { for sender in finished_dataflow.reply_senders {
let _ = sender.send(Ok(reply.clone())); let _ = sender.send(Ok(reply.clone()));
@@ -353,8 +355,13 @@ async fn start_inner(
uuid: dataflow_uuid, uuid: dataflow_uuid,
result: dataflow_results result: dataflow_results
.get(&dataflow_uuid) .get(&dataflow_uuid)
.map(|r| dataflow_result(r, dataflow_uuid))
.unwrap_or(Ok(())),
.map(|r| dataflow_result(r, dataflow_uuid, &clock))
.unwrap_or_else(|| {
DataflowResult::ok_empty(
dataflow_uuid,
clock.new_timestamp(),
)
}),
}, },
}; };
let _ = reply_sender.send(Ok(status)); let _ = reply_sender.send(Ok(status));
@@ -396,6 +403,7 @@ async fn start_inner(
reply_sender, reply_sender,
clock.new_timestamp(), clock.new_timestamp(),
grace_duration, grace_duration,
&clock,
) )
.await?; .await?;
} }
@@ -412,6 +420,7 @@ async fn start_inner(
reply_sender, reply_sender,
clock.new_timestamp(), clock.new_timestamp(),
grace_duration, grace_duration,
&clock,
) )
.await? .await?
} }
@@ -458,15 +467,31 @@ async fn start_inner(
let mut dataflows: Vec<_> = running_dataflows.values().collect(); let mut dataflows: Vec<_> = running_dataflows.values().collect();
dataflows.sort_by_key(|d| (&d.name, d.uuid)); dataflows.sort_by_key(|d| (&d.name, d.uuid));


let reply = Ok(ControlRequestReply::DataflowList {
dataflows: dataflows
.into_iter()
.map(|d| DataflowId {
uuid: d.uuid,
name: d.name.clone(),
})
.collect(),
let running = dataflows.into_iter().map(|d| DataflowListEntry {
id: DataflowId {
uuid: d.uuid,
name: d.name.clone(),
},
status: dora_core::topics::DataflowStatus::Running,
}); });
let finished_failed =
dataflow_results.iter().map(|(&uuid, results)| {
let name =
archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
let id = DataflowId { uuid, name };
let status = if results.values().all(|r| r.is_ok()) {
dora_core::topics::DataflowStatus::Finished
} else {
dora_core::topics::DataflowStatus::Failed
};
DataflowListEntry { id, status }
});

let reply = Ok(ControlRequestReply::DataflowList(
dora_core::topics::DataflowList(
running.chain(finished_failed).collect(),
),
));
let _ = reply_sender.send(reply); let _ = reply_sender.send(reply);
} }
ControlRequest::DaemonConnected => { ControlRequest::DaemonConnected => {
@@ -545,18 +570,19 @@ async fn start_inner(


async fn stop_dataflow_by_uuid( async fn stop_dataflow_by_uuid(
running_dataflows: &mut HashMap<Uuid, RunningDataflow>, running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
dataflow_results: &HashMap<Uuid, BTreeMap<String, Result<(), String>>>,
dataflow_results: &HashMap<Uuid, BTreeMap<String, DataflowDaemonResult>>,
dataflow_uuid: Uuid, dataflow_uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>, daemon_connections: &mut HashMap<String, DaemonConnection>,
reply_sender: tokio::sync::oneshot::Sender<Result<ControlRequestReply, eyre::ErrReport>>, reply_sender: tokio::sync::oneshot::Sender<Result<ControlRequestReply, eyre::ErrReport>>,
timestamp: uhlc::Timestamp, timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>, grace_duration: Option<Duration>,
clock: &uhlc::HLC,
) -> Result<(), eyre::ErrReport> { ) -> Result<(), eyre::ErrReport> {
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else { let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
if let Some(result) = dataflow_results.get(&dataflow_uuid) { if let Some(result) = dataflow_results.get(&dataflow_uuid) {
let reply = ControlRequestReply::DataflowStopped { let reply = ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid, uuid: dataflow_uuid,
result: dataflow_result(result, dataflow_uuid),
result: dataflow_result(result, dataflow_uuid, clock),
}; };
let _ = reply_sender.send(Ok(reply)); let _ = reply_sender.send(Ok(reply));
return Ok(()); return Ok(());
@@ -585,36 +611,23 @@ async fn stop_dataflow_by_uuid(
Ok(()) Ok(())
} }


fn format_error(machine: &str, err: &str) -> String {
let mut error = err
.lines()
.fold(format!("- machine `{machine}`:\n"), |mut output, line| {
output.push_str(" ");
output.push_str(line);
output.push('\n');
output
});
error.push('\n');
error
}

fn dataflow_result( fn dataflow_result(
results: &BTreeMap<String, Result<(), String>>,
results: &BTreeMap<String, DataflowDaemonResult>,
dataflow_uuid: Uuid, dataflow_uuid: Uuid,
) -> Result<(), String> {
let mut errors = Vec::new();
for (machine, result) in results {
if let Err(err) = result {
errors.push(format_error(machine, err));
clock: &uhlc::HLC,
) -> DataflowResult {
let mut node_results = BTreeMap::new();
for (_machine, result) in results {
node_results.extend(result.node_results.clone());
if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
tracing::warn!("failed to update HLC: {err}");
} }
} }


if errors.is_empty() {
Ok(())
} else {
let mut formatted = format!("errors occurred in dataflow {dataflow_uuid}:\n");
formatted.push_str(&errors.join("\n"));
Err(formatted)
DataflowResult {
uuid: dataflow_uuid,
timestamp: clock.new_timestamp(),
node_results,
} }
} }


@@ -669,7 +682,7 @@ struct RunningDataflow {
machines: BTreeSet<String>, machines: BTreeSet<String>,
/// IDs of machines that are waiting until all nodes are started. /// IDs of machines that are waiting until all nodes are started.
pending_machines: BTreeSet<String>, pending_machines: BTreeSet<String>,
init_success: bool,
exited_before_subscribe: Vec<NodeId>,
nodes: Vec<ResolvedNode>, nodes: Vec<ResolvedNode>,


reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>, reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
@@ -868,7 +881,7 @@ async fn start_dataflow(
} else { } else {
BTreeSet::new() BTreeSet::new()
}, },
init_success: true,
exited_before_subscribe: Default::default(),
machines, machines,
nodes, nodes,
reply_senders: Vec::new(), reply_senders: Vec::new(),
@@ -935,11 +948,11 @@ impl Event {
pub enum DataflowEvent { pub enum DataflowEvent {
DataflowFinishedOnMachine { DataflowFinishedOnMachine {
machine_id: String, machine_id: String,
result: eyre::Result<()>,
result: DataflowDaemonResult,
}, },
ReadyOnMachine { ReadyOnMachine {
machine_id: String, machine_id: String,
success: bool,
exited_before_subscribe: Vec<NodeId>,
}, },
} }


@@ -974,21 +987,3 @@ fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport>


Ok(ReceiverStream::new(ctrlc_rx)) Ok(ReceiverStream::new(ctrlc_rx))
} }

#[cfg(test)]
mod test {
#[test]
fn test_format_error() {
let machine = "machine A";
let err = "foo\nbar\nbuzz";

// old method
let old_error = {
#[allow(clippy::format_collect)]
let err: String = err.lines().map(|line| format!(" {line}\n")).collect();
format!("- machine `{machine}`:\n{err}\n")
};
let new_error = super::format_error(machine, err);
assert_eq!(old_error, new_error)
}
}

+ 4
- 7
binaries/coordinator/src/listener.rs View File

@@ -1,6 +1,6 @@
use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event};
use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC}; use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC};
use eyre::{eyre, Context};
use eyre::Context;
use std::{io::ErrorKind, net::SocketAddr, sync::Arc}; use std::{io::ErrorKind, net::SocketAddr, sync::Arc};
use tokio::{ use tokio::{
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
@@ -66,13 +66,13 @@ pub async fn handle_connection(
coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event {
coordinator_messages::DaemonEvent::AllNodesReady { coordinator_messages::DaemonEvent::AllNodesReady {
dataflow_id, dataflow_id,
success,
exited_before_subscribe,
} => { } => {
let event = Event::Dataflow { let event = Event::Dataflow {
uuid: dataflow_id, uuid: dataflow_id,
event: DataflowEvent::ReadyOnMachine { event: DataflowEvent::ReadyOnMachine {
machine_id, machine_id,
success,
exited_before_subscribe,
}, },
}; };
if events_tx.send(event).await.is_err() { if events_tx.send(event).await.is_err() {
@@ -85,10 +85,7 @@ pub async fn handle_connection(
} => { } => {
let event = Event::Dataflow { let event = Event::Dataflow {
uuid: dataflow_id, uuid: dataflow_id,
event: DataflowEvent::DataflowFinishedOnMachine {
machine_id,
result: result.map_err(|e| eyre!(e)),
},
event: DataflowEvent::DataflowFinishedOnMachine { machine_id, result },
}; };
if events_tx.send(event).await.is_err() { if events_tx.send(event).await.is_err() {
break; break;


+ 2
- 0
binaries/daemon/Cargo.toml View File

@@ -39,3 +39,5 @@ aligned-vec = "0.5.0"
ctrlc = "3.2.5" ctrlc = "3.2.5"
which = "5.0.0" which = "5.0.0"
sysinfo = "0.30.11" sysinfo = "0.30.11"
crossbeam = "0.8.4"
crossbeam-skiplist = "0.1.3"

+ 127
- 137
binaries/daemon/src/lib.rs View File

@@ -1,5 +1,6 @@
use aligned_vec::{AVec, ConstAlign}; use aligned_vec::{AVec, ConstAlign};
use coordinator::CoordinatorEvent; use coordinator::CoordinatorEvent;
use crossbeam::queue::ArrayQueue;
use dora_core::config::{Input, OperatorId}; use dora_core::config::{Input, OperatorId};
use dora_core::coordinator_messages::CoordinatorRequest; use dora_core::coordinator_messages::CoordinatorRequest;
use dora_core::daemon_messages::{ use dora_core::daemon_messages::{
@@ -9,6 +10,9 @@ use dora_core::descriptor::runtime_node_inputs;
use dora_core::message::uhlc::{self, HLC}; use dora_core::message::uhlc::{self, HLC};
use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters};
use dora_core::topics::LOCALHOST; use dora_core::topics::LOCALHOST;
use dora_core::topics::{
DataflowDaemonResult, DataflowResult, NodeError, NodeErrorCause, NodeExitStatus,
};
use dora_core::{ use dora_core::{
config::{DataId, InputMapping, NodeId}, config::{DataId, InputMapping, NodeId},
coordinator_messages::DaemonEvent, coordinator_messages::DaemonEvent,
@@ -29,9 +33,7 @@ use shared_memory_server::ShmemConf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use std::{ use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet, HashMap}, collections::{BTreeMap, BTreeSet, HashMap},
io,
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
time::Duration, time::Duration,
@@ -64,6 +66,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;


use crate::pending::DataflowStatus; use crate::pending::DataflowStatus;


const STDERR_LOG_LINES: usize = 10;

pub struct Daemon { pub struct Daemon {
running: HashMap<DataflowId, RunningDataflow>, running: HashMap<DataflowId, RunningDataflow>,
working_dir: HashMap<DataflowId, PathBuf>, working_dir: HashMap<DataflowId, PathBuf>,
@@ -78,7 +82,7 @@ pub struct Daemon {
/// used for testing and examples /// used for testing and examples
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>, exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
/// used to record dataflow results when `exit_when_done` is used /// used to record dataflow results when `exit_when_done` is used
dataflow_errors: BTreeMap<Uuid, BTreeMap<NodeId, eyre::Report>>,
dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,


clock: Arc<uhlc::HLC>, clock: Arc<uhlc::HLC>,
} }
@@ -148,7 +152,7 @@ impl Daemon {
.map(|_| ()) .map(|_| ())
} }


pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> {
pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<DataflowResult> {
let working_dir = dataflow_path let working_dir = dataflow_path
.canonicalize() .canonicalize()
.context("failed to canoncialize dataflow path")? .context("failed to canoncialize dataflow path")?
@@ -160,8 +164,9 @@ impl Daemon {
descriptor.check(&working_dir)?; descriptor.check(&working_dir)?;
let nodes = descriptor.resolve_aliases_and_set_defaults()?; let nodes = descriptor.resolve_aliases_and_set_defaults()?;


let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
let spawn_command = SpawnDataflowNodes { let spawn_command = SpawnDataflowNodes {
dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)),
dataflow_id,
working_dir, working_dir,
nodes, nodes,
machine_listen_ports: BTreeMap::new(), machine_listen_ports: BTreeMap::new(),
@@ -191,7 +196,7 @@ impl Daemon {
None, None,
"".to_string(), "".to_string(),
Some(exit_when_done), Some(exit_when_done),
clock,
clock.clone(),
); );


let spawn_result = reply_rx let spawn_result = reply_rx
@@ -205,20 +210,15 @@ impl Daemon {
} }
}); });


let (dataflow_errors, ()) = future::try_join(run_result, spawn_result).await?;
let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;


if dataflow_errors.is_empty() {
Ok(())
} else {
let mut output = "some nodes failed:".to_owned();
for (dataflow, node_errors) in dataflow_errors {
for (node, error) in node_errors {
use std::fmt::Write;
write!(&mut output, "\n - {dataflow}/{node}: {error}").unwrap();
}
}
bail!("{output}");
}
Ok(DataflowResult {
uuid: dataflow_id,
timestamp: clock.new_timestamp(),
node_results: dataflow_results
.remove(&dataflow_id)
.context("no node results for dataflow_id")?,
})
} }


async fn run_general( async fn run_general(
@@ -227,7 +227,7 @@ impl Daemon {
machine_id: String, machine_id: String,
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>, exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
clock: Arc<HLC>, clock: Arc<HLC>,
) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, eyre::Report>>> {
) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>> {
let coordinator_connection = match coordinator_addr { let coordinator_connection = match coordinator_addr {
Some(addr) => { Some(addr) => {
let stream = TcpStream::connect(addr) let stream = TcpStream::connect(addr)
@@ -251,7 +251,7 @@ impl Daemon {
inter_daemon_connections: BTreeMap::new(), inter_daemon_connections: BTreeMap::new(),
machine_id, machine_id,
exit_when_done, exit_when_done,
dataflow_errors: BTreeMap::new(),
dataflow_node_results: BTreeMap::new(),
clock, clock,
}; };


@@ -272,7 +272,7 @@ impl Daemon {
async fn run_inner( async fn run_inner(
mut self, mut self,
incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin, incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, eyre::Report>>> {
) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>> {
let mut events = incoming_events; let mut events = incoming_events;


while let Some(event) = events.next().await { while let Some(event) = events.next().await {
@@ -329,7 +329,7 @@ impl Daemon {
} }
} }


Ok(self.dataflow_errors)
Ok(self.dataflow_node_results)
} }


async fn handle_coordinator_event( async fn handle_coordinator_event(
@@ -376,15 +376,19 @@ impl Daemon {
} }
DaemonCoordinatorEvent::AllNodesReady { DaemonCoordinatorEvent::AllNodesReady {
dataflow_id, dataflow_id,
success,
exited_before_subscribe,
} => { } => {
match self.running.get_mut(&dataflow_id) { match self.running.get_mut(&dataflow_id) {
Some(dataflow) => { Some(dataflow) => {
let ready = exited_before_subscribe.is_empty();
dataflow dataflow
.pending_nodes .pending_nodes
.handle_external_all_nodes_ready(success)
.handle_external_all_nodes_ready(
exited_before_subscribe,
&mut dataflow.cascading_error_causes,
)
.await?; .await?;
if success {
if ready {
tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`");
dataflow.start(&self.events_tx, &self.clock).await?; dataflow.start(&self.events_tx, &self.clock).await?;
} }
@@ -614,6 +618,11 @@ impl Daemon {
dataflow.pending_nodes.insert(node.id.clone()); dataflow.pending_nodes.insert(node.id.clone());


let node_id = node.id.clone(); let node_id = node.id.clone();
let node_stderr_most_recent = dataflow
.node_stderr_most_recent
.entry(node.id.clone())
.or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
.clone();
match spawn::spawn_node( match spawn::spawn_node(
dataflow_id, dataflow_id,
&working_dir, &working_dir,
@@ -621,6 +630,7 @@ impl Daemon {
self.events_tx.clone(), self.events_tx.clone(),
dataflow_descriptor.clone(), dataflow_descriptor.clone(),
self.clock.clone(), self.clock.clone(),
node_stderr_most_recent,
) )
.await .await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`")) .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
@@ -636,6 +646,7 @@ impl Daemon {
&node_id, &node_id,
&mut self.coordinator_connection, &mut self.coordinator_connection,
&self.clock, &self.clock,
&mut dataflow.cascading_error_causes,
) )
.await?; .await?;
} }
@@ -742,6 +753,7 @@ impl Daemon {
reply_sender, reply_sender,
&mut self.coordinator_connection, &mut self.coordinator_connection,
&self.clock, &self.clock,
&mut dataflow.cascading_error_causes,
) )
.await?; .await?;
match status { match status {
@@ -996,7 +1008,12 @@ impl Daemon {


dataflow dataflow
.pending_nodes .pending_nodes
.handle_node_stop(node_id, &mut self.coordinator_connection, &self.clock)
.handle_node_stop(
node_id,
&mut self.coordinator_connection,
&self.clock,
&mut dataflow.cascading_error_causes,
)
.await?; .await?;


Self::handle_outputs_done( Self::handle_outputs_done(
@@ -1013,17 +1030,15 @@ impl Daemon {
.iter() .iter()
.all(|(_id, n)| n.node_config.dynamic) .all(|(_id, n)| n.node_config.dynamic)
{ {
let result = match self.dataflow_errors.get(&dataflow.id) {
None => Ok(()),
Some(errors) => {
let mut output = "some nodes failed:".to_owned();
for (node, error) in errors {
use std::fmt::Write;
write!(&mut output, "\n - {node}: {error}").unwrap();
}
Err(output)
}
let result = DataflowDaemonResult {
timestamp: self.clock.new_timestamp(),
node_results: self
.dataflow_node_results
.get(&dataflow.id)
.context("failed to get dataflow node results")?
.clone(),
}; };

tracing::info!( tracing::info!(
"Dataflow `{dataflow_id}` finished on machine `{}`", "Dataflow `{dataflow_id}` finished on machine `{}`",
self.machine_id self.machine_id
@@ -1142,80 +1157,57 @@ impl Daemon {
node_id, node_id,
exit_status, exit_status,
} => { } => {
let node_error = match exit_status {
let node_result = match exit_status {
NodeExitStatus::Success => { NodeExitStatus::Success => {
tracing::info!("node {dataflow_id}/{node_id} finished successfully"); tracing::info!("node {dataflow_id}/{node_id} finished successfully");
None
}
NodeExitStatus::IoError(err) => {
let err = eyre!(err).wrap_err(format!(
"
I/O error while waiting for node `{dataflow_id}/{node_id}.

Check logs using: dora logs {dataflow_id} {node_id}
"
));
tracing::error!("{err:?}");
Some(err)
Ok(())
} }
NodeExitStatus::ExitCode(code) => {
let err = eyre!(
"
{dataflow_id}/{node_id} failed with exit code {code}.

Check logs using: dora logs {dataflow_id} {node_id}
"
);
tracing::error!("{err}");
Some(err)
}
NodeExitStatus::Signal(signal) => {
let signal: Cow<_> = match signal {
1 => "SIGHUP".into(),
2 => "SIGINT".into(),
3 => "SIGQUIT".into(),
4 => "SIGILL".into(),
6 => "SIGABRT".into(),
8 => "SIGFPE".into(),
9 => "SIGKILL".into(),
11 => "SIGSEGV".into(),
13 => "SIGPIPE".into(),
14 => "SIGALRM".into(),
15 => "SIGTERM".into(),
22 => "SIGABRT".into(),
23 => "NSIG".into(),

other => other.to_string().into(),
exit_status => {
let dataflow = self.running.get(&dataflow_id);
let caused_by_node = dataflow
.and_then(|dataflow| {
dataflow.cascading_error_causes.error_caused_by(&node_id)
})
.cloned();
let grace_duration_kill = dataflow
.map(|d| d.grace_duration_kills.contains(&node_id))
.unwrap_or_default();

let cause = match caused_by_node {
Some(caused_by_node) => {
tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`");
NodeErrorCause::Cascading { caused_by_node }
}
None if grace_duration_kill => NodeErrorCause::GraceDuration,
None => NodeErrorCause::Other {
stderr: dataflow
.and_then(|d| d.node_stderr_most_recent.get(&node_id))
.map(|queue| {
let mut s = if queue.is_full() {
"[...]".into()
} else {
String::new()
};
while let Some(line) = queue.pop() {
s += &line;
}
s
})
.unwrap_or_default(),
},
}; };
let err = eyre!(
"
{dataflow_id}/{node_id} failed with signal `{signal}`

Check logs using: dora logs {dataflow_id} {node_id}
"
);
tracing::error!("{err}");
Some(err)
}
NodeExitStatus::Unknown => {
let err = eyre!(
"
{dataflow_id}/{node_id} failed with unknown exit code
Check logs using: dora logs {dataflow_id} {node_id}
"
);
tracing::error!("{err}");
Some(err)
Err(NodeError {
timestamp: self.clock.new_timestamp(),
cause,
exit_status,
})
} }
}; };


if let Some(err) = node_error {
self.dataflow_errors
.entry(dataflow_id)
.or_default()
.insert(node_id.clone(), err);
}
self.dataflow_node_results
.entry(dataflow_id)
.or_default()
.insert(node_id.clone(), node_result);


self.handle_node_stop(dataflow_id, &node_id).await?; self.handle_node_stop(dataflow_id, &node_id).await?;


@@ -1423,6 +1415,12 @@ pub struct RunningDataflow {
/// ///
/// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable.
empty_set: BTreeSet<DataId>, empty_set: BTreeSet<DataId>,

/// Contains the node that caused the error for nodes that experienced a cascading error.
cascading_error_causes: CascadingErrorCauses,
grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,

node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
} }


impl RunningDataflow { impl RunningDataflow {
@@ -1441,6 +1439,9 @@ impl RunningDataflow {
_timer_handles: Vec::new(), _timer_handles: Vec::new(),
stop_sent: false, stop_sent: false,
empty_set: BTreeSet::new(), empty_set: BTreeSet::new(),
cascading_error_causes: Default::default(),
grace_duration_kills: Default::default(),
node_stderr_most_recent: BTreeMap::new(),
} }
} }


@@ -1503,6 +1504,7 @@ impl RunningDataflow {
} }


let running_nodes = self.running_nodes.clone(); let running_nodes = self.running_nodes.clone();
let grace_duration_kills = self.grace_duration_kills.clone();
tokio::spawn(async move { tokio::spawn(async move {
let duration = grace_duration.unwrap_or(Duration::from_millis(500)); let duration = grace_duration.unwrap_or(Duration::from_millis(500));
tokio::time::sleep(duration).await; tokio::time::sleep(duration).await;
@@ -1512,6 +1514,7 @@ impl RunningDataflow {
for (node, node_details) in running_nodes.iter() { for (node, node_details) in running_nodes.iter() {
if let Some(pid) = node_details.pid { if let Some(pid) = node_details.pid {
if let Some(process) = system.process(Pid::from(pid as usize)) { if let Some(process) = system.process(Pid::from(pid as usize)) {
grace_duration_kills.insert(node.clone());
process.kill(); process.kill();
warn!( warn!(
"{node} was killed due to not stopping within the {:#?} grace period", "{node} was killed due to not stopping within the {:#?} grace period",
@@ -1644,39 +1647,6 @@ pub enum DoraEvent {
}, },
} }


#[derive(Debug)]
pub enum NodeExitStatus {
Success,
IoError(io::Error),
ExitCode(i32),
Signal(i32),
Unknown,
}

impl From<Result<std::process::ExitStatus, io::Error>> for NodeExitStatus {
fn from(result: Result<std::process::ExitStatus, io::Error>) -> Self {
match result {
Ok(status) => {
if status.success() {
NodeExitStatus::Success
} else if let Some(code) = status.code() {
Self::ExitCode(code)
} else {
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(signal) = status.signal() {
return Self::Signal(signal);
}
}
Self::Unknown
}
}
Err(err) => Self::IoError(err),
}
}
}

#[must_use] #[must_use]
enum RunStatus { enum RunStatus {
Continue, Continue,
@@ -1723,3 +1693,23 @@ fn set_up_ctrlc_handler(


Ok(ReceiverStream::new(ctrlc_rx)) Ok(ReceiverStream::new(ctrlc_rx))
} }

#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct CascadingErrorCauses {
caused_by: BTreeMap<NodeId, NodeId>,
}

impl CascadingErrorCauses {
pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
self.caused_by.contains_key(node)
}

/// Return the ID of the node that caused a cascading error for the given node, if any.
pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
self.caused_by.get(node)
}

pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
self.caused_by.entry(affected_node).or_insert(causing_node);
}
}

+ 49
- 41
binaries/daemon/src/pending.rs View File

@@ -9,7 +9,7 @@ use dora_core::{
use eyre::{bail, Context}; use eyre::{bail, Context};
use tokio::{net::TcpStream, sync::oneshot}; use tokio::{net::TcpStream, sync::oneshot};


use crate::tcp_utils::tcp_send;
use crate::{tcp_utils::tcp_send, CascadingErrorCauses};


pub struct PendingNodes { pub struct PendingNodes {
dataflow_id: DataflowId, dataflow_id: DataflowId,
@@ -28,7 +28,7 @@ pub struct PendingNodes {
/// ///
/// If this list is non-empty, we should not start the dataflow at all. Instead, /// If this list is non-empty, we should not start the dataflow at all. Instead,
/// we report an error to the other nodes. /// we report an error to the other nodes.
exited_before_subscribe: HashSet<NodeId>,
exited_before_subscribe: Vec<NodeId>,


/// Whether the local init result was already reported to the coordinator. /// Whether the local init result was already reported to the coordinator.
reported_init_to_coordinator: bool, reported_init_to_coordinator: bool,
@@ -42,7 +42,7 @@ impl PendingNodes {
local_nodes: HashSet::new(), local_nodes: HashSet::new(),
external_nodes: false, external_nodes: false,
waiting_subscribers: HashMap::new(), waiting_subscribers: HashMap::new(),
exited_before_subscribe: HashSet::new(),
exited_before_subscribe: Default::default(),
reported_init_to_coordinator: false, reported_init_to_coordinator: false,
} }
} }
@@ -61,12 +61,13 @@ impl PendingNodes {
reply_sender: oneshot::Sender<DaemonReply>, reply_sender: oneshot::Sender<DaemonReply>,
coordinator_connection: &mut Option<TcpStream>, coordinator_connection: &mut Option<TcpStream>,
clock: &HLC, clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
) -> eyre::Result<DataflowStatus> { ) -> eyre::Result<DataflowStatus> {
self.waiting_subscribers self.waiting_subscribers
.insert(node_id.clone(), reply_sender); .insert(node_id.clone(), reply_sender);
self.local_nodes.remove(&node_id); self.local_nodes.remove(&node_id);


self.update_dataflow_status(coordinator_connection, clock)
self.update_dataflow_status(coordinator_connection, clock, cascading_errors)
.await .await
} }


@@ -75,26 +76,28 @@ impl PendingNodes {
node_id: &NodeId, node_id: &NodeId,
coordinator_connection: &mut Option<TcpStream>, coordinator_connection: &mut Option<TcpStream>,
clock: &HLC, clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
if self.local_nodes.remove(node_id) { if self.local_nodes.remove(node_id) {
tracing::warn!("node `{node_id}` exited before initializing dora connection"); tracing::warn!("node `{node_id}` exited before initializing dora connection");
self.exited_before_subscribe.insert(node_id.clone());
self.update_dataflow_status(coordinator_connection, clock)
self.exited_before_subscribe.push(node_id.clone());
self.update_dataflow_status(coordinator_connection, clock, cascading_errors)
.await?; .await?;
} }
Ok(()) Ok(())
} }


pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> {
pub async fn handle_external_all_nodes_ready(
&mut self,
exited_before_subscribe: Vec<NodeId>,
cascading_errors: &mut CascadingErrorCauses,
) -> eyre::Result<()> {
if !self.local_nodes.is_empty() { if !self.local_nodes.is_empty() {
bail!("received external `all_nodes_ready` event before local nodes were ready"); bail!("received external `all_nodes_ready` event before local nodes were ready");
} }
let external_error = if success {
None
} else {
Some("some nodes failed to initialize on remote machines".to_string())
};
self.answer_subscribe_requests(external_error).await;

self.answer_subscribe_requests(exited_before_subscribe, cascading_errors)
.await;


Ok(()) Ok(())
} }
@@ -103,6 +106,7 @@ impl PendingNodes {
&mut self, &mut self,
coordinator_connection: &mut Option<TcpStream>, coordinator_connection: &mut Option<TcpStream>,
clock: &HLC, clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
) -> eyre::Result<DataflowStatus> { ) -> eyre::Result<DataflowStatus> {
if self.local_nodes.is_empty() { if self.local_nodes.is_empty() {
if self.external_nodes { if self.external_nodes {
@@ -113,7 +117,8 @@ impl PendingNodes {
} }
Ok(DataflowStatus::Pending) Ok(DataflowStatus::Pending)
} else { } else {
self.answer_subscribe_requests(None).await;
self.answer_subscribe_requests(Vec::new(), cascading_errors)
.await;
Ok(DataflowStatus::AllNodesReady) Ok(DataflowStatus::AllNodesReady)
} }
} else { } else {
@@ -121,33 +126,34 @@ impl PendingNodes {
} }
} }


async fn answer_subscribe_requests(&mut self, external_error: Option<String>) {
let result = if self.exited_before_subscribe.is_empty() {
match external_error {
Some(err) => Err(err),
None => Ok(()),
}
} else {
let node_id_message = if self.exited_before_subscribe.len() == 1 {
self.exited_before_subscribe
.iter()
.next()
.map(|node_id| node_id.to_string())
.unwrap_or("<node_id>".to_string())
} else {
"<node_id>".to_string()
};
Err(format!(
"Some nodes exited before subscribing to dora: {:?}\n\n\
This is typically happens when an initialization error occurs
in the node or operator. To check the output of the failed
nodes, run `dora logs {} {node_id_message}`.",
self.exited_before_subscribe, self.dataflow_id
))
async fn answer_subscribe_requests(
&mut self,
exited_before_subscribe_external: Vec<NodeId>,
cascading_errors: &mut CascadingErrorCauses,
) {
let node_exited_before_subscribe = match self.exited_before_subscribe.as_slice() {
[first, ..] => Some(first),
[] => match exited_before_subscribe_external.as_slice() {
[first, ..] => Some(first),
[] => None,
},
}; };

let result = match &node_exited_before_subscribe {
Some(causing_node) => Err(format!(
"Node {causing_node} exited before initializing dora. For \
more information, run `dora logs {} {causing_node}`.",
self.dataflow_id
)),
None => Ok(()),
};

// answer all subscribe requests // answer all subscribe requests
let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); let subscribe_replies = std::mem::take(&mut self.waiting_subscribers);
for reply_sender in subscribe_replies.into_values() {
for (node_id, reply_sender) in subscribe_replies.into_iter() {
if let Some(causing_node) = node_exited_before_subscribe {
cascading_errors.report_cascading_error(causing_node.clone(), node_id.clone());
}
let _ = reply_sender.send(DaemonReply::Result(result.clone())); let _ = reply_sender.send(DaemonReply::Result(result.clone()));
} }
} }
@@ -161,15 +167,17 @@ impl PendingNodes {
bail!("no coordinator connection to send AllNodesReady"); bail!("no coordinator connection to send AllNodesReady");
}; };


let success = self.exited_before_subscribe.is_empty();
tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes");
tracing::info!(
"all local nodes are ready (exit before subscribe: {:?}), waiting for remote nodes",
self.exited_before_subscribe
);


let msg = serde_json::to_vec(&Timestamped { let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event { inner: CoordinatorRequest::Event {
machine_id: self.machine_id.clone(), machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesReady { event: DaemonEvent::AllNodesReady {
dataflow_id: self.dataflow_id, dataflow_id: self.dataflow_id,
success,
exited_before_subscribe: self.exited_before_subscribe.clone(),
}, },
}, },
timestamp, timestamp,


+ 9
- 3
binaries/daemon/src/spawn.rs View File

@@ -3,6 +3,7 @@ use crate::{
OutputId, RunningNode, OutputId, RunningNode,
}; };
use aligned_vec::{AVec, ConstAlign}; use aligned_vec::{AVec, ConstAlign};
use crossbeam::queue::ArrayQueue;
use dora_arrow_convert::IntoArrow; use dora_arrow_convert::IntoArrow;
use dora_core::{ use dora_core::{
config::DataId, config::DataId,
@@ -42,6 +43,7 @@ pub async fn spawn_node(
daemon_tx: mpsc::Sender<Timestamped<Event>>, daemon_tx: mpsc::Sender<Timestamped<Event>>,
dataflow_descriptor: Descriptor, dataflow_descriptor: Descriptor,
clock: Arc<HLC>, clock: Arc<HLC>,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
) -> eyre::Result<RunningNode> { ) -> eyre::Result<RunningNode> {
let node_id = node.id.clone(); let node_id = node.id.clone();
tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); tracing::debug!("Spawning node `{dataflow_id}/{node_id}`");
@@ -358,18 +360,22 @@ pub async fn spawn_node(
} }
}; };


match String::from_utf8(raw) {
Ok(s) => buffer.push_str(&s),
let new = match String::from_utf8(raw) {
Ok(s) => s,
Err(err) => { Err(err) => {
let lossy = String::from_utf8_lossy(err.as_bytes()); let lossy = String::from_utf8_lossy(err.as_bytes());
tracing::warn!( tracing::warn!(
"stderr not valid UTF-8 string (node {node_id}): {}: {lossy}", "stderr not valid UTF-8 string (node {node_id}): {}: {lossy}",
err.utf8_error() err.utf8_error()
); );
buffer.push_str(&lossy)
lossy.into_owned()
} }
}; };


buffer.push_str(&new);

node_stderr_most_recent.force_push(new);

if buffer.starts_with("Traceback (most recent call last):") { if buffer.starts_with("Traceback (most recent call last):") {
if !finished { if !finished {
continue; continue;


+ 1
- 1
examples/multiple-daemons/run.rs View File

@@ -176,7 +176,7 @@ async fn running_dataflows(coordinator_events_tx: &Sender<Event>) -> eyre::Resul
.await?; .await?;
let result = reply.await??; let result = reply.await??;
let dataflows = match result { let dataflows = match result {
ControlRequestReply::DataflowList { dataflows } => dataflows,
ControlRequestReply::DataflowList(list) => list.get_active(),
ControlRequestReply::Error(err) => bail!("{err}"), ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"), other => bail!("unexpected start dataflow reply: {other:?}"),
}; };


+ 3
- 3
libraries/core/src/coordinator_messages.rs View File

@@ -1,4 +1,4 @@
use crate::daemon_messages::DataflowId;
use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult};
use eyre::eyre; use eyre::eyre;


#[derive(Debug, serde::Serialize, serde::Deserialize)] #[derive(Debug, serde::Serialize, serde::Deserialize)]
@@ -18,11 +18,11 @@ pub enum CoordinatorRequest {
pub enum DaemonEvent { pub enum DaemonEvent {
AllNodesReady { AllNodesReady {
dataflow_id: DataflowId, dataflow_id: DataflowId,
success: bool,
exited_before_subscribe: Vec<NodeId>,
}, },
AllNodesFinished { AllNodesFinished {
dataflow_id: DataflowId, dataflow_id: DataflowId,
result: Result<(), String>,
result: DataflowDaemonResult,
}, },
Heartbeat, Heartbeat,
} }


+ 1
- 1
libraries/core/src/daemon_messages.rs View File

@@ -234,7 +234,7 @@ pub enum DaemonCoordinatorEvent {
Spawn(SpawnDataflowNodes), Spawn(SpawnDataflowNodes),
AllNodesReady { AllNodesReady {
dataflow_id: DataflowId, dataflow_id: DataflowId,
success: bool,
exited_before_subscribe: Vec<NodeId>,
}, },
StopDataflow { StopDataflow {
dataflow_id: DataflowId, dataflow_id: DataflowId,


+ 168
- 15
libraries/core/src/topics.rs View File

@@ -1,5 +1,7 @@
use dora_message::uhlc;
use std::{ use std::{
collections::BTreeSet,
borrow::Cow,
collections::{BTreeMap, BTreeSet},
fmt::Display, fmt::Display,
net::{IpAddr, Ipv4Addr}, net::{IpAddr, Ipv4Addr},
path::PathBuf, path::PathBuf,
@@ -55,24 +57,40 @@ pub enum ControlRequest {
ConnectedMachines, ConnectedMachines,
} }


#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowList(pub Vec<DataflowListEntry>);

impl DataflowList {
pub fn get_active(&self) -> Vec<DataflowId> {
self.0
.iter()
.filter(|d| d.status == DataflowStatus::Running)
.map(|d| d.id.clone())
.collect()
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowListEntry {
pub id: DataflowId,
pub status: DataflowStatus,
}

#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
pub enum DataflowStatus {
Running,
Finished,
Failed,
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply { pub enum ControlRequestReply {
Error(String), Error(String),
CoordinatorStopped, CoordinatorStopped,
DataflowStarted {
uuid: Uuid,
},
DataflowReloaded {
uuid: Uuid,
},
DataflowStopped {
uuid: Uuid,
result: Result<(), String>,
},

DataflowList {
dataflows: Vec<DataflowId>,
},
DataflowStarted { uuid: Uuid },
DataflowReloaded { uuid: Uuid },
DataflowStopped { uuid: Uuid, result: DataflowResult },
DataflowList(DataflowList),
DestroyOk, DestroyOk,
DaemonConnected(bool), DaemonConnected(bool),
ConnectedMachines(BTreeSet<String>), ConnectedMachines(BTreeSet<String>),
@@ -94,3 +112,138 @@ impl Display for DataflowId {
} }
} }
} }

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowResult {
pub uuid: Uuid,
pub timestamp: uhlc::Timestamp,
pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
}

impl DataflowResult {
pub fn ok_empty(uuid: Uuid, timestamp: uhlc::Timestamp) -> Self {
Self {
uuid,
timestamp,
node_results: Default::default(),
}
}

pub fn is_ok(&self) -> bool {
self.node_results.values().all(|r| r.is_ok())
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowDaemonResult {
pub timestamp: uhlc::Timestamp,
pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
}

impl DataflowDaemonResult {
pub fn is_ok(&self) -> bool {
self.node_results.values().all(|r| r.is_ok())
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct NodeError {
pub timestamp: uhlc::Timestamp,
pub cause: NodeErrorCause,
pub exit_status: NodeExitStatus,
}

impl std::fmt::Display for NodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.exit_status {
NodeExitStatus::Success => write!(f, "<success>"),
NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
NodeExitStatus::Signal(signal) => {
let signal_str: Cow<_> = match signal {
1 => "SIGHUP".into(),
2 => "SIGINT".into(),
3 => "SIGQUIT".into(),
4 => "SIGILL".into(),
6 => "SIGABRT".into(),
8 => "SIGFPE".into(),
9 => "SIGKILL".into(),
11 => "SIGSEGV".into(),
13 => "SIGPIPE".into(),
14 => "SIGALRM".into(),
15 => "SIGTERM".into(),
22 => "SIGABRT".into(),
23 => "NSIG".into(),
other => other.to_string().into(),
};
if matches!(self.cause, NodeErrorCause::GraceDuration) {
write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})")
} else {
write!(f, "exited because of signal {signal_str}")
}
}
NodeExitStatus::Unknown => write!(f, "unknown exit status"),
}?;

match &self.cause {
NodeErrorCause::GraceDuration => {}, // handled above
NodeErrorCause::Cascading { caused_by_node } => write!(
f,
"\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora."
)?,
NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
NodeErrorCause::Other { stderr } => {
let line: &str = "---------------------------------------------------------------------------------\n";
write!(f, " with stderr output:\n{line}{stderr}{line}")?
},
}

Ok(())
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum NodeErrorCause {
/// Node was killed because it didn't react to a stop message in time.
GraceDuration,
/// Node failed because another node failed before,
Cascading {
caused_by_node: NodeId,
},
Other {
stderr: String,
},
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum NodeExitStatus {
Success,
IoError(String),
ExitCode(i32),
Signal(i32),
Unknown,
}

impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
match result {
Ok(status) => {
if status.success() {
NodeExitStatus::Success
} else if let Some(code) = status.code() {
Self::ExitCode(code)
} else {
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(signal) = status.signal() {
return Self::Signal(signal);
}
}
Self::Unknown
}
}
Err(err) => Self::IoError(err.to_string()),
}
}
}

Loading…
Cancel
Save