Browse Source

Merge branch 'main' into list-failed-dataflows

tags/v0.3.5-rc0
Philipp Oppermann 2 years ago
parent
commit
e90ffd0b7f
Failed to extract signature
47 changed files with 1380 additions and 676 deletions
  1. +43
    -5
      .github/workflows/ci.yml
  2. +117
    -33
      Cargo.lock
  3. +18
    -10
      apis/c++/node/src/lib.rs
  4. +0
    -1
      apis/python/node/dora/__init__.py
  5. +2
    -11
      apis/python/node/dora/__init__.pyi
  6. +17
    -9
      apis/python/node/src/lib.rs
  7. +35
    -57
      apis/python/operator/src/lib.rs
  8. +3
    -3
      binaries/cli/src/attach.rs
  9. +50
    -0
      binaries/cli/src/formatting.rs
  10. +43
    -9
      binaries/cli/src/main.rs
  11. +79
    -0
      binaries/cli/src/template/c/cmake-template.txt
  12. +14
    -16
      binaries/cli/src/template/c/dataflow-template.yml
  13. +69
    -0
      binaries/cli/src/template/c/listener/listener-template.c
  14. +43
    -49
      binaries/cli/src/template/c/mod.rs
  15. +71
    -0
      binaries/cli/src/template/c/talker/talker-template.c
  16. +85
    -0
      binaries/cli/src/template/cxx/cmake-template.txt
  17. +18
    -19
      binaries/cli/src/template/cxx/dataflow-template.yml
  18. +35
    -0
      binaries/cli/src/template/cxx/listener-template.cc
  19. +43
    -43
      binaries/cli/src/template/cxx/mod.rs
  20. +39
    -0
      binaries/cli/src/template/cxx/talker-template.cc
  21. +2
    -2
      binaries/cli/src/template/mod.rs
  22. +12
    -13
      binaries/cli/src/template/python/dataflow-template.yml
  23. +11
    -0
      binaries/cli/src/template/python/listener/listener-template.py
  24. +18
    -34
      binaries/cli/src/template/python/mod.rs
  25. +14
    -0
      binaries/cli/src/template/python/talker/talker-template.py
  26. +1
    -1
      binaries/cli/src/template/rust/Cargo-template.toml
  27. +15
    -15
      binaries/cli/src/template/rust/dataflow-template.yml
  28. +9
    -0
      binaries/cli/src/template/rust/listener/Cargo-template.toml
  29. +25
    -0
      binaries/cli/src/template/rust/listener/main-template.rs
  30. +25
    -61
      binaries/cli/src/template/rust/mod.rs
  31. +9
    -0
      binaries/cli/src/template/rust/talker/Cargo-template.toml
  32. +25
    -0
      binaries/cli/src/template/rust/talker/main-template.rs
  33. +46
    -67
      binaries/coordinator/src/lib.rs
  34. +4
    -7
      binaries/coordinator/src/listener.rs
  35. +2
    -0
      binaries/daemon/Cargo.toml
  36. +127
    -137
      binaries/daemon/src/lib.rs
  37. +49
    -41
      binaries/daemon/src/pending.rs
  38. +10
    -4
      binaries/daemon/src/spawn.rs
  39. +3
    -1
      binaries/runtime/src/operator/python.rs
  40. +5
    -0
      examples/python-dataflow/example.py
  41. +2
    -2
      examples/python-dataflow/requirements.txt
  42. +0
    -7
      examples/python-dataflow/run.rs
  43. +1
    -1
      examples/python-operator-dataflow/requirements.txt
  44. +2
    -2
      examples/python-ros2-dataflow/random_turtle.py
  45. +3
    -3
      libraries/core/src/coordinator_messages.rs
  46. +1
    -1
      libraries/core/src/daemon_messages.rs
  47. +135
    -12
      libraries/core/src/topics.rs

+ 43
- 5
.github/workflows/ci.yml View File

@@ -286,12 +286,12 @@ jobs:
cargo build --all
dora up
dora list
dora start dataflow.yml --name ci-rust-test
dora start dataflow.yml --name ci-rust-test --detach
sleep 10
dora stop --name ci-rust-test --grace-duration 5s
cd ..
dora build examples/rust-dataflow/dataflow_dynamic.yml
dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic
dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic --detach
cargo run -p rust-dataflow-example-sink-dynamic
sleep 5
dora stop --name ci-rust-dynamic --grace-duration 5s
@@ -315,16 +315,54 @@ jobs:
cd test_python_project
dora up
dora list
dora start dataflow.yml --name ci-python-test
dora start dataflow.yml --name ci-python-test --detach
sleep 10
dora stop --name ci-python-test --grace-duration 5s
pip install opencv-python
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic
pip install "numpy<2.0.0" opencv-python
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach
python ../examples/python-dataflow/plot_dynamic.py
sleep 5
dora stop --name ci-python-test --grace-duration 5s
dora destroy

- name: "Test CLI (C)"
timeout-minutes: 30
# fail-fast by using bash shell explictly
shell: bash
if: runner.os == 'Linux'
run: |
# Test C template Project
dora new test_c_project --lang c --internal-create-with-path-dependencies
cd test_c_project
dora up
dora list
cmake -B build
cmake --build build
cmake --install build
dora start dataflow.yml --name ci-c-test --detach
sleep 10
dora stop --name ci-c-test --grace-duration 5s
dora destroy
- name: "Test CLI (C++)"
timeout-minutes: 30
# fail-fast by using bash shell explictly
shell: bash
if: runner.os == 'Linux'
run: |
# Test C++ template Project
dora new test_cxx_project --lang cxx --internal-create-with-path-dependencies
cd test_cxx_project
dora up
dora list
cmake -B build
cmake --build build
cmake --install build
dora start dataflow.yml --name ci-cxx-test --detach
sleep 10
dora stop --name ci-cxx-test --grace-duration 5s
dora destroy

clippy:
name: "Clippy"
runs-on: ubuntu-latest


+ 117
- 33
Cargo.lock View File

