Browse Source

Merge pull request #19 from futurewei-tech/rust-client

Initial implementation of API crates, coordinator and runtime
tags/v0.0.0-test.4
Philipp Oppermann GitHub 3 years ago
parent
commit
8dc4fae91a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 2348 additions and 167 deletions
  1. +5
    -5
      .github/workflows/ci.yml
  2. +260
    -15
      Cargo.lock
  3. +20
    -4
      Cargo.toml
  4. +24
    -0
      api/node/Cargo.toml
  5. +108
    -0
      api/node/src/communication.rs
  6. +151
    -0
      api/node/src/config.rs
  7. +200
    -0
      api/node/src/lib.rs
  8. +9
    -0
      api/operator/Cargo.toml
  9. +17
    -0
      api/operator/macros/Cargo.toml
  10. +70
    -0
      api/operator/macros/src/lib.rs
  11. +42
    -0
      api/operator/src/lib.rs
  12. +51
    -0
      api/operator/src/raw.rs
  13. +11
    -0
      common/Cargo.toml
  14. +91
    -0
      common/src/descriptor/mod.rs
  15. +137
    -0
      common/src/descriptor/visualize.rs
  16. +21
    -0
      common/src/lib.rs
  17. +23
    -0
      coordinator/Cargo.toml
  18. +34
    -0
      coordinator/README.md
  19. +52
    -0
      coordinator/examples/dataflow-example.yml
  20. +49
    -0
      coordinator/examples/example_sink_logger.rs
  21. +19
    -0
      coordinator/examples/example_source_timer.rs
  22. +44
    -0
      coordinator/examples/mini-dataflow.yml
  23. +32
    -0
      coordinator/examples/random_number.rs
  24. +50
    -0
      coordinator/examples/rate_limit.rs
  25. +193
    -0
      coordinator/src/main.rs
  26. +0
    -30
      examples/dataflow-example.yml
  27. +22
    -0
      runtime/Cargo.toml
  28. +12
    -0
      runtime/examples/example-operator/Cargo.toml
  29. +40
    -0
      runtime/examples/example-operator/src/lib.rs
  30. +273
    -0
      runtime/src/main.rs
  31. +68
    -0
      runtime/src/operator/mod.rs
  32. +196
    -0
      runtime/src/operator/shared_lib.rs
  33. +0
    -92
      src/descriptor.rs
  34. +0
    -1
      src/lib.rs
  35. +2
    -19
      src/main.rs
  36. +1
    -1
      src/zenoh_client/mod.rs
  37. +10
    -0
      zenoh-logger/Cargo.toml
  38. +11
    -0
      zenoh-logger/src/main.rs

+ 5
- 5
.github/workflows/ci.yml View File

@@ -12,11 +12,11 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: "Check"
run: cargo check
run: cargo check --all
- name: "Build"
run: cargo build
run: cargo build --all
- name: "Test"
run: cargo test
run: cargo test --all

clippy:
name: "Clippy"
@@ -24,7 +24,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: "Clippy"
run: cargo clippy
run: cargo clippy --all

rustfmt:
name: "Formatting"
@@ -32,4 +32,4 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: "rustfmt"
run: cargo fmt -- --check
run: cargo fmt --all -- --check

+ 260
- 15
Cargo.lock View File

@@ -371,12 +371,51 @@ dependencies = [
"ansi_term",
"atty",
"bitflags",
"strsim",
"textwrap",
"strsim 0.8.0",
"textwrap 0.11.0",
"unicode-width",
"vec_map",
]

[[package]]
name = "clap"
version = "3.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"indexmap",
"lazy_static",
"strsim 0.10.0",
"termcolor",
"textwrap 0.15.0",
]

