Browse Source

Redesign communication layer to be synchronous and add support for iceoryx

tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
03348c5be8
Failed to extract signature
30 changed files with 965 additions and 616 deletions
  1. +103
    -23
      Cargo.lock
  2. +5
    -0
      Cargo.toml
  3. +1
    -1
      apis/c/node/Cargo.toml
  4. +12
    -12
      apis/c/node/src/lib.rs
  5. +25
    -27
      apis/python/node/src/lib.rs
  6. +8
    -7
      apis/rust/node/Cargo.toml
  7. +0
    -108
      apis/rust/node/src/communication.rs
  8. +109
    -0
      apis/rust/node/src/communication/iceoryx.rs
  9. +172
    -0
      apis/rust/node/src/communication/mod.rs
  10. +82
    -0
      apis/rust/node/src/communication/zenoh.rs
  11. +36
    -0
      apis/rust/node/src/config.rs
  12. +25
    -82
      apis/rust/node/src/lib.rs
  13. +15
    -9
      binaries/coordinator/src/lib.rs
  14. +1
    -0
      binaries/runtime/Cargo.toml
  15. +64
    -195
      binaries/runtime/src/main.rs
  16. +51
    -72
      binaries/runtime/src/operator/mod.rs
  17. +19
    -14
      binaries/runtime/src/operator/python.rs
  18. +24
    -20
      binaries/runtime/src/operator/shared_lib.rs
  19. +26
    -0
      examples/iceoryx/dataflow.yml
  20. +11
    -0
      examples/iceoryx/node/Cargo.toml
  21. +26
    -0
      examples/iceoryx/node/src/main.rs
  22. +13
    -0
      examples/iceoryx/operator/Cargo.toml
  23. +47
    -0
      examples/iceoryx/operator/src/lib.rs
  24. +33
    -0
      examples/iceoryx/run.rs
  25. +12
    -0
      examples/iceoryx/sink/Cargo.toml
  26. +32
    -0
      examples/iceoryx/sink/src/main.rs
  27. +7
    -13
      examples/rust-dataflow/node/src/main.rs
  28. +0
    -2
      examples/rust-dataflow/sink/Cargo.toml
  29. +6
    -11
      examples/rust-dataflow/sink/src/main.rs
  30. +0
    -20
      libraries/core/src/lib.rs

+ 103
- 23
Cargo.lock View File

@@ -505,6 +505,56 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"

[[package]]
name = "cpp"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dec5e86d4f6547f0218ad923d9508244a71ef83b763196e6698b4f70f3595185"
dependencies = [
"cpp_macros",
]

[[package]]
name = "cpp_build"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16f4d303b8ec35fb3afd7e963e2c898117f1e49930becb703e4a7ac528ad2dd0"
dependencies = [
"cc",
"cpp_common",
"lazy_static",
"proc-macro2",
"regex",
"syn",
"unicode-xid",
]

[[package]]
name = "cpp_common"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76071bb9c8c4dd2b5eb209907deab7b031323cf1be3dfdc6ec5d37f4f187d8a1"
dependencies = [
"lazy_static",
"proc-macro2",
"syn",
]

