Browse Source

Merge branch 'main' into error-logging

tags/v0.3.5-rc0
Philipp Oppermann 1 year ago
parent
commit
2d0f0dcf5e
Failed to extract signature
9 changed files with 259 additions and 73 deletions
  1. +6
    -6
      .github/workflows/ci.yml
  2. +126
    -33
      Cargo.lock
  3. +1
    -0
      binaries/cli/Cargo.toml
  4. +43
    -20
      binaries/cli/src/main.rs
  5. +26
    -9
      binaries/coordinator/src/lib.rs
  6. +1
    -0
      binaries/daemon/Cargo.toml
  7. +13
    -1
      binaries/daemon/src/lib.rs
  8. +1
    -1
      examples/multiple-daemons/run.rs
  9. +42
    -3
      libraries/core/src/topics.rs

+ 6
- 6
.github/workflows/ci.yml View File

@@ -286,12 +286,12 @@ jobs:
cargo build --all
dora up
dora list
dora start dataflow.yml --name ci-rust-test
dora start dataflow.yml --name ci-rust-test --detach
sleep 10
dora stop --name ci-rust-test --grace-duration 5s
cd ..
dora build examples/rust-dataflow/dataflow_dynamic.yml
dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic
dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic --detach
cargo run -p rust-dataflow-example-sink-dynamic
sleep 5
dora stop --name ci-rust-dynamic --grace-duration 5s
@@ -315,11 +315,11 @@ jobs:
cd test_python_project
dora up
dora list
dora start dataflow.yml --name ci-python-test
dora start dataflow.yml --name ci-python-test --detach
sleep 10
dora stop --name ci-python-test --grace-duration 5s
pip install "numpy<2.0.0" opencv-python
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach
python ../examples/python-dataflow/plot_dynamic.py
sleep 5
dora stop --name ci-python-test --grace-duration 5s
@@ -339,7 +339,7 @@ jobs:
cmake -B build
cmake --build build
cmake --install build
dora start dataflow.yml --name ci-c-test
dora start dataflow.yml --name ci-c-test --detach
sleep 10
dora stop --name ci-c-test --grace-duration 5s
dora destroy
@@ -358,7 +358,7 @@ jobs:
cmake -B build
cmake --build build
cmake --install build
dora start dataflow.yml --name ci-cxx-test
dora start dataflow.yml --name ci-cxx-test --detach
sleep 10
dora stop --name ci-cxx-test --grace-duration 5s
dora destroy


+ 126
- 33
Cargo.lock View File