[[package]]
name = "clap_derive"
version = "3.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3aab4734e083b809aaf5794e14e756d1c798d2c69c7f7de7a09a2f5214993c1"
dependencies = [
"heck 0.4.0",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "clap_lex"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669"
dependencies = [
"os_str_bytes",
]

[[package]]
name = "concurrent-queue"
version = "1.2.2"
@@ -542,14 +581,83 @@ dependencies = [
"winapi",
]

[[package]]
name = "dora-common"
version = "0.1.0"
dependencies = [
"dora-node-api",
"eyre",
"serde",
]

[[package]]
name = "dora-coordinator"
version = "0.1.0"
dependencies = [
"bincode",
"clap 3.1.12",
"dora-common",
"dora-node-api",
"eyre",
"futures",
"futures-concurrency",
"rand",
"serde",
"serde_yaml",
"time",
"tokio",
"tokio-stream",
"tokio-util",
"uuid",
]

[[package]]
name = "dora-node-api"
version = "0.1.0"
dependencies = [
"async-trait",
"eyre",
"futures",
"futures-concurrency",
"futures-time",
"serde",
"serde_yaml",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"uuid",
"zenoh",
"zenoh-config",
]

[[package]]
name = "dora-operator-api"
version = "0.1.0"
dependencies = [
"dora-operator-api-macros",
]

[[package]]
name = "dora-operator-api-macros"
version = "0.1.0"
dependencies = [
"dora-operator-api",
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "dora-rs"
version = "0.1.0"
dependencies = [
"bincode",
"env_logger",
"envy",
"eyre",
"futures",
"futures-concurrency",
"log",
"pyo3",
"rayon",
@@ -557,7 +665,29 @@ dependencies = [
"serde_yaml",
"structopt",
"tokio",
"tokio-stream",
"tokio-util",
"zenoh",
]

[[package]]
name = "dora-runtime"
version = "0.1.0"
dependencies = [
"clap 3.1.12",
"dora-common",
"dora-node-api",
"eyre",
"fern",
"futures",
"futures-concurrency",
"libloading",
"log",
"serde_yaml",
"tokio",
"tokio-stream",
"zenoh",
"zenoh-config",
]

[[package]]
@@ -594,11 +724,18 @@ version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"

[[package]]
name = "example-operator"
version = "0.1.0"
dependencies = [
"dora-operator-api",
]

[[package]]
name = "eyre"
version = "0.6.7"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9289ed2c0440a6536e65119725cf91fc2c6b5e513bfd2e36e1134d7cca6ca12f"
checksum = "4c2b6b5a29c02cdc822728b7d7b8ae1bab3e3b05d44522770ddd49722eeac7eb"
dependencies = [
"indenter",
"once_cell",
@@ -619,6 +756,15 @@ dependencies = [
"instant",
]

[[package]]
name = "fern"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bdd7b0849075e79ee9a1836df22c717d1eba30451796fdc631b04565dd11e2a"
dependencies = [
"log",
]

[[package]]
name = "fixedbitset"
version = "0.4.1"
@@ -663,6 +809,17 @@ dependencies = [
"futures-sink",
]

[[package]]
name = "futures-concurrency"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48e98b7b5aedee7c34a5cfb1ee1681af8faf46e2f30c0b8af5ea08eba517d61c"
dependencies = [
"async-trait",
"futures-core",
"pin-project",
]

[[package]]
name = "futures-core"
version = "0.3.21"
@@ -724,6 +881,18 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"

[[package]]
name = "futures-time"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "553673e17c187f65e79ed63a9e58b148560fd5982e1d739f37913f320139edb0"
dependencies = [
"async-channel",
"async-io",
"futures-core",
"pin-project-lite",
]

[[package]]
name = "futures-util"
version = "0.3.21"
@@ -838,6 +1007,12 @@ dependencies = [
"unicode-segmentation",
]

[[package]]
name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"

[[package]]
name = "hermit-abi"
version = "0.1.19"
@@ -1010,9 +1185,9 @@ dependencies = [

[[package]]
name = "log"
version = "0.4.16"
version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if",
"value-bag",
@@ -1178,11 +1353,20 @@ dependencies = [
"libc",
]

[[package]]
name = "num_threads"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aba1801fb138d8e85e11d0fc70baf4fe1cdfffda7c6cd34a854905df588e5ed0"
dependencies = [
"libc",
]

[[package]]
name = "once_cell"
version = "1.10.0"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225"

[[package]]
name = "opaque-debug"
@@ -1211,6 +1395,12 @@ dependencies = [
"num-traits",
]

[[package]]
name = "os_str_bytes"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"

[[package]]
name = "parking"
version = "2.0.0"
@@ -2091,13 +2281,19 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"

[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"

[[package]]
name = "structopt"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10"
dependencies = [
"clap",
"clap 2.34.0",
"lazy_static",
"structopt-derive",
]
@@ -2108,7 +2304,7 @@ version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0"
dependencies = [
"heck",
"heck 0.3.3",
"proc-macro-error",
"proc-macro2",
"quote",
@@ -2162,6 +2358,12 @@ dependencies = [
"unicode-width",
]

[[package]]
name = "textwrap"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"

[[package]]
name = "thiserror"
version = "1.0.30"
@@ -2182,6 +2384,16 @@ dependencies = [
"syn",
]

[[package]]
name = "time"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd"
dependencies = [
"libc",
"num_threads",
]

[[package]]
name = "tinyvec"
version = "1.5.1"
@@ -2228,11 +2440,36 @@ dependencies = [
"syn",
]

[[package]]
name = "tokio-stream"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]

[[package]]
name = "tokio-util"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
"tracing",
]

[[package]]
name = "tracing"
version = "0.1.32"
version = "0.1.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f"
checksum = "80b9fa4360528139bc96100c160b7ae879f5567f49f1782b0b02035b0358ebf3"
dependencies = [
"cfg-if",
"pin-project-lite",
@@ -2362,9 +2599,9 @@ dependencies = [

[[package]]
name = "value-bag"
version = "1.0.0-alpha.8"
version = "1.0.0-alpha.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79923f7731dc61ebfba3633098bf3ac533bbd35ccd8c57e7088d9a5eebe0263f"
checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55"
dependencies = [
"ctor",
"version_check",
@@ -2831,6 +3068,14 @@ dependencies = [
"zenoh-sync",
]

[[package]]
name = "zenoh-logger"
version = "0.1.0"
dependencies = [
"zenoh",
"zenoh-config",
]

[[package]]
name = "zenoh-macros"
version = "0.6.0-dev.0"
@@ -2926,7 +3171,7 @@ version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-std",
"clap",
"clap 2.34.0",
"futures",
"hex",
"home",


+ 20
- 4
Cargo.toml View File

@@ -5,16 +5,32 @@ edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[workspace]
members = [
"api/node",
"api/operator",
"api/operator/macros",
"coordinator",
"common",
"runtime",
"runtime/examples/example-operator",
"zenoh-logger",
]

[dependencies]
eyre = "0.6.7"
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.8.23"
structopt = "0.3.26"
zenoh = { git="https://github.com/eclipse-zenoh/zenoh.git" }
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
env_logger = "0.9.0"
tokio = { version="1.17.0", features=["full"]}
tokio = { version = "1.17.0", features = ["full"] }
pyo3 = "0.16.1"
futures = "0.3.12"
futures = "0.3.21"
envy = "0.4.2"
log = "0.4.16"
tokio-stream = { version = "0.1.8", features = ["net"] }
rayon = "1.5.1"
log = "0.4.16"
futures-concurrency = "2.0.3"
tokio-util = { version = "0.7.0", features = ["codec"] }
bincode = "1.3.3"

+ 24
- 0
api/node/Cargo.toml View File

@@ -0,0 +1,24 @@
[package]
name = "dora-node-api"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.53"
eyre = "0.6.7"
futures = "0.3.21"
futures-concurrency = "2.0.3"
futures-time = "1.0.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.8.23"
thiserror = "1.0.30"
tokio-stream = "0.1.8"
tracing = "0.1.33"
uuid = "0.8.2"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" }

[dev-dependencies]
tokio = { version = "1.17.0", features = ["rt"] }

+ 108
- 0
api/node/src/communication.rs View File

@@ -0,0 +1,108 @@
use async_trait::async_trait;
use eyre::Context;
use futures::StreamExt;
use futures_time::future::FutureExt;
use std::pin::Pin;
use zenoh::{
prelude::{Priority, SplitBuffer, ZFuture},
publication::CongestionControl,
};

use crate::{config::CommunicationConfig, BoxError};

pub async fn init(
communication_config: &CommunicationConfig,
) -> eyre::Result<Box<dyn CommunicationLayer>> {
match communication_config {
CommunicationConfig::Zenoh {
config: zenoh_config,
prefix: zenoh_prefix,
} => {
let zenoh = zenoh::open(zenoh_config.clone())
.await
.map_err(BoxError)
.wrap_err("failed to create zenoh session")?;
let layer = ZenohCommunicationLayer {
zenoh,
topic_prefix: zenoh_prefix.clone(),
};
Ok(Box::new(layer))
}
}
}

#[async_trait]
pub trait CommunicationLayer {
async fn subscribe<'a>(
&'a self,
topic: &str,
) -> Result<Pin<Box<dyn futures::Stream<Item = Vec<u8>> + 'a>>, BoxError>;

async fn publish(&self, topic: &str, data: &[u8]) -> Result<(), BoxError>;

fn publish_sync(&self, topic: &str, data: &[u8]) -> Result<(), BoxError>;

async fn close(self: Box<Self>) -> Result<(), BoxError>;
}

struct ZenohCommunicationLayer {
zenoh: zenoh::Session,
topic_prefix: String,
}

impl ZenohCommunicationLayer {
fn prefixed(&self, topic: &str) -> String {
format!("{}/{topic}", self.topic_prefix)
}
}

#[async_trait]
impl CommunicationLayer for ZenohCommunicationLayer {
async fn subscribe<'a>(
&'a self,
topic: &str,
) -> Result<Pin<Box<dyn futures::Stream<Item = Vec<u8>> + 'a>>, BoxError> {
zenoh::Session::subscribe(&self.zenoh, self.prefixed(topic))
.reliable()
.await
.map(|s| {
let trait_object: Pin<Box<dyn futures::Stream<Item = Vec<u8>> + 'a>> =
Box::pin(s.map(|s| s.value.payload.contiguous().into_owned()));
trait_object
})
.map_err(BoxError)
}

async fn publish(&self, topic: &str, data: &[u8]) -> Result<(), BoxError> {
let writer = self
.zenoh
.put(self.prefixed(topic), data)
.congestion_control(CongestionControl::Block)
.priority(Priority::RealTime);

let result = writer.await.map_err(BoxError);
result
}

fn publish_sync(&self, topic: &str, data: &[u8]) -> Result<(), BoxError> {
let writer = self
.zenoh
.put(self.prefixed(topic), data)
.congestion_control(CongestionControl::Block)
.priority(Priority::RealTime);

writer.wait().map_err(BoxError)
}

async fn close(self: Box<Self>) -> Result<(), BoxError> {
zenoh::Session::close(self.zenoh)
// wait a bit before closing to ensure that remaining published
// messages are sent out
//
// TODO: create a minimal example to reproduce the dropped messages
// and report this issue in the zenoh repo
.delay(futures_time::time::Duration::from_secs_f32(0.5))
.await
.map_err(BoxError)
}
}

+ 151
- 0
api/node/src/config.rs View File

@@ -0,0 +1,151 @@
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, BTreeSet},
convert::Infallible,
fmt::Write as _,
str::FromStr,
};

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct NodeRunConfig {
#[serde(default)]
pub inputs: BTreeMap<DataId, InputMapping>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct NodeId(String);

impl FromStr for NodeId {
type Err = Infallible;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(s.to_owned()))
}
}

impl From<String> for NodeId {
fn from(id: String) -> Self {
Self(id)
}
}

impl std::fmt::Display for NodeId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct OperatorId(String);

impl FromStr for OperatorId {
type Err = Infallible;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(s.to_owned()))
}
}

impl From<String> for OperatorId {
fn from(id: String) -> Self {
Self(id)
}
}

impl std::fmt::Display for OperatorId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct DataId(String);

impl From<String> for DataId {
fn from(id: String) -> Self {
Self(id)
}
}

impl std::fmt::Display for DataId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}