@@ -885,9 +885,12 @@ dependencies = [

[[package]]
name = "atomic"
version = "0.5.3"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994"
dependencies = [
"bytemuck",
]

[[package]]
name = "atomic-waker"
@@ -982,7 +985,7 @@ dependencies = [
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper",
"sync_wrapper 0.1.2",
"tower",
"tower-layer",
"tower-service",
@@ -1925,6 +1928,16 @@ dependencies = [
"crossbeam-utils",
]

[[package]]
name = "crossbeam-skiplist"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]

[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@@ -2011,9 +2024,9 @@ checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991"

[[package]]
name = "cxx"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8194f089b6da4751d6c1da1ef37c17255df51f9346cdb160f8b096562ae4a85c"
checksum = "273dcfd3acd4e1e276af13ed2a43eea7001318823e7a726a6b3ed39b4acc0b82"
dependencies = [
"cc",
"cxxbridge-flags",
@@ -2023,9 +2036,9 @@ dependencies = [

[[package]]
name = "cxx-build"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e8df9a089caae66634d754672d5f909395f30f38af6ff19366980d8a8b57501"
checksum = "d8b2766fbd92be34e9ed143898fce6c572dc009de39506ed6903e5a05b68914e"
dependencies = [
"cc",
"codespan-reporting",
@@ -2038,15 +2051,15 @@ dependencies = [

[[package]]
name = "cxxbridge-flags"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25290be4751803672a70b98c68b51c1e7d0a640ab5a4377f240f9d2e70054cd1"
checksum = "839fcd5e43464614ffaa989eaf1c139ef1f0c51672a1ed08023307fa1b909ccd"

[[package]]
name = "cxxbridge-macro"
version = "1.0.123"
version = "1.0.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8cb317cb13604b4752416783bb25070381c36e844743e4146b7f8e55de7d140"
checksum = "4b2c1c1776b986979be68bb2285da855f8d8a35851a769fca8740df7c3d07877"
dependencies = [
"proc-macro2",
"quote",
@@ -2321,6 +2334,8 @@ dependencies = [
"aligned-vec",
"async-trait",
"bincode",
"crossbeam",
"crossbeam-skiplist",
"ctrlc",
"dora-arrow-convert",
"dora-core",
@@ -3960,7 +3975,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.5.7",
"socket2 0.4.10",
"tokio",
"tower-service",
"tracing",
@@ -3988,19 +4003,20 @@ dependencies = [

[[package]]
name = "hyper-rustls"
version = "0.26.0"
version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.3.1",
"hyper-util",
"rustls 0.22.4",
"rustls 0.23.10",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tower-service",
"webpki-roots 0.26.1",
]

[[package]]
@@ -4580,7 +4596,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19"
dependencies = [
"cfg-if 1.0.0",
"windows-targets 0.52.5",
"windows-targets 0.48.5",
]

[[package]]
@@ -6388,9 +6404,9 @@ dependencies = [

[[package]]
name = "proc-macro2"
version = "1.0.85"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23"
checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
dependencies = [
"unicode-ident",
]
@@ -6581,8 +6597,8 @@ checksum = "2e8b432585672228923edbbf64b8b12c14e1112f62e88737655b4a083dbcd78e"
dependencies = [
"bytes",
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"quinn-proto 0.9.6",
"quinn-udp 0.3.2",
"rustc-hash",
"rustls 0.20.9",
"thiserror",
@@ -6591,6 +6607,23 @@ dependencies = [
"webpki",
]

[[package]]
name = "quinn"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad"
dependencies = [
"bytes",
"pin-project-lite",
"quinn-proto 0.11.3",
"quinn-udp 0.5.2",
"rustc-hash",
"rustls 0.23.10",
"thiserror",
"tokio",
"tracing",
]

[[package]]
name = "quinn-proto"
version = "0.9.6"
@@ -6610,6 +6643,23 @@ dependencies = [
"webpki",
]

[[package]]
name = "quinn-proto"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe"
dependencies = [
"bytes",
"rand",
"ring 0.17.8",
"rustc-hash",
"rustls 0.23.10",
"slab",
"thiserror",
"tinyvec",
"tracing",
]

[[package]]
name = "quinn-udp"
version = "0.3.2"
@@ -6617,12 +6667,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "641538578b21f5e5c8ea733b736895576d0fe329bb883b937db6f4d163dbaaf4"
dependencies = [
"libc",
"quinn-proto",
"quinn-proto 0.9.6",
"socket2 0.4.10",
"tracing",
"windows-sys 0.42.0",
]

[[package]]
name = "quinn-udp"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46"
dependencies = [
"libc",
"once_cell",
"socket2 0.5.7",
"tracing",
"windows-sys 0.52.0",
]

[[package]]
name = "quote"
version = "1.0.36"
@@ -7830,9 +7893,9 @@ checksum = "19b30a45b0cd0bcca8037f3d0dc3421eaf95327a17cad11964fb8179b4fc4832"

[[package]]
name = "reqwest"
version = "0.12.4"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10"
checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37"
dependencies = [
"base64 0.22.1",
"bytes",
@@ -7851,13 +7914,14 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.22.4",
"quinn 0.11.2",
"rustls 0.23.10",
"rustls-pemfile 2.1.2",
"rustls-pki-types",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"sync_wrapper 1.0.1",
"tokio",
"tokio-rustls",
"tower-service",
@@ -8235,6 +8299,20 @@ dependencies = [
"zeroize",
]

[[package]]
name = "rustls"
version = "0.23.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402"
dependencies = [
"once_cell",
"ring 0.17.8",
"rustls-pki-types",
"rustls-webpki",
"subtle",
"zeroize",
]

[[package]]
name = "rustls-native-certs"
version = "0.6.3"
@@ -9027,6 +9105,12 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"

[[package]]
name = "sync_wrapper"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"

[[package]]
name = "syntect"
version = "5.2.0"
@@ -9339,11 +9423,11 @@ dependencies = [

[[package]]
name = "tokio-rustls"
version = "0.25.0"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
"rustls 0.22.4",
"rustls 0.23.10",
"rustls-pki-types",
"tokio",
]
@@ -9812,9 +9896,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"

[[package]]
name = "uuid"
version = "1.8.0"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
checksum = "3ea73390fe27785838dcbf75b91b1d84799e28f1ce71e6f372a5dc2200c80de5"
dependencies = [
"atomic",
"getrandom",
@@ -9826,9 +9910,9 @@ dependencies = [

[[package]]
name = "uuid-macro-internal"
version = "1.8.0"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2"
checksum = "cdb394c528d1cb434c2f0522027e50c0705305caf6d20405c07a6f7e4cf9543c"
dependencies = [
"proc-macro2",
"quote",
@@ -11138,7 +11222,7 @@ dependencies = [
"async-trait",
"futures",
"log",
"quinn",
"quinn 0.9.4",
"rustls 0.20.9",
"rustls-native-certs",
"rustls-pemfile 1.0.4",


+ 18
- 10
apis/c++/node/src/lib.rs View File

@@ -1,8 +1,8 @@
use std::any::Any;
use std::{any::Any, vec};

use dora_node_api::{
self,
arrow::array::{AsArray, BinaryArray},
arrow::array::{AsArray, UInt8Array},
merged::{MergeExternal, MergedEvent},
Event, EventStream,
};
@@ -138,18 +138,26 @@ fn event_type(event: &DoraEvent) -> ffi::DoraEventType {
}

fn event_as_input(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraInput> {
let Some(Event::Input {
id,
metadata: _,
data,
}) = event.0
else {
let Some(Event::Input { id, metadata, data }) = event.0 else {
bail!("not an input event");
};
let data: Option<&BinaryArray> = data.as_binary_opt();
let data = match metadata.type_info.data_type {
dora_node_api::arrow::datatypes::DataType::UInt8 => {
let array: &UInt8Array = data.as_primitive();
array.values().to_vec()
}
dora_node_api::arrow::datatypes::DataType::Null => {
vec![]
}
_ => {
todo!("dora C++ Node does not yet support higher level type of arrow. Only UInt8.
The ultimate solution should be based on arrow FFI interface. Feel free to contribute :)")
}
};

Ok(ffi::DoraInput {
id: id.into(),
data: data.map(|d| d.value(0).to_owned()).unwrap_or_default(),
data,
})
}



+ 0
- 1
apis/python/node/dora/__init__.py View File

@@ -13,7 +13,6 @@ from .dora import *

from .dora import (
Node,
PyEvent,
Ros2Context,
Ros2Node,
Ros2NodeOptions,


+ 2
- 11
apis/python/node/dora/__init__.pyi View File

@@ -22,7 +22,7 @@ from dora import Node
node = Node()
```"""

def __init__(self) -> None:
def __init__(self, node_id: str=None) -> None:
"""The custom node API lets you integrate `dora` into your application.
It allows you to retrieve input and send output in any fashion you want.

@@ -46,7 +46,7 @@ This method returns the parsed dataflow YAML file."""
"""Merge an external event stream with dora main loop.
This currently only work with ROS2."""

def next(self, timeout: float=None) -> dora.PyEvent:
def next(self, timeout: float=None) -> dict:
"""`.next()` gives you the next input that the node has received.
It blocks until the next event becomes available.
You can use timeout in seconds to return if no input is available.
@@ -88,15 +88,6 @@ node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})
def __next__(self) -> typing.Any:
"""Implement next(self)."""

@typing.final
class PyEvent:
"""Dora Event"""

def inner(self):...

def __getitem__(self, key: typing.Any) -> typing.Any:
"""Return self[key]."""

@typing.final
class Ros2Context:
"""ROS2 Context holding all messages definition for receiving and sending messages to ROS2.


+ 17
- 9
apis/python/node/src/lib.rs View File

@@ -24,6 +24,7 @@ use pyo3::types::{PyBytes, PyDict};
/// node = Node()
/// ```
///
/// :type node_id: str, optional
#[pyclass]
pub struct Node {
events: Events,
@@ -67,11 +68,18 @@ impl Node {
/// ```
///
/// :type timeout: float, optional
/// :rtype: dora.PyEvent
/// :rtype: dict
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self, py: Python, timeout: Option<f32>) -> PyResult<Option<PyEvent>> {
pub fn next(&mut self, py: Python, timeout: Option<f32>) -> PyResult<Option<Py<PyDict>>> {
let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32)));
Ok(event)
if let Some(event) = event {
let dict = event
.to_py_dict(py)
.context("Could not convert event into a dict")?;
Ok(Some(dict))
} else {
Ok(None)
}
}

/// You can iterate over the event stream with a loop
@@ -84,10 +92,11 @@ impl Node {
/// case "image":
/// ```
///
/// :rtype: dora.PyEvent
pub fn __next__(&mut self, py: Python) -> PyResult<Option<PyEvent>> {
let event = py.allow_threads(|| self.events.recv(None));
Ok(event)
/// Default behaviour is to timeout after 2 seconds.
///
/// :rtype: dict
pub fn __next__(&mut self, py: Python) -> PyResult<Option<Py<PyDict>>> {
self.next(py, None)
}

/// You can iterate over the event stream with a loop
@@ -100,7 +109,7 @@ impl Node {
/// case "image":
/// ```
///
/// :rtype: dora.PyEvent
/// :rtype: dict
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
@@ -262,7 +271,6 @@ fn dora(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {

m.add_function(wrap_pyfunction!(start_runtime, &m)?)?;
m.add_class::<Node>()?;
m.add_class::<PyEvent>()?;
m.setattr("__version__", env!("CARGO_PKG_VERSION"))?;
m.setattr("__author__", "Dora-rs Authors")?;



+ 35
- 57
apis/python/operator/src/lib.rs View File

@@ -1,69 +1,52 @@
use arrow::{array::ArrayRef, pyarrow::ToPyArrow};
use std::collections::HashMap;

use arrow::pyarrow::ToPyArrow;
use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters};
use eyre::{Context, Result};
use pyo3::{exceptions::PyLookupError, prelude::*, pybacked::PyBackedStr, types::PyDict};
use pyo3::{
prelude::*,
pybacked::PyBackedStr,
types::{IntoPyDict, PyDict},
};

/// Dora Event
#[pyclass]
#[derive(Debug)]
pub struct PyEvent {
event: MergedEvent<PyObject>,
data: Option<ArrayRef>,
}

// Dora Event
#[pymethods]
impl PyEvent {
///
/// :rtype: dora.PyObject
pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult<Option<PyObject>> {
if key == "kind" {
let kind = match &self.event {
MergedEvent::Dora(_) => "dora",
MergedEvent::External(_) => "external",
};
return Ok(Some(kind.to_object(py)));
}
pub fn to_py_dict(self, py: Python<'_>) -> PyResult<Py<PyDict>> {
let mut pydict = HashMap::new();
match &self.event {
MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)),
MergedEvent::External(_) => pydict.insert("kind", "external".to_object(py)),
};
match &self.event {
MergedEvent::Dora(event) => {
let value = match key {
"type" => Some(Self::ty(event).to_object(py)),
"id" => Self::id(event).map(|v| v.to_object(py)),
"value" => self.value(py)?,
"metadata" => Self::metadata(event, py),
"error" => Self::error(event).map(|v| v.to_object(py)),
other => {
return Err(PyLookupError::new_err(format!(
"event has no property `{other}`"
)))
}
};
Ok(value)
if let Some(id) = Self::id(event) {
pydict.insert("id", id.into_py(py));
}
pydict.insert("type", Self::ty(event).to_object(py));

if let Some(value) = self.value(py)? {
pydict.insert("value", value);
}
if let Some(metadata) = Self::metadata(event, py) {
pydict.insert("metadata", metadata);
}
if let Some(error) = Self::error(event) {
pydict.insert("error", error.to_object(py));
}
}
MergedEvent::External(event) => {
let value = match key {
"value" => event,
_ => todo!(),
};

Ok(Some(value.clone()))
pydict.insert("value", event.clone());
}
}
}

pub fn inner(&mut self) -> Option<&PyObject> {
match &self.event {
MergedEvent::Dora(_) => None,
MergedEvent::External(event) => Some(event),
}
Ok(pydict.into_py_dict_bound(py).unbind())
}

fn __str__(&self) -> PyResult<String> {
Ok(format!("{:#?}", &self.event))
}
}

impl PyEvent {
fn ty(event: &Event) -> &str {
match event {
Event::Stop => "STOP",
@@ -84,9 +67,9 @@ impl PyEvent {

/// Returns the payload of an input event as an arrow array (if any).
fn value(&self, py: Python<'_>) -> PyResult<Option<PyObject>> {
match (&self.event, &self.data) {
(MergedEvent::Dora(Event::Input { .. }), Some(data)) => {
// TODO: Does this call leak data?
match &self.event {
MergedEvent::Dora(Event::Input { data, .. }) => {
// TODO: Does this call leak data?&
let array_data = data.to_data().to_pyarrow(py)?;
Ok(Some(array_data))
}
@@ -116,13 +99,8 @@ impl From<Event> for PyEvent {
}

impl From<MergedEvent<PyObject>> for PyEvent {
fn from(mut event: MergedEvent<PyObject>) -> Self {
let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event {
Some(data.clone())
} else {
None
};
Self { event, data }
fn from(event: MergedEvent<PyObject>) -> Self {
Self { event }
}
}



+ 3
- 3
binaries/cli/src/attach.rs View File

@@ -11,6 +11,8 @@ use std::{path::PathBuf, sync::mpsc, time::Duration};
use tracing::{error, info};
use uuid::Uuid;

use crate::handle_dataflow_result;

pub fn attach_dataflow(
dataflow: Descriptor,
dataflow_path: PathBuf,
@@ -133,9 +135,7 @@ pub fn attach_dataflow(
ControlRequestReply::DataflowStarted { uuid: _ } => (),
ControlRequestReply::DataflowStopped { uuid, result } => {
info!("dataflow {uuid} stopped");
break result
.map_err(|err| eyre::eyre!(err))
.wrap_err("dataflow failed");
break handle_dataflow_result(result, Some(uuid));
}
ControlRequestReply::DataflowReloaded { uuid } => {
info!("dataflow {uuid} reloaded")


+ 50
- 0
binaries/cli/src/formatting.rs View File

@@ -0,0 +1,50 @@
use dora_core::topics::{DataflowResult, NodeErrorCause};

pub struct FormatDataflowError<'a>(pub &'a DataflowResult);

impl std::fmt::Display for FormatDataflowError<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f)?;
let failed = self
.0
.node_results
.iter()
.filter_map(|(id, r)| r.as_ref().err().map(|e| (id, e)));
let total_failed = failed.clone().count();

let mut non_cascading: Vec<_> = failed
.clone()
.filter(|(_, e)| !matches!(e.cause, NodeErrorCause::Cascading { .. }))
.collect();
non_cascading.sort_by_key(|(_, e)| e.timestamp);
// try to print earliest non-cascading error
let hidden = if !non_cascading.is_empty() {
let printed = non_cascading.len();
for (id, err) in non_cascading {
writeln!(f, "Node `{id}` failed: {err}")?;
}
total_failed - printed
} else {
// no non-cascading errors -> print earliest cascading
let mut all: Vec<_> = failed.collect();
all.sort_by_key(|(_, e)| e.timestamp);
if let Some((id, err)) = all.first() {
write!(f, "Node `{id}` failed: {err}")?;
total_failed - 1
} else {
write!(f, "unknown error")?;
0
}
};

if hidden > 1 {
write!(
f,
"\n\nThere are {hidden} consequential errors. Check the `out/{}` folder for full details.",
self.0.uuid
)?;
}

Ok(())
}
}

+ 43
- 9
binaries/cli/src/main.rs View File

@@ -15,6 +15,7 @@ use dora_tracing::set_up_tracing;
use dora_tracing::set_up_tracing_opts;
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
use std::{io::Write, net::SocketAddr};
use std::{
net::{IpAddr, Ipv4Addr},
@@ -28,6 +29,7 @@ use uuid::Uuid;
mod attach;
mod build;
mod check;
mod formatting;
mod graph;
mod logs;
mod template;
@@ -76,7 +78,7 @@ enum Command {
#[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: PathBuf,
},
/// Generate a new project, node or operator. Choose the language between Rust, Python, C or C++.
/// Generate a new project or node. Choose the language between Rust, Python, C or C++.
New {
#[clap(flatten)]
args: CommandNew,
@@ -118,6 +120,9 @@ enum Command {
/// Attach to the dataflow and wait for its completion
#[clap(long, action)]
attach: bool,
/// Run the dataflow in background
#[clap(long, action)]
detach: bool,
/// Enable hot reloading (Python only)
#[clap(long, action)]
hot_reload: bool,
@@ -231,7 +236,6 @@ pub struct CommandNew {
#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
enum Kind {
Dataflow,
Operator,
CustomNode,
}

@@ -344,6 +348,7 @@ fn run() -> eyre::Result<()> {
coordinator_addr,
coordinator_port,
attach,
detach,
hot_reload,
} => {
let dataflow_descriptor =
@@ -371,6 +376,16 @@ fn run() -> eyre::Result<()> {
&mut *session,
)?;

let attach = match (attach, detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
(true, false) => true,
(false, true) => false,
(false, false) => {
println!("attaching to dataflow (use `--detach` to run in background)");
true
}
};

if attach {
attach_dataflow(
dataflow_descriptor,
@@ -460,7 +475,8 @@ fn run() -> eyre::Result<()> {
);
}

Daemon::run_dataflow(&dataflow_path).await
let result = Daemon::run_dataflow(&dataflow_path).await?;
handle_dataflow_result(result, None)
}
None => {
if coordinator_addr.ip() == LOCALHOST {
@@ -540,14 +556,32 @@ fn stop_dataflow(
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStopped { uuid: _, result } => result
.map_err(|err| eyre::eyre!(err))
.wrap_err("dataflow failed"),
ControlRequestReply::DataflowStopped { uuid, result } => {
handle_dataflow_result(result, Some(uuid))
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"),
}
}

fn handle_dataflow_result(
result: dora_core::topics::DataflowResult,
uuid: Option<Uuid>,
) -> Result<(), eyre::Error> {
if result.is_ok() {
Ok(())
} else {
Err(match uuid {
Some(uuid) => {
eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result))
}
None => {
eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result))
}
})
}
}

fn stop_dataflow_by_name(
name: String,
grace_duration: Option<Duration>,
@@ -565,9 +599,9 @@ fn stop_dataflow_by_name(
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStopped { uuid: _, result } => result
.map_err(|err| eyre::eyre!(err))
.wrap_err("dataflow failed"),
ControlRequestReply::DataflowStopped { uuid, result } => {
handle_dataflow_result(result, Some(uuid))
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected stop dataflow reply: {other:?}"),
}


+ 79
- 0
binaries/cli/src/template/c/cmake-template.txt View File

@@ -0,0 +1,79 @@
cmake_minimum_required(VERSION 3.21)
project(cxx-dataflow LANGUAGES C)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_FLAGS "-fPIC")

set(DORA_ROOT_DIR "__DORA_PATH__" CACHE FILEPATH "Path to the root of dora")

set(dora_c_include_dir "${CMAKE_CURRENT_BINARY_DIR}/include/c")
if(DORA_ROOT_DIR)
include(ExternalProject)
ExternalProject_Add(Dora
SOURCE_DIR ${DORA_ROOT_DIR}
BUILD_IN_SOURCE True
CONFIGURE_COMMAND ""
BUILD_COMMAND
cargo build
--package dora-node-api-c
INSTALL_COMMAND ""
)

add_custom_command(OUTPUT ${dora_c_include_dir}
WORKING_DIRECTORY ${DORA_ROOT_DIR}
DEPENDS Dora
COMMAND
mkdir ${CMAKE_CURRENT_BINARY_DIR}/include/c -p
&&
cp apis/c/node ${CMAKE_CURRENT_BINARY_DIR}/include/c -r
)

add_custom_target(Dora_c DEPENDS ${dora_c_include_dir})
set(dora_link_dirs ${DORA_ROOT_DIR}/target/debug)
else()
include(ExternalProject)
ExternalProject_Add(Dora
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/dora
GIT_REPOSITORY https://github.com/dora-rs/dora.git
GIT_TAG main
BUILD_IN_SOURCE True
CONFIGURE_COMMAND ""
BUILD_COMMAND
cargo build
--package dora-node-api-c
--target-dir ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target
INSTALL_COMMAND ""
)

add_custom_command(OUTPUT ${dora_c_include_dir}
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target
DEPENDS Dora
COMMAND
mkdir ${CMAKE_CURRENT_BINARY_DIR}/include/c -p
&&
cp ../apis/c/node ${CMAKE_CURRENT_BINARY_DIR}/include/c -r
)

set(dora_link_dirs ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target/debug)

add_custom_target(Dora_c DEPENDS ${dora_c_include_dir})
endif()

link_directories(${dora_link_dirs})

add_executable(talker_1 talker_1/node.c)
add_dependencies(talker_1 Dora_c)
target_include_directories(talker_1 PRIVATE ${dora_c_include_dir})
target_link_libraries(talker_1 dora_node_api_c m)

add_executable(talker_2 talker_2/node.c)
add_dependencies(talker_2 Dora_c)
target_include_directories(talker_2 PRIVATE ${dora_c_include_dir})
target_link_libraries(talker_2 dora_node_api_c m)

add_executable(listener_1 listener_1/node.c)
add_dependencies(listener_1 Dora_c)
target_include_directories(listener_1 PRIVATE ${dora_c_include_dir})
target_link_libraries(listener_1 dora_node_api_c m)

install(TARGETS listener_1 talker_1 talker_2 DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin)

+ 14
- 16
binaries/cli/src/template/c/dataflow-template.yml View File

@@ -1,24 +1,22 @@
nodes:
- id: op_1
operator:
shared-library: build/op_1
- id: talker_1
custom:
source: bin/talker_1
inputs:
foo: dora/timer/millis/100
tick: dora/timer/millis/100
outputs:
- bar
- id: op_2
operator:
shared-library: build/op_2
- speech
- id: talker_2
custom:
source: bin/talker_2
inputs:
foo: dora/timer/secs/2
tick: dora/timer/secs/2
outputs:
- bar
- speech

- id: custom-node_1
- id: listener_1
custom:
source: build/node_1
source: bin/listener_1
inputs:
input-1: op_1/bar
input-2: op_2/bar
outputs:
- foo
speech-1: talker_1/speech
speech-2: talker_2/speech

+ 69
- 0
binaries/cli/src/template/c/listener/listener-template.c View File

@@ -0,0 +1,69 @@
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include "node_api.h"

// sleep
#ifdef _WIN32
#include <Windows.h>
#else
#include <unistd.h>
#endif

int main()
{
void *dora_context = init_dora_context_from_env();
if (dora_context == NULL)
{
fprintf(stderr, "[c node] init dora context failed\n");
return -1;
}

printf("[c node] dora context initialized\n");

for (char i = 0; i < 20; i++)
{
void *event = dora_next_event(dora_context);
if (event == NULL)
{
printf("[c node] ERROR: unexpected end of event\n");
return -1;
}

enum DoraEventType ty = read_dora_event_type(event);

if (ty == DoraEventType_Input)
{
char *id_ptr;
size_t id_len;
read_dora_input_id(event, &id_ptr, &id_len);

char *data_ptr;
size_t data_len;
read_dora_input_data(event, &data_ptr, &data_len);

printf("I heard %s from %s\n", data_ptr, id_ptr);
}
else if (ty == DoraEventType_Stop)
{
printf("[c node] received stop event\n");
free_dora_event(event);
break;
}
else if (ty == DoraEventType_InputClosed) {
printf("[c node] received input closed event\n");
}
else
{
printf("[c node] received unexpected event: %d\n", ty);
free_dora_event(event);
break;
}

free_dora_event(event);
}

free_dora_context(dora_context);

return 0;
}

+ 43
- 49
binaries/cli/src/template/c/mod.rs View File

@@ -1,12 +1,15 @@
use dora_node_api_c::HEADER_NODE_API;
use dora_operator_api_c::{HEADER_OPERATOR_API, HEADER_OPERATOR_TYPES};
use eyre::{bail, Context};
use eyre::{bail, Context, ContextCompat};
use std::{
fs,
path::{Path, PathBuf},
};

pub fn create(args: crate::CommandNew) -> eyre::Result<()> {
const NODE: &str = include_str!("node/node-template.c");
const TALKER: &str = include_str!("talker/talker-template.c");
const LISTENER: &str = include_str!("listener/listener-template.c");

pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> {
let crate::CommandNew {
kind,
lang: _,
@@ -15,13 +18,16 @@ pub fn create(args: crate::CommandNew) -> eyre::Result<()> {
} = args;

match kind {
crate::Kind::Operator => create_operator(name, path),
crate::Kind::CustomNode => create_custom_node(name, path),
crate::Kind::Dataflow => create_dataflow(name, path),
crate::Kind::CustomNode => create_custom_node(name, path, NODE),
crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps),
}
}

fn create_dataflow(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrReport> {
fn create_dataflow(
name: String,
path: Option<PathBuf>,
use_path_deps: bool,
) -> Result<(), eyre::ErrReport> {
const DATAFLOW_YML: &str = include_str!("dataflow-template.yml");

if name.contains('/') {
@@ -41,9 +47,10 @@ fn create_dataflow(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrR
fs::write(&dataflow_yml_path, dataflow_yml)
.with_context(|| format!("failed to write `{}`", dataflow_yml_path.display()))?;

create_operator("op_1".into(), Some(root.join("op_1")))?;
create_operator("op_2".into(), Some(root.join("op_2")))?;
create_custom_node("node_1".into(), Some(root.join("node_1")))?;
create_custom_node("talker_1".into(), Some(root.join("talker_1")), TALKER)?;
create_custom_node("talker_2".into(), Some(root.join("talker_2")), TALKER)?;
create_custom_node("listener_1".into(), Some(root.join("listener_1")), LISTENER)?;
create_cmakefile(root.to_path_buf(), use_path_deps)?;

println!(
"Created new C dataflow at `{name}` at {}",
@@ -53,47 +60,34 @@ fn create_dataflow(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrR
Ok(())
}

fn create_operator(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrReport> {
const OPERATOR: &str = include_str!("operator/operator-template.c");

if name.contains('/') {
bail!("operator name must not contain `/` separators");
}
if name.contains('-') {
bail!("operator name must not contain `-` separators");
}
if !name.is_ascii() {
bail!("operator name must be ASCII");
}

// create directories
let root = path.as_deref().unwrap_or_else(|| Path::new(&name));
fs::create_dir(root)
.with_context(|| format!("failed to create directory `{}`", root.display()))?;

let operator_path = root.join("operator.c");
fs::write(&operator_path, OPERATOR)
.with_context(|| format!("failed to write `{}`", operator_path.display()))?;
let header_api_path = root.join("operator_api.h");
let header_type_path = root.join("operator_types.h");
fs::write(&header_api_path, HEADER_OPERATOR_API)
.with_context(|| format!("failed to write `{}`", header_api_path.display()))?;
fs::write(&header_type_path, HEADER_OPERATOR_TYPES)
.with_context(|| format!("failed to write `{}`", header_type_path.display()))?;

// TODO: Makefile?

println!(
"Created new C operator `{name}` at {}",
Path::new(".").join(root).display()
);

fn create_cmakefile(root: PathBuf, use_path_deps: bool) -> Result<(), eyre::ErrReport> {
const CMAKEFILE: &str = include_str!("cmake-template.txt");

let cmake_file = if use_path_deps {
let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR"));
let workspace_dir = manifest_dir
.parent()
.context("Could not get manifest parent folder")?
.parent()
.context("Could not get manifest grandparent folder")?;
CMAKEFILE.replace("__DORA_PATH__", workspace_dir.to_str().unwrap())
} else {
CMAKEFILE.replace("__DORA_PATH__", "")
};

let cmake_path = root.join("CMakeLists.txt");
fs::write(&cmake_path, cmake_file)
.with_context(|| format!("failed to write `{}`", cmake_path.display()))?;

println!("Created new CMakeLists.txt at {}", cmake_path.display());
Ok(())
}

fn create_custom_node(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrReport> {
const NODE: &str = include_str!("node/node-template.c");

fn create_custom_node(
name: String,
path: Option<PathBuf>,
template_scripts: &str,
) -> Result<(), eyre::ErrReport> {
if name.contains('/') {
bail!("node name must not contain `/` separators");
}
@@ -107,7 +101,7 @@ fn create_custom_node(name: String, path: Option<PathBuf>) -> Result<(), eyre::E
.with_context(|| format!("failed to create directory `{}`", root.display()))?;

let node_path = root.join("node.c");
fs::write(&node_path, NODE)
fs::write(&node_path, template_scripts)
.with_context(|| format!("failed to write `{}`", node_path.display()))?;
let header_path = root.join("node_api.h");
fs::write(&header_path, HEADER_NODE_API)


+ 71
- 0
binaries/cli/src/template/c/talker/talker-template.c View File

@@ -0,0 +1,71 @@
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include "node_api.h"

// sleep
#ifdef _WIN32
#include <Windows.h>
#else
#include <unistd.h>
#endif

int main()
{
void *dora_context = init_dora_context_from_env();
if (dora_context == NULL)
{
fprintf(stderr, "[c node] init dora context failed\n");
return -1;
}

printf("[c node] dora context initialized\n");

for (char i = 0; i < 10; i++)
{
void *event = dora_next_event(dora_context);
if (event == NULL)
{
printf("[c node] ERROR: unexpected end of event\n");
return -1;
}

enum DoraEventType ty = read_dora_event_type(event);

if (ty == DoraEventType_Input)
{
char *data;
size_t data_len;
read_dora_input_data(event, &data, &data_len);

assert(data_len == 0);

char out_id[] = "speech";
char out_data[] = "Hello World";

dora_send_output(dora_context, out_id, strlen(out_id), out_data, strlen(out_data));
}
else if (ty == DoraEventType_Stop)
{
printf("[c node] received stop event\n");
free_dora_event(event);
break;
}
else
{
printf("[c node] received unexpected event: %d\n", ty);
free_dora_event(event);
break;
}

free_dora_event(event);
}

printf("[c node] talker 10 events\n");

free_dora_context(dora_context);

printf("[c node] finished successfully\n");

return 0;
}

+ 85
- 0
binaries/cli/src/template/cxx/cmake-template.txt View File

@@ -0,0 +1,85 @@
cmake_minimum_required(VERSION 3.21)
project(cxx-dataflow LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_FLAGS "-fPIC")

set(DORA_ROOT_DIR "__DORA_PATH__" CACHE FILEPATH "Path to the root of dora")

set(dora_cxx_include_dir "${CMAKE_CURRENT_BINARY_DIR}/include/cxx")
set(node_bridge "${CMAKE_CURRENT_BINARY_DIR}/node_bridge.cc")

if(DORA_ROOT_DIR)
include(ExternalProject)
ExternalProject_Add(Dora
SOURCE_DIR ${DORA_ROOT_DIR}
BUILD_IN_SOURCE True
CONFIGURE_COMMAND ""
BUILD_COMMAND
cargo build
--package dora-node-api-cxx
INSTALL_COMMAND ""
)

add_custom_command(OUTPUT ${node_bridge} ${dora_cxx_include_dir} ${operator_bridge} ${dora_c_include_dir}
WORKING_DIRECTORY ${DORA_ROOT_DIR}
DEPENDS Dora
COMMAND
mkdir ${dora_cxx_include_dir} -p
&&
cp target/cxxbridge/dora-node-api-cxx/src/lib.rs.cc ${node_bridge}
&&
cp target/cxxbridge/dora-node-api-cxx/src/lib.rs.h ${dora_cxx_include_dir}/dora-node-api.h
)
add_custom_target(Dora_cxx DEPENDS ${node_bridge} ${dora_cxx_include_dir})
set(dora_link_dirs ${DORA_ROOT_DIR}/target/debug)
else()
include(ExternalProject)
ExternalProject_Add(Dora
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/dora
GIT_REPOSITORY https://github.com/dora-rs/dora.git
GIT_TAG main
BUILD_IN_SOURCE True
CONFIGURE_COMMAND ""
BUILD_COMMAND
cargo build
--package dora-node-api-cxx
--target-dir ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target
INSTALL_COMMAND ""
)

add_custom_command(OUTPUT ${node_bridge} ${dora_cxx_include_dir}
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target
DEPENDS Dora
COMMAND
mkdir ${dora_cxx_include_dir} -p
&&
cp cxxbridge/dora-node-api-cxx/src/lib.rs.cc ${node_bridge}
&&
cp cxxbridge/dora-node-api-cxx/src/lib.rs.h ${dora_cxx_include_dir}/dora-node-api.h
)

set(dora_link_dirs ${CMAKE_CURRENT_BINARY_DIR}/dora/src/Dora/target/debug)

add_custom_target(Dora_cxx DEPENDS ${node_bridge} ${dora_cxx_include_dir})
endif()

link_directories(${dora_link_dirs})

add_executable(talker_1 talker_1/node.cc ${node_bridge})
add_dependencies(talker_1 Dora_cxx)
target_include_directories(talker_1 PRIVATE ${dora_cxx_include_dir})
target_link_libraries(talker_1 dora_node_api_cxx)

add_executable(talker_2 talker_2/node.cc ${node_bridge})
add_dependencies(talker_2 Dora_cxx)
target_include_directories(talker_2 PRIVATE ${dora_cxx_include_dir})
target_link_libraries(talker_2 dora_node_api_cxx)

add_executable(listener_1 listener_1/node.cc ${node_bridge})
add_dependencies(listener_1 Dora_cxx)
target_include_directories(listener_1 PRIVATE ${dora_cxx_include_dir})
target_link_libraries(listener_1 dora_node_api_cxx)

install(TARGETS listener_1 talker_1 talker_2 DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin)

+ 18
- 19
binaries/cli/src/template/cxx/dataflow-template.yml View File

@@ -1,23 +1,22 @@
nodes:
- id: runtime-node_1
operators:
- id: op_1
shared-library: build/op_1
inputs:
tick: dora/timer/millis/100
outputs:
- some-output
- id: op_2
shared-library: build/op_2
inputs:
tick: dora/timer/secs/2
outputs:
- some-output
- id: talker_1
custom:
source: bin/talker_1
inputs:
tick: dora/timer/millis/100
outputs:
- speech
- id: talker_2
custom:
source: bin/talker_2
inputs:
tick: dora/timer/secs/2
outputs:
- speech

- id: custom-node_1
- id: listener_1
custom:
source: build/node_1
source: bin/listener_1
inputs:
tick: dora/timer/secs/1
input-1: op_1/some-output
input-2: op_2/some-output
speech-1: talker_1/speech
speech-2: talker_2/speech

+ 35
- 0
binaries/cli/src/template/cxx/listener-template.cc View File

@@ -0,0 +1,35 @@
#include "dora-node-api.h" // adjust this path if necessary

#include <iostream>
#include <vector>

int main()
{
std::cout << "HELLO FROM C++" << std::endl;
unsigned char counter = 0;

auto dora_node = init_dora_node();

while (1)
{
auto event = dora_node.events->next();
auto ty = event_type(event);

if (ty == DoraEventType::AllInputsClosed)
{
break;
}
else if (ty == DoraEventType::Input)
{
auto input = event_as_input(std::move(event));
auto input_id = input.id;
auto message = std::string(reinterpret_cast<const char*>(input.data.data()), input.data.size());
std::cout << "I heard " << message << " from " << std::string(input_id) << std::endl;
}
else {
std::cerr << "Unknown event type " << static_cast<int>(ty) << std::endl;
}
}

return 0;;
}

+ 43
- 43
binaries/cli/src/template/cxx/mod.rs View File

@@ -1,10 +1,14 @@
use eyre::{bail, Context};
use eyre::{bail, Context, ContextCompat};
use std::{
fs,
path::{Path, PathBuf},
};

pub fn create(args: crate::CommandNew) -> eyre::Result<()> {
const NODE: &str = include_str!("node-template.cc");
const TALKER: &str = include_str!("talker-template.cc");
const LISTENER: &str = include_str!("listener-template.cc");

pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> {
let crate::CommandNew {
kind,
lang: _,
@@ -13,13 +17,16 @@ pub fn create(args: crate::CommandNew) -> eyre::Result<()> {
} = args;

match kind {
crate::Kind::Operator => create_operator(name, path),
crate::Kind::CustomNode => create_custom_node(name, path),
crate::Kind::Dataflow => create_dataflow(name, path),
crate::Kind::CustomNode => create_custom_node(name, path, NODE),
crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps),
}
}

fn create_dataflow(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrReport> {
fn create_dataflow(
name: String,
path: Option<PathBuf>,
use_path_deps: bool,
) -> Result<(), eyre::ErrReport> {
const DATAFLOW_YML: &str = include_str!("dataflow-template.yml");

if name.contains('/') {
@@ -39,9 +46,10 @@ fn create_dataflow(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrR
fs::write(&dataflow_yml_path, dataflow_yml)
.with_context(|| format!("failed to write `{}`", dataflow_yml_path.display()))?;

create_operator("op_1".into(), Some(root.join("op_1")))?;
create_operator("op_2".into(), Some(root.join("op_2")))?;
create_custom_node("node_1".into(), Some(root.join("node_1")))?;
create_custom_node("talker_1".into(), Some(root.join("talker_1")), TALKER)?;
create_custom_node("talker_2".into(), Some(root.join("talker_2")), TALKER)?;
create_custom_node("listener_1".into(), Some(root.join("listener_1")), LISTENER)?;
create_cmakefile(root.to_path_buf(), use_path_deps)?;

println!(
"Created new C++ dataflow at `{name}` at {}",
@@ -51,42 +59,34 @@ fn create_dataflow(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrR
Ok(())
}

fn create_operator(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrReport> {
const OPERATOR: &str = include_str!("operator-template.cc");
const HEADER: &str = include_str!("operator-template.h");

if name.contains('/') {
bail!("operator name must not contain `/` separators");
}
if !name.is_ascii() {
bail!("operator name must be ASCII");
}

// create directories
let root = path.as_deref().unwrap_or_else(|| Path::new(&name));
fs::create_dir(root)
.with_context(|| format!("failed to create directory `{}`", root.display()))?;

let operator_path = root.join("operator.cc");
fs::write(&operator_path, OPERATOR)
.with_context(|| format!("failed to write `{}`", operator_path.display()))?;
let header_path = root.join("operator.h");
fs::write(&header_path, HEADER)
.with_context(|| format!("failed to write `{}`", header_path.display()))?;

// TODO: Makefile?

println!(
"Created new C++ operator `{name}` at {}",
Path::new(".").join(root).display()
);

fn create_cmakefile(root: PathBuf, use_path_deps: bool) -> Result<(), eyre::ErrReport> {
const CMAKEFILE: &str = include_str!("cmake-template.txt");

let cmake_file = if use_path_deps {
let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR"));
let workspace_dir = manifest_dir
.parent()
.context("Could not get manifest parent folder")?
.parent()
.context("Could not get manifest grandparent folder")?;
CMAKEFILE.replace("__DORA_PATH__", workspace_dir.to_str().unwrap())
} else {
CMAKEFILE.replace("__DORA_PATH__", "")
};

let cmake_path = root.join("CMakeLists.txt");
fs::write(&cmake_path, cmake_file)
.with_context(|| format!("failed to write `{}`", cmake_path.display()))?;

println!("Created new CMakeLists.txt at {}", cmake_path.display());
Ok(())
}

fn create_custom_node(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrReport> {
const NODE: &str = include_str!("node-template.cc");

fn create_custom_node(
name: String,
path: Option<PathBuf>,
template_scripts: &str,
) -> Result<(), eyre::ErrReport> {
if name.contains('/') {
bail!("node name must not contain `/` separators");
}
@@ -100,7 +100,7 @@ fn create_custom_node(name: String, path: Option<PathBuf>) -> Result<(), eyre::E
.with_context(|| format!("failed to create directory `{}`", root.display()))?;

let node_path = root.join("node.cc");
fs::write(&node_path, NODE)
fs::write(&node_path, template_scripts)
.with_context(|| format!("failed to write `{}`", node_path.display()))?;

// TODO: Makefile?


+ 39
- 0
binaries/cli/src/template/cxx/talker-template.cc View File

@@ -0,0 +1,39 @@
#include "dora-node-api.h" // adjust this path if necessary

#include <iostream>
#include <vector>


int main()
{
auto dora_node = init_dora_node();

for (int i = 0; i < 20; i++)
{
auto event = dora_node.events->next();
auto ty = event_type(event);

if (ty == DoraEventType::AllInputsClosed)
{
break;
}
else if (ty == DoraEventType::Input)
{
std::string message{"Hello World!"};
rust::Slice<const uint8_t> message_slice{reinterpret_cast<const uint8_t*>(message.c_str()), message.size()};
auto result = send_output(dora_node.send_output, "speech", message_slice);
auto error = std::string(result.error);
if (!error.empty())
{
std::cerr << "Error: " << error << std::endl;
return -1;
}
}
else
{
std::cerr << "Unknown event type " << static_cast<int>(ty) << std::endl;
}
}

return 0;
}

+ 2
- 2
binaries/cli/src/template/mod.rs View File

@@ -7,7 +7,7 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()>
match args.lang {
crate::Lang::Rust => rust::create(args, use_path_deps),
crate::Lang::Python => python::create(args),
crate::Lang::C => c::create(args),
crate::Lang::Cxx => cxx::create(args),
crate::Lang::C => c::create(args, use_path_deps),
crate::Lang::Cxx => cxx::create(args, use_path_deps),
}
}

+ 12
- 13
binaries/cli/src/template/python/dataflow-template.yml View File

@@ -1,23 +1,22 @@
nodes:
- id: op_1
operator:
python: op_1/op_1.py
- id: talker_1
custom:
source: talker_1/talker_1.py
inputs:
tick: dora/timer/millis/100
outputs:
- some-output
- id: op_2
operator:
python: op_2/op_2.py
- speech
- id: talker_2
custom:
source: talker_2/talker_2.py
inputs:
tick: dora/timer/secs/2
outputs:
- some-output
- speech

- id: custom-node_1
- id: listener_1
custom:
source: ./node_1/node_1.py
source: listener_1/listener_1.py
inputs:
tick: dora/timer/secs/1
input-1: op_1/some-output
input-2: op_2/some-output
speech-1: talker_1/speech
speech-2: talker_2/speech

+ 11
- 0
binaries/cli/src/template/python/listener/listener-template.py View File

@@ -0,0 +1,11 @@
from dora import Node
import pyarrow as pa

node = Node()

for event in node:
if event["type"] == "INPUT":
message = event["value"][0].as_py()
print(
f"""I heard {message} from {event["id"]}"""
)

+ 18
- 34
binaries/cli/src/template/python/mod.rs View File

@@ -4,6 +4,10 @@ use std::{
path::{Path, PathBuf},
};

const NODE_PY: &str = include_str!("node/node-template.py");
const TALKER_PY: &str = include_str!("talker/talker-template.py");
const LISTENER_PY: &str = include_str!("listener/listener-template.py");

pub fn create(args: crate::CommandNew) -> eyre::Result<()> {
let crate::CommandNew {
kind,
@@ -13,47 +17,23 @@ pub fn create(args: crate::CommandNew) -> eyre::Result<()> {
} = args;

match kind {
crate::Kind::Operator => create_operator(name, path),
crate::Kind::CustomNode => create_custom_node(name, path),
crate::Kind::CustomNode => create_custom_node(name, path, NODE_PY),
crate::Kind::Dataflow => create_dataflow(name, path),
}
}

fn create_operator(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrReport> {
const OPERATOR_PY: &str = include_str!("operator/operator-template.py");

if name.contains('/') {
bail!("Operator name must not contain `/` separators");
}
if name.contains('.') {
bail!("Operator name must not contain `.` to not be confused for an extension");
}
// create directories
let root = path.as_deref().unwrap_or_else(|| Path::new(&name));
fs::create_dir(root)
.with_context(|| format!("failed to create directory `{}`", root.display()))?;

let operator_path = root.join(format!("{name}.py"));
fs::write(&operator_path, OPERATOR_PY)
.with_context(|| format!("failed to write `{}`", operator_path.display()))?;

println!(
"Created new Python operator `{name}` at {}",
Path::new(".").join(root).display()
);

Ok(())
}
fn create_custom_node(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrReport> {
const NODE_PY: &str = include_str!("node/node-template.py");

fn create_custom_node(
name: String,
path: Option<PathBuf>,
template_scripts: &str,
) -> Result<(), eyre::ErrReport> {
// create directories
let root = path.as_deref().unwrap_or_else(|| Path::new(&name));
fs::create_dir(root)
.with_context(|| format!("failed to create directory `{}`", root.display()))?;

let node_path = root.join(format!("{name}.py"));
fs::write(&node_path, NODE_PY)
fs::write(&node_path, template_scripts)
.with_context(|| format!("failed to write `{}`", node_path.display()))?;

println!(
@@ -84,9 +64,13 @@ fn create_dataflow(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrR
fs::write(&dataflow_yml_path, dataflow_yml)
.with_context(|| format!("failed to write `{}`", dataflow_yml_path.display()))?;

create_operator("op_1".into(), Some(root.join("op_1")))?;
create_operator("op_2".into(), Some(root.join("op_2")))?;
create_custom_node("node_1".into(), Some(root.join("node_1")))?;
create_custom_node("talker_1".into(), Some(root.join("talker_1")), TALKER_PY)?;
create_custom_node("talker_2".into(), Some(root.join("talker_2")), TALKER_PY)?;
create_custom_node(
"listener_1".into(),
Some(root.join("listener_1")),
LISTENER_PY,
)?;

println!(
"Created new yaml dataflow `{name}` at {}",


+ 14
- 0
binaries/cli/src/template/python/talker/talker-template.py View File

@@ -0,0 +1,14 @@
from dora import Node
import pyarrow as pa

node = Node()

for event in node:
if event["type"] == "INPUT":
print(
f"""Node received:
id: {event["id"]},
value: {event["value"]},
metadata: {event["metadata"]}"""
)
node.send_output("speech", pa.array(["Hello World"]))

+ 1
- 1
binaries/cli/src/template/rust/Cargo-template.toml View File

@@ -1,3 +1,3 @@
[workspace]
resolver = "2"
members = ["op_1", "op_2", "node_1"]
members = ["talker_1", "talker_2", "listener_1"]

+ 15
- 15
binaries/cli/src/template/rust/dataflow-template.yml View File

@@ -1,26 +1,26 @@
nodes:
- id: op_1
operator:
build: cargo build -p op_1
shared-library: target/debug/op_1
- id: talker_1
custom:
build: cargo build -p talker_1
source: target/debug/talker_1
inputs:
tick: dora/timer/millis/100
outputs:
- some-output
- id: op_2
operator:
build: cargo build -p op_2
shared-library: target/debug/op_2
- speech
- id: talker_2
custom:
build: cargo build -p talker_2
source: target/debug/talker_2
inputs:
tick: dora/timer/secs/2
outputs:
- some-output
- speech

- id: custom-node_1
- id: listener_1
custom:
build: cargo build -p node_1
source: target/debug/node_1
build: cargo build -p listener_1
source: target/debug/listener_1
inputs:
tick: dora/timer/secs/1
input-1: op_1/some-output
input-2: op_2/some-output
speech-1: talker_1/speech
speech-2: talker_2/speech

+ 9
- 0
binaries/cli/src/template/rust/listener/Cargo-template.toml View File

@@ -0,0 +1,9 @@
[package]
name = "___name___"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = {}

+ 25
- 0
binaries/cli/src/template/rust/listener/main-template.rs View File

@@ -0,0 +1,25 @@
use dora_node_api::{DoraNode, Event};
use std::error::Error;

fn main() -> Result<(), Box<dyn Error>> {
let (mut node, mut events) = DoraNode::init_from_env()?;

while let Some(event) = events.recv() {
match event {
Event::Input {
id,
metadata,
data,
} => match id.as_str() {
"speech" => {
let message: &str = (&data).try_into()?;
println!("I heard: {message} from {id}");
}
other => println!("Received input `{other}`"),
},
_ => {}
}
}

Ok(())
}

+ 25
- 61
binaries/cli/src/template/rust/mod.rs View File

@@ -4,6 +4,10 @@ use std::{
path::{Path, PathBuf},
};

const MAIN_RS: &str = include_str!("node/main-template.rs");
const TALKER_RS: &str = include_str!("talker/main-template.rs");
const LISTENER_RS: &str = include_str!("listener/main-template.rs");

const VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()> {
let crate::CommandNew {
@@ -14,8 +18,7 @@ pub fn create(args: crate::CommandNew, use_path_deps: bool) -> eyre::Result<()>
} = args;

match kind {
crate::Kind::Operator => create_operator(name, path, use_path_deps),
crate::Kind::CustomNode => create_custom_node(name, path, use_path_deps),
crate::Kind::CustomNode => create_custom_node(name, path, use_path_deps, MAIN_RS),
crate::Kind::Dataflow => create_dataflow(name, path, use_path_deps),
}
}
@@ -49,9 +52,24 @@ fn create_dataflow(
fs::write(&cargo_toml_path, cargo_toml)
.with_context(|| format!("failed to write `{}`", cargo_toml_path.display()))?;

create_operator("op_1".into(), Some(root.join("op_1")), use_path_deps)?;
create_operator("op_2".into(), Some(root.join("op_2")), use_path_deps)?;
create_custom_node("node_1".into(), Some(root.join("node_1")), use_path_deps)?;
create_custom_node(
"talker_1".into(),
Some(root.join("talker_1")),
use_path_deps,
TALKER_RS,
)?;
create_custom_node(
"talker_2".into(),
Some(root.join("talker_2")),
use_path_deps,
TALKER_RS,
)?;
create_custom_node(
"listener_1".into(),
Some(root.join("listener_1")),
use_path_deps,
LISTENER_RS,
)?;

println!(
"Created new Rust dataflow at `{name}` at {}",
@@ -61,67 +79,13 @@ fn create_dataflow(
Ok(())
}

fn create_operator(
name: String,
path: Option<PathBuf>,
use_path_deps: bool,
) -> Result<(), eyre::ErrReport> {
const CARGO_TOML: &str = include_str!("operator/Cargo-template.toml");
const LIB_RS: &str = include_str!("operator/lib-template.rs");

if name.contains('/') {
bail!("operator name must not contain `/` separators");
}
if name.contains('-') {
bail!(
"operator name must not contain `-` separators as
it get replaced by `_` as a static library."
);
}
if !name.is_ascii() {
bail!("operator name must be ASCII");
}

// create directories
let root = path.as_deref().unwrap_or_else(|| Path::new(&name));
fs::create_dir(root)
.with_context(|| format!("failed to create directory `{}`", root.display()))?;
let src = root.join("src");
fs::create_dir(&src)
.with_context(|| format!("failed to create directory `{}`", src.display()))?;

let dep = if use_path_deps {
r#"dora-operator-api = { path = "../../apis/rust/operator" }"#.to_string()
} else {
format!(r#"dora-operator-api = "{VERSION}""#)
};
let cargo_toml = CARGO_TOML
.replace("___name___", &name)
.replace("dora-operator-api = {}", &dep);

let cargo_toml_path = root.join("Cargo.toml");
fs::write(&cargo_toml_path, cargo_toml)
.with_context(|| format!("failed to write `{}`", cargo_toml_path.display()))?;

let lib_rs_path = src.join("lib.rs");
fs::write(&lib_rs_path, LIB_RS)
.with_context(|| format!("failed to write `{}`", lib_rs_path.display()))?;

println!(
"Created new Rust operator `{name}` at {}",
Path::new(".").join(root).display()
);

Ok(())
}

fn create_custom_node(
name: String,
path: Option<PathBuf>,
use_path_deps: bool,
template_scripts: &str,
) -> Result<(), eyre::ErrReport> {
const CARGO_TOML: &str = include_str!("node/Cargo-template.toml");
const MAIN_RS: &str = include_str!("node/main-template.rs");

if name.contains('/') {
bail!("node name must not contain `/` separators");
@@ -151,7 +115,7 @@ fn create_custom_node(
.with_context(|| format!("failed to write `{}`", cargo_toml_path.display()))?;

let main_rs_path = src.join("main.rs");
fs::write(&main_rs_path, MAIN_RS)
fs::write(&main_rs_path, template_scripts)
.with_context(|| format!("failed to write `{}`", main_rs_path.display()))?;

println!(


+ 9
- 0
binaries/cli/src/template/rust/talker/Cargo-template.toml View File

@@ -0,0 +1,9 @@
[package]
name = "___name___"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = {}

+ 25
- 0
binaries/cli/src/template/rust/talker/main-template.rs View File

@@ -0,0 +1,25 @@
use dora_node_api::{dora_core::config::DataId, DoraNode, Event, IntoArrow};
use std::error::Error;

fn main() -> Result<(), Box<dyn Error>> {
let (mut node, mut events) = DoraNode::init_from_env()?;

while let Some(event) = events.recv() {
match event {
Event::Input {
id,
metadata,
data: _,
} => match id.as_str() {
"tick" => {
node.send_output(DataId::from("speech".to_owned()), metadata.parameters, String::from("Hello World!").into_arrow())?;
println!("Node received `{id}`");
},
_ => {}
},
_ => {}
}
}

Ok(())
}

+ 46
- 67
binaries/coordinator/src/lib.rs View File

@@ -9,7 +9,10 @@ use dora_core::{
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC},
topics::{ControlRequest, ControlRequestReply, DataflowId, DataflowListEntry},
topics::{
ControlRequest, ControlRequestReply, DataflowDaemonResult, DataflowId, DataflowListEntry,
DataflowResult,
},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
@@ -134,7 +137,8 @@ async fn start_inner(
let mut events = (abortable_events, daemon_events).merge();

let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<Uuid, BTreeMap<String, Result<(), String>>> = HashMap::new();
let mut dataflow_results: HashMap<Uuid, BTreeMap<String, DataflowDaemonResult>> =
HashMap::new();
let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new();

@@ -220,18 +224,22 @@ async fn start_inner(
Event::Dataflow { uuid, event } => match event {
DataflowEvent::ReadyOnMachine {
machine_id,
success,
exited_before_subscribe,
} => {
match running_dataflows.entry(uuid) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
let dataflow = entry.get_mut();
dataflow.pending_machines.remove(&machine_id);
dataflow.init_success &= success;
dataflow
.exited_before_subscribe
.extend(exited_before_subscribe);
if dataflow.pending_machines.is_empty() {
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::AllNodesReady {
dataflow_id: uuid,
success: dataflow.init_success,
exited_before_subscribe: dataflow
.exited_before_subscribe
.clone(),
},
timestamp: clock.new_timestamp(),
})
@@ -271,26 +279,20 @@ async fn start_inner(
.insert(uuid, ArchivedDataflow::from(entry.get()));
}
entry.get_mut().machines.remove(&machine_id);
match &result {
Ok(()) => {
tracing::info!("dataflow `{uuid}` finished successfully on machine `{machine_id}`");
}
Err(err) => {
tracing::error!("{err:?}");
}
}
dataflow_results
.entry(uuid)
.or_default()
.insert(machine_id, result.map_err(|err| format!("{err:?}")));
.insert(machine_id, result);
if entry.get_mut().machines.is_empty() {
let finished_dataflow = entry.remove();
let reply = ControlRequestReply::DataflowStopped {
uuid,
result: dataflow_results
.get(&uuid)
.map(|r| dataflow_result(r, uuid))
.unwrap_or(Ok(())),
.map(|r| dataflow_result(r, uuid, &clock))
.unwrap_or_else(|| {
DataflowResult::ok_empty(uuid, clock.new_timestamp())
}),
};
for sender in finished_dataflow.reply_senders {
let _ = sender.send(Ok(reply.clone()));
@@ -353,8 +355,13 @@ async fn start_inner(
uuid: dataflow_uuid,
result: dataflow_results
.get(&dataflow_uuid)
.map(|r| dataflow_result(r, dataflow_uuid))
.unwrap_or(Ok(())),
.map(|r| dataflow_result(r, dataflow_uuid, &clock))
.unwrap_or_else(|| {
DataflowResult::ok_empty(
dataflow_uuid,
clock.new_timestamp(),
)
}),
},
};
let _ = reply_sender.send(Ok(status));
@@ -396,6 +403,7 @@ async fn start_inner(
reply_sender,
clock.new_timestamp(),
grace_duration,
&clock,
)
.await?;
}
@@ -412,6 +420,7 @@ async fn start_inner(
reply_sender,
clock.new_timestamp(),
grace_duration,
&clock,
)
.await?
}
@@ -561,18 +570,19 @@ async fn start_inner(

async fn stop_dataflow_by_uuid(
running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
dataflow_results: &HashMap<Uuid, BTreeMap<String, Result<(), String>>>,
dataflow_results: &HashMap<Uuid, BTreeMap<String, DataflowDaemonResult>>,
dataflow_uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
reply_sender: tokio::sync::oneshot::Sender<Result<ControlRequestReply, eyre::ErrReport>>,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
clock: &uhlc::HLC,
) -> Result<(), eyre::ErrReport> {
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
let reply = ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_result(result, dataflow_uuid),
result: dataflow_result(result, dataflow_uuid, clock),
};
let _ = reply_sender.send(Ok(reply));
return Ok(());
@@ -601,36 +611,23 @@ async fn stop_dataflow_by_uuid(
Ok(())
}

fn format_error(machine: &str, err: &str) -> String {
let mut error = err
.lines()
.fold(format!("- machine `{machine}`:\n"), |mut output, line| {
output.push_str(" ");
output.push_str(line);
output.push('\n');
output
});
error.push('\n');
error
}

fn dataflow_result(
results: &BTreeMap<String, Result<(), String>>,
results: &BTreeMap<String, DataflowDaemonResult>,
dataflow_uuid: Uuid,
) -> Result<(), String> {
let mut errors = Vec::new();
for (machine, result) in results {
if let Err(err) = result {
errors.push(format_error(machine, err));
clock: &uhlc::HLC,
) -> DataflowResult {
let mut node_results = BTreeMap::new();
for (_machine, result) in results {
node_results.extend(result.node_results.clone());
if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
}

if errors.is_empty() {
Ok(())
} else {
let mut formatted = format!("errors occurred in dataflow {dataflow_uuid}:\n");
formatted.push_str(&errors.join("\n"));
Err(formatted)
DataflowResult {
uuid: dataflow_uuid,
timestamp: clock.new_timestamp(),
node_results,
}
}

@@ -685,7 +682,7 @@ struct RunningDataflow {
machines: BTreeSet<String>,
/// IDs of machines that are waiting until all nodes are started.
pending_machines: BTreeSet<String>,
init_success: bool,
exited_before_subscribe: Vec<NodeId>,
nodes: Vec<ResolvedNode>,

reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
@@ -884,7 +881,7 @@ async fn start_dataflow(
} else {
BTreeSet::new()
},
init_success: true,
exited_before_subscribe: Default::default(),
machines,
nodes,
reply_senders: Vec::new(),
@@ -951,11 +948,11 @@ impl Event {
pub enum DataflowEvent {
DataflowFinishedOnMachine {
machine_id: String,
result: eyre::Result<()>,
result: DataflowDaemonResult,
},
ReadyOnMachine {
machine_id: String,
success: bool,
exited_before_subscribe: Vec<NodeId>,
},
}

@@ -990,21 +987,3 @@ fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport>

Ok(ReceiverStream::new(ctrlc_rx))
}

#[cfg(test)]
mod test {
#[test]
fn test_format_error() {
let machine = "machine A";
let err = "foo\nbar\nbuzz";

// old method
let old_error = {
#[allow(clippy::format_collect)]
let err: String = err.lines().map(|line| format!(" {line}\n")).collect();
format!("- machine `{machine}`:\n{err}\n")
};
let new_error = super::format_error(machine, err);
assert_eq!(old_error, new_error)
}
}

+ 4
- 7
binaries/coordinator/src/listener.rs View File

@@ -1,6 +1,6 @@
use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event};
use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC};
use eyre::{eyre, Context};
use eyre::Context;
use std::{io::ErrorKind, net::SocketAddr, sync::Arc};
use tokio::{
net::{TcpListener, TcpStream},
@@ -66,13 +66,13 @@ pub async fn handle_connection(
coordinator_messages::CoordinatorRequest::Event { machine_id, event } => match event {
coordinator_messages::DaemonEvent::AllNodesReady {
dataflow_id,
success,
exited_before_subscribe,
} => {
let event = Event::Dataflow {
uuid: dataflow_id,
event: DataflowEvent::ReadyOnMachine {
machine_id,
success,
exited_before_subscribe,
},
};
if events_tx.send(event).await.is_err() {
@@ -85,10 +85,7 @@ pub async fn handle_connection(
} => {
let event = Event::Dataflow {
uuid: dataflow_id,
event: DataflowEvent::DataflowFinishedOnMachine {
machine_id,
result: result.map_err(|e| eyre!(e)),
},
event: DataflowEvent::DataflowFinishedOnMachine { machine_id, result },
};
if events_tx.send(event).await.is_err() {
break;


+ 2
- 0
binaries/daemon/Cargo.toml View File

@@ -39,3 +39,5 @@ aligned-vec = "0.5.0"
ctrlc = "3.2.5"
which = "5.0.0"
sysinfo = "0.30.11"
crossbeam = "0.8.4"
crossbeam-skiplist = "0.1.3"

+ 127
- 137
binaries/daemon/src/lib.rs View File

@@ -1,5 +1,6 @@
use aligned_vec::{AVec, ConstAlign};
use coordinator::CoordinatorEvent;
use crossbeam::queue::ArrayQueue;
use dora_core::config::{Input, OperatorId};
use dora_core::coordinator_messages::CoordinatorRequest;
use dora_core::daemon_messages::{
@@ -9,6 +10,9 @@ use dora_core::descriptor::runtime_node_inputs;
use dora_core::message::uhlc::{self, HLC};
use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters};
use dora_core::topics::LOCALHOST;
use dora_core::topics::{
DataflowDaemonResult, DataflowResult, NodeError, NodeErrorCause, NodeExitStatus,
};
use dora_core::{
config::{DataId, InputMapping, NodeId},
coordinator_messages::DaemonEvent,
@@ -29,9 +33,7 @@ use shared_memory_server::ShmemConf;
use std::sync::Arc;
use std::time::Instant;
use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet, HashMap},
io,
net::SocketAddr,
path::{Path, PathBuf},
time::Duration,
@@ -64,6 +66,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::pending::DataflowStatus;

const STDERR_LOG_LINES: usize = 10;

pub struct Daemon {
running: HashMap<DataflowId, RunningDataflow>,
working_dir: HashMap<DataflowId, PathBuf>,
@@ -78,7 +82,7 @@ pub struct Daemon {
/// used for testing and examples
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
/// used to record dataflow results when `exit_when_done` is used
dataflow_errors: BTreeMap<Uuid, BTreeMap<NodeId, eyre::Report>>,
dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>,

clock: Arc<uhlc::HLC>,
}
@@ -148,7 +152,7 @@ impl Daemon {
.map(|_| ())
}

pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> {
pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<DataflowResult> {
let working_dir = dataflow_path
.canonicalize()
.context("failed to canoncialize dataflow path")?
@@ -160,8 +164,9 @@ impl Daemon {
descriptor.check(&working_dir)?;
let nodes = descriptor.resolve_aliases_and_set_defaults()?;

let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
let spawn_command = SpawnDataflowNodes {
dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)),
dataflow_id,
working_dir,
nodes,
machine_listen_ports: BTreeMap::new(),
@@ -191,7 +196,7 @@ impl Daemon {
None,
"".to_string(),
Some(exit_when_done),
clock,
clock.clone(),
);

let spawn_result = reply_rx
@@ -205,20 +210,15 @@ impl Daemon {
}
});

let (dataflow_errors, ()) = future::try_join(run_result, spawn_result).await?;
let (mut dataflow_results, ()) = future::try_join(run_result, spawn_result).await?;

if dataflow_errors.is_empty() {
Ok(())
} else {
let mut output = "some nodes failed:".to_owned();
for (dataflow, node_errors) in dataflow_errors {
for (node, error) in node_errors {
use std::fmt::Write;
write!(&mut output, "\n - {dataflow}/{node}: {error}").unwrap();
}
}
bail!("{output}");
}
Ok(DataflowResult {
uuid: dataflow_id,
timestamp: clock.new_timestamp(),
node_results: dataflow_results
.remove(&dataflow_id)
.context("no node results for dataflow_id")?,
})
}

async fn run_general(
@@ -227,7 +227,7 @@ impl Daemon {
machine_id: String,
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
clock: Arc<HLC>,
) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, eyre::Report>>> {
) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>> {
let coordinator_connection = match coordinator_addr {
Some(addr) => {
let stream = TcpStream::connect(addr)
@@ -251,7 +251,7 @@ impl Daemon {
inter_daemon_connections: BTreeMap::new(),
machine_id,
exit_when_done,
dataflow_errors: BTreeMap::new(),
dataflow_node_results: BTreeMap::new(),
clock,
};

@@ -272,7 +272,7 @@ impl Daemon {
async fn run_inner(
mut self,
incoming_events: impl Stream<Item = Timestamped<Event>> + Unpin,
) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, eyre::Report>>> {
) -> eyre::Result<BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>> {
let mut events = incoming_events;

while let Some(event) = events.next().await {
@@ -329,7 +329,7 @@ impl Daemon {
}
}

Ok(self.dataflow_errors)
Ok(self.dataflow_node_results)
}

async fn handle_coordinator_event(
@@ -376,15 +376,19 @@ impl Daemon {
}
DaemonCoordinatorEvent::AllNodesReady {
dataflow_id,
success,
exited_before_subscribe,
} => {
match self.running.get_mut(&dataflow_id) {
Some(dataflow) => {
let ready = exited_before_subscribe.is_empty();
dataflow
.pending_nodes
.handle_external_all_nodes_ready(success)
.handle_external_all_nodes_ready(
exited_before_subscribe,
&mut dataflow.cascading_error_causes,
)
.await?;
if success {
if ready {
tracing::info!("coordinator reported that all nodes are ready, starting dataflow `{dataflow_id}`");
dataflow.start(&self.events_tx, &self.clock).await?;
}
@@ -614,6 +618,11 @@ impl Daemon {
dataflow.pending_nodes.insert(node.id.clone());

let node_id = node.id.clone();
let node_stderr_most_recent = dataflow
.node_stderr_most_recent
.entry(node.id.clone())
.or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES)))
.clone();
match spawn::spawn_node(
dataflow_id,
&working_dir,
@@ -621,6 +630,7 @@ impl Daemon {
self.events_tx.clone(),
dataflow_descriptor.clone(),
self.clock.clone(),
node_stderr_most_recent,
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
@@ -636,6 +646,7 @@ impl Daemon {
&node_id,
&mut self.coordinator_connection,
&self.clock,
&mut dataflow.cascading_error_causes,
)
.await?;
}
@@ -742,6 +753,7 @@ impl Daemon {
reply_sender,
&mut self.coordinator_connection,
&self.clock,
&mut dataflow.cascading_error_causes,
)
.await?;
match status {
@@ -996,7 +1008,12 @@ impl Daemon {

dataflow
.pending_nodes
.handle_node_stop(node_id, &mut self.coordinator_connection, &self.clock)
.handle_node_stop(
node_id,
&mut self.coordinator_connection,
&self.clock,
&mut dataflow.cascading_error_causes,
)
.await?;

Self::handle_outputs_done(
@@ -1013,17 +1030,15 @@ impl Daemon {
.iter()
.all(|(_id, n)| n.node_config.dynamic)
{
let result = match self.dataflow_errors.get(&dataflow.id) {
None => Ok(()),
Some(errors) => {
let mut output = "some nodes failed:".to_owned();
for (node, error) in errors {
use std::fmt::Write;
write!(&mut output, "\n - {node}: {error}").unwrap();
}
Err(output)
}
let result = DataflowDaemonResult {
timestamp: self.clock.new_timestamp(),
node_results: self
.dataflow_node_results
.get(&dataflow.id)
.context("failed to get dataflow node results")?
.clone(),
};

tracing::info!(
"Dataflow `{dataflow_id}` finished on machine `{}`",
self.machine_id
@@ -1142,80 +1157,57 @@ impl Daemon {
node_id,
exit_status,
} => {
let node_error = match exit_status {
let node_result = match exit_status {
NodeExitStatus::Success => {
tracing::info!("node {dataflow_id}/{node_id} finished successfully");
None
}
NodeExitStatus::IoError(err) => {
let err = eyre!(err).wrap_err(format!(
"
I/O error while waiting for node `{dataflow_id}/{node_id}.

Check logs using: dora logs {dataflow_id} {node_id}
"
));
tracing::error!("{err:?}");
Some(err)
Ok(())
}
NodeExitStatus::ExitCode(code) => {
let err = eyre!(
"
{dataflow_id}/{node_id} failed with exit code {code}.

Check logs using: dora logs {dataflow_id} {node_id}
"
);
tracing::error!("{err}");
Some(err)
}
NodeExitStatus::Signal(signal) => {
let signal: Cow<_> = match signal {
1 => "SIGHUP".into(),
2 => "SIGINT".into(),
3 => "SIGQUIT".into(),
4 => "SIGILL".into(),
6 => "SIGABRT".into(),
8 => "SIGFPE".into(),
9 => "SIGKILL".into(),
11 => "SIGSEGV".into(),
13 => "SIGPIPE".into(),
14 => "SIGALRM".into(),
15 => "SIGTERM".into(),
22 => "SIGABRT".into(),
23 => "NSIG".into(),

other => other.to_string().into(),
exit_status => {
let dataflow = self.running.get(&dataflow_id);
let caused_by_node = dataflow
.and_then(|dataflow| {
dataflow.cascading_error_causes.error_caused_by(&node_id)
})
.cloned();
let grace_duration_kill = dataflow
.map(|d| d.grace_duration_kills.contains(&node_id))
.unwrap_or_default();

let cause = match caused_by_node {
Some(caused_by_node) => {
tracing::info!("marking `{node_id}` as cascading error caused by `{caused_by_node}`");
NodeErrorCause::Cascading { caused_by_node }
}
None if grace_duration_kill => NodeErrorCause::GraceDuration,
None => NodeErrorCause::Other {
stderr: dataflow
.and_then(|d| d.node_stderr_most_recent.get(&node_id))
.map(|queue| {
let mut s = if queue.is_full() {
"[...]".into()
} else {
String::new()
};
while let Some(line) = queue.pop() {
s += &line;
}
s
})
.unwrap_or_default(),
},
};
let err = eyre!(
"
{dataflow_id}/{node_id} failed with signal `{signal}`

Check logs using: dora logs {dataflow_id} {node_id}
"
);
tracing::error!("{err}");
Some(err)
}
NodeExitStatus::Unknown => {
let err = eyre!(
"
{dataflow_id}/{node_id} failed with unknown exit code
Check logs using: dora logs {dataflow_id} {node_id}
"
);
tracing::error!("{err}");
Some(err)
Err(NodeError {
timestamp: self.clock.new_timestamp(),
cause,
exit_status,
})
}
};

if let Some(err) = node_error {
self.dataflow_errors
.entry(dataflow_id)
.or_default()
.insert(node_id.clone(), err);
}
self.dataflow_node_results
.entry(dataflow_id)
.or_default()
.insert(node_id.clone(), node_result);

self.handle_node_stop(dataflow_id, &node_id).await?;

@@ -1423,6 +1415,12 @@ pub struct RunningDataflow {
///
/// TODO: replace this with a constant once `BTreeSet::new` is `const` on stable.
empty_set: BTreeSet<DataId>,

/// Contains the node that caused the error for nodes that experienced a cascading error.
cascading_error_causes: CascadingErrorCauses,
grace_duration_kills: Arc<crossbeam_skiplist::SkipSet<NodeId>>,

node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,
}

impl RunningDataflow {
@@ -1441,6 +1439,9 @@ impl RunningDataflow {
_timer_handles: Vec::new(),
stop_sent: false,
empty_set: BTreeSet::new(),
cascading_error_causes: Default::default(),
grace_duration_kills: Default::default(),
node_stderr_most_recent: BTreeMap::new(),
}
}

@@ -1503,6 +1504,7 @@ impl RunningDataflow {
}

let running_nodes = self.running_nodes.clone();
let grace_duration_kills = self.grace_duration_kills.clone();
tokio::spawn(async move {
let duration = grace_duration.unwrap_or(Duration::from_millis(500));
tokio::time::sleep(duration).await;
@@ -1512,6 +1514,7 @@ impl RunningDataflow {
for (node, node_details) in running_nodes.iter() {
if let Some(pid) = node_details.pid {
if let Some(process) = system.process(Pid::from(pid as usize)) {
grace_duration_kills.insert(node.clone());
process.kill();
warn!(
"{node} was killed due to not stopping within the {:#?} grace period",
@@ -1644,39 +1647,6 @@ pub enum DoraEvent {
},
}

#[derive(Debug)]
pub enum NodeExitStatus {
Success,
IoError(io::Error),
ExitCode(i32),
Signal(i32),
Unknown,
}

impl From<Result<std::process::ExitStatus, io::Error>> for NodeExitStatus {
fn from(result: Result<std::process::ExitStatus, io::Error>) -> Self {
match result {
Ok(status) => {
if status.success() {
NodeExitStatus::Success
} else if let Some(code) = status.code() {
Self::ExitCode(code)
} else {
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(signal) = status.signal() {
return Self::Signal(signal);
}
}
Self::Unknown
}
}
Err(err) => Self::IoError(err),
}
}
}

#[must_use]
enum RunStatus {
Continue,
@@ -1723,3 +1693,23 @@ fn set_up_ctrlc_handler(

Ok(ReceiverStream::new(ctrlc_rx))
}

#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct CascadingErrorCauses {
caused_by: BTreeMap<NodeId, NodeId>,
}

impl CascadingErrorCauses {
pub fn experienced_cascading_error(&self, node: &NodeId) -> bool {
self.caused_by.contains_key(node)
}

/// Return the ID of the node that caused a cascading error for the given node, if any.
pub fn error_caused_by(&self, node: &NodeId) -> Option<&NodeId> {
self.caused_by.get(node)
}

pub fn report_cascading_error(&mut self, causing_node: NodeId, affected_node: NodeId) {
self.caused_by.entry(affected_node).or_insert(causing_node);
}
}

+ 49
- 41
binaries/daemon/src/pending.rs View File

@@ -9,7 +9,7 @@ use dora_core::{
use eyre::{bail, Context};
use tokio::{net::TcpStream, sync::oneshot};

use crate::tcp_utils::tcp_send;
use crate::{tcp_utils::tcp_send, CascadingErrorCauses};

pub struct PendingNodes {
dataflow_id: DataflowId,
@@ -28,7 +28,7 @@ pub struct PendingNodes {
///
/// If this list is non-empty, we should not start the dataflow at all. Instead,
/// we report an error to the other nodes.
exited_before_subscribe: HashSet<NodeId>,
exited_before_subscribe: Vec<NodeId>,

/// Whether the local init result was already reported to the coordinator.
reported_init_to_coordinator: bool,
@@ -42,7 +42,7 @@ impl PendingNodes {
local_nodes: HashSet::new(),
external_nodes: false,
waiting_subscribers: HashMap::new(),
exited_before_subscribe: HashSet::new(),
exited_before_subscribe: Default::default(),
reported_init_to_coordinator: false,
}
}
@@ -61,12 +61,13 @@ impl PendingNodes {
reply_sender: oneshot::Sender<DaemonReply>,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
) -> eyre::Result<DataflowStatus> {
self.waiting_subscribers
.insert(node_id.clone(), reply_sender);
self.local_nodes.remove(&node_id);

self.update_dataflow_status(coordinator_connection, clock)
self.update_dataflow_status(coordinator_connection, clock, cascading_errors)
.await
}

@@ -75,26 +76,28 @@ impl PendingNodes {
node_id: &NodeId,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
) -> eyre::Result<()> {
if self.local_nodes.remove(node_id) {
tracing::warn!("node `{node_id}` exited before initializing dora connection");
self.exited_before_subscribe.insert(node_id.clone());
self.update_dataflow_status(coordinator_connection, clock)
self.exited_before_subscribe.push(node_id.clone());
self.update_dataflow_status(coordinator_connection, clock, cascading_errors)
.await?;
}
Ok(())
}

pub async fn handle_external_all_nodes_ready(&mut self, success: bool) -> eyre::Result<()> {
pub async fn handle_external_all_nodes_ready(
&mut self,
exited_before_subscribe: Vec<NodeId>,
cascading_errors: &mut CascadingErrorCauses,
) -> eyre::Result<()> {
if !self.local_nodes.is_empty() {
bail!("received external `all_nodes_ready` event before local nodes were ready");
}
let external_error = if success {
None
} else {
Some("some nodes failed to initialize on remote machines".to_string())
};
self.answer_subscribe_requests(external_error).await;

self.answer_subscribe_requests(exited_before_subscribe, cascading_errors)
.await;

Ok(())
}
@@ -103,6 +106,7 @@ impl PendingNodes {
&mut self,
coordinator_connection: &mut Option<TcpStream>,
clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
) -> eyre::Result<DataflowStatus> {
if self.local_nodes.is_empty() {
if self.external_nodes {
@@ -113,7 +117,8 @@ impl PendingNodes {
}
Ok(DataflowStatus::Pending)
} else {
self.answer_subscribe_requests(None).await;
self.answer_subscribe_requests(Vec::new(), cascading_errors)
.await;
Ok(DataflowStatus::AllNodesReady)
}
} else {
@@ -121,33 +126,34 @@ impl PendingNodes {
}
}

async fn answer_subscribe_requests(&mut self, external_error: Option<String>) {
let result = if self.exited_before_subscribe.is_empty() {
match external_error {
Some(err) => Err(err),
None => Ok(()),
}
} else {
let node_id_message = if self.exited_before_subscribe.len() == 1 {
self.exited_before_subscribe
.iter()
.next()
.map(|node_id| node_id.to_string())
.unwrap_or("<node_id>".to_string())
} else {
"<node_id>".to_string()
};
Err(format!(
"Some nodes exited before subscribing to dora: {:?}\n\n\
This is typically happens when an initialization error occurs
in the node or operator. To check the output of the failed
nodes, run `dora logs {} {node_id_message}`.",
self.exited_before_subscribe, self.dataflow_id
))
async fn answer_subscribe_requests(
&mut self,
exited_before_subscribe_external: Vec<NodeId>,
cascading_errors: &mut CascadingErrorCauses,
) {
let node_exited_before_subscribe = match self.exited_before_subscribe.as_slice() {
[first, ..] => Some(first),
[] => match exited_before_subscribe_external.as_slice() {
[first, ..] => Some(first),
[] => None,
},
};

let result = match &node_exited_before_subscribe {
Some(causing_node) => Err(format!(
"Node {causing_node} exited before initializing dora. For \
more information, run `dora logs {} {causing_node}`.",
self.dataflow_id
)),
None => Ok(()),
};

// answer all subscribe requests
let subscribe_replies = std::mem::take(&mut self.waiting_subscribers);
for reply_sender in subscribe_replies.into_values() {
for (node_id, reply_sender) in subscribe_replies.into_iter() {
if let Some(causing_node) = node_exited_before_subscribe {
cascading_errors.report_cascading_error(causing_node.clone(), node_id.clone());
}
let _ = reply_sender.send(DaemonReply::Result(result.clone()));
}
}
@@ -161,15 +167,17 @@ impl PendingNodes {
bail!("no coordinator connection to send AllNodesReady");
};

let success = self.exited_before_subscribe.is_empty();
tracing::info!("all local nodes are ready (success = {success}), waiting for remote nodes");
tracing::info!(
"all local nodes are ready (exit before subscribe: {:?}), waiting for remote nodes",
self.exited_before_subscribe
);

let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
machine_id: self.machine_id.clone(),
event: DaemonEvent::AllNodesReady {
dataflow_id: self.dataflow_id,
success,
exited_before_subscribe: self.exited_before_subscribe.clone(),
},
},
timestamp,


+ 10
- 4
binaries/daemon/src/spawn.rs View File

@@ -3,6 +3,7 @@ use crate::{
OutputId, RunningNode,
};
use aligned_vec::{AVec, ConstAlign};
use crossbeam::queue::ArrayQueue;
use dora_arrow_convert::IntoArrow;
use dora_core::{
config::DataId,
@@ -32,7 +33,7 @@ use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt},
sync::{mpsc, oneshot},
};
use tracing::{debug, error};
use tracing::error;

/// clock is required for generating timestamps when dropping messages early because queue is full
pub async fn spawn_node(
@@ -42,6 +43,7 @@ pub async fn spawn_node(
daemon_tx: mpsc::Sender<Timestamped<Event>>,
dataflow_descriptor: Descriptor,
clock: Arc<HLC>,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
) -> eyre::Result<RunningNode> {
let node_id = node.id.clone();
tracing::debug!("Spawning node `{dataflow_id}/{node_id}`");
@@ -358,18 +360,22 @@ pub async fn spawn_node(
}
};

match String::from_utf8(raw) {
Ok(s) => buffer.push_str(&s),
let new = match String::from_utf8(raw) {
Ok(s) => s,
Err(err) => {
let lossy = String::from_utf8_lossy(err.as_bytes());
tracing::warn!(
"stderr not valid UTF-8 string (node {node_id}): {}: {lossy}",
err.utf8_error()
);
buffer.push_str(&lossy)
lossy.into_owned()
}
};

buffer.push_str(&new);

node_stderr_most_recent.force_push(new);

if buffer.starts_with("Traceback (most recent call last):") {
if !finished {
continue;


+ 3
- 1
binaries/runtime/src/operator/python.rs View File

@@ -208,7 +208,9 @@ pub fn run(
metadata.parameters.open_telemetry_context = string_cx;
}

let py_event = PyEvent::from(event);
let py_event = PyEvent::from(event)
.to_py_dict(py)
.context("Could not convert event to pydict bound")?;

let status_enum = operator
.call_method1(py, "on_event", (py_event, send_output.clone()))


+ 5
- 0
examples/python-dataflow/example.py View File

@@ -0,0 +1,5 @@
from dora import Node

node = Node("plot")

event = node.next()

+ 2
- 2
examples/python-dataflow/requirements.txt View File

@@ -6,7 +6,7 @@ ultralytics
gitpython
ipython # interactive notebook
matplotlib>=3.2.2
numpy>=1.18.5
numpy<2.0.0 # See: https://github.com/opencv/opencv-python/issues/997
opencv-python>=4.1.1
Pillow>=7.1.2
psutil # system resources
@@ -44,4 +44,4 @@ seaborn>=0.11.0
# roboflow

opencv-python>=4.1.1
maturin
maturin

+ 0
- 7
examples/python-dataflow/run.rs View File

@@ -1,5 +1,4 @@
use dora_core::{get_pip_path, get_python_path, run};
use dora_download::download_file;
use dora_tracing::set_up_tracing;
use eyre::{bail, ContextCompat, WrapErr};
use std::path::Path;
@@ -73,12 +72,6 @@ async fn main() -> eyre::Result<()> {
)
.await
.context("maturin develop failed")?;
download_file(
"https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt",
Path::new("yolov8n.pt"),
)
.await
.context("Could not download weights.")?;

let dataflow = Path::new("dataflow.yml");
run_dataflow(dataflow).await?;


+ 1
- 1
examples/python-operator-dataflow/requirements.txt View File

@@ -6,7 +6,7 @@ ultralytics
gitpython
ipython # interactive notebook
matplotlib>=3.2.2
numpy>=1.18.5
numpy<2.0.0
opencv-python>=4.1.1
Pillow>=7.1.2
psutil # system resources


+ 2
- 2
examples/python-ros2-dataflow/random_turtle.py View File

@@ -55,11 +55,11 @@ for i in range(500):

# ROS2 Event
elif event_kind == "external":
pose = event.inner()[0].as_py()
pose = event["value"][0].as_py()
min_x = min([min_x, pose["x"]])
max_x = max([max_x, pose["x"]])
min_y = min([min_y, pose["y"]])
max_y = max([max_y, pose["y"]])
dora_node.send_output("turtle_pose", event.inner())
dora_node.send_output("turtle_pose", event["value"])

assert max_x - min_x > 1 or max_y - min_y > 1, "no turtle movement"

+ 3
- 3
libraries/core/src/coordinator_messages.rs View File

@@ -1,4 +1,4 @@
use crate::daemon_messages::DataflowId;
use crate::{config::NodeId, daemon_messages::DataflowId, topics::DataflowDaemonResult};
use eyre::eyre;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
@@ -18,11 +18,11 @@ pub enum CoordinatorRequest {
pub enum DaemonEvent {
AllNodesReady {
dataflow_id: DataflowId,
success: bool,
exited_before_subscribe: Vec<NodeId>,
},
AllNodesFinished {
dataflow_id: DataflowId,
result: Result<(), String>,
result: DataflowDaemonResult,
},
Heartbeat,
}


+ 1
- 1
libraries/core/src/daemon_messages.rs View File

@@ -234,7 +234,7 @@ pub enum DaemonCoordinatorEvent {
Spawn(SpawnDataflowNodes),
AllNodesReady {
dataflow_id: DataflowId,
success: bool,
exited_before_subscribe: Vec<NodeId>,
},
StopDataflow {
dataflow_id: DataflowId,


+ 135
- 12
libraries/core/src/topics.rs View File

@@ -1,5 +1,7 @@
use dora_message::uhlc;
use std::{
collections::BTreeSet,
borrow::Cow,
collections::{BTreeMap, BTreeSet},
fmt::Display,
net::{IpAddr, Ipv4Addr},
path::PathBuf,
@@ -85,17 +87,9 @@ pub enum DataflowStatus {
pub enum ControlRequestReply {
Error(String),
CoordinatorStopped,
DataflowStarted {
uuid: Uuid,
},
DataflowReloaded {
uuid: Uuid,
},
DataflowStopped {
uuid: Uuid,
result: Result<(), String>,
},

DataflowStarted { uuid: Uuid },
DataflowReloaded { uuid: Uuid },
DataflowStopped { uuid: Uuid, result: DataflowResult },
DataflowList(DataflowList),
DestroyOk,
DaemonConnected(bool),
@@ -118,3 +112,132 @@ impl Display for DataflowId {
}
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowResult {
pub uuid: Uuid,
pub timestamp: uhlc::Timestamp,
pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
}

impl DataflowResult {
pub fn ok_empty(uuid: Uuid, timestamp: uhlc::Timestamp) -> Self {
Self {
uuid,
timestamp,
node_results: Default::default(),
}
}

pub fn is_ok(&self) -> bool {
self.node_results.values().all(|r| r.is_ok())
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowDaemonResult {
pub timestamp: uhlc::Timestamp,
pub node_results: BTreeMap<NodeId, Result<(), NodeError>>,
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct NodeError {
pub timestamp: uhlc::Timestamp,
pub cause: NodeErrorCause,
pub exit_status: NodeExitStatus,
}

impl std::fmt::Display for NodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.exit_status {
NodeExitStatus::Success => write!(f, "<success>"),
NodeExitStatus::IoError(err) => write!(f, "I/O error while reading exit status: {err}"),
NodeExitStatus::ExitCode(code) => write!(f, "exited with code {code}"),
NodeExitStatus::Signal(signal) => {
let signal_str: Cow<_> = match signal {
1 => "SIGHUP".into(),
2 => "SIGINT".into(),
3 => "SIGQUIT".into(),
4 => "SIGILL".into(),
6 => "SIGABRT".into(),
8 => "SIGFPE".into(),
9 => "SIGKILL".into(),
11 => "SIGSEGV".into(),
13 => "SIGPIPE".into(),
14 => "SIGALRM".into(),
15 => "SIGTERM".into(),
22 => "SIGABRT".into(),
23 => "NSIG".into(),
other => other.to_string().into(),
};
if matches!(self.cause, NodeErrorCause::GraceDuration) {
write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})")
} else {
write!(f, "exited because of signal {signal_str}")
}
}
NodeExitStatus::Unknown => write!(f, "unknown exit status"),
}?;

match &self.cause {
NodeErrorCause::GraceDuration => {}, // handled above
NodeErrorCause::Cascading { caused_by_node } => write!(
f,
"\n\nThis error occurred because node `{caused_by_node}` exited before connecting to dora."
)?,
NodeErrorCause::Other { stderr } if stderr.is_empty() => {}
NodeErrorCause::Other { stderr } => {
let line: &str = "---------------------------------------------------------------------------------\n";
write!(f, " with stderr output:\n{line}{stderr}{line}")?
},
}

Ok(())
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum NodeErrorCause {
/// Node was killed because it didn't react to a stop message in time.
GraceDuration,
/// Node failed because another node failed before,
Cascading {
caused_by_node: NodeId,
},
Other {
stderr: String,
},
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum NodeExitStatus {
Success,
IoError(String),
ExitCode(i32),
Signal(i32),
Unknown,
}

impl From<Result<std::process::ExitStatus, std::io::Error>> for NodeExitStatus {
fn from(result: Result<std::process::ExitStatus, std::io::Error>) -> Self {
match result {
Ok(status) => {
if status.success() {
NodeExitStatus::Success
} else if let Some(code) = status.code() {
Self::ExitCode(code)
} else {
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(signal) = status.signal() {
return Self::Signal(signal);
}
}
Self::Unknown
}
}
Err(err) => Self::IoError(err.to_string()),
}
}
}

Loading…
Cancel
Save