diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d74da80a..8708ba96 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -286,12 +286,12 @@ jobs: cargo build --all dora up dora list - dora start dataflow.yml --name ci-rust-test + dora start dataflow.yml --name ci-rust-test --detach sleep 10 dora stop --name ci-rust-test --grace-duration 5s cd .. dora build examples/rust-dataflow/dataflow_dynamic.yml - dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic + dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic --detach cargo run -p rust-dataflow-example-sink-dynamic sleep 5 dora stop --name ci-rust-dynamic --grace-duration 5s @@ -315,16 +315,54 @@ jobs: cd test_python_project dora up dora list - dora start dataflow.yml --name ci-python-test + dora start dataflow.yml --name ci-python-test --detach sleep 10 dora stop --name ci-python-test --grace-duration 5s - pip install opencv-python - dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic + pip install "numpy<2.0.0" opencv-python + dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach python ../examples/python-dataflow/plot_dynamic.py sleep 5 dora stop --name ci-python-test --grace-duration 5s dora destroy + - name: "Test CLI (C)" + timeout-minutes: 30 + # fail-fast by using bash shell explictly + shell: bash + if: runner.os == 'Linux' + run: | + # Test C template Project + dora new test_c_project --lang c --internal-create-with-path-dependencies + cd test_c_project + dora up + dora list + cmake -B build + cmake --build build + cmake --install build + dora start dataflow.yml --name ci-c-test --detach + sleep 10 + dora stop --name ci-c-test --grace-duration 5s + dora destroy + + - name: "Test CLI (C++)" + timeout-minutes: 30 + # fail-fast by using bash shell explictly + shell: bash + if: runner.os == 'Linux' + run: | + # Test C++ template Project + dora new test_cxx_project --lang cxx --internal-create-with-path-dependencies + cd test_cxx_project + dora up + dora list + cmake -B build + cmake --build build + cmake --install build + dora start dataflow.yml --name ci-cxx-test --detach + sleep 10 + dora stop --name ci-cxx-test --grace-duration 5s + dora destroy + clippy: name: "Clippy" runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 8c26f680..5d6cf532 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -885,9 +885,12 @@ dependencies = [ [[package]] name = "atomic" -version = "0.5.3" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" +checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" +dependencies = [ + "bytemuck", +] [[package]] name = "atomic-waker" @@ -982,7 +985,7 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "sync_wrapper", + "sync_wrapper 0.1.2", "tower", "tower-layer", "tower-service", @@ -1925,6 +1928,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -2011,9 +2024,9 @@ checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991" [[package]] name = "cxx" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8194f089b6da4751d6c1da1ef37c17255df51f9346cdb160f8b096562ae4a85c" +checksum = "273dcfd3acd4e1e276af13ed2a43eea7001318823e7a726a6b3ed39b4acc0b82" dependencies = [ "cc", "cxxbridge-flags", @@ -2023,9 +2036,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e8df9a089caae66634d754672d5f909395f30f38af6ff19366980d8a8b57501" +checksum = "d8b2766fbd92be34e9ed143898fce6c572dc009de39506ed6903e5a05b68914e" dependencies = [ "cc", "codespan-reporting", @@ -2038,15 +2051,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25290be4751803672a70b98c68b51c1e7d0a640ab5a4377f240f9d2e70054cd1" +checksum = "839fcd5e43464614ffaa989eaf1c139ef1f0c51672a1ed08023307fa1b909ccd" [[package]] name = "cxxbridge-macro" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8cb317cb13604b4752416783bb25070381c36e844743e4146b7f8e55de7d140" +checksum = "4b2c1c1776b986979be68bb2285da855f8d8a35851a769fca8740df7c3d07877" dependencies = [ "proc-macro2", "quote", @@ -2321,6 +2334,8 @@ dependencies = [ "aligned-vec", "async-trait", "bincode", + "crossbeam", + "crossbeam-skiplist", "ctrlc", "dora-arrow-convert", "dora-core", @@ -3960,7 +3975,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.7", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -3988,19 +4003,20 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.26.0" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" +checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" dependencies = [ "futures-util", "http 1.1.0", "hyper 1.3.1", "hyper-util", - "rustls 0.22.4", + "rustls 0.23.10", "rustls-pki-types", "tokio", "tokio-rustls", "tower-service", + "webpki-roots 0.26.1", ] [[package]] @@ -4580,7 +4596,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if 1.0.0", - "windows-targets 0.52.5", + "windows-targets 0.48.5", ] [[package]] @@ -6388,9 +6404,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] @@ -6581,8 +6597,8 @@ checksum = "2e8b432585672228923edbbf64b8b12c14e1112f62e88737655b4a083dbcd78e" dependencies = [ "bytes", "pin-project-lite", - "quinn-proto", - "quinn-udp", + "quinn-proto 0.9.6", + "quinn-udp 0.3.2", "rustc-hash", "rustls 0.20.9", "thiserror", @@ -6591,6 +6607,23 @@ dependencies = [ "webpki", ] +[[package]] +name = "quinn" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto 0.11.3", + "quinn-udp 0.5.2", + "rustc-hash", + "rustls 0.23.10", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "quinn-proto" version = "0.9.6" @@ -6610,6 +6643,23 @@ dependencies = [ "webpki", ] +[[package]] +name = "quinn-proto" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" +dependencies = [ + "bytes", + "rand", + "ring 0.17.8", + "rustc-hash", + "rustls 0.23.10", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + [[package]] name = "quinn-udp" version = "0.3.2" @@ -6617,12 +6667,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "641538578b21f5e5c8ea733b736895576d0fe329bb883b937db6f4d163dbaaf4" dependencies = [ "libc", - "quinn-proto", + "quinn-proto 0.9.6", "socket2 0.4.10", "tracing", "windows-sys 0.42.0", ] +[[package]] +name = "quinn-udp" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46" +dependencies = [ + "libc", + "once_cell", + "socket2 0.5.7", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.36" @@ -7830,9 +7893,9 @@ checksum = "19b30a45b0cd0bcca8037f3d0dc3421eaf95327a17cad11964fb8179b4fc4832" [[package]] name = "reqwest" -version = "0.12.4" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" +checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" dependencies = [ "base64 0.22.1", "bytes", @@ -7851,13 +7914,14 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.22.4", + "quinn 0.11.2", + "rustls 0.23.10", "rustls-pemfile 2.1.2", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "tokio", "tokio-rustls", "tower-service", @@ -8235,6 +8299,20 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls" +version = "0.23.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" +dependencies = [ + "once_cell", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -9027,6 +9105,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "syntect" version = "5.2.0" @@ -9339,11 +9423,11 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.22.4", + "rustls 0.23.10", "rustls-pki-types", "tokio", ] @@ -9812,9 +9896,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "3ea73390fe27785838dcbf75b91b1d84799e28f1ce71e6f372a5dc2200c80de5" dependencies = [ "atomic", "getrandom", @@ -9826,9 +9910,9 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2" +checksum = "cdb394c528d1cb434c2f0522027e50c0705305caf6d20405c07a6f7e4cf9543c" dependencies = [ "proc-macro2", "quote", @@ -11138,7 +11222,7 @@ dependencies = [ "async-trait", "futures", "log", - "quinn", + "quinn 0.9.4", "rustls 0.20.9", "rustls-native-certs", "rustls-pemfile 1.0.4", diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index e3476de8..fea50fba 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -1,8 +1,8 @@ -use std::any::Any; +use std::{any::Any, vec}; use dora_node_api::{ self, - arrow::array::{AsArray, BinaryArray}, + arrow::array::{AsArray, UInt8Array}, merged::{MergeExternal, MergedEvent}, Event, EventStream, }; @@ -138,18 +138,26 @@ fn event_type(event: &DoraEvent) -> ffi::DoraEventType { } fn event_as_input(event: Box) -> eyre::Result { - let Some(Event::Input { - id, - metadata: _, - data, - }) = event.0 - else { + let Some(Event::Input { id, metadata, data }) = event.0 else { bail!("not an input event"); }; - let data: Option<&BinaryArray> = data.as_binary_opt(); + let data = match metadata.type_info.data_type { + dora_node_api::arrow::datatypes::DataType::UInt8 => { + let array: &UInt8Array = data.as_primitive(); + array.values().to_vec() + } + dora_node_api::arrow::datatypes::DataType::Null => { + vec![] + } + _ => { + todo!("dora C++ Node does not yet support higher level type of arrow. Only UInt8. + The ultimate solution should be based on arrow FFI interface. Feel free to contribute :)") + } + }; + Ok(ffi::DoraInput { id: id.into(), - data: data.map(|d| d.value(0).to_owned()).unwrap_or_default(), + data, }) } diff --git a/apis/python/node/dora/__init__.py b/apis/python/node/dora/__init__.py index 7269bda1..354d9ad4 100644 --- a/apis/python/node/dora/__init__.py +++ b/apis/python/node/dora/__init__.py @@ -13,7 +13,6 @@ from .dora import * from .dora import ( Node, - PyEvent, Ros2Context, Ros2Node, Ros2NodeOptions, diff --git a/apis/python/node/dora/__init__.pyi b/apis/python/node/dora/__init__.pyi index 3dc33676..a6dcb659 100644 --- a/apis/python/node/dora/__init__.pyi +++ b/apis/python/node/dora/__init__.pyi @@ -22,7 +22,7 @@ from dora import Node node = Node() ```""" - def __init__(self) -> None: + def __init__(self, node_id: str=None) -> None: """The custom node API lets you integrate `dora` into your application. It allows you to retrieve input and send output in any fashion you want. @@ -46,7 +46,7 @@ This method returns the parsed dataflow YAML file.""" """Merge an external event stream with dora main loop. This currently only work with ROS2.""" - def next(self, timeout: float=None) -> dora.PyEvent: + def next(self, timeout: float=None) -> dict: """`.next()` gives you the next input that the node has received. It blocks until the next event becomes available. You can use timeout in seconds to return if no input is available. @@ -88,15 +88,6 @@ node.send_output("string", b"string", {"open_telemetry_context": "7632e76"}) def __next__(self) -> typing.Any: """Implement next(self).""" -@typing.final -class PyEvent: - """Dora Event""" - - def inner(self):... - - def __getitem__(self, key: typing.Any) -> typing.Any: - """Return self[key].""" - @typing.final class Ros2Context: """ROS2 Context holding all messages definition for receiving and sending messages to ROS2. diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 3c1fcb2a..45125ec0 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -24,6 +24,7 @@ use pyo3::types::{PyBytes, PyDict}; /// node = Node() /// ``` /// +/// :type node_id: str, optional #[pyclass] pub struct Node { events: Events, @@ -67,11 +68,18 @@ impl Node { /// ``` /// /// :type timeout: float, optional - /// :rtype: dora.PyEvent + /// :rtype: dict #[allow(clippy::should_implement_trait)] - pub fn next(&mut self, py: Python, timeout: Option) -> PyResult> { + pub fn next(&mut self, py: Python, timeout: Option) -> PyResult>> { let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32))); - Ok(event) + if let Some(event) = event { + let dict = event + .to_py_dict(py) + .context("Could not convert event into a dict")?; + Ok(Some(dict)) + } else { + Ok(None) + } } /// You can iterate over the event stream with a loop @@ -84,10 +92,11 @@ impl Node { /// case "image": /// ``` /// - /// :rtype: dora.PyEvent - pub fn __next__(&mut self, py: Python) -> PyResult> { - let event = py.allow_threads(|| self.events.recv(None)); - Ok(event) + /// Default behaviour is to timeout after 2 seconds. + /// + /// :rtype: dict + pub fn __next__(&mut self, py: Python) -> PyResult>> { + self.next(py, None) } /// You can iterate over the event stream with a loop @@ -100,7 +109,7 @@ impl Node { /// case "image": /// ``` /// - /// :rtype: dora.PyEvent + /// :rtype: dict fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { slf } @@ -262,7 +271,6 @@ fn dora(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(start_runtime, &m)?)?; m.add_class::()?; - m.add_class::()?; m.setattr("__version__", env!("CARGO_PKG_VERSION"))?; m.setattr("__author__", "Dora-rs Authors")?; diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index bc74279e..2c027e9a 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,69 +1,52 @@ -use arrow::{array::ArrayRef, pyarrow::ToPyArrow}; +use std::collections::HashMap; + +use arrow::pyarrow::ToPyArrow; use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters}; use eyre::{Context, Result}; -use pyo3::{exceptions::PyLookupError, prelude::*, pybacked::PyBackedStr, types::PyDict}; +use pyo3::{ + prelude::*, + pybacked::PyBackedStr, + types::{IntoPyDict, PyDict}, +}; /// Dora Event -#[pyclass] #[derive(Debug)] pub struct PyEvent { event: MergedEvent, - data: Option, } -// Dora Event -#[pymethods] impl PyEvent { - /// - /// :rtype: dora.PyObject - pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult> { - if key == "kind" { - let kind = match &self.event { - MergedEvent::Dora(_) => "dora", - MergedEvent::External(_) => "external", - }; - return Ok(Some(kind.to_object(py))); - } + pub fn to_py_dict(self, py: Python<'_>) -> PyResult> { + let mut pydict = HashMap::new(); + match &self.event { + MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)), + MergedEvent::External(_) => pydict.insert("kind", "external".to_object(py)), + }; match &self.event { MergedEvent::Dora(event) => { - let value = match key { - "type" => Some(Self::ty(event).to_object(py)), - "id" => Self::id(event).map(|v| v.to_object(py)), - "value" => self.value(py)?, - "metadata" => Self::metadata(event, py), - "error" => Self::error(event).map(|v| v.to_object(py)), - other => { - return Err(PyLookupError::new_err(format!( - "event has no property `{other}`" - ))) - } - }; - Ok(value) + if let Some(id) = Self::id(event) { + pydict.insert("id", id.into_py(py)); + } + pydict.insert("type", Self::ty(event).to_object(py)); + + if let Some(value) = self.value(py)? { + pydict.insert("value", value); + } + if let Some(metadata) = Self::metadata(event, py) { + pydict.insert("metadata", metadata); + } + if let Some(error) = Self::error(event) { + pydict.insert("error", error.to_object(py)); + } } MergedEvent::External(event) => { - let value = match key { - "value" => event, - _ => todo!(), - }; - - Ok(Some(value.clone())) + pydict.insert("value", event.clone()); } } - } - pub fn inner(&mut self) -> Option<&PyObject> { - match &self.event { - MergedEvent::Dora(_) => None, - MergedEvent::External(event) => Some(event), - } + Ok(pydict.into_py_dict_bound(py).unbind()) } - fn __str__(&self) -> PyResult { - Ok(format!("{:#?}", &self.event)) - } -} - -impl PyEvent { fn ty(event: &Event) -> &str { match event { Event::Stop => "STOP", @@ -84,9 +67,9 @@ impl PyEvent { /// Returns the payload of an input event as an arrow array (if any). fn value(&self, py: Python<'_>) -> PyResult> { - match (&self.event, &self.data) { - (MergedEvent::Dora(Event::Input { .. }), Some(data)) => { - // TODO: Does this call leak data? + match &self.event { + MergedEvent::Dora(Event::Input { data, .. }) => { + // TODO: Does this call leak data?& let array_data = data.to_data().to_pyarrow(py)?; Ok(Some(array_data)) } @@ -116,13 +99,8 @@ impl From for PyEvent { } impl From> for PyEvent { - fn from(mut event: MergedEvent) -> Self { - let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event { - Some(data.clone()) - } else { - None - }; - Self { event, data } + fn from(event: MergedEvent) -> Self { + Self { event } } } diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 62745e14..1d5f8275 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -11,6 +11,8 @@ use std::{path::PathBuf, sync::mpsc, time::Duration}; use tracing::{error, info}; use uuid::Uuid; +use crate::handle_dataflow_result; + pub fn attach_dataflow( dataflow: Descriptor, dataflow_path: PathBuf, @@ -133,9 +135,7 @@ pub fn attach_dataflow( ControlRequestReply::DataflowStarted { uuid: _ } => (), ControlRequestReply::DataflowStopped { uuid, result } => { info!("dataflow {uuid} stopped"); - break result - .map_err(|err| eyre::eyre!(err)) - .wrap_err("dataflow failed"); + break handle_dataflow_result(result, Some(uuid)); } ControlRequestReply::DataflowReloaded { uuid } => { info!("dataflow {uuid} reloaded") diff --git a/binaries/cli/src/formatting.rs b/binaries/cli/src/formatting.rs new file mode 100644 index 00000000..f19e1599 --- /dev/null +++ b/binaries/cli/src/formatting.rs @@ -0,0 +1,50 @@ +use dora_core::topics::{DataflowResult, NodeErrorCause}; + +pub struct FormatDataflowError<'a>(pub &'a DataflowResult); + +impl std::fmt::Display for FormatDataflowError<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f)?; + let failed = self + .0 + .node_results + .iter() + .filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e))); + let total_failed = failed.clone().count(); + + let mut non_cascading: Vec<_> = failed + .clone() + .filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. })) + .collect(); + non_cascading.sort_by_key(|(_, e)| e.timestamp); + // try to print earliest non-cascading error + let hidden = if !non_cascading.is_empty() { + let printed = non_cascading.len(); + for (id, err) in non_cascading { + writeln!(f, "Node `{id}` failed: {err}")?; + } + total_failed - printed + } else { + // no non-cascading errors -> print earliest cascading + let mut all: Vec<_> = failed.collect(); + all.sort_by_key(|(_, e)| e.timestamp); + if let Some((id, err)) = all.first() { + write!(f, "Node `{id}` failed: {err}")?; + total_failed - 1 + } else { + write!(f, "unknown error")?; + 0 + } + }; + + if hidden > 1 { + write!( + f, + "\n\nThere are {hidden} consequential errors. Check the `out/{}` folder for full details.", + self.0.uuid + )?; + } + + Ok(()) + } +} diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 630f3f19..8fdd6a2d 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -15,6 +15,7 @@ use dora_tracing::set_up_tracing; use dora_tracing::set_up_tracing_opts; use duration_str::parse; use eyre::{bail, Context}; +use formatting::FormatDataflowError; use std::{io::Write, net::SocketAddr}; use std::{ net::{IpAddr, Ipv4Addr}, @@ -28,6 +29,7 @@ use uuid::Uuid; mod attach; mod build; mod check; +mod formatting; mod graph; mod logs; mod template; @@ -76,7 +78,7 @@ enum Command { #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] dataflow: PathBuf, }, - /// Generate a new project, node or operator. Choose the language between Rust, Python, C or C++. + /// Generate a new project or node. Choose the language between Rust, Python, C or C++. New { #[clap(flatten)] args: CommandNew, @@ -118,6 +120,9 @@ enum Command { /// Attach to the dataflow and wait for its completion #[clap(long, action)] attach: bool, + /// Run the dataflow in background + #[clap(long, action)] + detach: bool, /// Enable hot reloading (Python only) #[clap(long, action)] hot_reload: bool, @@ -231,7 +236,6 @@ pub struct CommandNew { #[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] enum Kind { Dataflow, - Operator, CustomNode, } @@ -344,6 +348,7 @@ fn run() -> eyre::Result<()> { coordinator_addr, coordinator_port, attach, + detach, hot_reload, } => { let dataflow_descriptor = @@ -371,6 +376,16 @@ fn run() -> eyre::Result<()> { &mut *session, )?; + let attach = match (attach, detach) { + (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), + (true, false) => true, + (false, true) => false, + (false, false) => { + println!("attaching to dataflow (use `--detach` to run in background)"); + true + } + }; + if attach { attach_dataflow( dataflow_descriptor, @@ -460,7 +475,8 @@ fn run() -> eyre::Result<()> { ); } - Daemon::run_dataflow(&dataflow_path).await + let result = Daemon::run_dataflow(&dataflow_path).await?; + handle_dataflow_result(result, None) } None => { if coordinator_addr.ip() == LOCALHOST { @@ -540,14 +556,32 @@ fn stop_dataflow( let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { - ControlRequestReply::DataflowStopped { uuid: _, result } => result - .map_err(|err| eyre::eyre!(err)) - .wrap_err("dataflow failed"), + ControlRequestReply::DataflowStopped { uuid, result } => { + handle_dataflow_result(result, Some(uuid)) + } ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected stop dataflow reply: {other:?}"), } } +fn handle_dataflow_result( + result: dora_core::topics::DataflowResult, + uuid: Option, +) -> Result<(), eyre::Error> { + if result.is_ok() { + Ok(()) + } else { + Err(match uuid { + Some(uuid) => { + eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result)) + } + None => { + eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result)) + } + }) + } +} + fn stop_dataflow_by_name( name: String, grace_duration: Option, @@ -565,9 +599,9 @@ fn stop_dataflow_by_name( let result: ControlRequestReply = serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { - ControlRequestReply::DataflowStopped { uuid: _, result } => result - .map_err(|err| eyre::eyre!(err)) - .wrap_err("dataflow failed"), + ControlRequestReply::DataflowStopped { uuid, result } => { + handle_dataflow_result(result, Some(uuid)) + } ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected stop dataflow reply: {other:?}"), } diff --git a/binaries/cli/src/template/c/cmake-template.txt b/binaries/cli/src/template/c/cmake-template.txt new file mode 100644 index 00000000..32cb561f --- /dev/null +++ b/binaries/cli/src/template/c/cmake-template.txt @@ -0,0 +1,79 @@ +cmake_minimum_required(VERSION 3.21) +project(cxx-dataflow LANGUAGES C) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_FLAGS "-fPIC") + +set(DORA_ROOT_DIR "__DORA_PATH__" CACHE FILEPATH "Path to the root of dora") + +set(dora_c_include_dir "${CMAKE_CURRENT_BINARY_DIR}/include/c") +if(DORA_ROOT_DIR) + include(ExternalProject) + ExternalProject_Add(Dora + SOURCE_DIR ${DORA_ROOT_DIR} + BUILD_IN_SOURCE True + CONFIGURE_COMMAND "" + BUILD_COMMAND + cargo build + --package dora-node-api-c + INSTALL_COMMAND "" + ) + + add_custom_command(OUTPUT ${dora_c_include_dir} + WORKING_DIRECTORY ${DORA_ROOT_DIR} + DEPENDS Dora + COMMAND + mkdir ${CMAKE_CURRENT_BINARY_DIR}/include/c -p + && + cp apis/c/node ${CMAKE_CURRENT_BINARY_DIR}/include/c -r + ) + + add_custom_target(Dora_c DEPENDS ${dora_c_include_dir}) + set(dora_link_dirs ${DORA_ROOT_DIR}/target/debug) +else() + include(ExternalProject) + ExternalProject_Add(Dora + PREFIX ${CMAKE_CURRENT_BINARY_DIR}/dora + GIT_REPOSITORY https://github.com/dora-rs/dora.git + GIT_TAG main + BUILD_IN_SOURCE True + CONFIGURE_COMMAND "" + BUILD_COMMAND + cargo build + --package dora-node-api-c + --target-dir ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target + INSTALL_COMMAND "" + ) + + add_custom_command(OUTPUT ${dora_c_include_dir} + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target + DEPENDS Dora + COMMAND + mkdir ${CMAKE_CURRENT_BINARY_DIR}/include/c -p + && + cp ../apis/c/node ${CMAKE_CURRENT_BINARY_DIR}/include/c -r + ) + + set(dora_link_dirs ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target/debug) + + add_custom_target(Dora_c DEPENDS ${dora_c_include_dir}) +endif() + +link_directories(${dora_link_dirs}) + +add_executable(talker_1 talker_1/node.c) +add_dependencies(talker_1 Dora_c) +target_include_directories(talker_1 PRIVATE ${dora_c_include_dir}) +target_link_libraries(talker_1 dora_node_api_c m) + +add_executable(talker_2 talker_2/node.c) +add_dependencies(talker_2 Dora_c) +target_include_directories(talker_2 PRIVATE ${dora_c_include_dir}) +target_link_libraries(talker_2 dora_node_api_c m) + +add_executable(listener_1 listener_1/node.c) +add_dependencies(listener_1 Dora_c) +target_include_directories(listener_1 PRIVATE ${dora_c_include_dir}) +target_link_libraries(listener_1 dora_node_api_c m) + +install(TARGETS listener_1 talker_1 talker_2 DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin) \ No newline at end of file diff --git a/binaries/cli/src/template/c/dataflow-template.yml b/binaries/cli/src/template/c/dataflow-template.yml index f28ae8fc..72b121ea 100644 --- a/binaries/cli/src/template/c/dataflow-template.yml +++ b/binaries/cli/src/template/c/dataflow-template.yml @@ -1,24 +1,22 @@ nodes: - - id: op_1 - operator: - shared-library: build/op_1 + - id: talker_1 + custom: + source: bin/talker_1 inputs: - foo: dora/timer/millis/100 + tick: dora/timer/millis/100 outputs: - - bar - - id: op_2 - operator: - shared-library: build/op_2 + - speech + - id: talker_2 + custom: + source: bin/talker_2 inputs: - foo: dora/timer/secs/2 + tick: dora/timer/secs/2 outputs: - - bar + - speech - - id: custom-node_1 + - id: listener_1 custom: - source: build/node_1 + source: bin/listener_1 inputs: - input-1: op_1/bar - input-2: op_2/bar - outputs: - - foo + speech-1: talker_1/speech + speech-2: talker_2/speech diff --git a/binaries/cli/src/template/c/listener/listener-template.c b/binaries/cli/src/template/c/listener/listener-template.c new file mode 100644 index 00000000..1f2c16cd --- /dev/null +++ b/binaries/cli/src/template/c/listener/listener-template.c @@ -0,0 +1,69 @@ +#include +#include +#include +#include "node_api.h" + +// sleep +#ifdef _WIN32 +#include +#else +#include +#endif + +int main() +{ + void *dora_context = init_dora_context_from_env(); + if (dora_context == NULL) + { + fprintf(stderr, "[c node] init dora context failed\n"); + return -1; + } + + printf("[c node] dora context initialized\n"); + + for (char i = 0; i < 20; i++) + { + void *event = dora_next_event(dora_context); + if (event == NULL) + { + printf("[c node] ERROR: unexpected end of event\n"); + return -1; + } + + enum DoraEventType ty = read_dora_event_type(event); + + if (ty == DoraEventType_Input) + { + char *id_ptr; + size_t id_len; + read_dora_input_id(event, &id_ptr, &id_len); + + char *data_ptr; + size_t data_len; + read_dora_input_data(event, &data_ptr, &data_len); + + printf("I heard %s from %s\n", data_ptr, id_ptr); + } + else if (ty == DoraEventType_Stop) + { + printf("[c node] received stop event\n"); + free_dora_event(event); + break; + } + else if (ty == DoraEventType_InputClosed) { + printf("[c node] received input closed event\n"); + } + else + { + printf("[c node] received unexpected event: %d\n", ty); + free_dora_event(event); + break; + } + + free_dora_event(event); + } + + free_dora_context(dora_context); + + return 0; +} diff --git a/binaries/cli/src/template/c/mod.rs b/binaries/cli/src/template/c/mod.rs index 21bbef01..378c572c 100644 --- a/binaries/cli/src/template/c/mod.rs +++ b/binaries/cli/src/template/c/mod.rs @@ -1,12 +1,15 @@ use dora_node_api_c::HEADER_NODE_API; -use dora_operator_api_c::{HEADER_OPERATOR_API, HEADER_OPERATOR_TYPES}; -use eyre::{bail, Context}; +use eyre::{bail, Context, ContextCompat}; use std::{ fs, path::{Path, PathBuf}, }; -pub fn create(args: crate::CommandNew) -> eyre::Result<()> { +const NODE: &str = include_str!("node/node-template.c"); +const TALKER: &str = include_str!("talker/talker-template.c"); +const LISTENER: &str = include_str!("listener/listener-template.c"); + +pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> { let crate::CommandNew { kind, lang: _, @@ -15,13 +18,16 @@ pub fn create(args: crate::CommandNew) -> eyre::Result<()> { } = args; match kind { - crate::Kind::Operator => create_operator(name, path), - crate::Kind::CustomNode => create_custom_node(name, path), - crate::Kind::Dataflow => create_dataflow(name, path), + crate::Kind::CustomNode => create_custom_node(name, path, NODE), + crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps), } } -fn create_dataflow(name: String, path: Option) -> Result<(), eyre::ErrReport> { +fn create_dataflow( + name: String, + path: Option, + use_path_deps: bool, +) -> Result<(), eyre::ErrReport> { const DATAFLOW_YML: &str = include_str!("dataflow-template.yml"); if name.contains('/') { @@ -41,9 +47,10 @@ fn create_dataflow(name: String, path: Option) -> Result<(), eyre::ErrR fs::write(&dataflow_yml_path, dataflow_yml) .with_context(|| format!("failed to write `{}`", dataflow_yml_path.display()))?; - create_operator("op_1".into(), Some(root.join("op_1")))?; - create_operator("op_2".into(), Some(root.join("op_2")))?; - create_custom_node("node_1".into(), Some(root.join("node_1")))?; + create_custom_node("talker_1".into(), Some(root.join("talker_1")), TALKER)?; + create_custom_node("talker_2".into(), Some(root.join("talker_2")), TALKER)?; + create_custom_node("listener_1".into(), Some(root.join("listener_1")), LISTENER)?; + create_cmakefile(root.to_path_buf(), use_path_deps)?; println!( "Created new C dataflow at `{name}` at {}", @@ -53,47 +60,34 @@ fn create_dataflow(name: String, path: Option) -> Result<(), eyre::ErrR Ok(()) } -fn create_operator(name: String, path: Option) -> Result<(), eyre::ErrReport> { - const OPERATOR: &str = include_str!("operator/operator-template.c"); - - if name.contains('/') { - bail!("operator name must not contain `/` separators"); - } - if name.contains('-') { - bail!("operator name must not contain `-` separators"); - } - if !name.is_ascii() { - bail!("operator name must be ASCII"); - } - - // create directories - let root = path.as_deref().unwrap_or_else(|| Path::new(&name)); - fs::create_dir(root) - .with_context(|| format!("failed to create directory `{}`", root.display()))?; - - let operator_path = root.join("operator.c"); - fs::write(&operator_path, OPERATOR) - .with_context(|| format!("failed to write `{}`", operator_path.display()))?; - let header_api_path = root.join("operator_api.h"); - let header_type_path = root.join("operator_types.h"); - fs::write(&header_api_path, HEADER_OPERATOR_API) - .with_context(|| format!("failed to write `{}`", header_api_path.display()))?; - fs::write(&header_type_path, HEADER_OPERATOR_TYPES) - .with_context(|| format!("failed to write `{}`", header_type_path.display()))?; - - // TODO: Makefile? - - println!( - "Created new C operator `{name}` at {}", - Path::new(".").join(root).display() - ); - +fn create_cmakefile(root: PathBuf, use_path_deps: bool) -> Result<(), eyre::ErrReport> { + const CMAKEFILE: &str = include_str!("cmake-template.txt"); + + let cmake_file = if use_path_deps { + let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR")); + let workspace_dir = manifest_dir + .parent() + .context("Could not get manifest parent folder")? + .parent() + .context("Could not get manifest grandparent folder")?; + CMAKEFILE.replace("__DORA_PATH__", workspace_dir.to_str().unwrap()) + } else { + CMAKEFILE.replace("__DORA_PATH__", "") + }; + + let cmake_path = root.join("CMakeLists.txt"); + fs::write(&cmake_path, cmake_file) + .with_context(|| format!("failed to write `{}`", cmake_path.display()))?; + + println!("Created new CMakeLists.txt at {}", cmake_path.display()); Ok(()) } -fn create_custom_node(name: String, path: Option) -> Result<(), eyre::ErrReport> { - const NODE: &str = include_str!("node/node-template.c"); - +fn create_custom_node( + name: String, + path: Option, + template_scripts: &str, +) -> Result<(), eyre::ErrReport> { if name.contains('/') { bail!("node name must not contain `/` separators"); } @@ -107,7 +101,7 @@ fn create_custom_node(name: String, path: Option) -> Result<(), eyre::E .with_context(|| format!("failed to create directory `{}`", root.display()))?; let node_path = root.join("node.c"); - fs::write(&node_path, NODE) + fs::write(&node_path, template_scripts) .with_context(|| format!("failed to write `{}`", node_path.display()))?; let header_path = root.join("node_api.h"); fs::write(&header_path, HEADER_NODE_API) diff --git a/binaries/cli/src/template/c/talker/talker-template.c b/binaries/cli/src/template/c/talker/talker-template.c new file mode 100644 index 00000000..31f9713b --- /dev/null +++ b/binaries/cli/src/template/c/talker/talker-template.c @@ -0,0 +1,71 @@ +#include +#include +#include +#include "node_api.h" + +// sleep +#ifdef _WIN32 +#include +#else +#include +#endif + +int main() +{ + void *dora_context = init_dora_context_from_env(); + if (dora_context == NULL) + { + fprintf(stderr, "[c node] init dora context failed\n"); + return -1; + } + + printf("[c node] dora context initialized\n"); + + for (char i = 0; i < 10; i++) + { + void *event = dora_next_event(dora_context); + if (event == NULL) + { + printf("[c node] ERROR: unexpected end of event\n"); + return -1; + } + + enum DoraEventType ty = read_dora_event_type(event); + + if (ty == DoraEventType_Input) + { + char *data; + size_t data_len; + read_dora_input_data(event, &data, &data_len); + + assert(data_len == 0); + + char out_id[] = "speech"; + char out_data[] = "Hello World"; + + dora_send_output(dora_context, out_id, strlen(out_id), out_data, strlen(out_data)); + } + else if (ty == DoraEventType_Stop) + { + printf("[c node] received stop event\n"); + free_dora_event(event); + break; + } + else + { + printf("[c node] received unexpected event: %d\n", ty); + free_dora_event(event); + break; + } + + free_dora_event(event); + } + + printf("[c node] talker 10 events\n"); + + free_dora_context(dora_context); + + printf("[c node] finished successfully\n"); + + return 0; +} diff --git a/binaries/cli/src/template/cxx/cmake-template.txt b/binaries/cli/src/template/cxx/cmake-template.txt new file mode 100644 index 00000000..bd3fe492 --- /dev/null +++ b/binaries/cli/src/template/cxx/cmake-template.txt @@ -0,0 +1,85 @@ +cmake_minimum_required(VERSION 3.21) +project(cxx-dataflow LANGUAGES CXX) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_FLAGS "-fPIC") + +set(DORA_ROOT_DIR "__DORA_PATH__" CACHE FILEPATH "Path to the root of dora") + +set(dora_cxx_include_dir "${CMAKE_CURRENT_BINARY_DIR}/include/cxx") +set(node_bridge "${CMAKE_CURRENT_BINARY_DIR}/node_bridge.cc") + +if(DORA_ROOT_DIR) + include(ExternalProject) + ExternalProject_Add(Dora + SOURCE_DIR ${DORA_ROOT_DIR} + BUILD_IN_SOURCE True + CONFIGURE_COMMAND "" + BUILD_COMMAND + cargo build + --package dora-node-api-cxx + INSTALL_COMMAND "" + ) + + add_custom_command(OUTPUT ${node_bridge} ${dora_cxx_include_dir} ${operator_bridge} ${dora_c_include_dir} + WORKING_DIRECTORY ${DORA_ROOT_DIR} + DEPENDS Dora + COMMAND + mkdir ${dora_cxx_include_dir} -p + && + cp target/cxxbridge/dora-node-api-cxx/src/lib.rs.cc ${node_bridge} + && + cp target/cxxbridge/dora-node-api-cxx/src/lib.rs.h ${dora_cxx_include_dir}/dora-node-api.h + ) + + add_custom_target(Dora_cxx DEPENDS ${node_bridge} ${dora_cxx_include_dir}) + set(dora_link_dirs ${DORA_ROOT_DIR}/target/debug) +else() + include(ExternalProject) + ExternalProject_Add(Dora + PREFIX ${CMAKE_CURRENT_BINARY_DIR}/dora + GIT_REPOSITORY https://github.com/dora-rs/dora.git + GIT_TAG main + BUILD_IN_SOURCE True + CONFIGURE_COMMAND "" + BUILD_COMMAND + cargo build + --package dora-node-api-cxx + --target-dir ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target + INSTALL_COMMAND "" + ) + + add_custom_command(OUTPUT ${node_bridge} ${dora_cxx_include_dir} + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target + DEPENDS Dora + COMMAND + mkdir ${dora_cxx_include_dir} -p + && + cp cxxbridge/dora-node-api-cxx/src/lib.rs.cc ${node_bridge} + && + cp cxxbridge/dora-node-api-cxx/src/lib.rs.h ${dora_cxx_include_dir}/dora-node-api.h + ) + + set(dora_link_dirs ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target/debug) + + add_custom_target(Dora_cxx DEPENDS ${node_bridge} ${dora_cxx_include_dir}) +endif() + +link_directories(${dora_link_dirs}) + +add_executable(talker_1 talker_1/node.cc ${node_bridge}) +add_dependencies(talker_1 Dora_cxx) +target_include_directories(talker_1 PRIVATE ${dora_cxx_include_dir}) +target_link_libraries(talker_1 dora_node_api_cxx) + +add_executable(talker_2 talker_2/node.cc ${node_bridge}) +add_dependencies(talker_2 Dora_cxx) +target_include_directories(talker_2 PRIVATE ${dora_cxx_include_dir}) +target_link_libraries(talker_2 dora_node_api_cxx) + +add_executable(listener_1 listener_1/node.cc ${node_bridge}) +add_dependencies(listener_1 Dora_cxx) +target_include_directories(listener_1 PRIVATE ${dora_cxx_include_dir}) +target_link_libraries(listener_1 dora_node_api_cxx) + +install(TARGETS listener_1 talker_1 talker_2 DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin) \ No newline at end of file diff --git a/binaries/cli/src/template/cxx/dataflow-template.yml b/binaries/cli/src/template/cxx/dataflow-template.yml index 9559eb88..72b121ea 100644 --- a/binaries/cli/src/template/cxx/dataflow-template.yml +++ b/binaries/cli/src/template/cxx/dataflow-template.yml @@ -1,23 +1,22 @@ nodes: - - id: runtime-node_1 - operators: - - id: op_1 - shared-library: build/op_1 - inputs: - tick: dora/timer/millis/100 - outputs: - - some-output - - id: op_2 - shared-library: build/op_2 - inputs: - tick: dora/timer/secs/2 - outputs: - - some-output + - id: talker_1 + custom: + source: bin/talker_1 + inputs: + tick: dora/timer/millis/100 + outputs: + - speech + - id: talker_2 + custom: + source: bin/talker_2 + inputs: + tick: dora/timer/secs/2 + outputs: + - speech - - id: custom-node_1 + - id: listener_1 custom: - source: build/node_1 + source: bin/listener_1 inputs: - tick: dora/timer/secs/1 - input-1: op_1/some-output - input-2: op_2/some-output + speech-1: talker_1/speech + speech-2: talker_2/speech diff --git a/binaries/cli/src/template/cxx/listener-template.cc b/binaries/cli/src/template/cxx/listener-template.cc new file mode 100644 index 00000000..1871dcc3 --- /dev/null +++ b/binaries/cli/src/template/cxx/listener-template.cc @@ -0,0 +1,35 @@ +#include "dora-node-api.h" // adjust this path if necessary + +#include +#include + +int main() +{ + std::cout << "HELLO FROM C++" << std::endl; + unsigned char counter = 0; + + auto dora_node = init_dora_node(); + + while (1) + { + auto event = dora_node.events->next(); + auto ty = event_type(event); + + if (ty == DoraEventType::AllInputsClosed) + { + break; + } + else if (ty == DoraEventType::Input) + { + auto input = event_as_input(std::move(event)); + auto input_id = input.id; + auto message = std::string(reinterpret_cast(input.data.data()), input.data.size()); + std::cout << "I heard " << message << " from " << std::string(input_id) << std::endl; + } + else { + std::cerr << "Unknown event type " << static_cast(ty) << std::endl; + } + } + + return 0;; +} diff --git a/binaries/cli/src/template/cxx/mod.rs b/binaries/cli/src/template/cxx/mod.rs index 579f39e1..09935031 100644 --- a/binaries/cli/src/template/cxx/mod.rs +++ b/binaries/cli/src/template/cxx/mod.rs @@ -1,10 +1,14 @@ -use eyre::{bail, Context}; +use eyre::{bail, Context, ContextCompat}; use std::{ fs, path::{Path, PathBuf}, }; -pub fn create(args: crate::CommandNew) -> eyre::Result<()> { +const NODE: &str = include_str!("node-template.cc"); +const TALKER: &str = include_str!("talker-template.cc"); +const LISTENER: &str = include_str!("listener-template.cc"); + +pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> { let crate::CommandNew { kind, lang: _, @@ -13,13 +17,16 @@ pub fn create(args: crate::CommandNew) -> eyre::Result<()> { } = args; match kind { - crate::Kind::Operator => create_operator(name, path), - crate::Kind::CustomNode => create_custom_node(name, path), - crate::Kind::Dataflow => create_dataflow(name, path), + crate::Kind::CustomNode => create_custom_node(name, path, NODE), + crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps), } } -fn create_dataflow(name: String, path: Option) -> Result<(), eyre::ErrReport> { +fn create_dataflow( + name: String, + path: Option, + use_path_deps: bool, +) -> Result<(), eyre::ErrReport> { const DATAFLOW_YML: &str = include_str!("dataflow-template.yml"); if name.contains('/') { @@ -39,9 +46,10 @@ fn create_dataflow(name: String, path: Option) -> Result<(), eyre::ErrR fs::write(&dataflow_yml_path, dataflow_yml) .with_context(|| format!("failed to write `{}`", dataflow_yml_path.display()))?; - create_operator("op_1".into(), Some(root.join("op_1")))?; - create_operator("op_2".into(), Some(root.join("op_2")))?; - create_custom_node("node_1".into(), Some(root.join("node_1")))?; + create_custom_node("talker_1".into(), Some(root.join("talker_1")), TALKER)?; + create_custom_node("talker_2".into(), Some(root.join("talker_2")), TALKER)?; + create_custom_node("listener_1".into(), Some(root.join("listener_1")), LISTENER)?; + create_cmakefile(root.to_path_buf(), use_path_deps)?; println!( "Created new C++ dataflow at `{name}` at {}", @@ -51,42 +59,34 @@ fn create_dataflow(name: String, path: Option) -> Result<(), eyre::ErrR Ok(()) } -fn create_operator(name: String, path: Option) -> Result<(), eyre::ErrReport> { - const OPERATOR: &str = include_str!("operator-template.cc"); - const HEADER: &str = include_str!("operator-template.h"); - - if name.contains('/') { - bail!("operator name must not contain `/` separators"); - } - if !name.is_ascii() { - bail!("operator name must be ASCII"); - } - - // create directories - let root = path.as_deref().unwrap_or_else(|| Path::new(&name)); - fs::create_dir(root) - .with_context(|| format!("failed to create directory `{}`", root.display()))?; - - let operator_path = root.join("operator.cc"); - fs::write(&operator_path, OPERATOR) - .with_context(|| format!("failed to write `{}`", operator_path.display()))?; - let header_path = root.join("operator.h"); - fs::write(&header_path, HEADER) - .with_context(|| format!("failed to write `{}`", header_path.display()))?; - - // TODO: Makefile? - - println!( - "Created new C++ operator `{name}` at {}", - Path::new(".").join(root).display() - ); - +fn create_cmakefile(root: PathBuf, use_path_deps: bool) -> Result<(), eyre::ErrReport> { + const CMAKEFILE: &str = include_str!("cmake-template.txt"); + + let cmake_file = if use_path_deps { + let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR")); + let workspace_dir = manifest_dir + .parent() + .context("Could not get manifest parent folder")? + .parent() + .context("Could not get manifest grandparent folder")?; + CMAKEFILE.replace("__DORA_PATH__", workspace_dir.to_str().unwrap()) + } else { + CMAKEFILE.replace("__DORA_PATH__", "") + }; + + let cmake_path = root.join("CMakeLists.txt"); + fs::write(&cmake_path, cmake_file) + .with_context(|| format!("failed to write `{}`", cmake_path.display()))?; + + println!("Created new CMakeLists.txt at {}", cmake_path.display()); Ok(()) } -fn create_custom_node(name: String, path: Option) -> Result<(), eyre::ErrReport> { - const NODE: &str = include_str!("node-template.cc"); - +fn create_custom_node( + name: String, + path: Option, + template_scripts: &str, +) -> Result<(), eyre::ErrReport> { if name.contains('/') { bail!("node name must not contain `/` separators"); } @@ -100,7 +100,7 @@ fn create_custom_node(name: String, path: Option) -> Result<(), eyre::E .with_context(|| format!("failed to create directory `{}`", root.display()))?; let node_path = root.join("node.cc"); - fs::write(&node_path, NODE) + fs::write(&node_path, template_scripts) .with_context(|| format!("failed to write `{}`", node_path.display()))?; // TODO: Makefile? diff --git a/binaries/cli/src/template/cxx/talker-template.cc b/binaries/cli/src/template/cxx/talker-template.cc new file mode 100644 index 00000000..a1179e09 --- /dev/null +++ b/binaries/cli/src/template/cxx/talker-template.cc @@ -0,0 +1,39 @@ +#include "dora-node-api.h" // adjust this path if necessary + +#include +#include + + +int main() +{ + auto dora_node = init_dora_node(); + + for (int i = 0; i < 20; i++) + { + auto event = dora_node.events->next(); + auto ty = event_type(event); + + if (ty == DoraEventType::AllInputsClosed) + { + break; + } + else if (ty == DoraEventType::Input) + { + std::string message{"Hello World!"}; + rust::Slice message_slice{reinterpret_cast(message.c_str()), message.size()}; + auto result = send_output(dora_node.send_output, "speech", message_slice); + auto error = std::string(result.error); + if (!error.empty()) + { + std::cerr << "Error: " << error << std::endl; + return -1; + } + } + else + { + std::cerr << "Unknown event type " << static_cast(ty) << std::endl; + } + } + + return 0; +} diff --git a/binaries/cli/src/template/mod.rs b/binaries/cli/src/template/mod.rs index 4e68cb54..a1d59367 100644 --- a/binaries/cli/src/template/mod.rs +++ b/binaries/cli/src/template/mod.rs @@ -7,7 +7,7 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> match args.lang { crate::Lang::Rust => rust::create(args, use_path_deps), crate::Lang::Python => python::create(args), - crate::Lang::C => c::create(args), - crate::Lang::Cxx => cxx::create(args), + crate::Lang::C => c::create(args, use_path_deps), + crate::Lang::Cxx => cxx::create(args, use_path_deps), } } diff --git a/binaries/cli/src/template/python/dataflow-template.yml b/binaries/cli/src/template/python/dataflow-template.yml index 782d78a6..b44cacb7 100644 --- a/binaries/cli/src/template/python/dataflow-template.yml +++ b/binaries/cli/src/template/python/dataflow-template.yml @@ -1,23 +1,22 @@ nodes: - - id: op_1 - operator: - python: op_1/op_1.py + - id: talker_1 + custom: + source: talker_1/talker_1.py inputs: tick: dora/timer/millis/100 outputs: - - some-output - - id: op_2 - operator: - python: op_2/op_2.py + - speech + - id: talker_2 + custom: + source: talker_2/talker_2.py inputs: tick: dora/timer/secs/2 outputs: - - some-output + - speech - - id: custom-node_1 + - id: listener_1 custom: - source: ./node_1/node_1.py + source: listener_1/listener_1.py inputs: - tick: dora/timer/secs/1 - input-1: op_1/some-output - input-2: op_2/some-output + speech-1: talker_1/speech + speech-2: talker_2/speech diff --git a/binaries/cli/src/template/python/listener/listener-template.py b/binaries/cli/src/template/python/listener/listener-template.py new file mode 100644 index 00000000..82c8f809 --- /dev/null +++ b/binaries/cli/src/template/python/listener/listener-template.py @@ -0,0 +1,11 @@ +from dora import Node +import pyarrow as pa + +node = Node() + +for event in node: + if event["type"] == "INPUT": + message = event["value"][0].as_py() + print( + f"""I heard {message} from {event["id"]}""" + ) diff --git a/binaries/cli/src/template/python/mod.rs b/binaries/cli/src/template/python/mod.rs index a475fd85..4c8eb435 100644 --- a/binaries/cli/src/template/python/mod.rs +++ b/binaries/cli/src/template/python/mod.rs @@ -4,6 +4,10 @@ use std::{ path::{Path, PathBuf}, }; +const NODE_PY: &str = include_str!("node/node-template.py"); +const TALKER_PY: &str = include_str!("talker/talker-template.py"); +const LISTENER_PY: &str = include_str!("listener/listener-template.py"); + pub fn create(args: crate::CommandNew) -> eyre::Result<()> { let crate::CommandNew { kind, @@ -13,47 +17,23 @@ pub fn create(args: crate::CommandNew) -> eyre::Result<()> { } = args; match kind { - crate::Kind::Operator => create_operator(name, path), - crate::Kind::CustomNode => create_custom_node(name, path), + crate::Kind::CustomNode => create_custom_node(name, path, NODE_PY), crate::Kind::Dataflow => create_dataflow(name, path), } } -fn create_operator(name: String, path: Option) -> Result<(), eyre::ErrReport> { - const OPERATOR_PY: &str = include_str!("operator/operator-template.py"); - - if name.contains('/') { - bail!("Operator name must not contain `/` separators"); - } - if name.contains('.') { - bail!("Operator name must not contain `.` to not be confused for an extension"); - } - // create directories - let root = path.as_deref().unwrap_or_else(|| Path::new(&name)); - fs::create_dir(root) - .with_context(|| format!("failed to create directory `{}`", root.display()))?; - - let operator_path = root.join(format!("{name}.py")); - fs::write(&operator_path, OPERATOR_PY) - .with_context(|| format!("failed to write `{}`", operator_path.display()))?; - - println!( - "Created new Python operator `{name}` at {}", - Path::new(".").join(root).display() - ); - - Ok(()) -} -fn create_custom_node(name: String, path: Option) -> Result<(), eyre::ErrReport> { - const NODE_PY: &str = include_str!("node/node-template.py"); - +fn create_custom_node( + name: String, + path: Option, + template_scripts: &str, +) -> Result<(), eyre::ErrReport> { // create directories let root = path.as_deref().unwrap_or_else(|| Path::new(&name)); fs::create_dir(root) .with_context(|| format!("failed to create directory `{}`", root.display()))?; let node_path = root.join(format!("{name}.py")); - fs::write(&node_path, NODE_PY) + fs::write(&node_path, template_scripts) .with_context(|| format!("failed to write `{}`", node_path.display()))?; println!( @@ -84,9 +64,13 @@ fn create_dataflow(name: String, path: Option) -> Result<(), eyre::ErrR fs::write(&dataflow_yml_path, dataflow_yml) .with_context(|| format!("failed to write `{}`", dataflow_yml_path.display()))?; - create_operator("op_1".into(), Some(root.join("op_1")))?; - create_operator("op_2".into(), Some(root.join("op_2")))?; - create_custom_node("node_1".into(), Some(root.join("node_1")))?; + create_custom_node("talker_1".into(), Some(root.join("talker_1")), TALKER_PY)?; + create_custom_node("talker_2".into(), Some(root.join("talker_2")), TALKER_PY)?; + create_custom_node( + "listener_1".into(), + Some(root.join("listener_1")), + LISTENER_PY, + )?; println!( "Created new yaml dataflow `{name}` at {}", diff --git a/binaries/cli/src/template/python/talker/talker-template.py b/binaries/cli/src/template/python/talker/talker-template.py new file mode 100644 index 00000000..93324e40 --- /dev/null +++ b/binaries/cli/src/template/python/talker/talker-template.py @@ -0,0 +1,14 @@ +from dora import Node +import pyarrow as pa + +node = Node() + +for event in node: + if event["type"] == "INPUT": + print( + f"""Node received: + id: {event["id"]}, + value: {event["value"]}, + metadata: {event["metadata"]}""" + ) + node.send_output("speech", pa.array(["Hello World"])) diff --git a/binaries/cli/src/template/rust/Cargo-template.toml b/binaries/cli/src/template/rust/Cargo-template.toml index d28ac4f8..bf138465 100644 --- a/binaries/cli/src/template/rust/Cargo-template.toml +++ b/binaries/cli/src/template/rust/Cargo-template.toml @@ -1,3 +1,3 @@ [workspace] resolver = "2" -members = ["op_1", "op_2", "node_1"] +members = ["talker_1", "talker_2", "listener_1"] diff --git a/binaries/cli/src/template/rust/dataflow-template.yml b/binaries/cli/src/template/rust/dataflow-template.yml index 0f019fe4..7093abc5 100644 --- a/binaries/cli/src/template/rust/dataflow-template.yml +++ b/binaries/cli/src/template/rust/dataflow-template.yml @@ -1,26 +1,26 @@ nodes: - - id: op_1 - operator: - build: cargo build -p op_1 - shared-library: target/debug/op_1 + - id: talker_1 + custom: + build: cargo build -p talker_1 + source: target/debug/talker_1 inputs: tick: dora/timer/millis/100 outputs: - - some-output - - id: op_2 - operator: - build: cargo build -p op_2 - shared-library: target/debug/op_2 + - speech + - id: talker_2 + custom: + build: cargo build -p talker_2 + source: target/debug/talker_2 inputs: tick: dora/timer/secs/2 outputs: - - some-output + - speech - - id: custom-node_1 + - id: listener_1 custom: - build: cargo build -p node_1 - source: target/debug/node_1 + build: cargo build -p listener_1 + source: target/debug/listener_1 inputs: tick: dora/timer/secs/1 - input-1: op_1/some-output - input-2: op_2/some-output + speech-1: talker_1/speech + speech-2: talker_2/speech diff --git a/binaries/cli/src/template/rust/listener/Cargo-template.toml b/binaries/cli/src/template/rust/listener/Cargo-template.toml new file mode 100644 index 00000000..fa46f49a --- /dev/null +++ b/binaries/cli/src/template/rust/listener/Cargo-template.toml @@ -0,0 +1,9 @@ +[package] +name = "___name___" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = {} diff --git a/binaries/cli/src/template/rust/listener/main-template.rs b/binaries/cli/src/template/rust/listener/main-template.rs new file mode 100644 index 00000000..e2a769c6 --- /dev/null +++ b/binaries/cli/src/template/rust/listener/main-template.rs @@ -0,0 +1,25 @@ +use dora_node_api::{DoraNode, Event}; +use std::error::Error; + +fn main() -> Result<(), Box> { + let (mut node, mut events) = DoraNode::init_from_env()?; + + while let Some(event) = events.recv() { + match event { + Event::Input { + id, + metadata, + data, + } => match id.as_str() { + "speech" => { + let message: &str = (&data).try_into()?; + println!("I heard: {message} from {id}"); + } + other => println!("Received input `{other}`"), + }, + _ => {} + } + } + + Ok(()) +} diff --git a/binaries/cli/src/template/rust/mod.rs b/binaries/cli/src/template/rust/mod.rs index 84e5f30e..a0f4717c 100644 --- a/binaries/cli/src/template/rust/mod.rs +++ b/binaries/cli/src/template/rust/mod.rs @@ -4,6 +4,10 @@ use std::{ path::{Path, PathBuf}, }; +const MAIN_RS: &str = include_str!("node/main-template.rs"); +const TALKER_RS: &str = include_str!("talker/main-template.rs"); +const LISTENER_RS: &str = include_str!("listener/main-template.rs"); + const VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> { let crate::CommandNew { @@ -14,8 +18,7 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> } = args; match kind { - crate::Kind::Operator => create_operator(name, path, use_path_deps), - crate::Kind::CustomNode => create_custom_node(name, path, use_path_deps), + crate::Kind::CustomNode => create_custom_node(name, path, use_path_deps, MAIN_RS), crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps), } } @@ -49,9 +52,24 @@ fn create_dataflow( fs::write(&cargo_toml_path, cargo_toml) .with_context(|| format!("failed to write `{}`", cargo_toml_path.display()))?; - create_operator("op_1".into(), Some(root.join("op_1")), use_path_deps)?; - create_operator("op_2".into(), Some(root.join("op_2")), use_path_deps)?; - create_custom_node("node_1".into(), Some(root.join("node_1")), use_path_deps)?; + create_custom_node( + "talker_1".into(), + Some(root.join("talker_1")), + use_path_deps, + TALKER_RS, + )?; + create_custom_node( + "talker_2".into(), + Some(root.join("talker_2")), + use_path_deps, + TALKER_RS, + )?; + create_custom_node( + "listener_1".into(), + Some(root.join("listener_1")), + use_path_deps, + LISTENER_RS, + )?; println!( "Created new Rust dataflow at `{name}` at {}", @@ -61,67 +79,13 @@ fn create_dataflow( Ok(()) } -fn create_operator( - name: String, - path: Option, - use_path_deps: bool, -) -> Result<(), eyre::ErrReport> { - const CARGO_TOML: &str = include_str!("operator/Cargo-template.toml"); - const LIB_RS: &str = include_str!("operator/lib-template.rs"); - - if name.contains('/') { - bail!("operator name must not contain `/` separators"); - } - if name.contains('-') { - bail!( - "operator name must not contain `-` separators as - it get replaced by `_` as a static library." - ); - } - if !name.is_ascii() { - bail!("operator name must be ASCII"); - } - - // create directories - let root = path.as_deref().unwrap_or_else(|| Path::new(&name)); - fs::create_dir(root) - .with_context(|| format!("failed to create directory `{}`", root.display()))?; - let src = root.join("src"); - fs::create_dir(&src) - .with_context(|| format!("failed to create directory `{}`", src.display()))?; - - let dep = if use_path_deps { - r#"dora-operator-api = { path = "../../apis/rust/operator" }"#.to_string() - } else { - format!(r#"dora-operator-api = "{VERSION}""#) - }; - let cargo_toml = CARGO_TOML - .replace("___name___", &name) - .replace("dora-operator-api = {}", &dep); - - let cargo_toml_path = root.join("Cargo.toml"); - fs::write(&cargo_toml_path, cargo_toml) - .with_context(|| format!("failed to write `{}`", cargo_toml_path.display()))?; - - let lib_rs_path = src.join("lib.rs"); - fs::write(&lib_rs_path, LIB_RS) - .with_context(|| format!("failed to write `{}`", lib_rs_path.display()))?; - - println!( - "Created new Rust operator `{name}` at {}", - Path::new(".").join(root).display() - ); - - Ok(()) -} - fn create_custom_node( name: String, path: Option, use_path_deps: bool, + template_scripts: &str, ) -> Result<(), eyre::ErrReport> { const CARGO_TOML: &str = include_str!("node/Cargo-template.toml"); - const MAIN_RS: &str = include_str!("node/main-template.rs"); if name.contains('/') { bail!("node name must not contain `/` separators"); @@ -151,7 +115,7 @@ fn create_custom_node( .with_context(|| format!("failed to write `{}`", cargo_toml_path.display()))?; let main_rs_path = src.join("main.rs"); - fs::write(&main_rs_path, MAIN_RS) + fs::write(&main_rs_path, template_scripts) .with_context(|| format!("failed to write `{}`", main_rs_path.display()))?; println!( diff --git a/binaries/cli/src/template/rust/talker/Cargo-template.toml b/binaries/cli/src/template/rust/talker/Cargo-template.toml new file mode 100644 index 00000000..fa46f49a --- /dev/null +++ b/binaries/cli/src/template/rust/talker/Cargo-template.toml @@ -0,0 +1,9 @@ +[package] +name = "___name___" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = {} diff --git a/binaries/cli/src/template/rust/talker/main-template.rs b/binaries/cli/src/template/rust/talker/main-template.rs new file mode 100644 index 00000000..a334102e --- /dev/null +++ b/binaries/cli/src/template/rust/talker/main-template.rs @@ -0,0 +1,25 @@ +use dora_node_api::{dora_core::config::DataId, DoraNode, Event, IntoArrow}; +use std::error::Error; + +fn main() -> Result<(), Box> { + let (mut node, mut events) = DoraNode::init_from_env()?; + + while let Some(event) = events.recv() { + match event { + Event::Input { + id, + metadata, + data: _, + } => match id.as_str() { + "tick" => { + node.send_output(DataId::from("speech".to_owned()), metadata.parameters, String::from("Hello World!").into_arrow())?; + println!("Node received `{id}`"); + }, + _ => {} + }, + _ => {} + } + } + + Ok(()) +} diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index d00dfe22..67a830b3 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,7 +9,10 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{ControlRequest, ControlRequestReply, DataflowId, DataflowListEntry}, + topics::{ + ControlRequest, ControlRequestReply, DataflowDaemonResult, DataflowId, DataflowListEntry, + DataflowResult, + }, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -134,7 +137,8 @@ async fn start_inner( let mut events = (abortable_events, daemon_events).merge(); let mut running_dataflows: HashMap = HashMap::new(); - let mut dataflow_results: HashMap>> = HashMap::new(); + let mut dataflow_results: HashMap> = + HashMap::new(); let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new(); @@ -220,18 +224,22 @@ async fn start_inner( Event::Dataflow { uuid, event } => match event { DataflowEvent::ReadyOnMachine { machine_id, - success, + exited_before_subscribe, } => { match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { let dataflow = entry.get_mut(); dataflow.pending_machines.remove(&machine_id); - dataflow.init_success &= success; + dataflow + .exited_before_subscribe + .extend(exited_before_subscribe); if dataflow.pending_machines.is_empty() { let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::AllNodesReady { dataflow_id: uuid, - success: dataflow.init_success, + exited_before_subscribe: dataflow + .exited_before_subscribe + .clone(), }, timestamp: clock.new_timestamp(), }) @@ -271,26 +279,20 @@ async fn start_inner( .insert(uuid, ArchivedDataflow::from(entry.get())); } entry.get_mut().machines.remove(&machine_id); - match &result { - Ok(()) => { - tracing::info!("dataflow `{uuid}` finished successfully on machine `{machine_id}`"); - } - Err(err) => { - tracing::error!("{err:?}"); - } - } dataflow_results .entry(uuid) .or_default() - .insert(machine_id, result.map_err(|err| format!("{err:?}"))); + .insert(machine_id, result); if entry.get_mut().machines.is_empty() { let finished_dataflow = entry.remove(); let reply = ControlRequestReply::DataflowStopped { uuid, result: dataflow_results .get(&uuid) - .map(|r| dataflow_result(r, uuid)) - .unwrap_or(Ok(())), + .map(|r| dataflow_result(r, uuid, &clock)) + .unwrap_or_else(|| { + DataflowResult::ok_empty(uuid, clock.new_timestamp()) + }), }; for sender in finished_dataflow.reply_senders { let _ = sender.send(Ok(reply.clone())); @@ -353,8 +355,13 @@ async fn start_inner( uuid: dataflow_uuid, result: dataflow_results .get(&dataflow_uuid) - .map(|r| dataflow_result(r, dataflow_uuid)) - .unwrap_or(Ok(())), + .map(|r| dataflow_result(r, dataflow_uuid, &clock)) + .unwrap_or_else(|| { + DataflowResult::ok_empty( + dataflow_uuid, + clock.new_timestamp(), + ) + }), }, }; let _ = reply_sender.send(Ok(status)); @@ -396,6 +403,7 @@ async fn start_inner( reply_sender, clock.new_timestamp(), grace_duration, + &clock, ) .await?; } @@ -412,6 +420,7 @@ async fn start_inner( reply_sender, clock.new_timestamp(), grace_duration, + &clock, ) .await? } @@ -561,18 +570,19 @@ async fn start_inner( async fn stop_dataflow_by_uuid( running_dataflows: &mut HashMap, - dataflow_results: &HashMap>>, + dataflow_results: &HashMap>, dataflow_uuid: Uuid, daemon_connections: &mut HashMap, reply_sender: tokio::sync::oneshot::Sender>, timestamp: uhlc::Timestamp, grace_duration: Option, + clock: &uhlc::HLC, ) -> Result<(), eyre::ErrReport> { let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else { if let Some(result) = dataflow_results.get(&dataflow_uuid) { let reply = ControlRequestReply::DataflowStopped { uuid: dataflow_uuid, - result: dataflow_result(result, dataflow_uuid), + result: dataflow_result(result, dataflow_uuid, clock), }; let _ = reply_sender.send(Ok(reply)); return Ok(()); @@ -601,36 +611,23 @@ async fn stop_dataflow_by_uuid( Ok(()) } -fn format_error(machine: &str, err: &str) -> String { - let mut error = err - .lines() - .fold(format!("- machine `{machine}`:\n"), |mut output, line| { - output.push_str(" "); - output.push_str(line); - output.push('\n'); - output - }); - error.push('\n'); - error -} - fn dataflow_result( - results: &BTreeMap>, + results: &BTreeMap, dataflow_uuid: Uuid, -) -> Result<(), String> { - let mut errors = Vec::new(); - for (machine, result) in results { - if let Err(err) = result { - errors.push(format_error(machine, err)); + clock: &uhlc::HLC, +) -> DataflowResult { + let mut node_results = BTreeMap::new(); + for (_machine, result) in results { + node_results.extend(result.node_results.clone()); + if let Err(err) = clock.update_with_timestamp(&result.timestamp) { + tracing::warn!("failed to update HLC: {err}"); } } - if errors.is_empty() { - Ok(()) - } else { - let mut formatted = format!("errors occurred in dataflow {dataflow_uuid}:\n"); - formatted.push_str(&errors.join("\n")); - Err(formatted) + DataflowResult { + uuid: dataflow_uuid, + timestamp: clock.new_timestamp(), + node_results, } } @@ -685,7 +682,7 @@ struct RunningDataflow { machines: BTreeSet, /// IDs of machines that are waiting until all nodes are started. pending_machines: BTreeSet, - init_success: bool, + exited_before_subscribe: Vec, nodes: Vec, reply_senders: Vec>>, @@ -884,7 +881,7 @@ async fn start_dataflow( } else { BTreeSet::new() }, - init_success: true, + exited_before_subscribe: Default::default(), machines, nodes, reply_senders: Vec::new(), @@ -951,11 +948,11 @@ impl Event { pub enum DataflowEvent { DataflowFinishedOnMachine { machine_id: String, - result: eyre::Result<()>, + result: DataflowDaemonResult, }, ReadyOnMachine { machine_id: String, - success: bool, + exited_before_subscribe: Vec, }, } @@ -990,21 +987,3 @@ fn set_up_ctrlc_handler() -> Result, eyre::ErrReport> Ok(ReceiverStream::new(ctrlc_rx)) } - -#[cfg(test)] -mod test { - #[test] - fn test_format_error() { - let machine = "machine A"; - let err = "foo\nbar\nbuzz"; - - // old method - let old_error = { - #[allow(clippy::format_collect)] - let err: String = err.lines().map(|line| format!(" {line}\n")).collect(); - format!("- machine `{machine}`:\n{err}\n") - }; - let new_error = super::format_error(machine, err); - assert_eq!(old_error, new_error) - } -} diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 86600a4b..f6f9b56c 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,6 +1,6 @@ use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC}; -use eyre::{eyre, Context}; +use eyre::Context; use std::{io::ErrorKind, net::SocketAddr, sync::Arc}; use tokio::{ net::{TcpListener, TcpStream}, @@ -66,13 +66,13 @@ pub async fn handle_connection( coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event { coordinator_messages::DaemonEvent::AllNodesReady { dataflow_id, - success, + exited_before_subscribe, } => { let event = Event::Dataflow { uuid: dataflow_id, event: DataflowEvent::ReadyOnMachine { machine_id, - success, + exited_before_subscribe, }, }; if events_tx.send(event).await.is_err() { @@ -85,10 +85,7 @@ pub async fn handle_connection( } => { let event = Event::Dataflow { uuid: dataflow_id, - event: DataflowEvent::DataflowFinishedOnMachine { - machine_id, - result: result.map_err(|e| eyre!(e)), - }, + event: DataflowEvent::DataflowFinishedOnMachine { machine_id, result }, }; if events_tx.send(event).await.is_err() { break; diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index a53607f9..2f1be890 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -39,3 +39,5 @@ aligned-vec = "0.5.0" ctrlc = "3.2.5" which = "5.0.0" sysinfo = "0.30.11" +crossbeam = "0.8.4" +crossbeam-skiplist = "0.1.3" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 521b5bd8..2f030a61 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1,5 +1,6 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; +use crossbeam::queue::ArrayQueue; use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; use dora_core::daemon_messages::{ @@ -9,6 +10,9 @@ use dora_core::descriptor::runtime_node_inputs; use dora_core::message::uhlc::{self, HLC}; use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; use dora_core::topics::LOCALHOST; +use dora_core::topics::{ + DataflowDaemonResult, DataflowResult, NodeError, NodeErrorCause, NodeExitStatus, +}; use dora_core::{ config::{DataId, InputMapping, NodeId}, coordinator_messages::DaemonEvent, @@ -29,9 +33,7 @@ use shared_memory_server::ShmemConf; use std::sync::Arc; use std::time::Instant; use std::{ - borrow::Cow, collections::{BTreeMap, BTreeSet, HashMap}, - io, net::SocketAddr, path::{Path, PathBuf}, time::Duration, @@ -64,6 +66,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::pending::DataflowStatus; +const STDERR_LOG_LINES: usize = 10; + pub struct Daemon { running: HashMap, working_dir: HashMap, @@ -78,7 +82,7 @@ pub struct Daemon { /// used for testing and examples exit_when_done: Option>, /// used to record dataflow results when `exit_when_done` is used - dataflow_errors: BTreeMap>, + dataflow_node_results: BTreeMap>>, clock: Arc, } @@ -148,7 +152,7 @@ impl Daemon { .map(|_| ()) } - pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> { + pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result { let working_dir = dataflow_path .canonicalize() .context("failed to canoncialize dataflow path")? @@ -160,8 +164,9 @@ impl Daemon { descriptor.check(&working_dir)?; let nodes = descriptor.resolve_aliases_and_set_defaults()?; + let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext)); let spawn_command = SpawnDataflowNodes { - dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)), + dataflow_id, working_dir, nodes, machine_listen_ports: BTreeMap::new(), @@ -191,7 +196,7 @@ impl Daemon { None, "".to_string(), Some(exit_when_done), - clock, + clock.clone(), ); let spawn_result = reply_rx @@ -205,20 +210,15 @@ impl Daemon { } }); - let (dataflow_errors, ()) = future::try_join(run_result, spawn_result).await?; + let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?; - if dataflow_errors.is_empty() { - Ok(()) - } else { - let mut output = "some nodes failed:".to_owned(); - for (dataflow, node_errors) in dataflow_errors { - for (node, error) in node_errors { - use std::fmt::Write; - write!(&mut output, "\n - {dataflow}/{node}: {error}").unwrap(); - } - } - bail!("{output}"); - } + Ok(DataflowResult { + uuid: dataflow_id, + timestamp: clock.new_timestamp(), + node_results: dataflow_results + .remove(&dataflow_id) + .context("no node results for dataflow_id")?, + }) } async fn run_general( @@ -227,7 +227,7 @@ impl Daemon { machine_id: String, exit_when_done: Option>, clock: Arc, - ) -> eyre::Result>> { + ) -> eyre::Result>>> { let coordinator_connection = match coordinator_addr { Some(addr) => { let stream = TcpStream::connect(addr) @@ -251,7 +251,7 @@ impl Daemon { inter_daemon_connections: BTreeMap::new(), machine_id, exit_when_done, - dataflow_errors: BTreeMap::new(), + dataflow_node_results: BTreeMap::new(), clock, }; @@ -272,7 +272,7 @@ impl Daemon { async fn run_inner( mut self, incoming_events: impl Stream> + Unpin, - ) -> eyre::Result>> { + ) -> eyre::Result>>> { let mut events = incoming_events; while let Some(event) = events.next().await { @@ -329,7 +329,7 @@ impl Daemon { } } - Ok(self.dataflow_errors) + Ok(self.dataflow_node_results) } async fn handle_coordinator_event( @@ -376,15 +376,19 @@ impl Daemon { } DaemonCoordinatorEvent::AllNodesReady { dataflow_id, - success, + exited_before_subscribe, } => { match self.running.get_mut(&dataflow_id) { Some(dataflow) => { + let ready = exited_before_subscribe.is_empty(); dataflow .pending_nodes - .handle_external_all_nodes_ready(success) + .handle_external_all_nodes_ready( + exited_before_subscribe, + &mut dataflow.cascading_error_causes, + ) .await?; - if success { + if ready { tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`"); dataflow.start(&self.events_tx, &self.clock).await?; } @@ -614,6 +618,11 @@ impl Daemon { dataflow.pending_nodes.insert(node.id.clone()); let node_id = node.id.clone(); + let node_stderr_most_recent = dataflow + .node_stderr_most_recent + .entry(node.id.clone()) + .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES))) + .clone(); match spawn::spawn_node( dataflow_id, &working_dir, @@ -621,6 +630,7 @@ impl Daemon { self.events_tx.clone(), dataflow_descriptor.clone(), self.clock.clone(), + node_stderr_most_recent, ) .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) @@ -636,6 +646,7 @@ impl Daemon { &node_id, &mut self.coordinator_connection, &self.clock, + &mut dataflow.cascading_error_causes, ) .await?; } @@ -742,6 +753,7 @@ impl Daemon { reply_sender, &mut self.coordinator_connection, &self.clock, + &mut dataflow.cascading_error_causes, ) .await?; match status { @@ -996,7 +1008,12 @@ impl Daemon { dataflow .pending_nodes - .handle_node_stop(node_id, &mut self.coordinator_connection, &self.clock) + .handle_node_stop( + node_id, + &mut self.coordinator_connection, + &self.clock, + &mut dataflow.cascading_error_causes, + ) .await?; Self::handle_outputs_done( @@ -1013,17 +1030,15 @@ impl Daemon { .iter() .all(|(_id, n)| n.node_config.dynamic) { - let result = match self.dataflow_errors.get(&dataflow.id) { - None => Ok(()), - Some(errors) => { - let mut output = "some nodes failed:".to_owned(); - for (node, error) in errors { - use std::fmt::Write; - write!(&mut output, "\n - {node}: {error}").unwrap(); - } - Err(output) - } + let result = DataflowDaemonResult { + timestamp: self.clock.new_timestamp(), + node_results: self + .dataflow_node_results + .get(&dataflow.id) + .context("failed to get dataflow node results")? + .clone(), }; + tracing::info!( "Dataflow `{dataflow_id}` finished on machine `{}`", self.machine_id @@ -1142,80 +1157,57 @@ impl Daemon { node_id, exit_status, } => { - let node_error = match exit_status { + let node_result = match exit_status { NodeExitStatus::Success => { tracing::info!("node {dataflow_id}/{node_id} finished successfully"); - None - } - NodeExitStatus::IoError(err) => { - let err = eyre!(err).wrap_err(format!( - " - I/O error while waiting for node `{dataflow_id}/{node_id}. - - Check logs using: dora logs {dataflow_id} {node_id} - " - )); - tracing::error!("{err:?}"); - Some(err) + Ok(()) } - NodeExitStatus::ExitCode(code) => { - let err = eyre!( - " - {dataflow_id}/{node_id} failed with exit code {code}. - - Check logs using: dora logs {dataflow_id} {node_id} - " - ); - tracing::error!("{err}"); - Some(err) - } - NodeExitStatus::Signal(signal) => { - let signal: Cow<_> = match signal { - 1 => "SIGHUP".into(), - 2 => "SIGINT".into(), - 3 => "SIGQUIT".into(), - 4 => "SIGILL".into(), - 6 => "SIGABRT".into(), - 8 => "SIGFPE".into(), - 9 => "SIGKILL".into(), - 11 => "SIGSEGV".into(), - 13 => "SIGPIPE".into(), - 14 => "SIGALRM".into(), - 15 => "SIGTERM".into(), - 22 => "SIGABRT".into(), - 23 => "NSIG".into(), - - other => other.to_string().into(), + exit_status => { + let dataflow = self.running.get(&dataflow_id); + let caused_by_node = dataflow + .and_then(|dataflow| { + dataflow.cascading_error_causes.error_caused_by(&node_id) + }) + .cloned(); + let grace_duration_kill = dataflow + .map(|d| d.grace_duration_kills.contains(&node_id)) + .unwrap_or_default(); + + let cause = match caused_by_node { + Some(caused_by_node) => { + tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`"); + NodeErrorCause::Cascading { caused_by_node } + } + None if grace_duration_kill => NodeErrorCause::GraceDuration, + None => NodeErrorCause::Other { + stderr: dataflow + .and_then(|d| d.node_stderr_most_recent.get(&node_id)) + .map(|queue| { + let mut s = if queue.is_full() { + "[...]".into() + } else { + String::new() + }; + while let Some(line) = queue.pop() { + s += &line; + } + s + }) + .unwrap_or_default(), + }, }; - let err = eyre!( - " - {dataflow_id}/{node_id} failed with signal `{signal}` - - Check logs using: dora logs {dataflow_id} {node_id} - " - ); - tracing::error!("{err}"); - Some(err) - } - NodeExitStatus::Unknown => { - let err = eyre!( - " - {dataflow_id}/{node_id} failed with unknown exit code - - Check logs using: dora logs {dataflow_id} {node_id} - " - ); - tracing::error!("{err}"); - Some(err) + Err(NodeError { + timestamp: self.clock.new_timestamp(), + cause, + exit_status, + }) } }; - if let Some(err) = node_error { - self.dataflow_errors - .entry(dataflow_id) - .or_default() - .insert(node_id.clone(), err); - } + self.dataflow_node_results + .entry(dataflow_id) + .or_default() + .insert(node_id.clone(), node_result); self.handle_node_stop(dataflow_id, &node_id).await?; @@ -1423,6 +1415,12 @@ pub struct RunningDataflow { /// /// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable. empty_set: BTreeSet, + + /// Contains the node that caused the error for nodes that experienced a cascading error. + cascading_error_causes: CascadingErrorCauses, + grace_duration_kills: Arc>, + + node_stderr_most_recent: BTreeMap>>, } impl RunningDataflow { @@ -1441,6 +1439,9 @@ impl RunningDataflow { _timer_handles: Vec::new(), stop_sent: false, empty_set: BTreeSet::new(), + cascading_error_causes: Default::default(), + grace_duration_kills: Default::default(), + node_stderr_most_recent: BTreeMap::new(), } } @@ -1503,6 +1504,7 @@ impl RunningDataflow { } let running_nodes = self.running_nodes.clone(); + let grace_duration_kills = self.grace_duration_kills.clone(); tokio::spawn(async move { let duration = grace_duration.unwrap_or(Duration::from_millis(500)); tokio::time::sleep(duration).await; @@ -1512,6 +1514,7 @@ impl RunningDataflow { for (node, node_details) in running_nodes.iter() { if let Some(pid) = node_details.pid { if let Some(process) = system.process(Pid::from(pid as usize)) { + grace_duration_kills.insert(node.clone()); process.kill(); warn!( "{node} was killed due to not stopping within the {:#?} grace period", @@ -1644,39 +1647,6 @@ pub enum DoraEvent { }, } -#[derive(Debug)] -pub enum NodeExitStatus { - Success, - IoError(io::Error), - ExitCode(i32), - Signal(i32), - Unknown, -} - -impl From> for NodeExitStatus { - fn from(result: Result) -> Self { - match result { - Ok(status) => { - if status.success() { - NodeExitStatus::Success - } else if let Some(code) = status.code() { - Self::ExitCode(code) - } else { - #[cfg(unix)] - { - use std::os::unix::process::ExitStatusExt; - if let Some(signal) = status.signal() { - return Self::Signal(signal); - } - } - Self::Unknown - } - } - Err(err) => Self::IoError(err), - } - } -} - #[must_use] enum RunStatus { Continue, @@ -1723,3 +1693,23 @@ fn set_up_ctrlc_handler( Ok(ReceiverStream::new(ctrlc_rx)) } + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct CascadingErrorCauses { + caused_by: BTreeMap, +} + +impl CascadingErrorCauses { + pub fn experienced_cascading_error(&self, node: &NodeId) -> bool { + self.caused_by.contains_key(node) + } + + /// Return the ID of the node that caused a cascading error for the given node, if any. + pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> { + self.caused_by.get(node) + } + + pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) { + self.caused_by.entry(affected_node).or_insert(causing_node); + } +} diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index ccba1a56..d1fb1b30 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -9,7 +9,7 @@ use dora_core::{ use eyre::{bail, Context}; use tokio::{net::TcpStream, sync::oneshot}; -use crate::tcp_utils::tcp_send; +use crate::{tcp_utils::tcp_send, CascadingErrorCauses}; pub struct PendingNodes { dataflow_id: DataflowId, @@ -28,7 +28,7 @@ pub struct PendingNodes { /// /// If this list is non-empty, we should not start the dataflow at all. Instead, /// we report an error to the other nodes. - exited_before_subscribe: HashSet, + exited_before_subscribe: Vec, /// Whether the local init result was already reported to the coordinator. reported_init_to_coordinator: bool, @@ -42,7 +42,7 @@ impl PendingNodes { local_nodes: HashSet::new(), external_nodes: false, waiting_subscribers: HashMap::new(), - exited_before_subscribe: HashSet::new(), + exited_before_subscribe: Default::default(), reported_init_to_coordinator: false, } } @@ -61,12 +61,13 @@ impl PendingNodes { reply_sender: oneshot::Sender, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result { self.waiting_subscribers .insert(node_id.clone(), reply_sender); self.local_nodes.remove(&node_id); - self.update_dataflow_status(coordinator_connection, clock) + self.update_dataflow_status(coordinator_connection, clock, cascading_errors) .await } @@ -75,26 +76,28 @@ impl PendingNodes { node_id: &NodeId, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result<()> { if self.local_nodes.remove(node_id) { tracing::warn!("node `{node_id}` exited before initializing dora connection"); - self.exited_before_subscribe.insert(node_id.clone()); - self.update_dataflow_status(coordinator_connection, clock) + self.exited_before_subscribe.push(node_id.clone()); + self.update_dataflow_status(coordinator_connection, clock, cascading_errors) .await?; } Ok(()) } - pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> { + pub async fn handle_external_all_nodes_ready( + &mut self, + exited_before_subscribe: Vec, + cascading_errors: &mut CascadingErrorCauses, + ) -> eyre::Result<()> { if !self.local_nodes.is_empty() { bail!("received external `all_nodes_ready` event before local nodes were ready"); } - let external_error = if success { - None - } else { - Some("some nodes failed to initialize on remote machines".to_string()) - }; - self.answer_subscribe_requests(external_error).await; + + self.answer_subscribe_requests(exited_before_subscribe, cascading_errors) + .await; Ok(()) } @@ -103,6 +106,7 @@ impl PendingNodes { &mut self, coordinator_connection: &mut Option, clock: &HLC, + cascading_errors: &mut CascadingErrorCauses, ) -> eyre::Result { if self.local_nodes.is_empty() { if self.external_nodes { @@ -113,7 +117,8 @@ impl PendingNodes { } Ok(DataflowStatus::Pending) } else { - self.answer_subscribe_requests(None).await; + self.answer_subscribe_requests(Vec::new(), cascading_errors) + .await; Ok(DataflowStatus::AllNodesReady) } } else { @@ -121,33 +126,34 @@ impl PendingNodes { } } - async fn answer_subscribe_requests(&mut self, external_error: Option) { - let result = if self.exited_before_subscribe.is_empty() { - match external_error { - Some(err) => Err(err), - None => Ok(()), - } - } else { - let node_id_message = if self.exited_before_subscribe.len() == 1 { - self.exited_before_subscribe - .iter() - .next() - .map(|node_id| node_id.to_string()) - .unwrap_or("".to_string()) - } else { - "".to_string() - }; - Err(format!( - "Some nodes exited before subscribing to dora: {:?}\n\n\ - This is typically happens when an initialization error occurs - in the node or operator. To check the output of the failed - nodes, run `dora logs {} {node_id_message}`.", - self.exited_before_subscribe, self.dataflow_id - )) + async fn answer_subscribe_requests( + &mut self, + exited_before_subscribe_external: Vec, + cascading_errors: &mut CascadingErrorCauses, + ) { + let node_exited_before_subscribe = match self.exited_before_subscribe.as_slice() { + [first, ..] => Some(first), + [] => match exited_before_subscribe_external.as_slice() { + [first, ..] => Some(first), + [] => None, + }, }; + + let result = match &node_exited_before_subscribe { + Some(causing_node) => Err(format!( + "Node {causing_node} exited before initializing dora. For \ + more information, run `dora logs {} {causing_node}`.", + self.dataflow_id + )), + None => Ok(()), + }; + // answer all subscribe requests let subscribe_replies = std::mem::take(&mut self.waiting_subscribers); - for reply_sender in subscribe_replies.into_values() { + for (node_id, reply_sender) in subscribe_replies.into_iter() { + if let Some(causing_node) = node_exited_before_subscribe { + cascading_errors.report_cascading_error(causing_node.clone(), node_id.clone()); + } let _ = reply_sender.send(DaemonReply::Result(result.clone())); } } @@ -161,15 +167,17 @@ impl PendingNodes { bail!("no coordinator connection to send AllNodesReady"); }; - let success = self.exited_before_subscribe.is_empty(); - tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes"); + tracing::info!( + "all local nodes are ready (exit before subscribe: {:?}), waiting for remote nodes", + self.exited_before_subscribe + ); let msg = serde_json::to_vec(&Timestamped { inner: CoordinatorRequest::Event { machine_id: self.machine_id.clone(), event: DaemonEvent::AllNodesReady { dataflow_id: self.dataflow_id, - success, + exited_before_subscribe: self.exited_before_subscribe.clone(), }, }, timestamp, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 55a67384..b0af2e31 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -3,6 +3,7 @@ use crate::{ OutputId, RunningNode, }; use aligned_vec::{AVec, ConstAlign}; +use crossbeam::queue::ArrayQueue; use dora_arrow_convert::IntoArrow; use dora_core::{ config::DataId, @@ -32,7 +33,7 @@ use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt}, sync::{mpsc, oneshot}, }; -use tracing::{debug, error}; +use tracing::error; /// clock is required for generating timestamps when dropping messages early because queue is full pub async fn spawn_node( @@ -42,6 +43,7 @@ pub async fn spawn_node( daemon_tx: mpsc::Sender>, dataflow_descriptor: Descriptor, clock: Arc, + node_stderr_most_recent: Arc>, ) -> eyre::Result { let node_id = node.id.clone(); tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); @@ -358,18 +360,22 @@ pub async fn spawn_node( } }; - match String::from_utf8(raw) { - Ok(s) => buffer.push_str(&s), + let new = match String::from_utf8(raw) { + Ok(s) => s, Err(err) => { let lossy = String::from_utf8_lossy(err.as_bytes()); tracing::warn!( "stderr not valid UTF-8 string (node {node_id}): {}: {lossy}", err.utf8_error() ); - buffer.push_str(&lossy) + lossy.into_owned() } }; + buffer.push_str(&new); + + node_stderr_most_recent.force_push(new); + if buffer.starts_with("Traceback (most recent call last):") { if !finished { continue; diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 82d01ebb..fa488388 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -208,7 +208,9 @@ pub fn run( metadata.parameters.open_telemetry_context = string_cx; } - let py_event = PyEvent::from(event); + let py_event = PyEvent::from(event) + .to_py_dict(py) + .context("Could not convert event to pydict bound")?; let status_enum = operator .call_method1(py, "on_event", (py_event, send_output.clone())) diff --git a/examples/python-dataflow/example.py b/examples/python-dataflow/example.py new file mode 100644 index 00000000..c9221a3a --- /dev/null +++ b/examples/python-dataflow/example.py @@ -0,0 +1,5 @@ +from dora import Node + +node = Node("plot") + +event = node.next() diff --git a/examples/python-dataflow/requirements.txt b/examples/python-dataflow/requirements.txt index a8f02b0a..9c1fc915 100644 --- a/examples/python-dataflow/requirements.txt +++ b/examples/python-dataflow/requirements.txt @@ -6,7 +6,7 @@ ultralytics gitpython ipython # interactive notebook matplotlib>=3.2.2 -numpy>=1.18.5 +numpy<2.0.0 # See: https://github.com/opencv/opencv-python/issues/997 opencv-python>=4.1.1 Pillow>=7.1.2 psutil # system resources @@ -44,4 +44,4 @@ seaborn>=0.11.0 # roboflow opencv-python>=4.1.1 -maturin \ No newline at end of file +maturin diff --git a/examples/python-dataflow/run.rs b/examples/python-dataflow/run.rs index a14b553f..65ae5831 100644 --- a/examples/python-dataflow/run.rs +++ b/examples/python-dataflow/run.rs @@ -1,5 +1,4 @@ use dora_core::{get_pip_path, get_python_path, run}; -use dora_download::download_file; use dora_tracing::set_up_tracing; use eyre::{bail, ContextCompat, WrapErr}; use std::path::Path; @@ -73,12 +72,6 @@ async fn main() -> eyre::Result<()> { ) .await .context("maturin develop failed")?; - download_file( - "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt", - Path::new("yolov8n.pt"), - ) - .await - .context("Could not download weights.")?; let dataflow = Path::new("dataflow.yml"); run_dataflow(dataflow).await?; diff --git a/examples/python-operator-dataflow/requirements.txt b/examples/python-operator-dataflow/requirements.txt index 68020faa..bd15e396 100644 --- a/examples/python-operator-dataflow/requirements.txt +++ b/examples/python-operator-dataflow/requirements.txt @@ -6,7 +6,7 @@ ultralytics gitpython ipython # interactive notebook matplotlib>=3.2.2 -numpy>=1.18.5 +numpy<2.0.0 opencv-python>=4.1.1 Pillow>=7.1.2 psutil # system resources diff --git a/examples/python-ros2-dataflow/random_turtle.py b/examples/python-ros2-dataflow/random_turtle.py index 1e690d07..3b25ac21 100755 --- a/examples/python-ros2-dataflow/random_turtle.py +++ b/examples/python-ros2-dataflow/random_turtle.py @@ -55,11 +55,11 @@ for i in range(500): # ROS2 Event elif event_kind == "external": - pose = event.inner()[0].as_py() + pose = event["value"][0].as_py() min_x = min([min_x, pose["x"]]) max_x = max([max_x, pose["x"]]) min_y = min([min_y, pose["y"]]) max_y = max([max_y, pose["y"]]) - dora_node.send_output("turtle_pose", event.inner()) + dora_node.send_output("turtle_pose", event["value"]) assert max_x - min_x > 1 or max_y - min_y > 1, "no turtle movement" diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 38e9eae2..5a4a1db9 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,4 +1,4 @@ -use crate::daemon_messages::DataflowId; +use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult}; use eyre::eyre; #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -18,11 +18,11 @@ pub enum CoordinatorRequest { pub enum DaemonEvent { AllNodesReady { dataflow_id: DataflowId, - success: bool, + exited_before_subscribe: Vec, }, AllNodesFinished { dataflow_id: DataflowId, - result: Result<(), String>, + result: DataflowDaemonResult, }, Heartbeat, } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index ae6fc262..ce6c459a 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -234,7 +234,7 @@ pub enum DaemonCoordinatorEvent { Spawn(SpawnDataflowNodes), AllNodesReady { dataflow_id: DataflowId, - success: bool, + exited_before_subscribe: Vec, }, StopDataflow { dataflow_id: DataflowId, diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 9677ec6b..18a48008 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,5 +1,7 @@ +use dora_message::uhlc; use std::{ - collections::BTreeSet, + borrow::Cow, + collections::{BTreeMap, BTreeSet}, fmt::Display, net::{IpAddr, Ipv4Addr}, path::PathBuf, @@ -85,17 +87,9 @@ pub enum DataflowStatus { pub enum ControlRequestReply { Error(String), CoordinatorStopped, - DataflowStarted { - uuid: Uuid, - }, - DataflowReloaded { - uuid: Uuid, - }, - DataflowStopped { - uuid: Uuid, - result: Result<(), String>, - }, - + DataflowStarted { uuid: Uuid }, + DataflowReloaded { uuid: Uuid }, + DataflowStopped { uuid: Uuid, result: DataflowResult }, DataflowList(DataflowList), DestroyOk, DaemonConnected(bool), @@ -118,3 +112,132 @@ impl Display for DataflowId { } } } + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowResult { + pub uuid: Uuid, + pub timestamp: uhlc::Timestamp, + pub node_results: BTreeMap>, +} + +impl DataflowResult { + pub fn ok_empty(uuid: Uuid, timestamp: uhlc::Timestamp) -> Self { + Self { + uuid, + timestamp, + node_results: Default::default(), + } + } + + pub fn is_ok(&self) -> bool { + self.node_results.values().all(|r| r.is_ok()) + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct DataflowDaemonResult { + pub timestamp: uhlc::Timestamp, + pub node_results: BTreeMap>, +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct NodeError { + pub timestamp: uhlc::Timestamp, + pub cause: NodeErrorCause, + pub exit_status: NodeExitStatus, +} + +impl std::fmt::Display for NodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.exit_status { + NodeExitStatus::Success => write!(f, ""), + NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"), + NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"), + NodeExitStatus::Signal(signal) => { + let signal_str: Cow<_> = match signal { + 1 => "SIGHUP".into(), + 2 => "SIGINT".into(), + 3 => "SIGQUIT".into(), + 4 => "SIGILL".into(), + 6 => "SIGABRT".into(), + 8 => "SIGFPE".into(), + 9 => "SIGKILL".into(), + 11 => "SIGSEGV".into(), + 13 => "SIGPIPE".into(), + 14 => "SIGALRM".into(), + 15 => "SIGTERM".into(), + 22 => "SIGABRT".into(), + 23 => "NSIG".into(), + other => other.to_string().into(), + }; + if matches!(self.cause, NodeErrorCause::GraceDuration) { + write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})") + } else { + write!(f, "exited because of signal {signal_str}") + } + } + NodeExitStatus::Unknown => write!(f, "unknown exit status"), + }?; + + match &self.cause { + NodeErrorCause::GraceDuration => {}, // handled above + NodeErrorCause::Cascading { caused_by_node } => write!( + f, + "\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora." + )?, + NodeErrorCause::Other { stderr } if stderr.is_empty() => {} + NodeErrorCause::Other { stderr } => { + let line: &str = "---------------------------------------------------------------------------------\n"; + write!(f, " with stderr output:\n{line}{stderr}{line}")? + }, + } + + Ok(()) + } +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub enum NodeErrorCause { + /// Node was killed because it didn't react to a stop message in time. + GraceDuration, + /// Node failed because another node failed before, + Cascading { + caused_by_node: NodeId, + }, + Other { + stderr: String, + }, +} + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub enum NodeExitStatus { + Success, + IoError(String), + ExitCode(i32), + Signal(i32), + Unknown, +} + +impl From> for NodeExitStatus { + fn from(result: Result) -> Self { + match result { + Ok(status) => { + if status.success() { + NodeExitStatus::Success + } else if let Some(code) = status.code() { + Self::ExitCode(code) + } else { + #[cfg(unix)] + { + use std::os::unix::process::ExitStatusExt; + if let Some(signal) = status.signal() { + return Self::Signal(signal); + } + } + Self::Unknown + } + } + Err(err) => Self::IoError(err.to_string()), + } + } +}