impl std::ops::Deref for DataId {
type Target = String;

fn deref(&self) -> &Self::Target {
&self.0
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InputMapping {
pub source: NodeId,
pub operator: Option<OperatorId>,
pub output: DataId,
}

impl Serialize for InputMapping {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if let Some(operator) = &self.operator {
serializer.collect_str(&format_args!("{}/{operator}/{}", self.source, self.output))
} else {
serializer.collect_str(&format_args!("{}/{}", self.source, self.output))
}
}
}

impl<'de> Deserialize<'de> for InputMapping {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let string = String::deserialize(deserializer)?;
let (source, rest) = string
.split_once('/')
.ok_or_else(|| serde::de::Error::custom("input must start with `<source>/`"))?;

let (operator, output) = rest
.split_once('/')
.map(|(op, out)| (Some(op), out))
.unwrap_or((None, rest));

Ok(Self {
source: source.to_owned().into(),
operator: operator.map(|o| o.to_owned().into()),
output: output.to_owned().into(),
})
}
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub enum CommunicationConfig {
Zenoh {
#[serde(default)]
config: zenoh_config::Config,
prefix: String,
},
}

impl CommunicationConfig {
pub fn add_topic_prefix(&mut self, prefix: &str) {
match self {
CommunicationConfig::Zenoh {
prefix: zenoh_prefix,
..
} => {
write!(zenoh_prefix, "/{}", prefix).unwrap();
}
}
}
}

+ 200
- 0
api/node/src/lib.rs View File

@@ -0,0 +1,200 @@
use communication::CommunicationLayer;
use config::{CommunicationConfig, DataId, NodeId, NodeRunConfig};
use eyre::WrapErr;
use futures::{stream::FuturesUnordered, StreamExt};
use futures_concurrency::Merge;
use std::collections::HashSet;

pub mod communication;
pub mod config;

#[doc(hidden)]
pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped";

pub struct DoraNode {
id: NodeId,
node_config: NodeRunConfig,
communication: Box<dyn CommunicationLayer>,
}

impl DoraNode {
pub async fn init_from_env() -> eyre::Result<Self> {
let id = {
let raw =
std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};
let node_config = {
let raw = std::env::var("DORA_NODE_RUN_CONFIG")
.wrap_err("env variable DORA_NODE_RUN_CONFIG must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};
let communication_config = {
let raw = std::env::var("DORA_COMMUNICATION_CONFIG")
.wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize communication config")?
};
Self::init(id, node_config, communication_config).await
}

pub async fn init(
id: NodeId,
node_config: NodeRunConfig,
communication_config: CommunicationConfig,
) -> eyre::Result<Self> {
let communication = communication::init(&communication_config).await?;
Ok(Self {
id,
node_config,
communication,
})
}

pub async fn inputs(&self) -> eyre::Result<impl futures::Stream<Item = Input> + '_> {
let mut streams = Vec::new();
for (
input,
config::InputMapping {
source,
operator,
output,
},
) in &self.node_config.inputs
{
let topic = match operator {
Some(operator) => format!("{source}/{operator}/{output}"),
None => format!("{source}/{output}"),
};
let sub = self
.communication
.subscribe(&topic)
.await
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;
streams.push(sub.map(|data| Input {
id: input.clone(),
data,
}))
}

let stop_messages = FuturesUnordered::new();
let sources: HashSet<_> = self
.node_config
.inputs
.values()
.map(|v| (&v.source, &v.operator))
.collect();
for (source, operator) in &sources {
let topic = match operator {
Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"),
None => format!("{source}/{STOP_TOPIC}"),
};
let sub = self
.communication
.subscribe(&topic)
.await
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;
stop_messages.push(sub.into_future());
}
let finished = Box::pin(stop_messages.all(|_| async { true }));

Ok(streams.merge().take_until(finished))
}

pub async fn send_output(&self, output_id: &DataId, data: &[u8]) -> eyre::Result<()> {
if !self.node_config.outputs.contains(output_id) {
eyre::bail!("unknown output");
}

let self_id = &self.id;

let topic = format!("{self_id}/{output_id}");
self.communication
.publish(&topic, data)
.await
.wrap_err_with(|| format!("failed to send data for output {output_id}"))?;
Ok(())
}

pub fn id(&self) -> &NodeId {
&self.id
}

pub fn node_config(&self) -> &NodeRunConfig {
&self.node_config
}
}

impl Drop for DoraNode {
fn drop(&mut self) {
let self_id = &self.id;
let topic = format!("{self_id}/{STOP_TOPIC}");
let result = self
.communication
.publish_sync(&topic, &[])
.wrap_err_with(|| format!("failed to send stop message for source `{self_id}`"));
if let Err(err) = result {
tracing::error!("{err}")
}
}
}

pub struct Input {
pub id: DataId,
pub data: Vec<u8>,
}

pub struct BoxError(Box<dyn std::error::Error + Send + Sync + 'static>);

impl std::fmt::Debug for BoxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&self.0, f)
}
}

impl std::fmt::Display for BoxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}

impl std::error::Error for BoxError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}

#[cfg(test)]
mod tests {
use super::*;

fn run<F, O>(future: F) -> O
where
F: std::future::Future<Output = O>,
{
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(future)
}

#[test]
fn no_op_operator() {
let id = uuid::Uuid::new_v4().to_string().into();
let node_config = config::NodeRunConfig {
inputs: Default::default(),
outputs: Default::default(),
};
let communication_config = config::CommunicationConfig::Zenoh {
config: Default::default(),
prefix: format!("/{}", uuid::Uuid::new_v4()),
};

run(async {
let operator = DoraNode::init(id, node_config, communication_config)
.await
.unwrap();
let mut inputs = operator.inputs().await.unwrap();
assert!(inputs.next().await.is_none());
});
}
}

+ 9
- 0
api/operator/Cargo.toml View File

@@ -0,0 +1,9 @@
[package]
name = "dora-operator-api"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-operator-api-macros = { path = "macros" }

+ 17
- 0
api/operator/macros/Cargo.toml View File

@@ -0,0 +1,17 @@
[package]
name = "dora-operator-api-macros"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
proc-macro = true

[dependencies]
syn = { version = "1.0.81", features = ["full"] }
quote = "1.0.10"
proc-macro2 = "1.0.32"

[dev-dependencies]
dora-operator-api = { path = ".." }

+ 70
- 0
api/operator/macros/src/lib.rs View File

@@ -0,0 +1,70 @@
use proc_macro::TokenStream;
use proc_macro2::TokenStream as TokenStream2;
use quote::quote;

extern crate proc_macro;

#[proc_macro]
pub fn register_operator(item: TokenStream) -> TokenStream {
// convert from `TokenStream` to `TokenStream2`, which is used by the
// `syn` crate
let item = TokenStream2::from(item);
// generate the dora wrapper functions
let generated = register_operator_impl(&item).unwrap_or_else(|err| err.to_compile_error());
// output the generated functions
let tokens = quote! {
#generated
};
// convert the type back from `TokenStream2` to `TokenStream`
tokens.into()
}

/// Generates the wrapper functions for the annotated function.
fn register_operator_impl(item: &TokenStream2) -> syn::Result<TokenStream2> {
// parse the type given to the `register_operator` macro
let operator_ty: syn::TypePath = syn::parse2(item.clone())
.map_err(|e| syn::Error::new(e.span(), "expected type as argument"))?;

let init = quote! {
#[no_mangle]
pub unsafe extern "C" fn dora_init_operator(operator_context: *mut *mut std::ffi::c_void) -> isize {
dora_operator_api::raw::dora_init_operator::<#operator_ty>(operator_context)
}
};

let drop = quote! {
#[no_mangle]
pub unsafe extern "C" fn dora_drop_operator(operator_context: *mut std::ffi::c_void) {
dora_operator_api::raw::dora_drop_operator::<#operator_ty>(operator_context)
}
};

let on_input = quote! {
#[no_mangle]
pub unsafe extern "C" fn dora_on_input(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
output_fn_raw: dora_operator_api::raw::OutputFnRaw,
output_context: *const std::ffi::c_void,
operator_context: *mut std::ffi::c_void,
) -> isize {
dora_operator_api::raw::dora_on_input::<#operator_ty>(
id_start,
id_len,
data_start,
data_len,
output_fn_raw,
output_context,
operator_context,
)
}
};

Ok(quote! {
#init
#drop
#on_input
})
}

+ 42
- 0
api/operator/src/lib.rs View File

@@ -0,0 +1,42 @@
#![warn(unsafe_op_in_unsafe_fn)]
#![allow(clippy::missing_safety_doc)]

pub use dora_operator_api_macros::register_operator;
use raw::OutputFnRaw;
use std::ffi::c_void;

pub mod raw;

