Browse Source

Merge branch 'main' into alloc-shmem-in-node

tags/v0.2.0
Philipp Oppermann 2 years ago
parent
commit
4cc9b28953
Failed to extract signature
57 changed files with 712 additions and 1132 deletions
  1. +1
    -4
      .github/workflows/ci-python.yml
  2. +0
    -28
      .github/workflows/ci.yml
  3. +0
    -6
      .github/workflows/pip-release.yml
  4. +5
    -8
      .github/workflows/release.yml
  5. +444
    -541
      Cargo.lock
  6. +6
    -5
      README.md
  7. +1
    -1
      binaries/cli/Cargo.toml
  8. +7
    -12
      binaries/cli/src/main.rs
  9. +19
    -18
      binaries/cli/src/template/c/dataflow-template.yml
  10. +8
    -4
      binaries/cli/src/template/c/mod.rs
  11. +18
    -14
      binaries/cli/src/template/c/node/node-template.c
  12. +32
    -28
      binaries/cli/src/template/c/operator/operator-template.c
  13. +1
    -1
      binaries/cli/src/template/cxx/dataflow-template.yml
  14. +1
    -1
      binaries/cli/src/template/python/dataflow-template.yml
  15. +8
    -3
      binaries/cli/src/template/python/node/node-template.py
  16. +5
    -5
      binaries/cli/src/template/rust/dataflow-template.yml
  17. +2
    -2
      binaries/cli/src/template/rust/mod.rs
  18. +1
    -1
      binaries/coordinator/Cargo.toml
  19. +9
    -32
      binaries/coordinator/src/lib.rs
  20. +1
    -2
      binaries/coordinator/src/main.rs
  21. +1
    -21
      binaries/coordinator/src/run/mod.rs
  22. +1
    -8
      binaries/daemon/src/lib.rs
  23. +6
    -6
      binaries/runtime/src/lib.rs
  24. +6
    -41
      docs/src/c-api.md
  25. +34
    -47
      docs/src/dataflow-config.md
  26. +1
    -1
      docs/src/getting-started.md
  27. +2
    -8
      docs/src/installation.md
  28. +7
    -5
      docs/src/introduction.md
  29. +2
    -7
      docs/src/python-api.md
  30. +16
    -24
      docs/src/rust-api.md
  31. +1
    -1
      examples/benchmark/dataflow.yml
  32. +1
    -1
      examples/c++-dataflow/README.md
  33. +1
    -1
      examples/c++-dataflow/dataflow.yml
  34. +1
    -1
      examples/c-dataflow/README.md
  35. +1
    -1
      examples/c-dataflow/dataflow.yml
  36. +2
    -10
      examples/python-dataflow/README.md
  37. +1
    -1
      examples/python-dataflow/dataflow.yml
  38. +1
    -1
      examples/python-dataflow/dataflow_without_webcam.yml
  39. +2
    -2
      examples/python-operator-dataflow/README.md
  40. +1
    -1
      examples/python-operator-dataflow/dataflow.yml
  41. +1
    -1
      examples/python-operator-dataflow/dataflow_without_webcam.yml
  42. +1
    -1
      examples/python-operator-dataflow/no_webcam.py
  43. +2
    -6
      examples/python-operator-dataflow/object_detection.py
  44. +10
    -8
      examples/python-operator-dataflow/plot.py
  45. +1
    -1
      examples/rust-dataflow-url/dataflow.yml
  46. +1
    -1
      examples/rust-dataflow/dataflow.yml
  47. +4
    -9
      libraries/communication-layer/pub-sub/Cargo.toml
  48. +0
    -158
      libraries/communication-layer/pub-sub/src/iceoryx.rs
  49. +0
    -5
      libraries/communication-layer/pub-sub/src/lib.rs
  50. +15
    -12
      libraries/communication-layer/pub-sub/src/zenoh.rs
  51. +6
    -0
      libraries/communication-layer/request-reply/src/tcp.rs
  52. +1
    -1
      libraries/core/Cargo.toml
  53. +2
    -17
      libraries/core/src/config.rs
  54. +1
    -1
      libraries/core/src/descriptor/visualize.rs
  55. +1
    -1
      libraries/extensions/download/Cargo.toml
  56. +1
    -2
      libraries/extensions/zenoh-logger/Cargo.toml
  57. +8
    -4
      libraries/extensions/zenoh-logger/src/main.rs

+ 1
- 4
.github/workflows/ci-python.yml View File

@@ -15,10 +15,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install libacl-dev
run: |
export DEBIAN_FRONTEND=noninteractive
sudo apt-get install -y libacl1-dev

- uses: actions/setup-python@v2
with:
python-version: "3.10"


+ 0
- 28
.github/workflows/ci.yml View File

@@ -16,12 +16,6 @@ jobs:
timeout-minutes: 30
steps:
- uses: actions/checkout@v3
- name: Install libacl-dev (Linux)
if: runner.os == 'Linux'
run: |
export DEBIAN_FRONTEND=noninteractive
sudo apt-get install -y libacl1-dev

- uses: r7kamura/rust-problem-matchers@v1.1.0
- run: cargo --version --verbose
- uses: Swatinem/rust-cache@v2
@@ -43,12 +37,6 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: Install libacl-dev (Linux)
if: runner.os == 'Linux'
run: |
export DEBIAN_FRONTEND=noninteractive
sudo apt-get install -y libacl1-dev

- uses: r7kamura/rust-problem-matchers@v1.1.0
- run: cargo --version --verbose
- uses: Swatinem/rust-cache@v2
@@ -82,12 +70,6 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: Install libacl-dev (Linux)
if: runner.os == 'Linux'
run: |
export DEBIAN_FRONTEND=noninteractive
sudo apt-get install -y libacl1-dev

- uses: r7kamura/rust-problem-matchers@v1.1.0
- run: cargo --version --verbose
- uses: Swatinem/rust-cache@v2
@@ -120,12 +102,6 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: Install libacl-dev (Linux)
if: runner.os == 'Linux'
run: |
export DEBIAN_FRONTEND=noninteractive
sudo apt-get install -y libacl1-dev

- uses: r7kamura/rust-problem-matchers@v1.1.0
- run: cargo --version --verbose
- uses: Swatinem/rust-cache@v2
@@ -140,10 +116,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install libacl-dev
run: |
export DEBIAN_FRONTEND=noninteractive
sudo apt-get install -y libacl1-dev

- uses: r7kamura/rust-problem-matchers@v1.1.0
- run: cargo --version --verbose


+ 0
- 6
.github/workflows/pip-release.yml View File

@@ -21,12 +21,6 @@ jobs:

steps:
- uses: actions/checkout@v3

- name: Install libacl-dev (Linux)
if: runner.os == 'Linux'
run: |
export DEBIAN_FRONTEND=noninteractive
sudo apt-get install -y libacl1-dev
- uses: r7kamura/rust-problem-matchers@v1.1.0
# Publish Dora Node Python API


+ 5
- 8
.github/workflows/release.yml View File

@@ -22,17 +22,12 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: Install libacl-dev (Linux)
if: runner.os == 'Linux'
run: |
export DEBIAN_FRONTEND=noninteractive
sudo apt-get install -y libacl1-dev