[[package]]
name = "cpp_macros"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fdaa01904c12a8989dbfa110b41ef27efc432ac9934f691b9732f01cb64dc01"
dependencies = [
"aho-corasick",
"byteorder",
"cpp_common",
"lazy_static",
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "cpufeatures"
version = "0.2.2"
@@ -791,11 +841,9 @@ dependencies = [
name = "dora-node-api"
version = "0.1.0"
dependencies = [
"async-trait",
"eyre",
"futures",
"futures-concurrency",
"futures-time",
"flume",
"iceoryx-rs",
"once_cell",
"serde",
"serde_yaml",
@@ -813,7 +861,7 @@ version = "0.1.0"
dependencies = [
"dora-node-api",
"eyre",
"futures",
"flume",
]

[[package]]
@@ -871,6 +919,7 @@ dependencies = [
"dora-operator-api-types",
"eyre",
"fern",
"flume",
"futures",
"futures-concurrency",
"libloading",
@@ -975,9 +1024,9 @@ dependencies = [

[[package]]
name = "flume"
version = "0.10.12"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
@@ -1099,18 +1148,6 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"

[[package]]
name = "futures-time"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "553673e17c187f65e79ed63a9e58b148560fd5982e1d739f37913f320139edb0"
dependencies = [
"async-channel",
"async-io",
"futures-core",
"pin-project-lite",
]

[[package]]
name = "futures-util"
version = "0.3.21"
@@ -1371,6 +1408,51 @@ dependencies = [
"tokio-io-timeout",
]

[[package]]
name = "iceoryx-example-node"
version = "0.1.0"
dependencies = [
"dora-node-api",
"eyre",
"rand",
]

[[package]]
name = "iceoryx-example-operator"
version = "0.1.0"
dependencies = [
"dora-operator-api",
]

[[package]]
name = "iceoryx-example-sink"
version = "0.1.0"
dependencies = [
"dora-node-api",
"eyre",
"futures",
"tokio",
]

[[package]]
name = "iceoryx-rs"
version = "0.1.0"
source = "git+https://github.com/eclipse-iceoryx/iceoryx-rs.git#68fbd034a77c6ed98cb75a939d406429ef26b0dd"
dependencies = [
"iceoryx-sys",
"thiserror",
]

[[package]]
name = "iceoryx-sys"
version = "0.1.0"
source = "git+https://github.com/eclipse-iceoryx/iceoryx-rs.git#68fbd034a77c6ed98cb75a939d406429ef26b0dd"
dependencies = [
"cpp",
"cpp_build",
"thiserror",
]

[[package]]
name = "idna"
version = "0.2.3"
@@ -2658,8 +2740,6 @@ version = "0.1.0"
dependencies = [
"dora-node-api",
"eyre",
"futures",
"tokio",
]

[[package]]
@@ -3217,9 +3297,9 @@ dependencies = [

[[package]]
name = "tokio-stream"
version = "0.1.8"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9"
dependencies = [
"futures-core",
"pin-project-lite",


+ 5
- 0
Cargo.toml View File

@@ -8,6 +8,7 @@ members = [
"binaries/*",
"examples/rust-dataflow/*",
"examples/c++-dataflow/*-rust-*",
"examples/iceoryx/*",
"libraries/core",
"libraries/extensions/message",
"libraries/extensions/telemetry/*",
@@ -42,3 +43,7 @@ path = "examples/c++-dataflow/run.rs"
[[example]]
name = "python-dataflow"
path = "examples/python-dataflow/run.rs"

[[example]]
name = "iceoryx"
path = "examples/iceoryx/run.rs"

+ 1
- 1
apis/c/node/Cargo.toml View File

@@ -12,4 +12,4 @@ crate-type = ["staticlib"]
[dependencies]
dora-node-api = { path = "../../rust/node" }
eyre = "0.6.8"
futures = "0.3.21"
flume = "0.10.14"

+ 12
- 12
apis/c/node/src/lib.rs View File

@@ -1,12 +1,11 @@
#![deny(unsafe_op_in_unsafe_fn)]

use dora_node_api::{DoraNode, Input};
use futures::{executor::block_on, Stream, StreamExt};
use std::{pin::Pin, ptr, slice};
use std::{ptr, slice};

struct DoraContext {
node: &'static DoraNode,
inputs: Pin<Box<dyn futures::Stream<Item = Input>>>,
node: &'static mut DoraNode,
inputs: flume::Receiver<Input>,
}

/// Initializes a dora context from the environment variables that were set by
@@ -20,12 +19,13 @@ struct DoraContext {
/// On error, a null pointer is returned.
#[no_mangle]
pub extern "C" fn init_dora_context_from_env() -> *mut () {
let context = match block_on(async {
let node = DoraNode::init_from_env().await?;
let context = || {
let node = DoraNode::init_from_env()?;
let node = Box::leak(Box::new(node));
let inputs: Pin<Box<dyn Stream<Item = Input>>> = Box::pin(node.inputs().await?);
let inputs = node.inputs()?;
Ok(DoraContext { node, inputs })
}) {
};
let context = match context() {
Ok(n) => n,
Err(err) => {
let err: eyre::Error = err;
@@ -71,9 +71,9 @@ pub unsafe extern "C" fn free_dora_context(context: *mut ()) {
#[no_mangle]
pub unsafe extern "C" fn dora_next_input(context: *mut ()) -> *mut () {
let context: &mut DoraContext = unsafe { &mut *context.cast() };
match block_on(context.inputs.next()) {
Some(input) => Box::into_raw(Box::new(input)).cast(),
None => ptr::null_mut(),
match context.inputs.recv() {
Ok(input) => Box::into_raw(Box::new(input)).cast(),
Err(flume::RecvError::Disconnected) => ptr::null_mut(),
}
}

@@ -191,5 +191,5 @@ unsafe fn try_send_output(
let id = std::str::from_utf8(unsafe { slice::from_raw_parts(id_ptr, id_len) })?;
let output_id = id.to_owned().into();
let data = unsafe { slice::from_raw_parts(data_ptr, data_len) };
block_on(context.node.send_output(&output_id, data))
context.node.send_output(&output_id, data)
}

+ 25
- 27
apis/python/node/src/lib.rs View File

@@ -1,7 +1,6 @@
use dora_node_api::config::{DataId, NodeId};
use dora_node_api::{DoraNode, Input};
use eyre::{Context, Result};
use futures::StreamExt;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use std::sync::Arc;
@@ -42,32 +41,31 @@ impl Node {
// It would have been difficult to expose the FutureStream of Dora directly.
thread::spawn(move || -> Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread().build()?;
rt.block_on(async move {
let node = Arc::new(DoraNode::init_from_env().await?);
let _node = node.clone();
let receive_handle = tokio::spawn(async move {
let mut inputs = _node.inputs().await.unwrap();
while let Some(input) = inputs.next().await {
tx_input.send(input).await?
}
Result::<_, eyre::Error>::Ok(())
});
let send_handle = tokio::spawn(async move {
while let Some((output_str, data)) = rx_output.recv().await {
let output_id = DataId::from(output_str);
node.send_output(&output_id, data.as_slice()).await?
}
Result::<_, eyre::Error>::Ok(())
});
let (receiver, sender) = tokio::join!(receive_handle, send_handle);
receiver
.wrap_err("Handle to the receiver failed")?
.wrap_err("Receiving messages from receiver channel failed")?;
sender
.wrap_err("Handle to the sender failed")?
.wrap_err("Sending messages using sender channel failed")?;
Ok(())
})

let node = Arc::new(DoraNode::init_from_env()?);
let _node = node.clone();
let receive_handle = tokio::task::spawn_blocking(async move {
let mut inputs = _node.inputs().unwrap();
while let Ok(input) = inputs.recv() {
tx_input.blocking_send(input)?
}
Result::<_, eyre::Error>::Ok(())
});
let send_handle = tokio::task::spawn_blocking(async move {
while let Some((output_str, data)) = rx_output.recv().await {
let output_id = DataId::from(output_str);
node.send_output(&output_id, data.as_slice())?
}
Result::<_, eyre::Error>::Ok(())
});
let (receiver, sender) = tokio::join!(receive_handle, send_handle);
receiver
.wrap_err("Handle to the receiver failed")?
.wrap_err("Receiving messages from receiver channel failed")?;
sender
.wrap_err("Handle to the sender failed")?
.wrap_err("Sending messages using sender channel failed")?;
Ok(())
});

Ok(Node {


+ 8
- 7
apis/rust/node/Cargo.toml View File

@@ -4,22 +4,23 @@ version = "0.1.0"
edition = "2021"
license = "Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["zenoh", "iceoryx"]
zenoh = ["dep:zenoh", "dep:zenoh-config"]
iceoryx = ["dep:iceoryx-rs"]

[dependencies]
async-trait = "0.1.53"
eyre = "0.6.7"
futures = "0.3.21"
futures-concurrency = "2.0.3"
futures-time = "1.0.0"
once_cell = "1.13.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.8.23"
thiserror = "1.0.30"
tracing = "0.1.33"
uuid = "0.8.2"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", optional = true }
iceoryx-rs = { git = "https://github.com/eclipse-iceoryx/iceoryx-rs.git", optional = true }
flume = "0.10.14"

[dev-dependencies]
tokio = { version = "1.17.0", features = ["rt"] }

+ 0
- 108
apis/rust/node/src/communication.rs View File

@@ -1,108 +0,0 @@
use async_trait::async_trait;
use eyre::Context;
use futures::StreamExt;
use futures_time::future::FutureExt;
use std::pin::Pin;
use zenoh::{
prelude::{Priority, SplitBuffer, ZFuture},
publication::CongestionControl,
};

use crate::{config::CommunicationConfig, BoxError};

pub async fn init(
communication_config: &CommunicationConfig,
) -> eyre::Result<Box<dyn CommunicationLayer>> {
match communication_config {
CommunicationConfig::Zenoh {
config: zenoh_config,
prefix: zenoh_prefix,
} => {
let zenoh = zenoh::open(zenoh_config.clone())
.await
.map_err(BoxError)
.wrap_err("failed to create zenoh session")?;
let layer = ZenohCommunicationLayer {
zenoh,
topic_prefix: zenoh_prefix.clone(),
};
Ok(Box::new(layer))
}
}
}

#[async_trait]
pub trait CommunicationLayer: Send + Sync {
async fn subscribe<'a>(
&'a self,
topic: &str,
) -> Result<Pin<Box<dyn futures::Stream<Item = Vec<u8>> + Send + 'a>>, BoxError>;

async fn publish(&self, topic: &str, data: &[u8]) -> Result<(), BoxError>;

fn publish_sync(&self, topic: &str, data: &[u8]) -> Result<(), BoxError>;

async fn close(self: Box<Self>) -> Result<(), BoxError>;
}

struct ZenohCommunicationLayer {
zenoh: zenoh::Session,
topic_prefix: String,
}

impl ZenohCommunicationLayer {
fn prefixed(&self, topic: &str) -> String {
format!("{}/{topic}", self.topic_prefix)
}
}

#[async_trait]
impl CommunicationLayer for ZenohCommunicationLayer {
async fn subscribe<'a>(
&'a self,
topic: &str,
) -> Result<Pin<Box<dyn futures::Stream<Item = Vec<u8>> + Send + 'a>>, BoxError> {
zenoh::Session::subscribe(&self.zenoh, self.prefixed(topic))
.reliable()
.await
.map(|s| {
let trait_object: Pin<Box<dyn futures::Stream<Item = Vec<u8>> + Send + 'a>> =
Box::pin(s.map(|s| s.value.payload.contiguous().into_owned()));
trait_object
})
.map_err(BoxError)
}

async fn publish(&self, topic: &str, data: &[u8]) -> Result<(), BoxError> {
let writer = self
.zenoh
.put(self.prefixed(topic), data)
.congestion_control(CongestionControl::Block)
.priority(Priority::RealTime);

let result = writer.await.map_err(BoxError);
result
}

fn publish_sync(&self, topic: &str, data: &[u8]) -> Result<(), BoxError> {
let writer = self
.zenoh
.put(self.prefixed(topic), data)
.congestion_control(CongestionControl::Block)
.priority(Priority::RealTime);

writer.wait().map_err(BoxError)
}

async fn close(self: Box<Self>) -> Result<(), BoxError> {
zenoh::Session::close(self.zenoh)
// wait a bit before closing to ensure that remaining published
// messages are sent out
//
// TODO: create a minimal example to reproduce the dropped messages
// and report this issue in the zenoh repo
.delay(futures_time::time::Duration::from_secs_f32(0.5))
.await
.map_err(BoxError)
}
}

+ 109
- 0
apis/rust/node/src/communication/iceoryx.rs View File

@@ -0,0 +1,109 @@
use eyre::Context;
use std::{collections::HashMap, sync::Arc, time::Duration};
use uuid::Uuid;

use crate::BoxError;

use super::{CommunicationLayer, Publisher, Subscriber};

pub struct IceoryxCommunicationLayer {
topic_prefix: Arc<String>,
publishers: HashMap<String, Arc<iceoryx_rs::Publisher<[u8]>>>,
}

impl IceoryxCommunicationLayer {
pub fn init(app_name_prefix: String, topic_prefix: String) -> eyre::Result<Self> {
let app_name = format!("{app_name_prefix}-{}", Uuid::new_v4());
iceoryx_rs::Runtime::init(&app_name);

Ok(Self {
topic_prefix: Arc::new(topic_prefix.clone()),
publishers: Default::default(),
})
}
}

impl IceoryxCommunicationLayer {
fn get_or_create_publisher(
&mut self,
topic: &str,
) -> eyre::Result<Arc<iceoryx_rs::Publisher<[u8]>>> {
match self.publishers.get(topic) {
Some(p) => Ok(p.clone()),
None => {
let publisher = Self::create_publisher(&self.topic_prefix, topic)
.context("failed to create iceoryx publisher")?;

let publisher = Arc::new(publisher);
self.publishers.insert(topic.to_owned(), publisher.clone());
Ok(publisher)
}
}
}

fn create_publisher(
topic_prefix: &str,
topic: &str,
) -> Result<iceoryx_rs::Publisher<[u8]>, iceoryx_rs::IceoryxError> {
iceoryx_rs::PublisherBuilder::new("dora", &topic_prefix, &topic).create()
}
}

impl CommunicationLayer for IceoryxCommunicationLayer {
fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, crate::BoxError> {
let publisher = self
.get_or_create_publisher(topic)
.map_err(|err| BoxError(err.into()))?;

Ok(Box::new(IceoryxPublisher { publisher }))
}

fn subscribe(&mut self, topic: &str) -> Result<Box<dyn Subscriber>, crate::BoxError> {
let (subscriber, token) =
iceoryx_rs::SubscriberBuilder::new("dora", &self.topic_prefix, topic)
.queue_capacity(5)
.create_mt()
.context("failed to create iceoryx subscriber")
.map_err(|err| BoxError(err.into()))?;
let receiver = subscriber.get_sample_receiver(token);

Ok(Box::new(IceoryxReceiver { receiver }))
}
}

#[derive(Clone)]
pub struct IceoryxPublisher {
publisher: Arc<iceoryx_rs::Publisher<[u8]>>,
}

impl Publisher for IceoryxPublisher {
fn publish(&self, data: &[u8]) -> Result<(), crate::BoxError> {
let mut sample = self
.publisher
.loan_slice(data.len())
.context("failed to loan iceoryx slice for publishing")
.map_err(|err| BoxError(err.into()))?;
sample.copy_from_slice(data);
self.publisher.publish(sample);
Ok(())
}

fn boxed_clone(&self) -> Box<dyn Publisher> {
Box::new(self.clone())
}
}

pub struct IceoryxReceiver {
receiver: iceoryx_rs::mt::SampleReceiver<[u8]>,
}

impl Subscriber for IceoryxReceiver {
fn recv(&mut self) -> Result<Option<Vec<u8>>, crate::BoxError> {
self.receiver
.wait_for_samples(Duration::from_secs(u64::MAX));
match self.receiver.take() {
Some(sample) => Ok(Some(sample.to_owned())),
None => Ok(None),
}
}
}

+ 172
- 0
apis/rust/node/src/communication/mod.rs View File

@@ -0,0 +1,172 @@
use crate::{
config::{CommunicationConfig, DataId, InputMapping},
BoxError,
};
use eyre::{eyre, Context};
use std::{
collections::{BTreeMap, HashMap},
mem, thread,
};

#[doc(hidden)]
pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped";

pub mod iceoryx;
pub mod zenoh;

pub fn init(
communication_config: &CommunicationConfig,
) -> eyre::Result<Box<dyn CommunicationLayer>> {
match communication_config {
CommunicationConfig::Zenoh {
config: zenoh_config,
prefix: zenoh_prefix,
} => {
let layer =
zenoh::ZenohCommunicationLayer::init(zenoh_config.clone(), zenoh_prefix.clone())?;

Ok(Box::new(layer))
}
CommunicationConfig::Iceoryx {
app_name_prefix,
topic_prefix,
} => {
let app_name_prefix = app_name_prefix.clone();
let topic_prefix = topic_prefix.clone();
let layer = iceoryx::IceoryxCommunicationLayer::init(app_name_prefix, topic_prefix)?;

Ok(Box::new(layer))
}
}
}

pub trait CommunicationLayer: Send + Sync {
fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, BoxError>;

fn subscribe(&mut self, topic: &str) -> Result<Box<dyn Subscriber>, BoxError>;

fn subscribe_all(
&mut self,
inputs: &BTreeMap<DataId, InputMapping>,
) -> eyre::Result<flume::Receiver<Input>> {
let (inputs_tx, inputs_rx) = flume::bounded(10);
for (input, mapping) in inputs {
let topic = mapping.to_string();
let mut sub = self
.subscribe(&topic)
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;

let input_id = input.to_owned();
let sender = inputs_tx.clone();
thread::spawn(move || loop {
match sub.recv().transpose() {
None => break,
Some(value) => {
let input = value.map(|data| Input {
id: input_id.clone(),
data,
});
match sender.send(input) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
}
}
}
});
}
mem::drop(inputs_tx);

let (stop_tx, stop_rx) = flume::bounded(10);
let mut sources: HashMap<_, _> = inputs
.values()
.map(|v| (v.source().to_owned(), v.operator().to_owned()))
.collect();
for (source, operator) in &sources {
let topic = match operator {
Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"),
None => format!("{source}/{STOP_TOPIC}"),
};
let mut sub = self
.subscribe(&topic)
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;

let source_id = source.to_owned();
let sender = stop_tx.clone();
thread::spawn(move || loop {
match sub.recv().transpose() {
None => break,
Some(value) => {
let input = value.map(|_| source_id.clone());
match sender.send(input) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
}
}
}
});
}
mem::drop(stop_tx);

let (combined_tx, combined) = flume::bounded(1);
thread::spawn(move || loop {
let selector = flume::Selector::new()
.recv(&inputs_rx, |v| match v {
Ok(Ok(value)) => InputEvent::Input(value),
Ok(Err(err)) => InputEvent::Error(err),
Err(flume::RecvError::Disconnected) => InputEvent::Error(BoxError(
eyre!("input stream was disconnected unexpectedly").into(),
)),
})
.recv(&stop_rx, |v| match v {
Ok(Ok(stopped_source)) => {
sources.remove(&stopped_source);
InputEvent::InputClosed {
number_of_remaining_sources: sources.len(),
}
}
Ok(Err(err)) => InputEvent::Error(err),
Err(flume::RecvError::Disconnected) => InputEvent::Error(BoxError(
eyre!("stop stream was disconnected unexpectedly").into(),
)),
});
match selector.wait() {
InputEvent::Input(input) => match combined_tx.send(input) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
},
InputEvent::InputClosed {
number_of_remaining_sources,
} => {
if number_of_remaining_sources == 0 {
break;
}
}
InputEvent::Error(err) => panic!("{err}"),
}
});

Ok(combined)
}
}

pub trait Publisher: Send + Sync {
fn publish(&self, data: &[u8]) -> Result<(), BoxError>;

fn boxed_clone(&self) -> Box<dyn Publisher>;
}

pub trait Subscriber: Send + Sync {
fn recv(&mut self) -> Result<Option<Vec<u8>>, BoxError>;
}

enum InputEvent {
Input(Input),
InputClosed { number_of_remaining_sources: usize },
Error(BoxError),
}

#[derive(Debug)]
pub struct Input {
pub id: DataId,
pub data: Vec<u8>,
}

+ 82
- 0
apis/rust/node/src/communication/zenoh.rs View File

@@ -0,0 +1,82 @@
use super::{CommunicationLayer, Publisher, Subscriber};
use crate::BoxError;
use std::sync::Arc;
use zenoh::{
prelude::{EntityFactory, Priority, Receiver as _, SplitBuffer, ZFuture},
publication::CongestionControl,
};

pub struct ZenohCommunicationLayer {
zenoh: Arc<zenoh::Session>,
topic_prefix: String,
}

impl ZenohCommunicationLayer {
pub fn init(config: zenoh_config::Config, prefix: String) -> eyre::Result<Self> {
let zenoh = ::zenoh::open(config)
.wait()
.map_err(|err| BoxError(err.into()))?
.into_arc();
Ok(Self {
zenoh,
topic_prefix: prefix,
})
}

fn prefixed(&self, topic: &str) -> String {
format!("{}/{topic}", self.topic_prefix)
}
}

impl CommunicationLayer for ZenohCommunicationLayer {
fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, crate::BoxError> {
let publisher = self
.zenoh
.publish(self.prefixed(topic))
.congestion_control(CongestionControl::Block)
.priority(Priority::RealTime)
.wait()
.map_err(|err| BoxError(err.into()))?;

Ok(Box::new(ZenohPublisher { publisher }))
}

fn subscribe(&mut self, topic: &str) -> Result<Box<dyn Subscriber>, BoxError> {
let subscriber = self
.zenoh
.subscribe(self.prefixed(topic))
.reliable()
.wait()
.map_err(|err| BoxError(err.into()))?;

Ok(Box::new(ZenohReceiver(subscriber)))
}
}

#[derive(Clone)]
pub struct ZenohPublisher {
publisher: zenoh::publication::Publisher<'static>,
}

impl Publisher for ZenohPublisher {
fn publish(&self, data: &[u8]) -> Result<(), crate::BoxError> {
self.publisher
.send(data)
.map_err(|err| BoxError(err.into()))
}

fn boxed_clone(&self) -> Box<dyn Publisher> {
Box::new(self.clone())
}
}

pub struct ZenohReceiver(zenoh::subscriber::Subscriber<'static>);

impl Subscriber for ZenohReceiver {
fn recv(&mut self) -> Result<Option<Vec<u8>>, crate::BoxError> {
match self.0.recv() {
Ok(sample) => Ok(Some(sample.value.payload.contiguous().into_owned())),
Err(flume::RecvError::Disconnected) => Ok(None),
}
}
}

+ 36
- 0
apis/rust/node/src/config.rs View File

@@ -1,6 +1,7 @@
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use std::{
borrow::Borrow,
collections::{BTreeMap, BTreeSet},
convert::Infallible,
fmt::{self, Write as _},
@@ -92,6 +93,30 @@ impl std::ops::Deref for DataId {
}
}

impl AsRef<String> for DataId {
fn as_ref(&self) -> &String {
&self.0
}
}

impl AsRef<str> for DataId {
fn as_ref(&self) -> &str {
&self.0
}
}

impl Borrow<String> for DataId {
fn borrow(&self) -> &String {
&self.0
}
}

impl Borrow<str> for DataId {
fn borrow(&self) -> &str {
&self.0
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum InputMapping {
Timer { interval: Duration },
@@ -244,6 +269,11 @@ pub enum CommunicationConfig {
config: zenoh_config::Config,
prefix: String,
},
Iceoryx {
app_name_prefix: String,
#[serde(skip)]
topic_prefix: String,
},
}

impl CommunicationConfig {
@@ -255,6 +285,12 @@ impl CommunicationConfig {
} => {
write!(zenoh_prefix, "/{}", prefix).unwrap();
}
CommunicationConfig::Iceoryx { topic_prefix, .. } => {
if !topic_prefix.is_empty() {
topic_prefix.push_str("-");
}
topic_prefix.push_str(prefix);
}
}
}
}

+ 25
- 82
apis/rust/node/src/lib.rs View File

@@ -1,16 +1,11 @@
use communication::CommunicationLayer;
pub use communication::Input;
use communication::{CommunicationLayer, STOP_TOPIC};
use config::{CommunicationConfig, DataId, NodeId, NodeRunConfig};
use eyre::WrapErr;
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use futures_concurrency::Merge;
use std::collections::HashSet;

pub mod communication;
pub mod config;

#[doc(hidden)]
pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped";

pub struct DoraNode {
id: NodeId,
node_config: NodeRunConfig,
@@ -18,7 +13,7 @@ pub struct DoraNode {
}

impl DoraNode {
pub async fn init_from_env() -> eyre::Result<Self> {
pub fn init_from_env() -> eyre::Result<Self> {
let id = {
let raw =
std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?;
@@ -34,15 +29,15 @@ impl DoraNode {
.wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize communication config")?
};
Self::init(id, node_config, communication_config).await
Self::init(id, node_config, communication_config)
}

pub async fn init(
pub fn init(
id: NodeId,
node_config: NodeRunConfig,
communication_config: CommunicationConfig,
) -> eyre::Result<Self> {
let communication = communication::init(&communication_config).await?;
let communication = communication::init(&communication_config)?;
Ok(Self {
id,
node_config,
@@ -50,51 +45,11 @@ impl DoraNode {
})
}

pub async fn inputs(&self) -> eyre::Result<impl futures::Stream<Item = Input> + '_> {
let mut streams = Vec::new();
for (input, mapping) in &self.node_config.inputs {
let topic = mapping.to_string();
let sub = self
.communication
.subscribe(&topic)
.await
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;
streams.push(sub.map(|data| Input {
id: input.clone(),
data,
}))
}

let stop_messages = FuturesUnordered::new();
let sources: HashSet<_> = self
.node_config
.inputs
.values()
.map(|v| (v.source(), v.operator()))
.collect();
for (source, operator) in &sources {
let topic = match operator {
Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"),
None => format!("{source}/{STOP_TOPIC}"),
};
let sub = self
.communication
.subscribe(&topic)
.await
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;
stop_messages.push(sub.into_future());
}
let node_id = self.id.clone();
let finished = Box::pin(
stop_messages
.all(|_| async { true })
.map(move |_| println!("all inputs finished for node {node_id}")),
);

Ok(streams.merge().take_until(finished))
pub fn inputs(&mut self) -> eyre::Result<flume::Receiver<Input>> {
self.communication.subscribe_all(&self.node_config.inputs)
}

pub async fn send_output(&self, output_id: &DataId, data: &[u8]) -> eyre::Result<()> {
pub fn send_output(&mut self, output_id: &DataId, data: &[u8]) -> eyre::Result<()> {
if !self.node_config.outputs.contains(output_id) {
eyre::bail!("unknown output");
}
@@ -103,8 +58,9 @@ impl DoraNode {

let topic = format!("{self_id}/{output_id}");
self.communication
.publish(&topic, data)
.await
.publisher(&topic)
.wrap_err_with(|| format!("failed create publisher for output {output_id}"))?
.publish(data)
.wrap_err_with(|| format!("failed to send data for output {output_id}"))?;
Ok(())
}
@@ -124,8 +80,14 @@ impl Drop for DoraNode {
let topic = format!("{self_id}/{STOP_TOPIC}");
let result = self
.communication
.publish_sync(&topic, &[])
.wrap_err_with(|| format!("failed to send stop message for source `{self_id}`"));
.publisher(&topic)
.wrap_err_with(|| {
format!("failed to create publisher for stop message for node `{self_id}`")
})
.and_then(|p| {
p.publish(&[])
.wrap_err_with(|| format!("failed to send stop message for node `{self_id}`"))
});
match result {
Ok(()) => println!("sent stop message for {self_id}"),
Err(err) => {
@@ -136,13 +98,7 @@ impl Drop for DoraNode {
}
}

#[derive(Debug)]
pub struct Input {
pub id: DataId,
pub data: Vec<u8>,
}

pub struct BoxError(Box<dyn std::error::Error + Send + Sync + 'static>);
pub struct BoxError(pub Box<dyn std::error::Error + Send + Sync + 'static>);

impl std::fmt::Debug for BoxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -166,16 +122,6 @@ impl std::error::Error for BoxError {
mod tests {
use super::*;

fn run<F, O>(future: F) -> O
where
F: std::future::Future<Output = O>,
{
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(future)
}

#[test]
fn no_op_operator() {
let id = uuid::Uuid::new_v4().to_string().into();
@@ -188,12 +134,9 @@ mod tests {
prefix: format!("/{}", uuid::Uuid::new_v4()),
};

run(async {
let operator = DoraNode::init(id, node_config, communication_config)
.await
.unwrap();
let mut inputs = operator.inputs().await.unwrap();
assert!(inputs.next().await.is_none());
});
let mut node = DoraNode::init(id, node_config, communication_config).unwrap();

let inputs = node.inputs().unwrap();
assert!(inputs.recv().is_err());
}
}

+ 15
- 9
binaries/coordinator/src/lib.rs View File

@@ -64,7 +64,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()

let nodes = descriptor.resolve_aliases();
let dora_timers = collect_dora_timers(&nodes);
let mut communication = descriptor.communication;
let mut communication_config = descriptor.communication;

if nodes
.iter()
@@ -78,7 +78,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
}

// add uuid as prefix to ensure isolation
communication.add_topic_prefix(&uuid::Uuid::new_v4().to_string());
communication_config.add_topic_prefix(&uuid::Uuid::new_v4().to_string());

let mut tasks = FuturesUnordered::new();
for node in nodes {
@@ -86,14 +86,14 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()

match node.kind {
descriptor::CoreNodeKind::Custom(node) => {
let result = spawn_custom_node(node_id.clone(), &node, &communication)
let result = spawn_custom_node(node_id.clone(), &node, &communication_config)
.wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?;
tasks.push(result);
}
descriptor::CoreNodeKind::Runtime(node) => {
if !node.operators.is_empty() {
let result =
spawn_runtime_node(&runtime, node_id.clone(), &node, &communication)
spawn_runtime_node(&runtime, node_id.clone(), &node, &communication_config)
.wrap_err_with(|| format!("failed to spawn runtime node {node_id}"))?;
tasks.push(result);
}
@@ -102,9 +102,12 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
}

for interval in dora_timers {
let communication = communication::init(&communication)
.await
.wrap_err("failed to init communication layer")?;
let communication_config = communication_config.clone();
let mut communication =
tokio::task::spawn_blocking(move || communication::init(&communication_config))
.await
.wrap_err("failed to join communication layer init task")?
.wrap_err("failed to init communication layer")?;
tokio::spawn(async move {
let topic = {
let duration = format_duration(interval);
@@ -112,8 +115,11 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
};
let mut stream = IntervalStream::new(tokio::time::interval(interval));
while let Some(_) = stream.next().await {
let publish = communication.publish(&topic, &[]);
publish.await.expect("failed to publish timer tick message");
communication
.publisher(&topic)
.unwrap()
.publish(&[])
.expect("failed to publish timer tick message");
}
});
}


+ 1
- 0
binaries/runtime/Cargo.toml View File

@@ -23,3 +23,4 @@ zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
log = "0.4.17"
fern = "0.6.1"
pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre"] }
flume = "0.10.14"

+ 64
- 195
binaries/runtime/src/main.rs View File

@@ -3,29 +3,22 @@
use dora_core::descriptor::OperatorDefinition;
use dora_node_api::{
self,
communication::{self, CommunicationLayer},
config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId, UserInputMapping},
STOP_TOPIC,
communication::{self, CommunicationLayer, Publisher, STOP_TOPIC},
config::{CommunicationConfig, DataId, NodeId, OperatorId},
};
use eyre::{bail, eyre, Context};
use futures::{
stream::{self, FuturesUnordered},
Future, FutureExt, StreamExt,
};
use futures_concurrency::Merge;
use operator::{Operator, OperatorEvent};
use eyre::{bail, Context};
use futures::{Stream, StreamExt};
use operator::{spawn_operator, OperatorEvent};
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
collections::{BTreeSet, HashMap},
mem,
pin::Pin,
};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamMap};

mod operator;

#[tokio::main]
async fn main() -> eyre::Result<()> {
fn main() -> eyre::Result<()> {
set_up_logger()?;

let node_id = {
@@ -44,112 +37,79 @@ async fn main() -> eyre::Result<()> {
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};

let mut operator_map = BTreeMap::new();
let mut stopped_operators = BTreeSet::new();
let mut communication: Box<dyn CommunicationLayer> =
communication::init(&communication_config)?;

let mut operator_events = StreamMap::new();
let mut operator_events_tx = HashMap::new();
let mut operator_stop_publishers = HashMap::new();
for operator_config in &operators {
let (events_tx, events) = mpsc::channel(1);
let operator = Operator::init(operator_config.clone(), events_tx.clone())
.await
.wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?;
operator_map.insert(&operator_config.id, operator);
spawn_operator(
&node_id,
operator_config.clone(),
events_tx.clone(),
communication.as_mut(),
)
.wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?;
operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events));
operator_events_tx.insert(operator_config.id.clone(), events_tx);

let stop_publisher = publisher(
&node_id,
operator_config.id.clone(),
STOP_TOPIC.to_owned().into(),
communication.as_mut(),
)
.with_context(|| {
format!(
"failed to create stop publisher for operator {}",
operator_config.id
)
})?;
operator_stop_publishers.insert(operator_config.id.clone(), stop_publisher);
}

let communication: Box<dyn CommunicationLayer> =
communication::init(&communication_config).await?;
let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event });

let inputs = subscribe(&operators, communication.as_ref())
.await
.context("failed to subscribe")?;
tokio::runtime::Runtime::new()?.block_on(run(
node_id,
operator_events,
operator_stop_publishers,
))
}

let input_events = inputs.map(Event::External);
let operator_events = operator_events.map(|(id, event)| Event::Operator { id, event });
let mut events = (input_events, operator_events).merge();
async fn run(
node_id: NodeId,
mut events: impl Stream<Item = Event> + Unpin,
mut operator_stop_publishers: HashMap<OperatorId, Box<dyn Publisher>>,
) -> eyre::Result<()> {
let mut stopped_operators = BTreeSet::new();

while let Some(event) = events.next().await {
match event {
Event::External(event) => match event {
SubscribeEvent::Input(input) => {
let operator = match operator_map.get_mut(&input.target_operator) {
Some(op) => op,
None => {
if stopped_operators.contains(&input.target_operator) {
continue; // operator was stopped already -> ignore input
} else {
bail!(
"received input for unexpected operator `{}`",
input.target_operator
);
}
}
};

operator
.handle_input(input.id.clone(), input.data)
.wrap_err_with(|| {
format!(
"operator {} failed to handle input {}",
input.target_operator, input.id
)
})?;
}
SubscribeEvent::InputsStopped { target_operator } => {
println!("all inputs finished for operator {node_id}/{target_operator}");
match operator_map.get_mut(&target_operator) {
Some(op) => op.close_input_stream(),
None => {
if !stopped_operators.contains(&target_operator) {
bail!(
"received InputsStopped event for unknown operator `{}`",
target_operator
);
}
}
}
}
},
Event::Operator { id, event } => {
let operator = operator_map
.get(&id)
.ok_or_else(|| eyre!("received event from unknown operator {id}"))?;
match event {
OperatorEvent::Output { id: data_id, value } => {
if !operator.definition().config.outputs.contains(&data_id) {
eyre::bail!("unknown output {data_id} for operator {id}");
}
publish(&node_id, id, data_id, &value, communication.as_ref())
.await
.context("failed to publish operator output")?;
}
OperatorEvent::Error(err) => {
bail!(err.wrap_err(format!("operator {id} failed")))
}
OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload),
OperatorEvent::Finished => {
if operator_map.remove(&id).is_some() {
if let Some(stop_publisher) = operator_stop_publishers.remove(&id) {
println!("operator {node_id}/{id} finished");
stopped_operators.insert(id.clone());
// send stopped message
publish(
&node_id,
id.clone(),
STOP_TOPIC.to_owned().into(),
&[],
communication.as_ref(),
)
.await
.with_context(|| {
format!("failed to send stop message for operator `{node_id}/{id}`")
})?;

operator_events_tx.remove(&id);
}

if operator_map.is_empty() {
break;
tokio::task::spawn_blocking(move || stop_publisher.publish(&[]))
.await
.wrap_err("failed to join stop publish task")?
.with_context(|| {
format!(
"failed to send stop message for operator `{node_id}/{id}`"
)
})?;
if operator_stop_publishers.is_empty() {
break;
}
} else {
log::warn!("no stop publisher for {id}");
}
}
}
@@ -159,119 +119,28 @@ async fn main() -> eyre::Result<()> {

mem::drop(events);

communication.close().await?;

Ok(())
}

async fn subscribe<'a>(
operators: &'a [OperatorDefinition],
communication: &'a dyn CommunicationLayer,
) -> eyre::Result<impl futures::Stream<Item = SubscribeEvent> + 'a> {
let mut streams = Vec::new();

for operator in operators {
let events = subscribe_operator(operator, communication).await?;
streams.push(events);
}

Ok(streams.merge())
}

async fn subscribe_operator<'a>(
operator: &'a OperatorDefinition,
communication: &'a dyn CommunicationLayer,
) -> Result<impl futures::Stream<Item = SubscribeEvent> + 'a, eyre::Error> {
let stop_messages: FuturesUnordered<Pin<Box<dyn Future<Output = ()>>>> =
FuturesUnordered::new();
for mapping in operator.config.inputs.values() {
match mapping {
InputMapping::User(UserInputMapping {
source, operator, ..
}) => {
let topic = match operator {
Some(operator) => format!("{source}/{operator}/{STOP_TOPIC}"),
None => format!("{source}/{STOP_TOPIC}"),
};
let sub = communication
.subscribe(&topic)
.await
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;
stop_messages.push(Box::pin(sub.into_future().map(|_| ())));
}
InputMapping::Timer { .. } => {
// dora timer inputs run forever
stop_messages.push(Box::pin(futures::future::pending()));
}
}
}
let finished = Box::pin(stop_messages.all(|()| async { true }).shared());

let mut streams = Vec::new();
for (input, mapping) in &operator.config.inputs {
let topic = mapping.to_string();
let sub = communication
.subscribe(&topic)
.await
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;
let stream = sub
.map(|data| OperatorInput {
target_operator: operator.id.clone(),
id: input.clone(),
data,
})
.map(SubscribeEvent::Input)
.take_until(finished.clone());
streams.push(stream);
}

Ok(streams.merge().chain(stream::once(async {
SubscribeEvent::InputsStopped {
target_operator: operator.id.clone(),
}
})))
}

async fn publish(
fn publisher(
self_id: &NodeId,
operator_id: OperatorId,
output_id: DataId,
value: &[u8],
communication: &dyn CommunicationLayer,
) -> eyre::Result<()> {
communication: &mut dyn CommunicationLayer,
) -> eyre::Result<Box<dyn Publisher>> {
let topic = format!("{self_id}/{operator_id}/{output_id}");
communication
.publish(&topic, value)
.await
.wrap_err_with(|| format!("failed to send data for output {output_id}"))?;

Ok(())
.publisher(&topic)
.wrap_err_with(|| format!("failed to create publisher for output {output_id}"))
}

enum Event {
External(SubscribeEvent),
Operator {
id: OperatorId,
event: OperatorEvent,
},
}

enum SubscribeEvent {
/// New input for an operator
Input(OperatorInput),
/// All input streams for an operator are finished.
InputsStopped {
/// The operator whose inputs are all finished.
target_operator: OperatorId,
},
}

struct OperatorInput {
pub target_operator: OperatorId,
pub id: DataId,
pub data: Vec<u8>,
}

fn set_up_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new()
.format(|out, message, record| {


+ 51
- 72
binaries/runtime/src/operator/mod.rs View File

@@ -1,90 +1,69 @@
use dora_core::descriptor::{OperatorDefinition, OperatorSource};
use dora_node_api::config::DataId;
use eyre::{eyre, Context};
use log::warn;
use dora_node_api::{communication::CommunicationLayer, config::NodeId};
use eyre::Context;
use std::any::Any;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::mpsc::Sender;

mod python;
mod shared_lib;

pub struct Operator {
operator_task: Option<Sender<OperatorInput>>,
definition: OperatorDefinition,
}

impl Operator {
pub async fn init(
operator_definition: OperatorDefinition,
events_tx: Sender<OperatorEvent>,
) -> eyre::Result<Self> {
let (operator_task, operator_rx) = mpsc::channel(10);
pub fn spawn_operator(
node_id: &NodeId,
operator_definition: OperatorDefinition,
events_tx: Sender<OperatorEvent>,
communication: &mut dyn CommunicationLayer,
) -> eyre::Result<()> {
let inputs = communication
.subscribe_all(&operator_definition.config.inputs)
.wrap_err_with(|| {
format!(
"failed to subscribe to inputs of operator {}",
operator_definition.id
)
})?;

match &operator_definition.config.source {
OperatorSource::SharedLibrary(path) => {
shared_lib::spawn(path, events_tx, operator_rx).wrap_err_with(|| {
format!(
"failed to spawn shared library operator for {}",
operator_definition.id
)
})?;
}
OperatorSource::Python(path) => {
python::spawn(path, events_tx, operator_rx).wrap_err_with(|| {
format!(
"failed to spawn Python operator for {}",
operator_definition.id
)
})?;
}
OperatorSource::Wasm(_path) => {
eprintln!("WARNING: WASM operators are not supported yet");
}
}
Ok(Self {
operator_task: Some(operator_task),
definition: operator_definition,
let publishers = operator_definition
.config
.outputs
.iter()
.map(|output_id| {
let topic = format!(
"{node_id}/{operator_id}/{output_id}",
operator_id = operator_definition.id
);
communication
.publisher(&topic)
.wrap_err_with(|| format!("failed to create publisher for output {output_id}"))
.map(|p| (output_id.to_owned(), p))
})
}
.collect::<Result<_, _>>()?;

pub fn handle_input(&mut self, id: DataId, value: Vec<u8>) -> eyre::Result<()> {
self.operator_task
.as_mut()
.ok_or_else(|| {
eyre!(
"input channel for {} was already closed",
self.definition.id
match &operator_definition.config.source {
OperatorSource::SharedLibrary(path) => {
shared_lib::spawn(path, events_tx, inputs, publishers).wrap_err_with(|| {
format!(
"failed to spawn shared library operator for {}",
operator_definition.id
)
})?
.try_send(OperatorInput { id, value })
.or_else(|err| match err {
tokio::sync::mpsc::error::TrySendError::Closed(_) => Err(eyre!("operator crashed")),
tokio::sync::mpsc::error::TrySendError::Full(_) => {
warn!("operator queue full");
Ok(())
}
})
}

pub fn close_input_stream(&mut self) {
self.operator_task = None;
}

/// Get a reference to the operator's definition.
#[must_use]
pub fn definition(&self) -> &OperatorDefinition {
&self.definition
})?;
}
OperatorSource::Python(path) => {
python::spawn(path, events_tx, inputs, publishers).wrap_err_with(|| {
format!(
"failed to spawn Python operator for {}",
operator_definition.id
)
})?;
}
OperatorSource::Wasm(_path) => {
eprintln!("WARNING: WASM operators are not supported yet");
}
}
Ok(())
}

pub enum OperatorEvent {
Output { id: DataId, value: Vec<u8> },
Error(eyre::Error),
Panic(Box<dyn Any + Send>),
Finished,
}

pub struct OperatorInput {
id: DataId,
value: Vec<u8>,
}

+ 19
- 14
binaries/runtime/src/operator/python.rs View File

@@ -1,12 +1,15 @@
use super::{OperatorEvent, OperatorInput};
use super::OperatorEvent;
use dora_node_api::{communication::Publisher, config::DataId};
use eyre::{bail, eyre, Context};
use pyo3::{pyclass, types::IntoPyDict, types::PyBytes, Py, Python};
use std::{
collections::HashMap,
panic::{catch_unwind, AssertUnwindSafe},
path::Path,
sync::Arc,
thread,
};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::Sender;

fn traceback(err: pyo3::PyErr) -> eyre::Report {
Python::with_gil(|py| {
@@ -23,7 +26,8 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report {
pub fn spawn(
path: &Path,
events_tx: Sender<OperatorEvent>,
mut inputs: Receiver<OperatorInput>,
inputs: flume::Receiver<dora_node_api::Input>,
publishers: HashMap<DataId, Box<dyn Publisher>>,
) -> eyre::Result<()> {
if !path.exists() {
bail!("No python file exists at {}", path.display());
@@ -34,7 +38,7 @@ pub fn spawn(
let path_cloned = path.clone();

let send_output = SendOutputCallback {
events_tx: events_tx.clone(),
publishers: Arc::new(publishers),
};

let init_operator = move |py: Python| {
@@ -75,7 +79,7 @@ pub fn spawn(
let operator =
Python::with_gil(init_operator).wrap_err("failed to init python operator")?;

while let Some(input) = inputs.blocking_recv() {
while let Ok(input) = inputs.recv() {
let status_enum = Python::with_gil(|py| {
operator
.call_method1(
@@ -83,7 +87,7 @@ pub fn spawn(
"on_input",
(
input.id.to_string(),
PyBytes::new(py, &input.value),
PyBytes::new(py, &input.data),
send_output.clone(),
),
)
@@ -139,24 +143,25 @@ pub fn spawn(
#[pyclass]
#[derive(Clone)]
struct SendOutputCallback {
events_tx: Sender<OperatorEvent>,
publishers: Arc<HashMap<DataId, Box<dyn Publisher>>>,
}

#[allow(unsafe_op_in_unsafe_fn)]
mod callback_impl {
use super::SendOutputCallback;
use crate::operator::OperatorEvent;
use eyre::{eyre, Context};
use pyo3::{pymethods, PyResult};

#[pymethods]
impl SendOutputCallback {
fn __call__(&mut self, output: &str, data: &[u8]) -> PyResult<()> {
let result = self.events_tx.blocking_send(OperatorEvent::Output {
id: output.to_owned().into(),
value: data.to_owned(),
});
result
.map_err(|_| eyre::eyre!("channel to dora runtime was closed unexpectedly").into())
match self.publishers.get(output) {
Some(publisher) => publisher.publish(data).context("publish failed"),
None => Err(eyre!(
"unexpected output {output} (not defined in dataflow config)"
)),
}
.map_err(|err| err.into())
}
}
}

+ 24
- 20
binaries/runtime/src/operator/shared_lib.rs View File

@@ -1,23 +1,28 @@
use super::{OperatorEvent, OperatorInput};
use super::OperatorEvent;
use dora_node_api::{communication::Publisher, config::DataId, BoxError};
use dora_operator_api_types::{
safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput,
DoraResult, DoraStatus, Metadata, OnInputResult, Output, SendOutput,
};
use eyre::{bail, eyre, Context};
use flume::Receiver;
use libloading::Symbol;
use std::{
collections::HashMap,
ffi::c_void,
ops::Deref,
panic::{catch_unwind, AssertUnwindSafe},
path::Path,
sync::Arc,
thread,
};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::Sender;

pub fn spawn(
path: &Path,
events_tx: Sender<OperatorEvent>,
inputs: Receiver<OperatorInput>,
inputs: Receiver<dora_node_api::Input>,
publishers: HashMap<DataId, Box<dyn Publisher>>,
) -> eyre::Result<()> {
let file_name = path
.file_name()
@@ -41,13 +46,9 @@ pub fn spawn(
let closure = AssertUnwindSafe(|| {
let bindings = Bindings::init(&library).context("failed to init operator")?;

let operator = SharedLibraryOperator {
events_tx: events_tx.clone(),
inputs,
bindings,
};
let operator = SharedLibraryOperator { inputs, bindings };

operator.run()
operator.run(publishers)
});
match catch_unwind(closure) {
Ok(Ok(())) => {
@@ -66,14 +67,13 @@ pub fn spawn(
}

struct SharedLibraryOperator<'lib> {
events_tx: Sender<OperatorEvent>,
inputs: Receiver<OperatorInput>,
inputs: Receiver<dora_node_api::Input>,

bindings: Bindings<'lib>,
}

impl<'lib> SharedLibraryOperator<'lib> {
fn run(mut self) -> eyre::Result<()> {
fn run(self, publishers: HashMap<DataId, Box<dyn Publisher>>) -> eyre::Result<()> {
let operator_context = {
let DoraInitResult {
result,
@@ -89,13 +89,17 @@ impl<'lib> SharedLibraryOperator<'lib> {
}
};

let closure_events_tx = self.events_tx.clone();
let send_output_closure = Arc::new(move |output: Output| {
let id: String = output.id.into();
let result = closure_events_tx.blocking_send(OperatorEvent::Output {
id: id.into(),
value: output.data.into(),
});
let result = match publishers.get(output.id.deref()) {
Some(publisher) => publisher.publish(&output.data),
None => Err(BoxError(
eyre!(
"unexpected output {} (not defined in dataflow config)",
output.id.deref()
)
.into(),
)),
};

let error = match result {
Ok(()) => None,
@@ -105,10 +109,10 @@ impl<'lib> SharedLibraryOperator<'lib> {
DoraResult { error }
});

while let Some(input) = self.inputs.blocking_recv() {
while let Ok(input) = self.inputs.recv() {
let operator_input = dora_operator_api_types::Input {
id: String::from(input.id).into(),
data: input.value.into(),
data: input.data.into(),
metadata: Metadata {
open_telemetry_context: String::new().into(),
},


+ 26
- 0
examples/iceoryx/dataflow.yml View File

@@ -0,0 +1,26 @@
communication:
iceoryx:
app_name_prefix: dora-iceoryx-example

nodes:
- id: rust-node
custom:
run: ../../target/debug/iceoryx-example-node
inputs:
tick: dora/timer/millis/300
outputs:
- random
- id: runtime-node
operators:
- id: rust-operator
shared-library: ../../target/debug/iceoryx_example_operator
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink
custom:
run: ../../target/debug/iceoryx-example-sink
inputs:
message: runtime-node/rust-operator/status

+ 11
- 0
examples/iceoryx/node/Cargo.toml View File

@@ -0,0 +1,11 @@
[package]
name = "iceoryx-example-node"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" }
eyre = "0.6.8"
rand = "0.8.5"

+ 26
- 0
examples/iceoryx/node/src/main.rs View File

@@ -0,0 +1,26 @@
use dora_node_api::{self, config::DataId, DoraNode};

fn main() -> eyre::Result<()> {
let output = DataId::from("random".to_owned());

let mut operator = DoraNode::init_from_env()?;

let inputs = operator.inputs()?;

for _ in 0..20 {
let input = match inputs.recv() {
Ok(input) => input,
Err(_) => break,
};

match input.id.as_str() {
"tick" => {
let random: u64 = rand::random();
operator.send_output(&output, &random.to_le_bytes())?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

Ok(())
}

+ 13
- 0
examples/iceoryx/operator/Cargo.toml View File

@@ -0,0 +1,13 @@
[package]
name = "iceoryx-example-operator"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
crate-type = ["cdylib"]

[dependencies]
dora-operator-api = { path = "../../../apis/rust/operator" }

+ 47
- 0
examples/iceoryx/operator/src/lib.rs View File

@@ -0,0 +1,47 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus};
use std::time::{Duration, Instant};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
ticks: usize,
last_random_at: Option<Instant>,
}

impl DoraOperator for ExampleOperator {
fn on_input(
&mut self,
id: &str,
data: &[u8],
output_sender: &mut DoraOutputSender,
) -> Result<DoraStatus, String> {
match id {
"tick" => {
self.ticks += 1;
}
"random" => {
let parsed = {
let data: [u8; 8] = data.try_into().map_err(|_| "unexpected random data")?;
u64::from_le_bytes(data)
};
let output = format!(
"operator received random value {parsed} after {} ticks",
self.ticks
);
output_sender.send("status".into(), output.into_bytes())?;
self.last_random_at = Some(Instant::now());
}
other => eprintln!("ignoring unexpected input {other}"),
}
if let Some(last_random_at) = self.last_random_at {
if last_random_at.elapsed() > Duration::from_secs(1) {
// looks like the node sending the random values finished -> exit too
return Ok(DoraStatus::Stop);
}
}
Ok(DoraStatus::Continue)
}
}

+ 33
- 0
examples/iceoryx/run.rs View File

@@ -0,0 +1,33 @@
use eyre::{bail, Context};
use std::path::Path;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

build_package("iceoryx-example-node").await?;
build_package("iceoryx-example-operator").await?;
build_package("iceoryx-example-sink").await?;
build_package("dora-runtime").await?;

dora_coordinator::run(dora_coordinator::Command::Run {
dataflow: Path::new("dataflow.yml").to_owned(),
runtime: Some(root.join("target").join("debug").join("dora-runtime")),
})
.await?;

Ok(())
}

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
};
Ok(())
}

+ 12
- 0
examples/iceoryx/sink/Cargo.toml View File

@@ -0,0 +1,12 @@
[package]
name = "iceoryx-example-sink"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" }
eyre = "0.6.8"
futures = "0.3.21"
tokio = { version = "1.20.1", features = ["macros"] }

+ 32
- 0
examples/iceoryx/sink/src/main.rs View File

@@ -0,0 +1,32 @@
use dora_node_api::{self, DoraNode};
use eyre::{bail, Context};

fn main() -> eyre::Result<()> {
let mut operator = DoraNode::init_from_env()?;

let inputs = operator.inputs()?;

loop {
let input = match inputs.recv() {
Ok(input) => input,
Err(_) => break,
};

match input.id.as_str() {
"message" => {
let received_string = String::from_utf8(input.data)
.wrap_err("received message was not utf8-encoded")?;
println!("received message: {}", received_string);
if !received_string.starts_with("operator received random value ") {
bail!("unexpected message format (should start with 'operator received random value')")
}
if !received_string.ends_with(" ticks") {
bail!("unexpected message format (should end with 'ticks')")
}
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

Ok(())
}

+ 7
- 13
examples/rust-dataflow/node/src/main.rs View File

@@ -1,28 +1,22 @@
use dora_node_api::{self, config::DataId, DoraNode};
use eyre::bail;
use futures::StreamExt;
use std::time::Duration;

#[tokio::main]
async fn main() -> eyre::Result<()> {
fn main() -> eyre::Result<()> {
let output = DataId::from("random".to_owned());

let operator = DoraNode::init_from_env().await?;
let mut operator = DoraNode::init_from_env()?;

let mut inputs = operator.inputs().await?;
let inputs = operator.inputs()?;

for _ in 0..20 {
let timeout = Duration::from_secs(3);
let input = match tokio::time::timeout(timeout, inputs.next()).await {
Ok(Some(input)) => input,
Ok(None) => break,
Err(_) => bail!("timeout while waiting for input"),
let input = match inputs.recv() {
Ok(input) => input,
Err(_) => break,
};

match input.id.as_str() {
"tick" => {
let random: u64 = rand::random();
operator.send_output(&output, &random.to_le_bytes()).await?;
operator.send_output(&output, &random.to_le_bytes())?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}


+ 0
- 2
examples/rust-dataflow/sink/Cargo.toml View File

@@ -8,5 +8,3 @@ edition = "2021"
[dependencies]
dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" }
eyre = "0.6.8"
futures = "0.3.21"
tokio = { version = "1.20.1", features = ["macros"] }

+ 6
- 11
examples/rust-dataflow/sink/src/main.rs View File

@@ -1,20 +1,15 @@
use dora_node_api::{self, DoraNode};
use eyre::{bail, Context};
use futures::StreamExt;
use std::time::Duration;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let operator = DoraNode::init_from_env().await?;
fn main() -> eyre::Result<()> {
let mut operator = DoraNode::init_from_env()?;

let mut inputs = operator.inputs().await?;
let inputs = operator.inputs()?;

loop {
let timeout = Duration::from_secs(5);
let input = match tokio::time::timeout(timeout, inputs.next()).await {
Ok(Some(input)) => input,
Ok(None) => break,
Err(_) => bail!("timeout while waiting for input"),
let input = match inputs.recv() {
Ok(input) => input,
Err(_) => break,
};

match input.id.as_str() {


+ 0
- 20
libraries/core/src/lib.rs View File

@@ -1,21 +1 @@
pub mod descriptor;

pub struct BoxError(pub Box<dyn std::error::Error + Send + Sync + 'static>);

impl std::fmt::Debug for BoxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&self.0, f)
}
}

impl std::fmt::Display for BoxError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}

impl std::error::Error for BoxError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}

Loading…
Cancel
Save