pub trait DoraOperator: Default {
#[allow(clippy::result_unit_err)] // we use a () error type only for testing
fn on_input(
&mut self,
id: &str,
data: &[u8],
output_sender: &mut DoraOutputSender,
) -> Result<(), ()>;
}

pub struct DoraOutputSender {
output_fn_raw: OutputFnRaw,
output_context: *const c_void,
}

impl DoraOutputSender {
pub fn send(&mut self, id: &str, data: &[u8]) -> Result<(), isize> {
println!("operator sending output..");
let result = unsafe {
(self.output_fn_raw)(
id.as_ptr(),
id.len(),
data.as_ptr(),
data.len(),
self.output_context,
)
};
match result {
0 => Ok(()),
other => Err(other),
}
}
}

+ 51
- 0
api/operator/src/raw.rs View File

@@ -0,0 +1,51 @@
use std::{ffi::c_void, slice};

use crate::{DoraOperator, DoraOutputSender};

pub type OutputFnRaw = unsafe extern "C" fn(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
output_context: *const c_void,
) -> isize;

pub unsafe fn dora_init_operator<O: DoraOperator>(operator_context: *mut *mut c_void) -> isize {
let operator: O = Default::default();
let ptr: *mut O = Box::leak(Box::new(operator));
let type_erased: *mut c_void = ptr.cast();
unsafe { *operator_context = type_erased };
0
}

pub unsafe fn dora_drop_operator<O>(operator_context: *mut c_void) {
let raw: *mut O = operator_context.cast();
unsafe { Box::from_raw(raw) };
}

pub unsafe fn dora_on_input<O: DoraOperator>(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
output_fn_raw: OutputFnRaw,
output_context: *const c_void,
operator_context: *mut c_void,
) -> isize {
let id = match std::str::from_utf8(unsafe { slice::from_raw_parts(id_start, id_len) }) {
Ok(id) => id,
Err(_) => return -1,
};
let data = unsafe { slice::from_raw_parts(data_start, data_len) };
let mut output_sender = DoraOutputSender {
output_fn_raw,
output_context,
};

let operator: &mut O = unsafe { &mut *operator_context.cast() };

match operator.on_input(id, data, &mut output_sender) {
Ok(()) => 0,
Err(_) => -1,
}
}

+ 11
- 0
common/Cargo.toml View File

@@ -0,0 +1,11 @@
[package]
name = "dora-common"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { version = "0.1.0", path = "../api/node" }
eyre = "0.6.8"
serde = { version = "1.0.136", features = ["derive"] }

+ 91
- 0
common/src/descriptor/mod.rs View File

@@ -0,0 +1,91 @@
use dora_node_api::config::{
CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId,
};
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, BTreeSet},
path::PathBuf,
};

mod visualize;

#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Descriptor {
pub communication: CommunicationConfig,
pub nodes: Vec<Node>,
}