- uses: r7kamura/rust-problem-matchers@v1.1.0

- name: "Build binaries"
timeout-minutes: 30
run: cargo build --release -p dora-runtime -p dora-coordinator -p dora-cli
timeout-minutes: 60
run: "cargo build --release -p dora-runtime
-p dora-coordinator -p dora-cli -p dora-daemon"

- name: "Create Archive (Unix)"
if: runner.os == 'Linux' || runner.os == 'macOS'
@@ -40,6 +35,7 @@ jobs:
mkdir archive
cp target/release/dora-runtime archive
cp target/release/dora-coordinator archive
cp target/release/dora-daemon archive
cp target/release/dora-cli archive/dora
cd archive
zip -r ../archive.zip .
@@ -52,6 +48,7 @@ jobs:
New-Item -Path archive -ItemType Directory
Copy-Item target/release/dora-runtime.exe -Destination archive
Copy-Item target/release/dora-coordinator.exe -Destination archive
Copy-Item target/release/dora-daemon.exe -Destination archive
Copy-Item target/release/dora-cli.exe -Destination archive/dora.exe
Compress-Archive -Path archive\* -DestinationPath archive.zip



+ 444
- 541
Cargo.lock
File diff suppressed because it is too large
View File


+ 6
- 5
README.md View File

@@ -31,7 +31,7 @@ For linux
wget https://github.com/dora-rs/dora/releases/download/<version>/dora-<version>-x86_64-Linux.zip
unzip dora-<version>-x86_64-Linux.zip
python3 -m pip install dora-rs==<version>
PATH=$PATH:$(pwd):$(pwd)/iceoryx
PATH=$PATH:$(pwd)
dora --help
```

@@ -96,7 +96,7 @@ You need to add the created operators/nodes to your dataflow YAML file.
```yaml
communication:
zenoh:
prefix: /abc_project
prefix: abc_project

nodes:
- id: op_1
@@ -119,7 +119,7 @@ nodes:

Composability as:
- [x] `YAML` declarative programming
- [x] language-agnostic:
- [x] polyglot:
- [x] Rust
- [x] C
- [x] C++
@@ -128,10 +128,11 @@ Composability as:

Low latency as:
- [x] written in <i>...Cough...blazingly fast ...Cough...</i> Rust.
- [x] PubSub communication with [`iceoryx`](https://iceoryx.io/v1.0.1/)
- [x] PubSub communication with shared memory!
- [ ] Zero-copy on read!

Distributed as:
- [x] PubSub communication with [`zenoh`](https://github.com/eclipse-zenoh/zenoh)
- [ ] PubSub communication between machines with [`zenoh`](https://github.com/eclipse-zenoh/zenoh)
- [x] Distributed telemetry with [`opentelemetry`](https://github.com/open-telemetry/opentelemetry-rust)

---


+ 1
- 1
binaries/cli/Cargo.toml View File

@@ -15,7 +15,7 @@ eyre = "0.6.8"
dora-core = { path = "../../libraries/core" }
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
tempfile = "3.3.0"
tempfile = "3.4.0"
webbrowser = "0.8.3"
serde_json = "1.0.86"
termcolor = "1.1.3"


+ 7
- 12
binaries/cli/src/main.rs View File

@@ -40,7 +40,6 @@ enum Command {
#[clap(flatten)]
args: CommandNew,
},
Dashboard,
Up {
#[clap(long)]
config: Option<PathBuf>,
@@ -63,12 +62,14 @@ enum Command {
#[clap(long)]
name: Option<String>,
},
Logs,
Metrics,
Stats,
List,
Get,
Upgrade,
// Planned for future releases:
// Dashboard,
// Logs,
// Metrics,
// Stats,
// Get,
// Upgrade,
}

#[derive(Debug, clap::Args)]
@@ -120,7 +121,6 @@ fn main() -> eyre::Result<()> {
build::build(&dataflow)?;
}
Command::New { args } => template::create(args)?,
Command::Dashboard => todo!(),
Command::Up {
config,
coordinator_path,
@@ -138,11 +138,6 @@ fn main() -> eyre::Result<()> {
(None, None) => stop_dataflow_interactive(&mut session)?,
},
Command::Destroy { config } => up::destroy(config.as_deref(), &mut session)?,
Command::Logs => todo!(),
Command::Metrics => todo!(),
Command::Stats => todo!(),
Command::Get => todo!(),
Command::Upgrade => todo!(),
}

Ok(())


+ 19
- 18
binaries/cli/src/template/c/dataflow-template.yml View File

@@ -1,27 +1,28 @@
communication:
zenoh:
prefix: /___name___
prefix: ___name___

nodes:
- id: runtime-node_1
operators:
- id: op_1
shared-library: build/op_1
inputs:
tick: dora/timer/millis/100
outputs:
- some-output
- id: op_2
shared-library: build/op_2
inputs:
tick: dora/timer/secs/2
outputs:
- some-output
- id: op_1
operator:
shared-library: build/op_1
inputs:
foo: dora/timer/millis/100
outputs:
- bar
- id: op_2
operator:
shared-library: build/op_2
inputs:
foo: dora/timer/secs/2
outputs:
- bar

- id: custom-node_1
custom:
source: build/node_1
inputs:
tick: dora/timer/secs/1
input-1: op_1/some-output
input-2: op_2/some-output
input-1: op_1/bar
input-2: op_2/bar
outputs:
- foo

+ 8
- 4
binaries/cli/src/template/c/mod.rs View File

@@ -53,7 +53,8 @@ fn create_dataflow(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrR

fn create_operator(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrReport> {
const OPERATOR: &str = include_str!("operator/operator-template.c");
const HEADER: &str = include_str!("../../../../../apis/c/operator/operator_api.h");
const HEADER_API: &str = include_str!("../../../../../apis/c/operator/operator_api.h");
const HEADER_TYPE: &str = include_str!("../../../../../apis/c/operator/operator_types.h");

if name.contains('/') {
bail!("operator name must not contain `/` separators");
@@ -73,9 +74,12 @@ fn create_operator(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrR
let operator_path = root.join("operator.c");
fs::write(&operator_path, OPERATOR)
.with_context(|| format!("failed to write `{}`", operator_path.display()))?;
let header_path = root.join("operator_api.h");
fs::write(&header_path, HEADER)
.with_context(|| format!("failed to write `{}`", header_path.display()))?;
let header_api_path = root.join("operator_api.h");
let header_type_path = root.join("operator_types.h");
fs::write(&header_api_path, HEADER_API)
.with_context(|| format!("failed to write `{}`", header_api_path.display()))?;
fs::write(&header_type_path, HEADER_TYPE)
.with_context(|| format!("failed to write `{}`", header_type_path.display()))?;

// TODO: Makefile?



+ 18
- 14
binaries/cli/src/template/c/node/node-template.c View File

@@ -21,26 +21,30 @@ int main()

while (1)
{
void *input = dora_next_input(dora_context);
if (input == NULL)
void *event = dora_next_event(dora_context);
if (event == NULL)
{
// end of input
break;
printf("[c node] ERROR: unexpected end of event\n");
return -1;
}

char *id;
size_t id_len;
read_dora_input_id(input, &id, &id_len);
enum DoraEventType ty = read_dora_event_type(event);
if (ty == DoraEventType_Input)
{
char *id;
size_t id_len;
read_dora_input_id(event, &id, &id_len);

char *data;
size_t data_len;
read_dora_input_data(input, &data, &data_len);
char *data;
size_t data_len;
read_dora_input_data(event, &data, &data_len);

char out_id[] = "foo";
char out_data[] = "bar";
dora_send_output(dora_context, out_id, strlen(out_id), out_data, strlen(out_data));
char out_id[] = "foo";
char out_data[] = "bar";
dora_send_output(dora_context, out_id, strlen(out_id), out_data, strlen(out_data));

free_dora_input(input); // do not use `id` or `data` pointer after freeing
free_dora_event(event); // do not use `id` or `data` pointer after freeing
}
}

free_dora_context(dora_context);


+ 32
- 28
binaries/cli/src/template/c/operator/operator-template.c View File

@@ -22,39 +22,43 @@ DoraResult_t dora_drop_operator(void *operator_context)
return result;
}

OnInputResult_t dora_on_input(
const Input_t *input,
OnEventResult_t dora_on_event(
const RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context)
{
char id[input->id.len + 1];
memcpy(id, input->id.ptr, input->id.len);
id[input->id.len] = 0;

// example for matching on input name
if (strcmp(id, "foo") == 0)
if (event->input != NULL)
{
char *out_id = "bar";
char *out_id_heap = strdup(out_id);

int data_alloc_size = 10;
void *out_data = malloc(data_alloc_size);
// TODO intialize out_data

Output_t output = {.id = {
.ptr = (uint8_t *)out_id_heap,
.len = strlen(out_id_heap),
.cap = strlen(out_id_heap) + 1,
},
.data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}};
DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output);

OnInputResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE};
return result;
char id[event->input->id.len + 1];
memcpy(id, event->input->id.ptr, event->input->id.len);
id[event->input->id.len] = 0;

// example for matching on input name
if (strcmp(id, "foo") == 0)
{
char *out_id = "bar";
char *out_id_heap = strdup(out_id);

int data_alloc_size = 10;
void *out_data = malloc(data_alloc_size);
// TODO intialize out_data

Output_t output = {.id = {
.ptr = (uint8_t *)out_id_heap,
.len = strlen(out_id_heap),
.cap = strlen(out_id_heap) + 1,
},
.data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}};
DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output);

OnEventResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE};
return result;
}
}
else
if (event->stop)
{
OnInputResult_t result = {.status = DORA_STATUS_CONTINUE};
return result;
printf("C operator received stop event\n");
}
OnEventResult_t result = {.status = DORA_STATUS_CONTINUE};
return result;
}

+ 1
- 1
binaries/cli/src/template/cxx/dataflow-template.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /___name___
prefix: ___name___

nodes:
- id: runtime-node_1


+ 1
- 1
binaries/cli/src/template/python/dataflow-template.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /___name___
prefix: ___name___

nodes:
- id: op_1


+ 8
- 3
binaries/cli/src/template/python/node/node-template.py View File

@@ -5,6 +5,11 @@ from dora import Node

node = Node()

input_id, value, metadata = node.next()

print(f"id: {input_id}, value: {value}, metadata: {metadata}")
event = node.next()
if event["type"] == "INPUT":
print(
f"""Node received:
id: {event["id"]},
value: {event["data"]},
metadata: {event["metadata"]}"""
)

+ 5
- 5
binaries/cli/src/template/rust/dataflow-template.yml View File

@@ -1,18 +1,18 @@
communication:
zenoh:
prefix: /___name___
prefix: ___name___

nodes:
- id: runtime-node_1
operators:
- id: op_1
- id: op_1
operator:
build: cargo build -p op_1
shared-library: target/debug/op_1
inputs:
tick: dora/timer/millis/100
outputs:
- some-output
- id: op_2
- id: op_2
operator:
build: cargo build -p op_2
shared-library: target/debug/op_2
inputs:


+ 2
- 2
binaries/cli/src/template/rust/mod.rs View File

@@ -84,7 +84,7 @@ fn create_operator(name: String, path: Option<PathBuf>) -> Result<(), eyre::ErrR

let cargo_toml = CARGO_TOML
.replace("___name___", &name)
.replace("___version___", &VERSION);
.replace("___version___", VERSION);
let cargo_toml_path = root.join("Cargo.toml");
fs::write(&cargo_toml_path, &cargo_toml)
.with_context(|| format!("failed to write `{}`", cargo_toml_path.display()))?;
@@ -122,7 +122,7 @@ fn create_custom_node(name: String, path: Option<PathBuf>) -> Result<(), eyre::E

let cargo_toml = CARGO_TOML
.replace("___name___", &name)
.replace("___version___", &VERSION);
.replace("___version___", VERSION);
let cargo_toml_path = root.join("Cargo.toml");
fs::write(&cargo_toml_path, &cargo_toml)
.with_context(|| format!("failed to write `{}`", cargo_toml_path.display()))?;


+ 1
- 1
binaries/coordinator/Cargo.toml View File

@@ -27,7 +27,7 @@ rand = "0.8.5"
dora-core = { workspace = true }
tracing = "0.1.36"
futures-concurrency = "7.1.0"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" }
zenoh = "0.7.0-rc"
serde_json = "1.0.86"
dora-download = { path = "../../libraries/extensions/download" }
dora-tracing = { workspace = true, optional = true }


+ 9
- 32
binaries/coordinator/src/lib.rs View File

@@ -18,7 +18,7 @@ use futures_concurrency::stream::Merge;
use run::SpawnedDataflow;
use std::{
collections::{BTreeSet, HashMap},
path::{Path, PathBuf},
path::Path,
time::Duration,
};
use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle};
@@ -30,28 +30,11 @@ mod listener;
mod run;
mod tcp_utils;

#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Dora coordinator")]
pub struct Args {
#[clap(long)]
pub runtime: Option<PathBuf>,
}

pub async fn run(args: Args) -> eyre::Result<()> {
let Args { runtime } = args;

let runtime_path = runtime.unwrap_or_else(|| {
std::env::args()
.next()
.map(PathBuf::from)
.unwrap_or_default()
.with_file_name("dora-runtime")
});

pub async fn run() -> eyre::Result<()> {
let mut tasks = FuturesUnordered::new();

// start in daemon mode
start(&runtime_path, &tasks).await?;
start(&tasks).await?;

tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
while let Some(join_result) = tasks.next().await {
@@ -64,7 +47,7 @@ pub async fn run(args: Args) -> eyre::Result<()> {
Ok(())
}

async fn start(runtime_path: &Path, tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
let ctrlc_events = set_up_ctrlc_handler()?;

let listener = listener::create_listener(DORA_COORDINATOR_PORT_DEFAULT).await?;
@@ -191,13 +174,9 @@ async fn start(runtime_path: &Path, tasks: &FuturesUnordered<JoinHandle<()>>) ->
bail!("there is already a running dataflow with name `{name}`");
}
}
let dataflow = start_dataflow(
&dataflow_path,
name,
runtime_path,
&mut daemon_connections,
)
.await?;
let dataflow =
start_dataflow(&dataflow_path, name, &mut daemon_connections)
.await?;
Ok(dataflow)
};
inner.await.map(|dataflow| {
@@ -377,6 +356,7 @@ async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> {
}
}

#[allow(dead_code)] // Keeping the communication layer for later use.
struct RunningDataflow {
name: Option<String>,
uuid: Uuid,
@@ -432,16 +412,13 @@ async fn stop_dataflow(
async fn start_dataflow(
path: &Path,
name: Option<String>,
runtime_path: &Path,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<RunningDataflow> {
let runtime_path = runtime_path.to_owned();

let SpawnedDataflow {
uuid,
communication_config,
machines,
} = spawn_dataflow(&runtime_path, path, daemon_connections).await?;
} = spawn_dataflow(path, daemon_connections).await?;
Ok(RunningDataflow {
uuid,
name,


+ 1
- 2
binaries/coordinator/src/main.rs View File

@@ -8,6 +8,5 @@ async fn main() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing().context("failed to set up tracing subscriber")?;

let args = clap::Parser::parse();
dora_coordinator::run(args).await
dora_coordinator::run().await
}

+ 1
- 21
binaries/coordinator/src/run/mod.rs View File

@@ -3,23 +3,20 @@ use crate::tcp_utils::{tcp_receive, tcp_send};
use dora_core::{
config::CommunicationConfig,
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, SpawnDataflowNodes},
descriptor::{CoreNodeKind, Descriptor},
descriptor::Descriptor,
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use std::{
collections::{BTreeSet, HashMap},
env::consts::EXE_EXTENSION,
path::Path,
};
use tokio::net::TcpStream;
use uuid::Uuid;

pub async fn spawn_dataflow(
runtime: &Path,
dataflow_path: &Path,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<SpawnedDataflow> {
let mut runtime = runtime.with_extension(EXE_EXTENSION);
let descriptor = read_descriptor(dataflow_path).await.wrap_err_with(|| {
format!(
"failed to read dataflow descriptor at {}",
@@ -40,23 +37,6 @@ pub async fn spawn_dataflow(
config.add_topic_prefix(&uuid.to_string());
config
};
if nodes
.iter()
.any(|n| matches!(n.kind, CoreNodeKind::Runtime(_)))
{
match which::which(runtime.as_os_str()) {
Ok(path) => {
runtime = path;
}
Err(err) => {
let err = eyre!(err).wrap_err(format!(
"There is no runtime at {}, or it is not a file",
runtime.display()
));
bail!("{err:?}")
}
}
}

let spawn_command = SpawnDataflowNodes {
dataflow_id: uuid,


+ 1
- 8
binaries/daemon/src/lib.rs View File

@@ -20,7 +20,7 @@ use std::{
io,
net::SocketAddr,
path::{Path, PathBuf},
time::{Duration, Instant},
time::Duration,
};
use tcp_utils::tcp_receive;
use tokio::{
@@ -188,8 +188,6 @@ impl Daemon {
let mut events = incoming_events;

while let Some(event) = events.next().await {
let start = Instant::now();

match event {
Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
let (reply, status) = self.handle_coordinator_event(event).await;
@@ -231,11 +229,6 @@ impl Daemon {
}
}
}

let elapsed = start.elapsed();
// if elapsed.as_micros() > 10 {
// tracing::debug!("handled event in {elapsed:?}: {event_debug}");
// }
}

Ok(self.dataflow_errors)


+ 6
- 6
binaries/runtime/src/lib.rs View File

@@ -143,17 +143,17 @@ async fn run(
}
OperatorEvent::Finished { reason } => {
if let StopReason::ExplicitStopAll = reason {
let hlc = dora_core::message::uhlc::HLC::default();
let metadata = dora_core::message::Metadata::new(hlc.new_timestamp());
let data = metadata
.serialize()
.wrap_err("failed to serialize stop message")?;
// let hlc = dora_core::message::uhlc::HLC::default();
// let metadata = dora_core::message::Metadata::new(hlc.new_timestamp());
// let data = metadata
// .serialize()
// .wrap_err("failed to serialize stop message")?;
todo!("instruct dora-daemon/dora-coordinator to stop other nodes");
// manual_stop_publisher
// .publish(&data)
// .map_err(|err| eyre::eyre!(err))
// .wrap_err("failed to send stop message")?;
break;
// break;
}

let Some(config) = operators.get(&operator_id) else {


+ 6
- 41
docs/src/c-api.md View File

@@ -4,45 +4,10 @@

The operator API is a framework for you to implement. The implemented operator will be managed by `dora`. This framework enable us to make optimisation and provide advanced features.

The operator definition is composed of 3 functions, `dora_init_operator` that initialise the operator and its context. `dora_drop_operator` that free the memory, and `dora_on_input` that action the logic of the operator on receiving an input.
The operator definition is composed of 3 functions, `dora_init_operator` that initialise the operator and its context. `dora_drop_operator` that free the memory, and `dora_on_event` that action the logic of the operator on receiving an input.

```c
int dora_init_operator(void **operator_context)
{
// allocate a single byte to store a counter
// (the operator context pointer can be used to keep arbitrary data between calls)
void *context = malloc(1);

char *context_char = (char *)context;
*context_char = 0;

*operator_context = context;

return 0;
}

void dora_drop_operator(void *operator_context)
{
free(operator_context);
}

int dora_on_input(
const char *id_start,
size_t id_len,
const char *data_start,
size_t data_len,
const int (*output_fn_raw)(const char *id_start,
size_t id_len,
const char *data_start,
size_t data_len,
const void *output_context),
void *output_context,
const void *operator_context)
{
// handle the input ...
// (sending outputs is possible using `output_fn_raw`)
// (the `operator_context` is the pointer created in `dora_init_operator`, i.e., a counter in our case)
}
{{#include ../../examples/c-dataflow/operator.c:0:29}}
```
### Try it out!

@@ -70,9 +35,9 @@ The custom node API allow you to integrate `dora` into your application. It allo
void *dora_context = init_dora_context_from_env();
```

#### `dora_next_input`
#### `dora_next_event`

`dora_next_input` waits for the next input. To extract the input ID and data, use `read_dora_input_id` and `read_dora_input_data` on the returned pointer.
`dora_next_event` waits for the next event (e.g. an input). Use `read_dora_event_type` to read the event's type. Inputs are of type `DoraEventType_Input`. To extract the ID and data of an input event, use `read_dora_input_id` and `read_dora_input_data` on the returned pointer. It is safe to ignore any events and handle only the events that are relevant to the node.

```c
void *input = dora_next_input(dora_context);
@@ -94,8 +59,8 @@ read_dora_input_data(input, &data, &data_len);

```c
char out_id[] = "tick";
char out_data[] = {0, 0, 0};
dora_send_output(dora_context, out_id, strlen(out_id), &out_data, sizeof out_data);
char out_data[50];
dora_send_output(dora_context, out_id, strlen(out_id), out_data, out_data_len);
```
### Try it out!



+ 34
- 47
docs/src/dataflow-config.md View File

@@ -7,16 +7,15 @@ Dataflows are specified through a YAML file. This section presents our current d
Dataflows are specified through the following format:

```yaml
communication:
zenoh:
prefix: /example-python-no-webcam-dataflow

nodes:
- id: foo
# ... (see below)
- id: bar
# ... (see below)
deployment:
# (not specified yet, these fields are just examples)
zenoh_routers:
- 127.0.0.1
kubernetes:
```

### Inputs and Outputs
@@ -30,29 +29,34 @@ Input operands are specified using the <name>: <operator>/<output> syntax, where
Nodes are defined using the following format:

```yaml
- id: some-unique-id
name: Human-Readable Node Name
description: An optional description of the node's purpose.

# EITHER:
operators:
- id: operator-1
# ... (see below)
- id: operator-2
# ... (see below)

# OR:
custom:
run: path/to/timestamp
env:
- ENVIRONMENT_VARIABLE_1: true
working-directory: some/path

inputs:
input_1: operator_2/output_4
input_2: custom_node_2/output_4
outputs:
- output_1
nodes:
- id: some-unique-id
# For nodes with multiple operators
operators:
- id: operator-1
# ... (see below)
- id: operator-2
# ... (see below)



- id: some-unique-id-2
custom:
source: path/to/timestamp
env:
- ENVIRONMENT_VARIABLE_1: true
working-directory: some/path

inputs:
input_1: operator_2/output_4
input_2: custom_node_2/output_4
outputs:
- output_1
# Unique operator
- id: some-unique-id-3
operator:
# ... (see below)
```

Nodes must provide either a `operators` field, or a `custom` field, but not both. Nodes with an `operators` field run a dora runtime process, which runs and manages the specified operators. Nodes with a `custom` field, run a custom executable.
@@ -108,7 +112,7 @@ The mandatory `communication` key specifies how dora nodes and operators should
```yaml
communication:
zenoh:
prefix: /some-unique-prefix
prefix: some-unique-prefix
```

The specified `prefix` is added to all pub/sub topics. It is useful for filtering messages (e.g. in a logger) when other applications use `zenoh` in parallel. Dora will extend the given prefix with a newly generated UUID on each run, to ensure that multiple instances of the same dataflow run concurrently without interfering with each other.
@@ -116,24 +120,7 @@ The mandatory `communication` key specifies how dora nodes and operators should
Zenoh is quite flexible and can be easily scaled to distributed deployment. It does not require any extra setup since it supports peer-to-peer communication without an external broker. The drawback of zenoh is that it is still in an early stage of development, so it might still have reliability and performance issues.

_Note:_ Dora currently only supports local deployments, so interacting with remote nodes/operators is not possible yet.

- **[Iceoryx](https://iceoryx.io/):** The Eclipse iceoryx™ project provides an IPC middleware based on shared memory. It is very fast, but it only supports local communication. To use iceoryx as the communication backend, set the `communication` field to the following:

```yaml
communication:
iceoryx:
app_name_prefix: dora-iceoryx-example
```

The `app_name_prefix` defines a prefix for the _application name_ that the dataflow will use. An additional UUID will be added to that prefix to ensure that the application name remains unique even if multiple instances of the same dataflow are running.

In order to use iceoryx, you need to start its broker deamon called [_RouDi_](https://iceoryx.io/v2.0.2/getting-started/overview/#roudi). Its executable name is `iox-roudi`. There are two ways to obtain it:

- Follow the [iceoryx installation chapter](https://iceoryx.io/v2.0.2/getting-started/installation/)
- Clone the `dora-rs` project and build its iceoryx example using `cargo build --example iceoryx`. After building, you can find the `iox-roudi` executable inside the `target` directory using the following command: `find target -type f -wholename "*/iox-roudi"`.

Run the `iox-roudi` executable to start the iceoryx broker deamon. Afterwards, you should be able to run your dataflow.

## TODO: Integration with ROS 1/2

To integrate dora-rs operators with ROS1 or ROS2 operators, we plan to provide special _bridge operators_. These operators act as a sink in one dataflow framework and push all messages to a different dataflow framework, where they act as source.


+ 1
- 1
docs/src/getting-started.md View File

@@ -119,5 +119,5 @@ Let's write the graph definition so that the nodes know who to communicate with.

- Run the `dataflow`:
```bash
dora-coordinator --run-dataflow dataflow.yml dora-runtime
dora-daemon --run-dataflow dataflow.yml
```

+ 2
- 8
docs/src/installation.md View File

@@ -11,7 +11,7 @@ Install `dora` binaries from GitHub releases:
wget https://github.com/dora-rs/dora/releases/download/<version>/dora-<version>-x86_64-Linux.zip
unzip dora-<version>-x86_64-Linux.zip
python3 -m pip install dora-rs==<version> ## For Python API
PATH=$PATH:$(pwd):$(pwd)/iceoryx
PATH=$PATH:$(pwd)
dora --help
```

@@ -21,12 +21,6 @@ Build it using:
```bash
git clone https://github.com/dora-rs/dora.git
cd dora
cargo build -p dora-coordinator -p dora-runtime --release
cargo build --all --release
PATH=$PATH:$(pwd)/target/release
```

If you want to use `Iceoryx`. Add `iox-roudi` to the path.
You can find `iox-roudi` with:
```bash
find target -type f -wholename "*/iceoryx-install/bin/iox-roudi"
```

+ 7
- 5
docs/src/introduction.md View File

@@ -4,22 +4,24 @@

By using `dora`, you can define robotic applications as a graph of nodes that can be easily swapped and replaced. Those nodes can be shared and implemented in different languages such as Rust, Python or C. `dora` will then connect those nodes and try to provide as many features as possible to facilitate the dataflow.

## ✨ Features that we want to provide
## ✨ Features

Composability as:
- [x] `YAML` declarative programming
- [x] language-agnostic:
- [x] polyglot:
- [x] Rust
- [x] C
- [x] C++
- [x] Python
- [ ] Isolated operators and nodes that can be reused.
- [x] Isolated operators and custom nodes that can be reused.

Low latency as:
- [x] written in <i>...Cough...blazingly fast ...Cough...</i> Rust.
- [ ] Minimal abstraction, close to the metal.
- [x] PubSub communication with shared memory!
- [ ] Zero-copy on read!

Distributed as:
- [x] PubSub communication with [`zenoh`](https://github.com/eclipse-zenoh/zenoh)
- [ ] PubSub communication between machines with [`zenoh`](https://github.com/eclipse-zenoh/zenoh)
- [x] Distributed telemetry with [`opentelemetry`](https://github.com/open-telemetry/opentelemetry-rust)




+ 2
- 7
docs/src/python-api.md View File

@@ -4,15 +4,10 @@

The operator API is a framework for you to implement. The implemented operator will be managed by `dora`. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using `dora`.

An operator requires an `on_input` method and requires to return a `DoraStatus` of 0 or 1, depending of it needs to continue or stop.
An operator requires an `on_event` method and requires to return a `DoraStatus` , depending of it needs to continue or stop.

```python
class Operator:
def on_input(
self,
dora_input: dict,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
{{#include ../../examples/python-operator-dataflow/object_detection.py:0:25}}
```

> For Python, we recommend to allocate the operator on a single runtime. A runtime will share the same GIL with several operators making those operators run almost sequentially. See: [https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks](https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks)


+ 16
- 24
docs/src/rust-api.md View File

@@ -4,25 +4,10 @@

The operator API is a framework for you to implement. The implemented operator will be managed by `dora`. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using `dora`.

An operator requires to be registered and implement the `DoraOperator` trait. It is composed of an `on_input` method that defines the behaviour of the operator when there is an input.
An operator requires to be registered and implement the `DoraOperator` trait. It is composed of an `on_event` method that defines the behaviour of the operator when there is an event such as receiving an input for example.

```rust
use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
time: Option<String>,
}

impl DoraOperator for ExampleOperator {
fn on_input(
&mut self,
id: &str,
data: &[u8],
output_sender: &mut DoraOutputSender,
) -> Result<DoraStatus, ()> {
{{#include ../../examples/rust-dataflow/operator/src/lib.rs:0:17}}
```

### Try it out!
@@ -63,23 +48,30 @@ The custom node API allow you to integrate `dora` into your application. It allo
`DoraNode::init_from_env()` initiate a node from environment variables set by `dora-coordinator`

```rust
let node = DoraNode::init_from_env().await?;
let (mut node, mut events) = DoraNode::init_from_env()?;
```

#### `.inputs()`
#### `.recv()`

`.inputs()` gives you a stream of input that you can access using `next()` on the input stream.
`.recv()` wait for the next event on the events stream.

```rust
let mut inputs = node.inputs().await?;
let event = events.recv();
```

#### `.send_output(output_id, data)`
#### `.send_output(...)`

`send_output` send data from the node.
`send_output` send data from the node to the other nodes.
We take a closure as an input to enable zero copy on send.

```rust
node.send_output(&data_id, data.as_bytes()).await?;
node.send_output(
&data_id,
metadata.parameters,
data.len(),
|out| {
out.copy_from_slice(data);
})?;
```

### Try it out!


+ 1
- 1
examples/benchmark/dataflow.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /benchmark-example
prefix: benchmark-example

nodes:
- id: rust-node


+ 1
- 1
examples/c++-dataflow/README.md View File

@@ -53,5 +53,5 @@ For a manual build, follow these steps:
- Start the `dora-coordinator`, passing the paths to the dataflow file and the `dora-runtime` as arguments:

```
../../target/release/dora-coordinator --run-dataflow dataflow.yml ../../target/release/dora-runtime
../../target/release/dora-daemon --run-dataflow dataflow.yml ../../target/release/dora-runtime
```

+ 1
- 1
examples/c++-dataflow/dataflow.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /example-cxx-dataflow
prefix: example-cxx-dataflow

nodes:
- id: cxx-node-rust-api


+ 1
- 1
examples/c-dataflow/README.md View File

@@ -65,5 +65,5 @@ For a manual build, follow these steps:
- Start the `dora-coordinator`, passing the paths to the dataflow file and the `dora-runtime` as arguments:

```
../../target/release/dora-coordinator --run-dataflow dataflow.yml ../../target/release/dora-runtime
../../target/release/dora-daemon --run-dataflow dataflow.yml ../../target/release/dora-runtime
```

+ 1
- 1
examples/c-dataflow/dataflow.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /example-c-dataflow
prefix: example-c-dataflow

nodes:
- id: c_node


+ 2
- 10
examples/python-dataflow/README.md View File

@@ -16,18 +16,10 @@ The [`dataflow.yml`](./dataflow.yml) defines a simple dataflow graph with the fo
cargo run --example python-dataflow
```

## Installation

To install, you should run the `install.sh` script.

```bash
install.sh
```

## Run the dataflow as a standalone

- Start the `dora-coordinator`, passing the paths to the dataflow file and the `dora-runtime` as arguments:
- Start the `dora-daemon`:

```
../../target/release/dora-coordinator --run-dataflow dataflow.yml ../../target/release/dora-runtime
../../target/release/dora-daemon --run-dataflow dataflow.yml
```

+ 1
- 1
examples/python-dataflow/dataflow.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /example-python-dataflow
prefix: example-python-dataflow

nodes:
- id: webcam


+ 1
- 1
examples/python-dataflow/dataflow_without_webcam.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /example-python-no-webcam-dataflow
prefix: example-python-no-webcam-dataflow

nodes:
- id: no_webcam


+ 2
- 2
examples/python-operator-dataflow/README.md View File

@@ -26,8 +26,8 @@ install.sh

## Run the dataflow as a standalone

- Start the `dora-coordinator`, passing the paths to the dataflow file and the `dora-runtime` as arguments:
- Start the `dora-coordinator`:

```
../../target/release/dora-coordinator --run-dataflow dataflow.yml ../../target/release/dora-runtime
../../target/release/dora-daemon --run-dataflow dataflow.yml
```

+ 1
- 1
examples/python-operator-dataflow/dataflow.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /example-python-dataflow
prefix: example-python-dataflow

nodes:
- id: webcam


+ 1
- 1
examples/python-operator-dataflow/dataflow_without_webcam.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /example-python-no-webcam-dataflow
prefix: example-python-no-webcam-dataflow

nodes:
- id: no_webcam


+ 1
- 1
examples/python-operator-dataflow/no_webcam.py View File

@@ -4,8 +4,8 @@
import time
import urllib.request

import cv2
import numpy as np

from dora import Node

print("Hello from no_webcam.py")


+ 2
- 6
examples/python-operator-dataflow/object_detection.py View File

@@ -1,18 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from enum import Enum
from typing import Callable

import cv2
import numpy as np
import torch


class DoraStatus(Enum):
CONTINUE = 0
STOP = 1

from dora import DoraStatus

class Operator:
"""


+ 10
- 8
examples/python-operator-dataflow/plot.py View File

@@ -1,22 +1,20 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
from enum import Enum
from typing import Callable

import cv2
import numpy as np

from utils import LABELS

from dora import DoraStatus

CI = os.environ.get("CI")

font = cv2.FONT_HERSHEY_SIMPLEX


class DoraStatus(Enum):
CONTINUE = 0
STOP = 1


class Operator:
"""
Plot image and bounding box
@@ -63,7 +61,11 @@ class Operator:
self.bboxs = np.reshape(bboxs, (-1, 6))

self.bounding_box_messages += 1
print("received " + str(self.bounding_box_messages) + " bounding boxes")
print(
"received "
+ str(self.bounding_box_messages)
+ " bounding boxes"
)

for bbox in self.bboxs:
[


+ 1
- 1
examples/rust-dataflow-url/dataflow.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /example-rust-dataflow
prefix: example-rust-dataflow

nodes:
- id: rust-node


+ 1
- 1
examples/rust-dataflow/dataflow.yml View File

@@ -1,6 +1,6 @@
communication:
zenoh:
prefix: /example-rust-dataflow
prefix: example-rust-dataflow

daemon_config: Tcp # or Shmem



+ 4
- 9
libraries/communication-layer/pub-sub/Cargo.toml View File

@@ -4,18 +4,13 @@ version.workspace = true
edition = "2021"

[features]
default = ["zenoh", "iceoryx"]
zenoh = ["dep:zenoh", "dep:zenoh-config"]
iceoryx = ["dep:iceoryx-rs", "dep:iceoryx-sys"]
default = ["zenoh"]
zenoh = ["dep:zenoh"]

[dependencies]
zenoh = { version = "0.7.0-rc", optional = true, features = ["transport_tcp"] }
eyre = "0.6.8"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b", optional = true }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b", optional = true }

[target.'cfg(unix)'.dependencies]
iceoryx-rs = { git = "https://github.com/eclipse-iceoryx/iceoryx-rs.git", optional = true }
iceoryx-sys = { git = "https://github.com/eclipse-iceoryx/iceoryx-rs.git", optional = true }
flume = "0.10"

[package.metadata.docs.rs]
all-features = true


+ 0
- 158
libraries/communication-layer/pub-sub/src/iceoryx.rs View File

@@ -1,158 +0,0 @@
//! Provides [`IceoryxCommunicationLayer`] to communicate over `iceoryx`.

use super::{CommunicationLayer, Publisher, Subscriber};
use crate::{BoxError, ReceivedSample};
use eyre::Context;
use std::{borrow::Cow, collections::HashMap, sync::Arc, time::Duration};

/// Enables local communication based on `iceoryx`.
pub struct IceoryxCommunicationLayer {
group_name: String,
instance_name: String,
publishers: HashMap<String, Arc<iceoryx_rs::Publisher<[u8]>>>,
}

impl IceoryxCommunicationLayer {
/// Initializes a new `iceoryx` connection with default configuration.
///
/// The given `app_name` must be unique. The `group_name` and
/// `instance_name` arguments are used to create an `iceoryx`
/// `ServiceDescription` in combination wiith topic names given to the
/// [`publisher`][Self::publisher] and [`subscriber`][Self::subscribe]
/// methods. See the
/// [`iceoryx` documentation](https://iceoryx.io/v2.0.1/getting-started/overview/#creating-service-descriptions-for-topics)
/// for details.
///
/// Note: In order to use iceoryx, you need to start its broker deamon called
/// [_RouDi_](https://iceoryx.io/v2.0.2/getting-started/overview/#roudi) first.
/// Its executable name is `iox-roudi`. See the
/// [`iceoryx` installation chapter](https://iceoryx.io/v2.0.2/getting-started/installation/)
/// for ways to install it.
pub fn init(
app_name: String,
group_name: String,
instance_name: String,
) -> Result<Self, BoxError> {
iceoryx_rs::Runtime::init(&app_name);

Ok(Self {
group_name,
instance_name,
publishers: Default::default(),
})
}
}

impl IceoryxCommunicationLayer {
fn get_or_create_publisher(
&mut self,
topic: &str,
) -> eyre::Result<Arc<iceoryx_rs::Publisher<[u8]>>> {
match self.publishers.get(topic) {
Some(p) => Ok(p.clone()),
None => {
let publisher =
Self::create_publisher(&self.group_name, &self.instance_name, topic)
.context("failed to create iceoryx publisher")?;

let publisher = Arc::new(publisher);
self.publishers.insert(topic.to_owned(), publisher.clone());
Ok(publisher)
}
}
}

fn create_publisher(
group: &str,
instance: &str,
topic: &str,
) -> Result<iceoryx_rs::Publisher<[u8]>, iceoryx_rs::IceoryxError> {
iceoryx_rs::PublisherBuilder::new(group, instance, topic).create()
}
}

impl CommunicationLayer for IceoryxCommunicationLayer {
fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, crate::BoxError> {
let publisher = self
.get_or_create_publisher(topic)
.map_err(BoxError::from)?;

Ok(Box::new(IceoryxPublisher { publisher }))
}

fn subscribe(&mut self, topic: &str) -> Result<Box<dyn Subscriber>, crate::BoxError> {
let (subscriber, token) =
iceoryx_rs::SubscriberBuilder::new(&self.group_name, &self.instance_name, topic)
.queue_capacity(5)
.create_mt()
.context("failed to create iceoryx subscriber")
.map_err(BoxError::from)?;
let receiver = subscriber.get_sample_receiver(token);

Ok(Box::new(IceoryxReceiver { receiver }))
}
}

#[derive(Clone)]
struct IceoryxPublisher {
publisher: Arc<iceoryx_rs::Publisher<[u8]>>,
}

impl Publisher for IceoryxPublisher {
fn prepare(&self, len: usize) -> Result<Box<dyn crate::PublishSample + '_>, BoxError> {
let sample = self
.publisher
.loan_slice(len)
.context("failed to loan iceoryx slice for publishing")
.map_err(BoxError::from)?;
Ok(Box::new(IceoryxPublishSample {
sample,
publisher: self.publisher.clone(),
}))
}

fn dyn_clone(&self) -> Box<dyn Publisher> {
Box::new(self.clone())
}
}

struct IceoryxPublishSample<'a> {
publisher: Arc<iceoryx_rs::Publisher<[u8]>>,
sample: iceoryx_rs::SampleMut<'a, [u8]>,
}

impl<'a> crate::PublishSample<'a> for IceoryxPublishSample<'a> {
fn as_mut_slice(&mut self) -> &mut [u8] {
&mut self.sample
}

fn publish(self: Box<Self>) -> Result<(), BoxError> {
self.publisher.publish(self.sample);
Ok(())
}
}

struct IceoryxReceiver {
receiver: iceoryx_rs::mt::SampleReceiver<[u8]>,
}

impl Subscriber for IceoryxReceiver {
fn recv(&mut self) -> Result<Option<Box<dyn ReceivedSample>>, BoxError> {
self.receiver
.wait_for_samples(Duration::from_secs(u64::MAX));
match self.receiver.take() {
Some(sample) => Ok(Some(Box::new(IceoryxReceivedSample { sample }))),
None => Ok(None),
}
}
}

struct IceoryxReceivedSample {
sample: iceoryx_rs::Sample<[u8], iceoryx_sys::SubscriberArc>,
}

impl ReceivedSample for IceoryxReceivedSample {
fn get(&self) -> Cow<[u8]> {
Cow::Borrowed(&self.sample)
}
}

+ 0
- 5
libraries/communication-layer/pub-sub/src/lib.rs View File

@@ -9,14 +9,9 @@
//! - **[Zenoh](https://zenoh.io/):** The zenoh project implements a distributed
//! publisher/subscriber system with automated routing. To use zenoh, use the
//! [`ZenohCommunicationLayer`][zenoh::ZenohCommunicationLayer] struct.
//! - **[Iceoryx](https://iceoryx.io/):** The Eclipse iceoryx™ project provides an IPC middleware
//! based on shared memory. It is very fast, but it only supports local communication. To use
//! iceoryx, use the [`IceoryxCommunicationLayer`][iceoryx::IceoryxCommunicationLayer] struct.

use std::borrow::Cow;

#[cfg(all(unix, feature = "iceoryx"))]
pub mod iceoryx;
#[cfg(feature = "zenoh")]
pub mod zenoh;



+ 15
- 12
libraries/communication-layer/pub-sub/src/zenoh.rs View File

@@ -1,12 +1,10 @@
//! Provides [`ZenohCommunicationLayer`] to communicate over `zenoh`.

pub use zenoh_config;

use super::{CommunicationLayer, Publisher, Subscriber};
use crate::{BoxError, ReceivedSample};
use std::{borrow::Cow, sync::Arc, time::Duration};
use zenoh::{
prelude::{EntityFactory, Priority, Receiver as _, SplitBuffer, ZFuture},
prelude::{sync::SyncResolve, Config, Priority, SessionDeclarations, SplitBuffer},
publication::CongestionControl,
};

@@ -22,9 +20,9 @@ impl ZenohCommunicationLayer {
/// The `prefix` is added to all topic names when using the [`publisher`][Self::publisher]
/// and [`subscriber`][Self::subscribe] methods. Pass an empty string if no prefix is
/// desired.
pub fn init(config: zenoh_config::Config, prefix: String) -> Result<Self, BoxError> {
pub fn init(config: Config, prefix: String) -> Result<Self, BoxError> {
let zenoh = ::zenoh::open(config)
.wait()
.res_sync()
.map_err(BoxError::from)?
.into_arc();
Ok(Self {
@@ -42,10 +40,10 @@ impl CommunicationLayer for ZenohCommunicationLayer {
fn publisher(&mut self, topic: &str) -> Result<Box<dyn Publisher>, BoxError> {
let publisher = self
.zenoh
.publish(self.prefixed(topic))
.declare_publisher(self.prefixed(topic))
.congestion_control(CongestionControl::Block)
.priority(Priority::RealTime)
.wait()
.res_sync()
.map_err(BoxError::from)?;

Ok(Box::new(ZenohPublisher { publisher }))
@@ -54,9 +52,9 @@ impl CommunicationLayer for ZenohCommunicationLayer {
fn subscribe(&mut self, topic: &str) -> Result<Box<dyn Subscriber>, BoxError> {
let subscriber = self
.zenoh
.subscribe(self.prefixed(topic))
.declare_subscriber(self.prefixed(topic))
.reliable()
.wait()
.res_sync()
.map_err(BoxError::from)?;

Ok(Box::new(ZenohReceiver(subscriber)))
@@ -104,11 +102,16 @@ impl<'a> crate::PublishSample<'a> for ZenohPublishSample {
}

fn publish(self: Box<Self>) -> Result<(), BoxError> {
self.publisher.send(self.sample).map_err(BoxError::from)
self.publisher
.put(self.sample)
.res_sync()
.map_err(BoxError::from)
}
}

struct ZenohReceiver(zenoh::subscriber::Subscriber<'static>);
struct ZenohReceiver(
zenoh::subscriber::Subscriber<'static, flume::Receiver<zenoh::sample::Sample>>,
);

impl Subscriber for ZenohReceiver {
fn recv(&mut self) -> Result<Option<Box<dyn ReceivedSample>>, BoxError> {
@@ -122,7 +125,7 @@ impl Subscriber for ZenohReceiver {
}

struct ZenohReceivedSample {
sample: zenoh::buf::ZBuf,
sample: zenoh::buffers::ZBuf,
}

impl ReceivedSample for ZenohReceivedSample {


+ 6
- 0
libraries/communication-layer/request-reply/src/tcp.rs View File

@@ -16,6 +16,12 @@ impl TcpLayer {
}
}

impl Default for TcpLayer {
fn default() -> Self {
Self::new()
}
}

impl RequestReplyLayer for TcpLayer {
type Address = SocketAddr;
type RequestData = Vec<u8>;


+ 1
- 1
libraries/core/Cargo.toml View File

@@ -11,9 +11,9 @@ eyre = "0.6.8"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
once_cell = "1.13.0"
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" }
which = "4.3.0"
uuid = { version = "1.2.1", features = ["serde"] }
dora-message = { path = "../message" }
tracing = "0.1"
serde-with-expand-env = "1.1.0"
zenoh = "0.7.0-rc"

+ 2
- 17
libraries/core/src/config.rs View File

@@ -202,11 +202,7 @@ impl<'de> Deserialize<'de> for InputMapping {
"unknown dora input `{other}`"
)))
}
None => {
return Err(serde::de::Error::custom(format!(
"dora input has invalid format"
)))
}
None => return Err(serde::de::Error::custom("dora input has invalid format")),
},
_ => Self::User(UserInputMapping {
source: source.to_owned().into(),
@@ -254,14 +250,9 @@ pub struct NodeRunConfig {
pub enum CommunicationConfig {
Zenoh {
#[serde(default)]
config: Box<zenoh_config::Config>,
config: Box<zenoh::prelude::Config>,
prefix: String,
},
Iceoryx {
app_name_prefix: String,
#[serde(default)]
topic_prefix: String,
},
}

impl CommunicationConfig {
@@ -273,12 +264,6 @@ impl CommunicationConfig {
} => {
write!(zenoh_prefix, "/{}", prefix).unwrap();
}
CommunicationConfig::Iceoryx { topic_prefix, .. } => {
if !topic_prefix.is_empty() {
topic_prefix.push('-');
}
topic_prefix.push_str(prefix);
}
}
}
}

+ 1
- 1
libraries/core/src/descriptor/visualize.rs View File

@@ -182,7 +182,7 @@ fn visualize_user_mapping(
if let Some(operator) = operators.iter().find(|o| o.id.as_ref() == operator_id) {
if operator.config.outputs.contains(output) {
let data = if output == input_id.as_str() {
format!("{output}")
output.to_string()
} else {
format!("{output} as {input_id}")
};


+ 1
- 1
libraries/extensions/download/Cargo.toml View File

@@ -8,7 +8,7 @@ license = "Apache-2.0"

[dependencies]
eyre = "0.6.8"
tempfile = "3.3.0"
tempfile = "3.4.0"
reqwest = "0.11.12"
tokio = { version = "1.24.2" }
tracing = "0.1.36"

+ 1
- 2
libraries/extensions/zenoh-logger/Cargo.toml View File

@@ -7,5 +7,4 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git", rev = "79a136e4fd90b11ff5d775ced981af53c4f1071b" }
zenoh = "0.7.0-rc"

+ 8
- 4
libraries/extensions/zenoh-logger/src/main.rs View File

@@ -1,11 +1,15 @@
use zenoh::prelude::{Receiver, ZFuture};
use zenoh::prelude::{sync::SyncResolve, Config};

fn main() {
let zenoh = zenoh::open(zenoh_config::Config::default()).wait().unwrap();
let mut sub = zenoh.subscribe("/**").reliable().wait().unwrap();
let zenoh = zenoh::open(Config::default()).res_sync().unwrap();
let sub = zenoh
.declare_subscriber("/**")
.reliable()
.res_sync()
.unwrap();

loop {
let msg = sub.receiver().recv().unwrap();
let msg = sub.recv().unwrap();
println!("{}", msg.key_expr);
}
}

Loading…
Cancel
Save