| @@ -12,6 +12,11 @@ Dataflow Oriented Robotic Architecture ⚡ | |||||
| This project is in early development, and many features have yet to be implemented with breaking changes. Please don't take for granted the current design. | This project is in early development, and many features have yet to be implemented with breaking changes. Please don't take for granted the current design. | ||||
| --- | |||||
| ## 📖 Documentation | |||||
| The documentation can be found here: [https://dora-rs.github.io/dora/](https://dora-rs.github.io/dora/) | |||||
| --- | --- | ||||
| ## ✨ Features | ## ✨ Features | ||||
| @@ -22,7 +22,3 @@ time = "0.3.9" | |||||
| futures-concurrency = "2.0.3" | futures-concurrency = "2.0.3" | ||||
| rand = "0.8.5" | rand = "0.8.5" | ||||
| dora-core = { version = "0.1.0", path = "../../libraries/core" } | dora-core = { version = "0.1.0", path = "../../libraries/core" } | ||||
| [[example]] | |||||
| name = "source_timer" | |||||
| path = "examples/nodes/rust/source_timer.rs" | |||||
| @@ -24,23 +24,3 @@ There are drawbacks too, for example: | |||||
| - Operators are always isolated | - Operators are always isolated | ||||
| - No way of using in-memory channels | - No way of using in-memory channels | ||||
| - Local sockets and shared memory should be still possible | - Local sockets and shared memory should be still possible | ||||
| ## Try it out | |||||
| - Compile: the dora runtime, the nodes in `examples`, and the Rust example operator through: | |||||
| ```bash | |||||
| cargo build -p dora-runtime --release | |||||
| cargo build -p dora-coordinator --examples --release | |||||
| cargo build --manifest-path ../examples/example-operator/Cargo.toml --release | |||||
| ``` | |||||
| - Compile the C example operator through: | |||||
| ```bash | |||||
| cd ../../examples/c-operator | |||||
| cp ../../apis/c/operator/api.h . | |||||
| clang -c operator.c | |||||
| clang -shared -v operator.o -o operator.so | |||||
| ``` | |||||
| - Run the `mini-dataflow` example using `cargo run --release -- run examples/mini-dataflow.yml` | |||||
| - This spawns a `timer` source, which sends the current time periodically, and a `logger` sink, which prints the incoming data. | |||||
| - The `timer` will exit after 100 iterations. The other nodes/operators will then exit as well because all sources closed. | |||||
| @@ -1,52 +0,0 @@ | |||||
| communication: | |||||
| zenoh_prefix: /foo | |||||
| nodes: | |||||
| - id: timer | |||||
| custom: | |||||
| run: non-existent | |||||
| outputs: | |||||
| - time | |||||
| - id: camera | |||||
| custom: | |||||
| run: non-existent | |||||
| outputs: | |||||
| - image | |||||
| - id: process | |||||
| operators: | |||||
| - id: operator-1 | |||||
| inputs: | |||||
| image: camera/image | |||||
| outputs: | |||||
| - processed | |||||
| shared_library: non-existent | |||||
| - id: operator-2 | |||||
| inputs: | |||||
| processed: process/operator-1/processed | |||||
| time: timer/time | |||||
| outputs: | |||||
| - processed | |||||
| python: non-existent | |||||
| - id: timestamp | |||||
| custom: | |||||
| run: non-existent | |||||
| inputs: | |||||
| timestamp: timer/time | |||||
| pic: camera/image | |||||
| outputs: | |||||
| - image-with-timestamp | |||||
| - id: printer | |||||
| custom: | |||||
| run: non-existent | |||||
| inputs: | |||||
| data: timestamp/image-with-timestamp | |||||
| processed: process/operator-2/processed | |||||
| - id: logger | |||||
| custom: | |||||
| run: non-existent | |||||
| inputs: | |||||
| data: timestamp/image-with-timestamp | |||||
| time: timer/time | |||||
| @@ -1,68 +0,0 @@ | |||||
| communication: | |||||
| zenoh: | |||||
| prefix: /foo | |||||
| nodes: | |||||
| - id: timer | |||||
| custom: | |||||
| run: cargo run --release --example example_source_timer | |||||
| outputs: | |||||
| - time | |||||
| - id: rate-limited-timer | |||||
| custom: | |||||
| run: cargo run --release --example rate_limit -- --seconds 0.5 | |||||
| inputs: | |||||
| data: timer/time | |||||
| outputs: | |||||
| - rate_limited | |||||
| - id: random | |||||
| custom: | |||||
| run: cargo run --release --example random_number | |||||
| inputs: | |||||
| timestamp: rate-limited-timer/rate_limited | |||||
| outputs: | |||||
| - number | |||||
| - id: logger | |||||
| custom: | |||||
| run: cargo run --release --example example_sink_logger | |||||
| inputs: | |||||
| random: random/number | |||||
| time: timer/time | |||||
| timestamped-random: runtime-node/op-1/timestamped-random | |||||
| c-counter: runtime-node/op-2/counter | |||||
| python-counter: runtime-node/op-3/counter | |||||
| - id: runtime-node | |||||
| operators: | |||||
| - id: op-1 | |||||
| shared-library: ../../target/release/libexample_operator.so | |||||
| inputs: | |||||
| random: random/number | |||||
| time: timer/time | |||||
| outputs: | |||||
| - timestamped-random | |||||
| - id: op-2 | |||||
| shared-library: ../../examples/c-operator/operator.so | |||||
| inputs: | |||||
| time: timer/time | |||||
| outputs: | |||||
| - counter | |||||
| - id: op-3 | |||||
| python: ../../examples/python-operator/op.py | |||||
| inputs: | |||||
| time: timer/time | |||||
| test: python-operator/counter | |||||
| outputs: | |||||
| - counter | |||||
| - id: python-operator | |||||
| operator: | |||||
| python: ../../examples/python-operator/op2.py | |||||
| inputs: | |||||
| time: timer/time | |||||
| dora_time: dora/timer/millis/50 | |||||
| outputs: | |||||
| - counter | |||||
| @@ -1,52 +0,0 @@ | |||||
| use clap::StructOpt; | |||||
| use dora_node_api::{self, config::DataId, DoraNode}; | |||||
| use eyre::bail; | |||||
| use futures::StreamExt; | |||||
| use std::time::{Duration, Instant}; | |||||
| #[derive(Debug, Clone, clap::Parser)] | |||||
| #[clap(about = "Limit the rate of incoming data")] | |||||
| struct Args { | |||||
| /// Minimal interval between two subsequent. | |||||
| /// | |||||
| /// Intermediate messages are ignored. | |||||
| #[clap(long)] | |||||
| seconds: f32, | |||||
| } | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| let args = Args::parse(); | |||||
| let min_interval = Duration::from_secs_f32(args.seconds); | |||||
| let output = DataId::from("rate_limited".to_owned()); | |||||
| let operator = DoraNode::init_from_env().await?; | |||||
| let mut inputs = operator.inputs().await?; | |||||
| let mut last_message = Instant::now(); | |||||
| loop { | |||||
| let timeout = Duration::from_secs(3); | |||||
| let input = match tokio::time::timeout(timeout, inputs.next()).await { | |||||
| Ok(Some(input)) => input, | |||||
| Ok(None) => break, | |||||
| Err(_) => bail!("timeout while waiting for input"), | |||||
| }; | |||||
| match input.id.as_str() { | |||||
| "data" => { | |||||
| let elapsed = last_message.elapsed(); | |||||
| if elapsed > min_interval { | |||||
| last_message += elapsed; | |||||
| operator.send_output(&output, &input.data).await?; | |||||
| } | |||||
| } | |||||
| other => eprintln!("Ignoring unexpected input `{other}`"), | |||||
| } | |||||
| } | |||||
| println!("rate limit finished"); | |||||
| Ok(()) | |||||
| } | |||||
| @@ -1,19 +0,0 @@ | |||||
| use dora_node_api::{self, config::DataId, DoraNode}; | |||||
| use std::time::Duration; | |||||
| use time::OffsetDateTime; | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| let operator = DoraNode::init_from_env().await?; | |||||
| let mut interval = tokio::time::interval(Duration::from_millis(20)); | |||||
| let time_output = DataId::from("time".to_owned()); | |||||
| for _ in 0..400 { | |||||
| interval.tick().await; | |||||
| let now = OffsetDateTime::now_utc().to_string(); | |||||
| operator.send_output(&time_output, now.as_bytes()).await?; | |||||
| } | |||||
| Ok(()) | |||||
| } | |||||
| @@ -55,7 +55,7 @@ int dora_on_input( | |||||
| - Link it in your graph as: | - Link it in your graph as: | ||||
| ```yaml | ```yaml | ||||
| {{#include ../../binaries/coordinator/examples/mini-dataflow.yml:47:52}} | |||||
| {{#include ../../examples/c-dataflow/dataflow.yml:13:20}} | |||||
| ``` | ``` | ||||
| ## Custom Node | ## Custom Node | ||||
| @@ -105,3 +105,8 @@ dora_send_output(dora_context, out_id, strlen(out_id), &out_data, sizeof out_dat | |||||
| ``` | ``` | ||||
| {{#include ../../examples/c-dataflow/README.md:26:35}} | {{#include ../../examples/c-dataflow/README.md:26:35}} | ||||
| - Link it in your graph as: | |||||
| ```yaml | |||||
| {{#include ../../examples/c-dataflow/dataflow.yml:6:12}} | |||||
| ``` | |||||
| @@ -97,7 +97,7 @@ Each operator must specify exactly one implementation. The implementation must f | |||||
| ## Example | ## Example | ||||
| ```yaml | ```yaml | ||||
| {{#include ../../binaries/coordinator/examples/mini-dataflow.yml}} | |||||
| {{#include ../../examples/rust-dataflow/dataflow.yml}} | |||||
| ``` | ``` | ||||
| @@ -14,7 +14,7 @@ cd my_first_dataflow | |||||
| [workspace] | [workspace] | ||||
| members = [ | members = [ | ||||
| "source_timer", | |||||
| "rust-dataflow-example-node", | |||||
| ] | ] | ||||
| ``` | ``` | ||||
| @@ -25,28 +25,50 @@ Let's write a node which sends the current time periodically. Let's make it afte | |||||
| - Generate a new Rust binary (application): | - Generate a new Rust binary (application): | ||||
| ```bash | ```bash | ||||
| cargo new source_timer | |||||
| cargo new rust-dataflow-example-node | |||||
| ``` | ``` | ||||
| with `Cargo.toml`: | with `Cargo.toml`: | ||||
| ```toml | ```toml | ||||
| [package] | |||||
| name = "rust-node" | |||||
| version = "0.1.0" | |||||
| edition = "2021" | |||||
| license = "Apache-2.0" | |||||
| [dependencies] | |||||
| dora-node-api = { git = "https://github.com/dora-rs/dora" } | |||||
| time = "0.3.9" | |||||
| {{#include ../../examples/rust-dataflow/node/Cargo.toml}} | |||||
| ``` | ``` | ||||
| with `src/main.rs`: | with `src/main.rs`: | ||||
| ```rust | ```rust | ||||
| {{#include ../../binaries/coordinator/examples/nodes/rust/source_timer.rs}} | |||||
| {{#include ../../examples/rust-dataflow/node/src/main.rs}} | |||||
| ``` | ``` | ||||
| ### Write your second node | |||||
| ### Write your first operator | |||||
| - Generate a new Rust library: | |||||
| ```bash | |||||
| cargo new rust-dataflow-example-operator --lib | |||||
| ``` | |||||
| with `Cargo.toml`: | |||||
| ```toml | |||||
| {{#include ../../examples/rust-dataflow/operator/Cargo.toml}} | |||||
| ``` | |||||
| with `src/lib.rs`: | |||||
| ```rust | |||||
| {{#include ../../examples/rust-dataflow/operator/src/lib.rs}} | |||||
| ``` | |||||
| - And modify the root `Cargo.toml`: | |||||
| ```toml= | |||||
| [workspace] | |||||
| members = [ | |||||
| "rust-dataflow-example-node", | |||||
| "rust-dataflow-example-operator", | |||||
| ] | |||||
| ``` | |||||
| ### Write your sink node | |||||
| Let's write a `logger` which will print incoming data. | Let's write a `logger` which will print incoming data. | ||||
| @@ -58,20 +80,12 @@ cargo new sink_logger | |||||
| with `Cargo.toml`: | with `Cargo.toml`: | ||||
| ```toml | ```toml | ||||
| [package] | |||||
| name = "sink_logger" | |||||
| version = "0.1.0" | |||||
| edition = "2021" | |||||
| license = "Apache-2.0" | |||||
| [dependencies] | |||||
| dora-node-api = { git = "https://github.com/dora-rs/dora" } | |||||
| time = "0.3.9" | |||||
| {{#include ../../examples/rust-dataflow/sink/Cargo.toml}} | |||||
| ``` | ``` | ||||
| with `src/main.rs`: | with `src/main.rs`: | ||||
| ```rust | ```rust | ||||
| {{#include ../../binaries/coordinator/examples/nodes/rust/sink_logger.rs}} | |||||
| {{#include ../../examples/rust-dataflow/sink/src/main.rs}} | |||||
| ``` | ``` | ||||
| - And modify the root `Cargo.toml`: | - And modify the root `Cargo.toml`: | ||||
| @@ -79,39 +93,31 @@ with `src/main.rs`: | |||||
| [workspace] | [workspace] | ||||
| members = [ | members = [ | ||||
| "source_timer", | |||||
| "sink_logger" | |||||
| "rust-dataflow-example-node", | |||||
| "rust-dataflow-example-operator", | |||||
| "rust-dataflow-example-sink" | |||||
| ] | ] | ||||
| ``` | ``` | ||||
| ### Compile everything | |||||
| ```bash | |||||
| cargo build --all --release | |||||
| ``` | |||||
| ### Write a graph definition | ### Write a graph definition | ||||
| Let's write the graph definition so that the nodes know who to communicate with. | Let's write the graph definition so that the nodes know who to communicate with. | ||||
| `mini-dataflow.yml` | |||||
| `dataflow.yml` | |||||
| ```yaml | ```yaml | ||||
| communication: | |||||
| zenoh: | |||||
| prefix: /foo | |||||
| nodes: | |||||
| - id: timer | |||||
| custom: | |||||
| run: cargo run --release --bin source_timer | |||||
| outputs: | |||||
| - time | |||||
| - id: logger | |||||
| custom: | |||||
| run: cargo run --release --bin sink_logger | |||||
| inputs: | |||||
| time: timer/time | |||||
| {{#include ../../examples/rust-dataflow/dataflow.yml}} | |||||
| ``` | ``` | ||||
| ### Run it! | ### Run it! | ||||
| - Run the `mini-dataflow`: | |||||
| - Run the `dataflow`: | |||||
| ```bash | ```bash | ||||
| dora-coordinator run mini-dataflow.yml | |||||
| dora-coordinator run dataflow.yml dora-runtime | |||||
| ``` | ``` | ||||
| @@ -19,14 +19,14 @@ class Operator: | |||||
| > For Python, we recommend to allocate the operator on a single runtime. A runtime will share the same GIL with several operators making those operators run almost sequentially. See: [https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks](https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks) | > For Python, we recommend to allocate the operator on a single runtime. A runtime will share the same GIL with several operators making those operators run almost sequentially. See: [https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks](https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks) | ||||
| ### Try it out! | ### Try it out! | ||||
| - Create an operator python file called `op.py`: | |||||
| - Create an operator python file called `object_detection.py`: | |||||
| ```python | ```python | ||||
| {{#include ../../examples/python-operator/op.py}} | |||||
| {{#include ../../examples/python-dataflow/object_detection.py}} | |||||
| ``` | ``` | ||||
| - Link it in your graph as: | - Link it in your graph as: | ||||
| ```yaml | ```yaml | ||||
| {{#include ../../binaries/coordinator/examples/graphs/mini-dataflow.yml:67:73}} | |||||
| {{#include ../../examples/python-dataflow/dataflow.yml:14:20}} | |||||
| ``` | ``` | ||||
| ## Custom Node | ## Custom Node | ||||
| @@ -74,12 +74,12 @@ pip install maturin | |||||
| maturin develop | maturin develop | ||||
| ``` | ``` | ||||
| - Create a python file called `printer.py`: | |||||
| - Create a python file called `webcam.py`: | |||||
| ```python | ```python | ||||
| {{#include ../../binaries/coordinator/examples/nodes/python/printer.py}} | |||||
| {{#include ../../examples/python-dataflow/webcam.py}} | |||||
| ``` | ``` | ||||
| - Link it in your graph as: | - Link it in your graph as: | ||||
| ```yaml | ```yaml | ||||
| {{#include ../../binaries/coordinator/examples/graphs/python_test.yml:12:17}} | |||||
| {{#include ../../examples/python-dataflow/dataflow.yml:6:12}} | |||||
| ``` | ``` | ||||
| @@ -30,17 +30,17 @@ impl DoraOperator for ExampleOperator { | |||||
| - Generate a new Rust library | - Generate a new Rust library | ||||
| ```bash | ```bash | ||||
| cargo new example-operator --lib | |||||
| cargo new rust-dataflow-example-operator --lib | |||||
| ``` | ``` | ||||
| `Cargo.toml` | `Cargo.toml` | ||||
| ```toml | ```toml | ||||
| {{#include ../../examples/example-operator/Cargo.toml}} | |||||
| {{#include ../../examples/rust-dataflow/operator/Cargo.toml}} | |||||
| ``` | ``` | ||||
| `src/lib.rs` | `src/lib.rs` | ||||
| ```rust | ```rust | ||||
| {{#include ../../examples/example-operator/src/lib.rs}} | |||||
| {{#include ../../examples/rust-dataflow/operator/src/lib.rs}} | |||||
| ``` | ``` | ||||
| - Build it: | - Build it: | ||||
| @@ -50,7 +50,7 @@ cargo build --release | |||||
| - Link it in your graph as: | - Link it in your graph as: | ||||
| ```yaml | ```yaml | ||||
| {{#include ../../binaries/coordinator/examples/mini-dataflow.yml:38:46}} | |||||
| {{#include ../../examples/rust-dataflow/dataflow.yml:13:21}} | |||||
| ``` | ``` | ||||
| This example can be found in `examples`. | This example can be found in `examples`. | ||||
| @@ -87,31 +87,19 @@ node.send_output(&data_id, data.as_bytes()).await?; | |||||
| - Generate a new Rust binary (application): | - Generate a new Rust binary (application): | ||||
| ```bash | ```bash | ||||
| cargo new source_timer | |||||
| cargo new rust-dataflow-example-node | |||||
| ``` | ``` | ||||
| ```toml | ```toml | ||||
| [package] | |||||
| name = "source_timer" | |||||
| version = "0.1.0" | |||||
| edition = "2021" | |||||
| license = "Apache-2.0" | |||||
| [dependencies] | |||||
| dora-node-api = { path = "../../apis/rust/node" } | |||||
| time = "0.3.9" | |||||
| {{#include ../../examples/rust-dataflow/node/Cargo.toml}} | |||||
| ``` | ``` | ||||
| `src/main.rs` | `src/main.rs` | ||||
| ```rust | ```rust | ||||
| {{#include ../../binaries/coordinator/examples/nodes/rust/source_timer.rs}} | |||||
| {{#include ../../examples/rust-dataflow/node/src/main.rs}} | |||||
| ``` | ``` | ||||
| - Link it in your graph as: | - Link it in your graph as: | ||||
| ```yaml | ```yaml | ||||
| - id: timer | |||||
| custom: | |||||
| run: cargo run --release | |||||
| outputs: | |||||
| - time | |||||
| {{#include ../../examples/rust-dataflow/dataflow.yml:6:12}} | |||||
| ``` | ``` | ||||