impl Descriptor {
pub fn visualize_as_mermaid(&self) -> eyre::Result<String> {
let flowchart = visualize::visualize_nodes(&self.nodes);

Ok(flowchart)
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Node {
pub id: NodeId,
pub name: Option<String>,
pub description: Option<String>,

#[serde(flatten)]
pub kind: NodeKind,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum NodeKind {
/// Dora runtime node
#[serde(rename = "operators")]
Runtime(RuntimeNode),
Custom(CustomNode),
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct RuntimeNode {
pub operators: Vec<OperatorConfig>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct OperatorConfig {
pub id: OperatorId,
pub name: Option<String>,
pub description: Option<String>,

#[serde(default)]
pub inputs: BTreeMap<DataId, InputMapping>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,

#[serde(flatten)]
pub source: OperatorSource,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub enum OperatorSource {
SharedLibrary(PathBuf),
Python(PathBuf),
Wasm(PathBuf),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CustomNode {
pub run: String,
pub env: Option<BTreeMap<String, EnvValue>>,
pub working_directory: Option<BTreeMap<String, EnvValue>>,

#[serde(flatten)]
pub run_config: NodeRunConfig,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum EnvValue {
Bool(bool),
Integer(u64),
String(String),
}

+ 137
- 0
common/src/descriptor/visualize.rs View File

@@ -0,0 +1,137 @@
use super::{CustomNode, Node, NodeKind, OperatorConfig, RuntimeNode};
use dora_node_api::config::{DataId, InputMapping, NodeId};
use std::{
collections::{BTreeMap, HashMap},
fmt::Write as _,
};

pub fn visualize_nodes(nodes: &[Node]) -> String {
let mut flowchart = "flowchart TB\n".to_owned();
let mut all_nodes = HashMap::new();

for node in nodes {
visualize_node(node, &mut flowchart);
all_nodes.insert(&node.id, node);
}

for node in nodes {
visualize_node_inputs(node, &mut flowchart, &all_nodes)
}

flowchart
}

fn visualize_node(node: &Node, flowchart: &mut String) {
let node_id = &node.id;
match &node.kind {
NodeKind::Custom(node) => visualize_custom_node(node_id, node, flowchart),
NodeKind::Runtime(RuntimeNode { operators }) => {
visualize_runtime_node(node_id, operators, flowchart)
}
}
}

fn visualize_custom_node(node_id: &NodeId, node: &CustomNode, flowchart: &mut String) {
if node.run_config.inputs.is_empty() {
// source node
writeln!(flowchart, " {node_id}[\\{node_id}/]").unwrap();
} else if node.run_config.outputs.is_empty() {
// sink node
writeln!(flowchart, " {node_id}[/{node_id}\\]").unwrap();
} else {
// normal node
writeln!(flowchart, " {node_id}").unwrap();
}
}

fn visualize_runtime_node(node_id: &NodeId, operators: &[OperatorConfig], flowchart: &mut String) {
writeln!(flowchart, "subgraph {node_id}").unwrap();
for operator in operators {
let operator_id = &operator.id;
if operator.inputs.is_empty() {
// source operator
writeln!(flowchart, " {node_id}/{operator_id}[\\{operator_id}/]").unwrap();
} else if operator.outputs.is_empty() {
// sink operator
writeln!(flowchart, " {node_id}/{operator_id}[/{operator_id}\\]").unwrap();
} else {
// normal operator
writeln!(flowchart, " {node_id}/{operator_id}[{operator_id}]").unwrap();
}
}

flowchart.push_str("end\n");
}

fn visualize_node_inputs(node: &Node, flowchart: &mut String, nodes: &HashMap<&NodeId, &Node>) {
let node_id = &node.id;
match &node.kind {
NodeKind::Custom(node) => visualize_inputs(
&node_id.to_string(),
&node.run_config.inputs,
flowchart,
nodes,
),
NodeKind::Runtime(RuntimeNode { operators }) => {
for operator in operators {
visualize_inputs(
&format!("{node_id}/{}", operator.id),
&operator.inputs,
flowchart,
nodes,
)
}
}
}
}

fn visualize_inputs(
target: &str,
inputs: &BTreeMap<DataId, InputMapping>,
flowchart: &mut String,
nodes: &HashMap<&NodeId, &Node>,
) {
for (input_id, mapping) in inputs {
let InputMapping {
source,
operator,
output,
} = mapping;

let mut source_found = false;
if let Some(source_node) = nodes.get(source) {
match (&source_node.kind, operator) {
(NodeKind::Custom(custom_node), None) => {
if custom_node.run_config.outputs.contains(output) {
let data = if output == input_id {
format!("{output}")
} else {
format!("{output} as {input_id}")
};
writeln!(flowchart, " {source} -- {data} --> {target}").unwrap();
source_found = true;
}
}
(NodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => {
if let Some(operator) = operators.iter().find(|o| &o.id == operator_id) {
if operator.outputs.contains(output) {
let data = if output == input_id {
format!("{output}")
} else {
format!("{output} as {input_id}")
};
writeln!(flowchart, " {source}/{operator_id} -- {data} --> {target}")
.unwrap();
source_found = true;
}
}
}
(NodeKind::Custom(_), Some(_)) | (NodeKind::Runtime(_), None) => {}
}
}

if !source_found {
writeln!(flowchart, " missing>missing] -- {input_id} --> {target}").unwrap();
}
}
}

+ 21
- 0
common/src/lib.rs View File

@@ -0,0 +1,21 @@
pub mod descriptor;

pub struct BoxError(pub Box<dyn std::error::Error + Send + Sync + 'static>);

impl std::fmt::Debug for BoxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&self.0, f)
}
}

impl std::fmt::Display for BoxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}

impl std::error::Error for BoxError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}

+ 23
- 0
coordinator/Cargo.toml View File

@@ -0,0 +1,23 @@
[package]
name = "dora-coordinator"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bincode = "1.3.3"
dora-node-api = { path = "../api/node" }
eyre = "0.6.7"
futures = "0.3.21"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.8.23"
tokio = { version = "1.17.0", features = ["full"] }
tokio-stream = { version = "0.1.8", features = ["io-util"] }
tokio-util = { version = "0.7.1", features = ["codec"] }
clap = { version = "3.1.8", features = ["derive"] }
uuid = "0.8.2"
time = "0.3.9"
futures-concurrency = "2.0.3"
rand = "0.8.5"
dora-common = { version = "0.1.0", path = "../common" }

+ 34
- 0
coordinator/README.md View File

@@ -0,0 +1,34 @@
# Coordinator

Prototype for a process/library-based dora-rs implementation, instead of framework-based. The idea is that each operator is compiled as a separate executable. The `dora-coordinator` runtime is responsible for reading the dataflow descriptor file and launching the operators accordingly. The operators use a common library called `dora-api`, which implements the communication layer based on zenoh.

This approach has the following advantages:

- Less overhead
- No data transfer between a runtime and the operator
- The compiler can inline and optimize the full process
- More flexibility
- Operators can be sync or async
- They can decide how many threads and which execution model they use
- The OS ensures fair share of resources (e.g. CPU time) -> no need to cooperate with other operators
- Operators get all inputs immediately -> no need for input rules
- Keeping local state is easily possible
- Separate address spaces
- The operators are isolated from each other.

There are drawbacks too, for example:

- Less control
- Processes run independently -> need to cooperate with the runtime, e.g. on stop signals
- Operator migration is more difficult
- Operators are always isolated
- No way of using in-memory channels
- Local sockets and shared memory should be still possible

## Try it out

- Compile the `examples` using `cargo build -p dora-rs --examples`
- Run the `mini-dataflow` example using `cargo run -- run examples/mini-dataflow.yml`
- This spawns a `timer` source, which sends the current time periodically, and a `logger` sink, which prints the incoming data.
- The `timer` will exit after 100 iterations. The `logger` will then exit with a timeout error.


+ 52
- 0
coordinator/examples/dataflow-example.yml View File

@@ -0,0 +1,52 @@
communication:
zenoh_prefix: /foo

nodes:
- id: timer
custom:
run: non-existent
outputs:
- time
- id: camera
custom:
run: non-existent
outputs:
- image

- id: process
operators:
- id: operator-1
inputs:
image: camera/image
outputs:
- processed
shared_library: non-existent
- id: operator-2
inputs:
processed: process/operator-1/processed
time: timer/time
outputs:
- processed
python: non-existent

- id: timestamp
custom:
run: non-existent
inputs:
timestamp: timer/time
pic: camera/image
outputs:
- image-with-timestamp

- id: printer
custom:
run: non-existent
inputs:
data: timestamp/image-with-timestamp
processed: process/operator-2/processed
- id: logger
custom:
run: non-existent
inputs:
data: timestamp/image-with-timestamp
time: timer/time

+ 49
- 0
coordinator/examples/example_sink_logger.rs View File

@@ -0,0 +1,49 @@
use dora_node_api::{self, DoraNode};
use eyre::bail;
use futures::StreamExt;
use std::time::Duration;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let operator = DoraNode::init_from_env().await?;

let mut inputs = operator.inputs().await?;

let mut last_timestamp = None;

loop {
let timeout = Duration::from_secs(2);
let input = match tokio::time::timeout(timeout, inputs.next()).await {
Ok(Some(input)) => input,
Ok(None) => break,
Err(_) => bail!("timeout while waiting for input"),
};

match input.id.as_str() {
"time" => {
// only record it, but don't print anything
last_timestamp = Some(String::from_utf8_lossy(&input.data).into_owned());
}
"random" => {
let number = match input.data.try_into() {
Ok(bytes) => u64::from_le_bytes(bytes),
Err(_) => {
eprintln!("Malformed `random` message");
continue;
}
};
if let Some(timestamp) = &last_timestamp {
println!("random at {}: {}", timestamp, number);
}
}
"timestamped-random" => {
let data = String::from_utf8(input.data)?;
println!("received timestamped random value: {data}");
}

other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

Ok(())
}

+ 19
- 0
coordinator/examples/example_source_timer.rs View File

@@ -0,0 +1,19 @@
use dora_node_api::{self, config::DataId, DoraNode};
use std::time::Duration;
use time::OffsetDateTime;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let operator = DoraNode::init_from_env().await?;

let mut interval = tokio::time::interval(Duration::from_millis(20));

let time_output = DataId::from("time".to_owned());
for _ in 0..400 {
interval.tick().await;
let now = OffsetDateTime::now_utc().to_string();
operator.send_output(&time_output, now.as_bytes()).await?;
}

Ok(())
}

+ 44
- 0
coordinator/examples/mini-dataflow.yml View File

@@ -0,0 +1,44 @@
communication:
zenoh:
prefix: /foo

nodes:
- id: timer
custom:
run: ../target/debug/examples/example_source_timer
outputs:
- time

- id: rate-limited-timer
custom:
run: ../target/debug/examples/rate_limit --seconds 0.5
inputs:
data: timer/time
outputs:
- rate_limited

- id: random
custom:
run: ../target/debug/examples/random_number
inputs:
timestamp: rate-limited-timer/rate_limited
outputs:
- number

- id: logger
custom:
run: ../target/debug/examples/example_sink_logger
inputs:
random: random/number
time: timer/time
timestamped-random: runtime-node/op-1/timestamped-random

- id: runtime-node
operators:
- id: op-1
shared-library: ../target/debug/libexample_operator.so
inputs:
random: random/number
time: timer/time
outputs:
- timestamped-random

+ 32
- 0
coordinator/examples/random_number.rs View File

@@ -0,0 +1,32 @@
use dora_node_api::{self, config::DataId, DoraNode};
use eyre::bail;
use futures::StreamExt;
use std::time::Duration;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let output = DataId::from("number".to_owned());

let operator = DoraNode::init_from_env().await?;

let mut inputs = operator.inputs().await?;

loop {
let timeout = Duration::from_secs(3);
let input = match tokio::time::timeout(timeout, inputs.next()).await {
Ok(Some(input)) => input,
Ok(None) => break,
Err(_) => bail!("timeout while waiting for input"),
};

match input.id.as_str() {
"timestamp" => {
let random: u64 = rand::random();
operator.send_output(&output, &random.to_le_bytes()).await?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

Ok(())
}

+ 50
- 0
coordinator/examples/rate_limit.rs View File

@@ -0,0 +1,50 @@
use clap::StructOpt;
use dora_node_api::{self, config::DataId, DoraNode};
use eyre::bail;
use futures::StreamExt;
use std::time::{Duration, Instant};

#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Limit the rate of incoming data")]
struct Args {
/// Minimal interval between two subsequent.
///
/// Intermediate messages are ignored.
#[clap(long)]
seconds: f32,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let args = Args::parse();
let min_interval = Duration::from_secs_f32(args.seconds);
let output = DataId::from("rate_limited".to_owned());

let operator = DoraNode::init_from_env().await?;

let mut inputs = operator.inputs().await?;

let mut last_message = Instant::now();

loop {
let timeout = Duration::from_secs(3);
let input = match tokio::time::timeout(timeout, inputs.next()).await {
Ok(Some(input)) => input,
Ok(None) => break,
Err(_) => bail!("timeout while waiting for input"),
};

match input.id.as_str() {
"data" => {
let elapsed = last_message.elapsed();
if elapsed > min_interval {
last_message += elapsed;
operator.send_output(&output, &input.data).await?;
}
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

Ok(())
}

+ 193
- 0
coordinator/src/main.rs View File

@@ -0,0 +1,193 @@
use dora_common::descriptor::{self, Descriptor, NodeKind};
use dora_node_api::config::NodeId;
use eyre::{bail, eyre, WrapErr};
use futures::{stream::FuturesUnordered, StreamExt};
use std::path::{Path, PathBuf};

#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Dora coordinator")]
enum Command {
#[clap(about = "Print Graph")]
Visualize { dataflow: PathBuf },
#[clap(about = "Run dataflow pipeline")]
Run {
dataflow: PathBuf,
runtime: Option<PathBuf>,
},
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let command = clap::Parser::parse();
match command {
Command::Visualize { dataflow: file } => {
let descriptor = read_descriptor(&file).await?;
let visualized = descriptor
.visualize_as_mermaid()
.context("failed to visualize descriptor")?;
println!("{visualized}");
println!(
"Paste the above output on https://mermaid.live/ or in a \
```mermaid code block on GitHub to display it."
);
}
Command::Run { dataflow, runtime } => {
let runtime_path = runtime.unwrap_or_else(|| {
std::env::args()
.next()
.map(PathBuf::from)
.unwrap_or_default()
.with_file_name("dora-runtime")
});
run_dataflow(dataflow.clone(), &runtime_path)
.await
.wrap_err_with(|| format!("failed to run dataflow at {}", dataflow.display()))?
}
}

Ok(())
}

async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> {
let Descriptor {
mut communication,
nodes,
} = read_descriptor(&dataflow_path).await.wrap_err_with(|| {
format!(
"failed to read dataflow descriptor at {}",
dataflow_path.display()
)
})?;

if nodes.iter().any(|n| matches!(n.kind, NodeKind::Runtime(_))) && !runtime.is_file() {
bail!(
"There is no runtime at {}, or it is not a file",
runtime.display()
);
}

// add uuid as prefix to ensure isolation
communication.add_topic_prefix(&uuid::Uuid::new_v4().to_string());

let mut tasks = FuturesUnordered::new();
for node in nodes {
let node_id = node.id.clone();

match node.kind {
descriptor::NodeKind::Custom(node) => {
let result = spawn_custom_node(node_id.clone(), &node, &communication)
.wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?;
tasks.push(result);
}
descriptor::NodeKind::Runtime(node) => {
if !node.operators.is_empty() {
let result =
spawn_runtime_node(runtime, node_id.clone(), &node, &communication)
.wrap_err_with(|| format!("failed to spawn runtime node {node_id}"))?;
tasks.push(result);
}
}
}
}

while let Some(task_result) = tasks.next().await {
task_result
.wrap_err("failed to join async task")?
.wrap_err("custom node failed")?;
}

Ok(())
}

fn spawn_custom_node(
node_id: NodeId,
node: &descriptor::CustomNode,
communication: &dora_node_api::config::CommunicationConfig,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut args = node.run.split_ascii_whitespace();
let cmd = args
.next()
.ok_or_else(|| eyre!("`run` field must not be empty"))?;
let mut command = tokio::process::Command::new(cmd);
command.args(args);
command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_NODE_RUN_CONFIG",
serde_yaml::to_string(&node.run_config)
.wrap_err("failed to serialize custom node run config")?,
);
let mut child = command
.spawn()
.wrap_err_with(|| format!("failed to run command `{}`", &node.run))?;
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
println!("node {node_id} finished");
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!("node {node_id} failed with exit code: {code}"))
} else {
Err(eyre!("node {node_id} failed (unknown exit code)"))
}
});
Ok(result)
}

fn spawn_runtime_node(
runtime: &Path,
node_id: NodeId,
node: &descriptor::RuntimeNode,
communication: &dora_node_api::config::CommunicationConfig,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut command = tokio::process::Command::new(runtime);
command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_OPERATORS",
serde_yaml::to_string(&node.operators)
.wrap_err("failed to serialize custom node run config")?,
);

let mut child = command
.spawn()
.wrap_err_with(|| format!("failed to run runtime at `{}`", runtime.display()))?;
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
println!("runtime node {node_id} finished");
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!(
"runtime node {node_id} failed with exit code: {code}"
))
} else {
Err(eyre!("runtime node {node_id} failed (unknown exit code)"))
}
});
Ok(result)
}