@@ -885,9 +885,12 @@ dependencies = [

[[package]]
name = "atomic"
version = "0.5.3"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994"
dependencies = [
"bytemuck",
]

[[package]]
name = "atomic-waker"
@@ -982,7 +985,7 @@ dependencies = [
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper",
"sync_wrapper 0.1.2",
"tower",
"tower-layer",
"tower-service",
@@ -1935,6 +1938,16 @@ dependencies = [
"crossbeam-utils",
]

[[package]]
name = "crossbeam-skiplist"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]

[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@@ -2021,9 +2034,9 @@ checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991"

[[package]]
name = "cxx"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8194f089b6da4751d6c1da1ef37c17255df51f9346cdb160f8b096562ae4a85c"
checksum = "273dcfd3acd4e1e276af13ed2a43eea7001318823e7a726a6b3ed39b4acc0b82"
dependencies = [
"cc",
"cxxbridge-flags",
@@ -2033,9 +2046,9 @@ dependencies = [

[[package]]
name = "cxx-build"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e8df9a089caae66634d754672d5f909395f30f38af6ff19366980d8a8b57501"
checksum = "d8b2766fbd92be34e9ed143898fce6c572dc009de39506ed6903e5a05b68914e"
dependencies = [
"cc",
"codespan-reporting",
@@ -2048,15 +2061,15 @@ dependencies = [

[[package]]
name = "cxxbridge-flags"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25290be4751803672a70b98c68b51c1e7d0a640ab5a4377f240f9d2e70054cd1"
checksum = "839fcd5e43464614ffaa989eaf1c139ef1f0c51672a1ed08023307fa1b909ccd"

[[package]]
name = "cxxbridge-macro"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8cb317cb13604b4752416783bb25070381c36e844743e4146b7f8e55de7d140"
checksum = "4b2c1c1776b986979be68bb2285da855f8d8a35851a769fca8740df7c3d07877"
dependencies = [
"proc-macro2",
"quote",
@@ -2281,6 +2294,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml 0.9.34+deprecated",
"tabwriter",
"termcolor",
"tokio",
"tokio-stream",
@@ -2336,6 +2350,7 @@ dependencies = [
"async-trait",
"bincode",
"crossbeam",
"crossbeam-skiplist",
"ctrlc",
"dora-arrow-convert",
"dora-core",
@@ -3998,7 +4013,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.5.7",
"socket2 0.4.10",
"tokio",
"tower-service",
"tracing",
@@ -4026,19 +4041,20 @@ dependencies = [

[[package]]
name = "hyper-rustls"
version = "0.26.0"
version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.3.1",
"hyper-util",
"rustls 0.22.4",
"rustls 0.23.10",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tower-service",
"webpki-roots 0.26.1",
]

[[package]]
@@ -4618,7 +4634,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19"
dependencies = [
"cfg-if 1.0.0",
"windows-targets 0.52.5",
"windows-targets 0.48.5",
]

[[package]]
@@ -6427,9 +6443,9 @@ dependencies = [

[[package]]
name = "proc-macro2"
version = "1.0.85"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23"
checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
dependencies = [
"unicode-ident",
]
@@ -6620,8 +6636,8 @@ checksum = "2e8b432585672228923edbbf64b8b12c14e1112f62e88737655b4a083dbcd78e"
dependencies = [
"bytes",
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"quinn-proto 0.9.6",
"quinn-udp 0.3.2",
"rustc-hash",
"rustls 0.20.9",
"thiserror",
@@ -6630,6 +6646,23 @@ dependencies = [
"webpki",
]

[[package]]
name = "quinn"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad"
dependencies = [
"bytes",
"pin-project-lite",
"quinn-proto 0.11.3",
"quinn-udp 0.5.2",
"rustc-hash",
"rustls 0.23.10",
"thiserror",
"tokio",
"tracing",
]

[[package]]
name = "quinn-proto"
version = "0.9.6"
@@ -6649,6 +6682,23 @@ dependencies = [
"webpki",
]

[[package]]
name = "quinn-proto"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe"
dependencies = [
"bytes",
"rand",
"ring 0.17.8",
"rustc-hash",
"rustls 0.23.10",
"slab",
"thiserror",
"tinyvec",
"tracing",
]

[[package]]
name = "quinn-udp"
version = "0.3.2"
@@ -6656,12 +6706,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "641538578b21f5e5c8ea733b736895576d0fe329bb883b937db6f4d163dbaaf4"
dependencies = [
"libc",
"quinn-proto",
"quinn-proto 0.9.6",
"socket2 0.4.10",
"tracing",
"windows-sys 0.42.0",
]

[[package]]
name = "quinn-udp"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46"
dependencies = [
"libc",
"once_cell",
"socket2 0.5.7",
"tracing",
"windows-sys 0.52.0",
]

[[package]]
name = "quote"
version = "1.0.36"
@@ -7869,9 +7932,9 @@ checksum = "19b30a45b0cd0bcca8037f3d0dc3421eaf95327a17cad11964fb8179b4fc4832"

[[package]]
name = "reqwest"
version = "0.12.4"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10"
checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37"
dependencies = [
"base64 0.22.1",
"bytes",
@@ -7890,13 +7953,14 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.22.4",
"quinn 0.11.2",
"rustls 0.23.10",
"rustls-pemfile 2.1.2",
"rustls-pki-types",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"sync_wrapper 1.0.1",
"tokio",
"tokio-rustls",
"tower-service",
@@ -8274,6 +8338,20 @@ dependencies = [
"zeroize",
]

[[package]]
name = "rustls"
version = "0.23.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402"
dependencies = [
"once_cell",
"ring 0.17.8",
"rustls-pki-types",
"rustls-webpki",
"subtle",
"zeroize",
]

[[package]]
name = "rustls-native-certs"
version = "0.6.3"
@@ -9066,6 +9144,12 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"

[[package]]
name = "sync_wrapper"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"

[[package]]
name = "syntect"
version = "5.2.0"
@@ -9128,6 +9212,15 @@ dependencies = [
"windows 0.52.0",
]

[[package]]
name = "tabwriter"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a327282c4f64f6dc37e3bba4c2b6842cc3a992f204fa58d917696a89f691e5f6"
dependencies = [
"unicode-width",
]

[[package]]
name = "tap"
version = "1.0.1"
@@ -9369,11 +9462,11 @@ dependencies = [

[[package]]
name = "tokio-rustls"
version = "0.25.0"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
"rustls 0.22.4",
"rustls 0.23.10",
"rustls-pki-types",
"tokio",
]
@@ -9842,9 +9935,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"

[[package]]
name = "uuid"
version = "1.8.0"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
checksum = "3ea73390fe27785838dcbf75b91b1d84799e28f1ce71e6f372a5dc2200c80de5"
dependencies = [
"atomic",
"getrandom",
@@ -9856,9 +9949,9 @@ dependencies = [

[[package]]
name = "uuid-macro-internal"
version = "1.8.0"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2"
checksum = "cdb394c528d1cb434c2f0522027e50c0705305caf6d20405c07a6f7e4cf9543c"
dependencies = [
"proc-macro2",
"quote",
@@ -11168,7 +11261,7 @@ dependencies = [
"async-trait",
"futures",
"log",
"quinn",
"quinn 0.9.4",
"rustls 0.20.9",
"rustls-native-certs",
"rustls-pemfile 1.0.4",


+ 1
- 0
binaries/cli/Cargo.toml View File

@@ -42,6 +42,7 @@ tokio = { version = "1.20.1", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["io-util", "net"] }
futures = "0.3.21"
duration-str = "0.5"
tabwriter = "1.4.0"
log = { version = "0.4.21", features = ["serde"] }
colored = "2.1.0"
env_logger = "0.11.3"

+ 43
- 20
binaries/cli/src/main.rs View File

@@ -6,7 +6,7 @@ use dora_coordinator::Event;
use dora_core::{
descriptor::Descriptor,
topics::{
ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
},
};
@@ -17,12 +17,13 @@ use dora_tracing::set_up_tracing_opts;
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
use std::net::SocketAddr;
use std::{io::Write, net::SocketAddr};
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
time::Duration,
};
use tabwriter::TabWriter;
use tokio::runtime::Builder;
use uuid::Uuid;

@@ -120,6 +121,9 @@ enum Command {
/// Attach to the dataflow and wait for its completion
#[clap(long, action)]
attach: bool,
/// Run the dataflow in background
#[clap(long, action)]
detach: bool,
/// Enable hot reloading (Python only)
#[clap(long, action)]
hot_reload: bool,
@@ -330,17 +334,18 @@ fn run() -> eyre::Result<()> {
} => {
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
let uuids = query_running_dataflows(&mut *session)
let list = query_running_dataflows(&mut *session)
.wrap_err("failed to query running dataflows")?;
if let Some(dataflow) = dataflow {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
logs::logs(&mut *session, uuid, name, node)?
} else {
let uuid = match &uuids[..] {
let active = list.get_active();
let uuid = match &active[..] {
[] => bail!("No dataflows are running"),
[uuid] => uuid.clone(),
_ => inquire::Select::new("Choose dataflow to show logs:", uuids).prompt()?,
_ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?,
};
logs::logs(&mut *session, Some(uuid.uuid), None, node)?
}
@@ -351,6 +356,7 @@ fn run() -> eyre::Result<()> {
coordinator_addr,
coordinator_port,
attach,
detach,
hot_reload,
} => {
let dataflow_descriptor =
@@ -379,6 +385,16 @@ fn run() -> eyre::Result<()> {
&mut *session,
)?;

let attach = match (attach, detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
(true, false) => true,
(false, true) => false,
(false, false) => {
println!("attaching to dataflow (use `--detach` to run in background)");
true
}
};

if attach {
attach_dataflow(
dataflow_descriptor,
@@ -522,11 +538,12 @@ fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> eyre::Result<()> {
let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
if uuids.is_empty() {
let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
let active = list.get_active();
if active.is_empty() {
eprintln!("No dataflows are running");
} else {
let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?;
let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?;
stop_dataflow(selection.uuid, grace_duration, session)?;
}

@@ -602,30 +619,36 @@ fn stop_dataflow_by_name(
}

fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> {
let ids = query_running_dataflows(session)?;
let list = query_running_dataflows(session)?;

if ids.is_empty() {
eprintln!("No dataflows are running");
} else {
println!("Running dataflows:");
for id in ids {
println!("- {id}");
}
let mut tw = TabWriter::new(vec![]);
tw.write_all(b"UUID\tName\tStatus\n")?;
for entry in list.0 {
let uuid = entry.id.uuid;
let name = entry.id.name.unwrap_or_default();
let status = match entry.status {
dora_core::topics::DataflowStatus::Running => "Running",
dora_core::topics::DataflowStatus::Finished => "Succeeded",
dora_core::topics::DataflowStatus::Failed => "Failed",
};
tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?;
}
tw.flush()?;
let formatted = String::from_utf8(tw.into_inner()?)?;

println!("{formatted}");

Ok(())
}

fn query_running_dataflows(
session: &mut TcpRequestReplyConnection,
) -> Result<Vec<DataflowId>, eyre::ErrReport> {
fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result<DataflowList> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::List).unwrap())
.wrap_err("failed to send list message")?;
let reply: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let ids = match reply {
ControlRequestReply::DataflowList { dataflows } => dataflows,
ControlRequestReply::DataflowList(list) => list,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected list dataflow reply: {other:?}"),
};


+ 26
- 9
binaries/coordinator/src/lib.rs View File

@@ -10,7 +10,8 @@ use dora_core::{
descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC},
topics::{
ControlRequest, ControlRequestReply, DataflowDaemonResult, DataflowId, DataflowResult,
ControlRequest, ControlRequestReply, DataflowDaemonResult, DataflowId, DataflowListEntry,
DataflowResult,
},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
@@ -468,15 +469,31 @@ async fn start_inner(
let mut dataflows: Vec<_> = running_dataflows.values().collect();
dataflows.sort_by_key(|d| (&d.name, d.uuid));

let reply = Ok(ControlRequestReply::DataflowList {
dataflows: dataflows
.into_iter()
.map(|d| DataflowId {
uuid: d.uuid,
name: d.name.clone(),
})
.collect(),
let running = dataflows.into_iter().map(|d| DataflowListEntry {
id: DataflowId {
uuid: d.uuid,
name: d.name.clone(),
},
status: dora_core::topics::DataflowStatus::Running,
});
let finished_failed =
dataflow_results.iter().map(|(&uuid, results)| {
let name =
archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
let id = DataflowId { uuid, name };
let status = if results.values().all(|r| r.is_ok()) {
dora_core::topics::DataflowStatus::Finished
} else {
dora_core::topics::DataflowStatus::Failed
};
DataflowListEntry { id, status }
});

let reply = Ok(ControlRequestReply::DataflowList(
dora_core::topics::DataflowList(
running.chain(finished_failed).collect(),
),
));
let _ = reply_sender.send(reply);
}
ControlRequest::DaemonConnected => {


+ 1
- 0
binaries/daemon/Cargo.toml View File

@@ -40,3 +40,4 @@ ctrlc = "3.2.5"
which = "5.0.0"
sysinfo = "0.30.11"
crossbeam = "0.8.4"
crossbeam-skiplist = "0.1.3"

+ 13
- 1
binaries/daemon/src/lib.rs View File

@@ -1209,17 +1209,25 @@ impl Daemon {
dataflow.cascading_error_causes.error_caused_by(&node_id)
})
.cloned();
let grace_duration_kill = dataflow
.map(|d| d.grace_duration_kills.contains(&node_id))
.unwrap_or_default();

let cause = match caused_by_node {
Some(caused_by_node) => {
tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`");
NodeErrorCause::Cascading { caused_by_node }
}
None if grace_duration_kill => NodeErrorCause::GraceDuration,
None => NodeErrorCause::Other {
stderr: dataflow
.and_then(|d| d.node_stderr_most_recent.get(&node_id))
.map(|queue| {
let mut s = String::new();
let mut s = if queue.is_full() {
"[...]".into()
} else {
String::new()
};
while let Some(line) = queue.pop() {
s += &line;
}
@@ -1469,6 +1477,7 @@ pub struct RunningDataflow {

/// Contains the node that caused the error for nodes that experienced a cascading error.
cascading_error_causes: CascadingErrorCauses,
grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,

node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
}
@@ -1490,6 +1499,7 @@ impl RunningDataflow {
stop_sent: false,
empty_set: BTreeSet::new(),
cascading_error_causes: Default::default(),
grace_duration_kills: Default::default(),
node_stderr_most_recent: BTreeMap::new(),
}
}
@@ -1553,6 +1563,7 @@ impl RunningDataflow {
}

let running_nodes = self.running_nodes.clone();
let grace_duration_kills = self.grace_duration_kills.clone();
tokio::spawn(async move {
let duration = grace_duration.unwrap_or(Duration::from_millis(500));
tokio::time::sleep(duration).await;
@@ -1562,6 +1573,7 @@ impl RunningDataflow {
for (node, node_details) in running_nodes.iter() {
if let Some(pid) = node_details.pid {
if let Some(process) = system.process(Pid::from(pid as usize)) {
grace_duration_kills.insert(node.clone());
process.kill();
warn!(
"{node} was killed due to not stopping within the {:#?} grace period",


+ 1
- 1
examples/multiple-daemons/run.rs View File

@@ -176,7 +176,7 @@ async fn running_dataflows(coordinator_events_tx: &Sender<Event>) -> eyre::Resul
.await?;
let result = reply.await??;
let dataflows = match result {
ControlRequestReply::DataflowList { dataflows } => dataflows,
ControlRequestReply::DataflowList(list) => list.get_active(),
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
};


+ 42
- 3
libraries/core/src/topics.rs View File

@@ -61,6 +61,32 @@ pub enum ControlRequest {
},
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowList(pub Vec<DataflowListEntry>);

impl DataflowList {
pub fn get_active(&self) -> Vec<DataflowId> {
self.0
.iter()
.filter(|d| d.status == DataflowStatus::Running)
.map(|d| d.id.clone())
.collect()
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowListEntry {
pub id: DataflowId,
pub status: DataflowStatus,
}

#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
pub enum DataflowStatus {
Running,
Finished,
Failed,
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply {
Error(String),
@@ -68,7 +94,7 @@ pub enum ControlRequestReply {
DataflowStarted { uuid: Uuid },
DataflowReloaded { uuid: Uuid },
DataflowStopped { uuid: Uuid, result: DataflowResult },
DataflowList { dataflows: Vec<DataflowId> },
DataflowList(DataflowList),
DestroyOk,
DaemonConnected(bool),
ConnectedMachines(BTreeSet<String>),
@@ -118,6 +144,12 @@ pub struct DataflowDaemonResult {
pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
}

impl DataflowDaemonResult {
pub fn is_ok(&self) -> bool {
self.node_results.values().all(|r| r.is_ok())
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct NodeError {
pub timestamp: uhlc::Timestamp,
@@ -148,12 +180,17 @@ impl std::fmt::Display for NodeError {
23 => "NSIG".into(),
other => other.to_string().into(),
};
write!(f, "exited because of signal {signal_str}")
if matches!(self.cause, NodeErrorCause::GraceDuration) {
write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})")
} else {
write!(f, "exited because of signal {signal_str}")
}
}
NodeExitStatus::Unknown => write!(f, "unknown exit status"),
}?;

match &self.cause {
NodeErrorCause::GraceDuration => {}, // handled above
NodeErrorCause::Cascading { caused_by_node } => write!(
f,
"\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora."
@@ -161,7 +198,7 @@ impl std::fmt::Display for NodeError {
NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
NodeErrorCause::Other { stderr } => {
let line: &str = "---------------------------------------------------------------------------------\n";
write!(f, "with stderr output:\n{line}[...]\n{stderr}{line}")?
write!(f, " with stderr output:\n{line}{stderr}{line}")?
},
}

@@ -171,6 +208,8 @@ impl std::fmt::Display for NodeError {

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum NodeErrorCause {
/// Node was killed because it didn't react to a stop message in time.
GraceDuration,
/// Node failed because another node failed before,
Cascading {
caused_by_node: NodeId,


Loading…
Cancel
Save