fn command_init_common_env(
command: &mut tokio::process::Command,
node_id: &NodeId,
communication: &dora_node_api::config::CommunicationConfig,
) -> Result<(), eyre::Error> {
command.env(
"DORA_NODE_ID",
serde_yaml::to_string(&node_id).wrap_err("failed to serialize custom node ID")?,
);
command.env(
"DORA_COMMUNICATION_CONFIG",
serde_yaml::to_string(communication)
.wrap_err("failed to serialize communication config")?,
);
Ok(())
}

async fn read_descriptor(file: &Path) -> Result<Descriptor, eyre::Error> {
let descriptor_file = tokio::fs::read(file)
.await
.context("failed to open given file")?;
let descriptor: Descriptor =
serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?;
Ok(descriptor)
}

+ 0
- 30
examples/dataflow-example.yml View File

@@ -1,30 +0,0 @@
sinks:
- id: sink-1
input: A
- id: sink-2
input: B
sources:
- id: source-1
output: C
- id: source-2
output: G
operators:
- id: op-1
inputs:
- C
- E
- B
outputs:
- A
- id: op-2
inputs:
- C
- F
outputs:
- E
- id: op-3
inputs:
- C
- G
outputs:
- B

+ 22
- 0
runtime/Cargo.toml View File

@@ -0,0 +1,22 @@
[package]
name = "dora-runtime"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = { version = "3.1.12", features = ["derive"] }
dora-node-api = { path = "../api/node" }
dora-common = { version = "0.1.0", path = "../common" }
eyre = "0.6.8"
futures = "0.3.21"
futures-concurrency = "2.0.3"
libloading = "0.7.3"
serde_yaml = "0.8.23"
tokio = { version = "1.17.0", features = ["full"] }
tokio-stream = "0.1.8"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
log = "0.4.17"
fern = "0.6.1"

+ 12
- 0
runtime/examples/example-operator/Cargo.toml View File

@@ -0,0 +1,12 @@
[package]
name = "example-operator"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
crate-type = ["cdylib"]

[dependencies]
dora-operator-api = { path = "../../../api/operator" }

+ 40
- 0
runtime/examples/example-operator/src/lib.rs View File

@@ -0,0 +1,40 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
time: Option<String>,
}

impl DoraOperator for ExampleOperator {
fn on_input(
&mut self,
id: &str,
data: &[u8],
output_sender: &mut DoraOutputSender,
) -> Result<(), ()> {
match id {
"time" => {
let parsed = std::str::from_utf8(data).map_err(|_| ())?;
self.time = Some(parsed.to_owned());
}
"random" => {
let parsed = {
let data: [u8; 8] = data.try_into().map_err(|_| ())?;
u64::from_le_bytes(data)
};
if let Some(time) = &self.time {
let output = format!("state operator random value {parsed} at {time}");
output_sender
.send("timestamped-random", output.as_bytes())
.map_err(|_| ())?;
}
}
other => eprintln!("ignoring unexpected input {other}"),
}
Ok(())
}
}

+ 273
- 0
runtime/src/main.rs View File

@@ -0,0 +1,273 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_common::descriptor::OperatorConfig;
use dora_node_api::{
self,
communication::{self, CommunicationLayer},
config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId},
STOP_TOPIC,
};
use eyre::{bail, eyre, Context};
use futures::{
stream::{self, FuturesUnordered},
FutureExt, StreamExt,
};
use futures_concurrency::Merge;
use operator::{Operator, OperatorEvent};
use std::{collections::BTreeMap, mem};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamMap};

mod operator;

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_logger()?;

let node_id = {
let raw =
std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};
let communication_config: CommunicationConfig = {
let raw = std::env::var("DORA_COMMUNICATION_CONFIG")
.wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize communication config")?
};
let operators: Vec<OperatorConfig> = {
let raw =
std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};

let mut operator_map = BTreeMap::new();
let mut operator_events = StreamMap::new();
for operator_config in &operators {
let (events_tx, events) = mpsc::channel(1);
let operator = Operator::init(operator_config.clone(), events_tx.clone())
.await
.wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?;
operator_map.insert(&operator_config.id, operator);
operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events));
}

let communication: Box<dyn CommunicationLayer> =
communication::init(&communication_config).await?;

let inputs = subscribe(&operators, communication.as_ref())
.await
.context("failed to subscribe")?;

let input_events = inputs.map(Event::External);
let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event });
let mut events = (input_events, operator_events).merge();

while let Some(event) = events.next().await {
match event {
Event::External(event) => match event {
SubscribeEvent::Input(input) => {
let operator =
operator_map
.get_mut(&input.target_operator)
.ok_or_else(|| {
eyre!(
"received input for unexpected operator `{}`",
input.target_operator
)
})?;

operator
.handle_input(input.id.clone(), input.data)
.wrap_err_with(|| {
format!(
"operator {} failed to handle input {}",
input.target_operator, input.id
)
})?;
}
SubscribeEvent::InputsStopped { target_operator } => {
// --------------------------------------------------------
// TODO FIXME: For some reason, these zenoh publish calls
// (and also subsequent ones) are not visible to other
// nodes. This includes the stop command, so the input
// streams of dependent nodes are not closed properly.
// --------------------------------------------------------

communication
.publish("/HHH", &[])
.await
.wrap_err("failed to send on /HHH")?;
if operator_map.remove(&target_operator).is_some() {
println!("operator {node_id}/{target_operator} finished");
// send stopped message
publish(
&node_id,
target_operator.clone(),
STOP_TOPIC.to_owned().into(),
&[],
communication.as_ref(),
)
.await.with_context(|| {
format!("failed to send stop message for operator `{node_id}/{target_operator}`")
})?;
}

if operator_map.is_empty() {
break;
}
}
},
Event::Operator { id, event } => {
let operator = operator_map
.get(&id)
.ok_or_else(|| eyre!("received event from unknown operator {id}"))?;
match event {
OperatorEvent::Output { id: data_id, value } => {
if !operator.config().outputs.contains(&data_id) {
eyre::bail!("unknown output {data_id} for operator {id}");
}
publish(&node_id, id, data_id, &value, communication.as_ref())
.await
.context("failed to publish operator output")?;
}
OperatorEvent::Error(err) => {
bail!(err.wrap_err(format!("operator {id} failed")))
}
OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload),
}
}
}
}

mem::drop(events);

communication.close().await?;

Ok(())
}

async fn subscribe<'a>(
operators: &'a [OperatorConfig],
communication: &'a dyn CommunicationLayer,
) -> eyre::Result<impl futures::Stream<Item = SubscribeEvent> + 'a> {
let mut streams = Vec::new();

for operator in operators {
let events = subscribe_operator(operator, communication).await?;
streams.push(events);
}

Ok(streams.merge())
}

async fn subscribe_operator<'a>(
operator: &'a OperatorConfig,
communication: &'a dyn CommunicationLayer,
) -> Result<impl futures::Stream<Item = SubscribeEvent> + 'a, eyre::Error> {
let stop_messages = FuturesUnordered::new();
for input in operator.inputs.values() {
let InputMapping {
source, operator, ..
} = input;
let topic = match operator {
Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"),
None => format!("{source}/{STOP_TOPIC}"),
};
let sub = communication
.subscribe(&topic)
.await
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;
stop_messages.push(sub.into_future());
}
let finished = Box::pin(stop_messages.all(|_| async { true }).shared());

let mut streams = Vec::new();
for (input, mapping) in &operator.inputs {
let InputMapping {
source,
operator: source_operator,
output,
} = mapping;
let topic = match source_operator {
Some(operator) => format!("{source}/{operator}/{output}"),
None => format!("{source}/{output}"),
};
let sub = communication
.subscribe(&topic)
.await
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;
let stream = sub
.map(|data| OperatorInput {
target_operator: operator.id.clone(),
id: input.clone(),
data,
})
.map(SubscribeEvent::Input)
.take_until(finished.clone())
.chain(stream::once(async {
SubscribeEvent::InputsStopped {
target_operator: operator.id.clone(),
}
}));
streams.push(stream);
}

Ok(streams.merge())
}

async fn publish(
self_id: &NodeId,
operator_id: OperatorId,
output_id: DataId,
value: &[u8],
communication: &dyn CommunicationLayer,
) -> eyre::Result<()> {
let topic = format!("{self_id}/{operator_id}/{output_id}");
communication
.publish(&topic, value)
.await
.wrap_err_with(|| format!("failed to send data for output {output_id}"))?;

Ok(())
}

enum Event {
External(SubscribeEvent),
Operator {
id: OperatorId,
event: OperatorEvent,
},
}

enum SubscribeEvent {
/// New input for an operator
Input(OperatorInput),
/// All input streams for an operator are finished.
InputsStopped {
/// The operator whose inputs are all finished.
target_operator: OperatorId,
},
}

struct OperatorInput {
pub target_operator: OperatorId,
pub id: DataId,
pub data: Vec<u8>,
}

fn set_up_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
" [{}][{}] {}",
record.target(),
record.level(),
message
))
})
.level(log::LevelFilter::Debug)
.chain(std::io::stdout())
.chain(fern::log_file("runtime.log")?)
.apply()?;
Ok(())
}

+ 68
- 0
runtime/src/operator/mod.rs View File

@@ -0,0 +1,68 @@
use dora_common::descriptor::{OperatorConfig, OperatorSource};
use dora_node_api::config::DataId;
use eyre::{eyre, Context};
use std::any::Any;
use tokio::sync::mpsc::{self, Sender};

mod shared_lib;

pub struct Operator {
operator_task: Sender<OperatorInput>,
config: OperatorConfig,
}

impl Operator {
pub async fn init(
operator_config: OperatorConfig,
events_tx: Sender<OperatorEvent>,
) -> eyre::Result<Self> {
let (operator_task, operator_rx) = mpsc::channel(10);

match &operator_config.source {
OperatorSource::SharedLibrary(path) => {
shared_lib::spawn(path, events_tx, operator_rx).wrap_err_with(|| {
format!(
"failed ot spawn shared library operator for {}",
operator_config.id
)
})?;
}
OperatorSource::Python(path) => {
eprintln!("WARNING: Python operators are not supported yet");
}
OperatorSource::Wasm(path) => {
eprintln!("WARNING: WASM operators are not supported yet");
}
}
Ok(Self {
operator_task,
config: operator_config,
})
}

pub fn handle_input(&mut self, id: DataId, value: Vec<u8>) -> eyre::Result<()> {
self.operator_task
.try_send(OperatorInput { id, value })
.map_err(|err| match err {
tokio::sync::mpsc::error::TrySendError::Closed(_) => eyre!("operator crashed"),
tokio::sync::mpsc::error::TrySendError::Full(_) => eyre!("operator queue full"),
})
}

/// Get a reference to the operator's config.
#[must_use]
pub fn config(&self) -> &OperatorConfig {
&self.config
}
}

pub enum OperatorEvent {
Output { id: DataId, value: Vec<u8> },
Error(eyre::Error),
Panic(Box<dyn Any + Send>),
}

pub struct OperatorInput {
id: DataId,
value: Vec<u8>,
}

+ 196
- 0
runtime/src/operator/shared_lib.rs View File

@@ -0,0 +1,196 @@
use super::{OperatorEvent, OperatorInput};
use eyre::{bail, Context};
use libloading::Symbol;
use std::{
ffi::c_void,
panic::{catch_unwind, AssertUnwindSafe},
path::Path,
ptr, slice, thread,
};
use tokio::sync::mpsc::{Receiver, Sender};

pub fn spawn(
path: &Path,
events_tx: Sender<OperatorEvent>,
inputs: Receiver<OperatorInput>,
) -> eyre::Result<()> {
let library = unsafe {
libloading::Library::new(path)
.wrap_err_with(|| format!("failed to load shared library at `{}`", path.display()))?
};

thread::spawn(move || {
let closure = AssertUnwindSafe(|| {
let bindings = Bindings::init(&library).context("failed to init operator")?;

let operator = SharedLibraryOperator {
events_tx: events_tx.clone(),
inputs,
bindings,
};

operator.run()
});
match catch_unwind(closure) {
Ok(Ok(())) => {}
Ok(Err(err)) => {
let _ = events_tx.blocking_send(OperatorEvent::Error(err));
}
Err(panic) => {
let _ = events_tx.blocking_send(OperatorEvent::Panic(panic));
}
}
});

Ok(())
}

struct SharedLibraryOperator<'lib> {
events_tx: Sender<OperatorEvent>,
inputs: Receiver<OperatorInput>,

bindings: Bindings<'lib>,
}

impl<'lib> SharedLibraryOperator<'lib> {
fn run(mut self) -> eyre::Result<()> {
let operator_context = {
let mut raw = ptr::null_mut();
let result = unsafe { (self.bindings.init_operator)(&mut raw) };
if result != 0 {
bail!("init_operator failed with error code {result}");
}
OperatorContext {
raw,
drop_fn: self.bindings.drop_operator.clone(),
}
};

while let Some(input) = self.inputs.blocking_recv() {
let id_start = input.id.as_bytes().as_ptr();
let id_len = input.id.as_bytes().len();
let data_start = input.value.as_slice().as_ptr();
let data_len = input.value.len();

let output = |id: &str, data: &[u8]| -> isize {
let result = self.events_tx.blocking_send(OperatorEvent::Output {
id: id.to_owned().into(),
value: data.to_owned(),
});
match result {
Ok(()) => 0,
Err(_) => -1,
}
};
let (output_fn, output_ctx) = wrap_closure(&output);

let result = unsafe {
(self.bindings.on_input)(
id_start,
id_len,
data_start,
data_len,
output_fn,
output_ctx,
operator_context.raw,
)
};
if result != 0 {
bail!("on_input failed with error code {result}");
}
}
Ok(())
}
}

struct OperatorContext<'lib> {
raw: *mut (),
drop_fn: Symbol<'lib, OperatorContextDropFn>,
}

impl<'lib> Drop for OperatorContext<'lib> {
fn drop(&mut self) {
unsafe { (self.drop_fn)(self.raw) };
}
}

/// Wrap a closure with an FFI-compatible trampoline function.
///
/// Returns a C compatible trampoline function and a data pointer that
/// must be passed as when invoking the trampoline function.
fn wrap_closure<F>(closure: &F) -> (OutputFn, *const c_void)
where
F: Fn(&str, &[u8]) -> isize,
{
/// Rust closures are just compiler-generated structs with a `call` method. This
/// trampoline function is generic over the closure type, which means that the
/// compiler's monomorphization step creates a different copy of that function
/// for each closure type.
///
/// The trampoline function expects the pointer to the corresponding closure
/// struct as `context` argument. It casts that pointer back to a closure
/// struct pointer and invokes its call method.
unsafe extern "C" fn trampoline<F: Fn(&str, &[u8]) -> isize>(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
context: *const c_void,
) -> isize {
let id_raw = unsafe { slice::from_raw_parts(id_start, id_len) };
let data = unsafe { slice::from_raw_parts(data_start, data_len) };
let id = match std::str::from_utf8(id_raw) {
Ok(s) => s,
Err(_) => return -1,
};
unsafe { (*(context as *const F))(id, data) }
}

(trampoline::<F>, closure as *const F as *const c_void)
}

struct Bindings<'lib> {
init_operator: Symbol<'lib, InitFn>,
drop_operator: Symbol<'lib, OperatorContextDropFn>,
on_input: Symbol<'lib, OnInputFn>,
}

impl<'lib> Bindings<'lib> {
fn init(library: &'lib libloading::Library) -> Result<Self, eyre::Error> {
let bindings = unsafe {
Bindings {
init_operator: library
.get(b"dora_init_operator")
.wrap_err("failed to get `dora_init_operator`")?,
drop_operator: library
.get(b"dora_drop_operator")
.wrap_err("failed to get `dora_drop_operator`")?,
on_input: library
.get(b"dora_on_input")
.wrap_err("failed to get `dora_on_input`")?,
}
};
Ok(bindings)
}
}

type InitFn = unsafe extern "C" fn(operator_context: *mut *mut ()) -> isize;
type OperatorContextDropFn = unsafe extern "C" fn(operator_context: *mut ());

type OnInputFn = unsafe extern "C" fn(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
output: OutputFn,
output_context: *const c_void,
operator_context: *mut (),
) -> isize;

type OutputFn = unsafe extern "C" fn(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
output_context: *const c_void,
) -> isize;

+ 0
- 92
src/descriptor.rs View File

@@ -1,92 +0,0 @@
use serde::{Deserialize, Serialize};
use std::collections::{BTreeSet, HashMap, HashSet};

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Descriptor {
#[serde(default)]
sources: HashSet<Source>,
#[serde(default)]
sinks: HashSet<Sink>,
#[serde(default)]
operators: HashSet<Operator>,
}

impl Descriptor {
pub fn visualize_as_mermaid(&self) -> eyre::Result<String> {
let mut flowchart = "flowchart TB\n".to_owned();
for source in &self.sources {
let id = &source.id;
flowchart.push_str(&format!(" {id}[\\{id}/]\n"));
}
for operator in &self.operators {
let id = &operator.id;
flowchart.push_str(&format!(" {id}\n"));
}
for sink in &self.sinks {
let id = &sink.id;
flowchart.push_str(&format!(" {id}[/{id}\\]\n"));
}

let mut expected_inputs: HashMap<_, BTreeSet<_>> = HashMap::new();
for operator in &self.operators {
for input in &operator.inputs {
expected_inputs
.entry(input.to_owned())
.or_default()
.insert(&operator.id);
}
}
for sink in &self.sinks {
expected_inputs
.entry(sink.input.to_owned())
.or_default()
.insert(&sink.id);
}

for source in &self.sources {
let targets = expected_inputs.remove(&source.output).unwrap_or_default();
let id = &source.id;
let output = &source.output;
for target in targets {
flowchart.push_str(&format!(" {id} -- {output} --> {target}\n"));
}
}

for operator in &self.operators {
let id = &operator.id;
for output in &operator.outputs {
let targets = expected_inputs.remove(output).unwrap_or_default();
for target in targets {
flowchart.push_str(&format!(" {id} -- {output} --> {target}\n"));
}
}
}

for (output, targets) in expected_inputs.drain() {
for target in targets {
flowchart.push_str(&format!(" missing>missing] -- {output} --> {target}\n"));
}
}

Ok(flowchart)
}
}

#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Source {
id: String,
output: String,
}

#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Sink {
id: String,
input: String,
}

#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Operator {
id: String,
inputs: BTreeSet<String>,
outputs: BTreeSet<String>,
}

+ 0
- 1
src/lib.rs View File

@@ -1,3 +1,2 @@
pub mod descriptor;
pub mod python;
pub mod zenoh_client;

+ 2
- 19
src/main.rs View File

@@ -1,36 +1,19 @@
use dora_rs::descriptor::Descriptor;
use eyre::Context;
use std::{fs::File, path::PathBuf};
use structopt::StructOpt;

#[derive(Debug, Clone, StructOpt)]
#[structopt(about = "Dora control")]
enum Command {
#[structopt(about = "Print Graph")]
Graph { file: PathBuf },
#[structopt(about = "Run Python server")]
StartPython(dora_rs::python::server::PythonCommand),
}

fn main() -> eyre::Result<()> {
#[tokio::main]
async fn main() -> eyre::Result<()> {
env_logger::init();

let command = Command::from_args();
match command {
Command::Graph { file } => {
let descriptor_file = File::open(&file).context("failed to open given file")?;

let descriptor: Descriptor = serde_yaml::from_reader(descriptor_file)
.context("failed to parse given descriptor")?;
let visualized = descriptor
.visualize_as_mermaid()
.context("failed to visualize descriptor")?;
println!("{visualized}");
println!(
"Paste the above output on https://mermaid.live/ or in a \
```mermaid code block on GitHub to display it."
);
}
Command::StartPython(command) => {
dora_rs::python::server::run(command).context("python server failed")?;
}


+ 1
- 1
src/zenoh_client/mod.rs View File

@@ -55,7 +55,7 @@ impl ZenohClient {

pub async fn pull(
&self,
receivers: &mut Vec<&mut SampleReceiver>,
receivers: &mut [&mut SampleReceiver],
) -> Option<BTreeMap<String, Vec<u8>>> {
let fetched_data = join_all(
receivers


+ 10
- 0
zenoh-logger/Cargo.toml View File

@@ -0,0 +1,10 @@
[package]
name = "zenoh-logger"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" }

+ 11
- 0
zenoh-logger/src/main.rs View File

@@ -0,0 +1,11 @@
use zenoh::prelude::{Receiver, ZFuture};

fn main() {
let zenoh = zenoh::open(zenoh_config::Config::default()).wait().unwrap();
let mut sub = zenoh.subscribe("/**").reliable().wait().unwrap();

loop {
let msg = sub.receiver().recv().unwrap();
println!("{}", msg.key_expr);
}
}

Loading…
Cancel
Save