From 38bc8a82849e4105935f41c21bcdb6906f6a4171 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 9 Jun 2022 16:44:18 +0200 Subject: [PATCH 1/7] Create a C-API for operators and an example operator Adds an `api.h` header file that defines the dora functions in a C-compatible way. To showcase that C-operators can interact with Rust operators and custom nodes, we create a simple C operator and add it to our `mini-dataflow`. The operator reads the periodic timestamps, increments a counter, and sends the counter value out. The counter is kept across calls by storing it behind the context pointer. --- coordinator/examples/example_sink_logger.rs | 3 + coordinator/examples/mini-dataflow.yml | 7 +++ runtime/examples/c-operator/.gitignore | 2 + runtime/examples/c-operator/api.h | 16 ++++++ runtime/examples/c-operator/operator.c | 64 +++++++++++++++++++++ 5 files changed, 92 insertions(+) create mode 100644 runtime/examples/c-operator/.gitignore create mode 100644 runtime/examples/c-operator/api.h create mode 100644 runtime/examples/c-operator/operator.c diff --git a/coordinator/examples/example_sink_logger.rs b/coordinator/examples/example_sink_logger.rs index dc56630f..8fbeed01 100644 --- a/coordinator/examples/example_sink_logger.rs +++ b/coordinator/examples/example_sink_logger.rs @@ -40,6 +40,9 @@ async fn main() -> eyre::Result<()> { let data = String::from_utf8(input.data)?; println!("received timestamped random value: {data}"); } + "counter" => { + println!("received counter value: {:?}", input.data); + } other => eprintln!("Ignoring unexpected input `{other}`"), } diff --git a/coordinator/examples/mini-dataflow.yml b/coordinator/examples/mini-dataflow.yml index c47a1cb0..838d8dc3 100644 --- a/coordinator/examples/mini-dataflow.yml +++ b/coordinator/examples/mini-dataflow.yml @@ -32,6 +32,7 @@ nodes: random: random/number time: timer/time timestamped-random: runtime-node/op-1/timestamped-random + counter: runtime-node/op-2/counter - id: runtime-node operators: @@ -42,3 +43,9 @@ nodes: time: timer/time outputs: - timestamped-random + - id: op-2 + shared-library: ../runtime/examples/c-operator/operator.so + inputs: + time: timer/time + outputs: + - counter diff --git a/runtime/examples/c-operator/.gitignore b/runtime/examples/c-operator/.gitignore new file mode 100644 index 00000000..7c547269 --- /dev/null +++ b/runtime/examples/c-operator/.gitignore @@ -0,0 +1,2 @@ +operator.o +operator.so diff --git a/runtime/examples/c-operator/api.h b/runtime/examples/c-operator/api.h new file mode 100644 index 00000000..5ba25954 --- /dev/null +++ b/runtime/examples/c-operator/api.h @@ -0,0 +1,16 @@ +int dora_init_operator(void **operator_context); + +void dora_drop_operator(void *operator_context); + +int dora_on_input( + const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const int (*output_fn_raw)(const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const void *output_context), + void *output_context, + const void *operator_context); diff --git a/runtime/examples/c-operator/operator.c b/runtime/examples/c-operator/operator.c new file mode 100644 index 00000000..2e3584a6 --- /dev/null +++ b/runtime/examples/c-operator/operator.c @@ -0,0 +1,64 @@ +#include "api.h" +#include +#include +#include + +int dora_init_operator(void **operator_context) +{ + void *context = malloc(1); + char *context_char = (char *)context; + *context_char = 0; + + *operator_context = context; + + return 0; +} + +void dora_drop_operator(void *operator_context) +{ + free(operator_context); +} + +int dora_on_input( + const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const int (*output_fn_raw)(const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const void *output_context), + void *output_context, + const void *operator_context) +{ + char *context = (char *)operator_context; + + char id[id_len + 1]; + memcpy(id, id_start, id_len); + id[id_len] = 0; + + if (strcmp(id, "time") == 0) + { + char time[data_len + 1]; + memcpy(time, data_start, data_len); + time[data_len] = 0; + + printf("C operator received time input %s, context: %i\n", time, *context); + *context += 1; + + char *out_id = "counter"; + + int res = (output_fn_raw)(out_id, strlen(out_id), context, 1, output_context); + if (res != 0) + { + printf("C operator failed to send output\n"); + } + } + else + { + printf("C operator received unexpected input %s, context: %i\n", id, *context); + } + + return 0; +} From ef306a077998698c7916d136fd80e726b8323c20 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 10 Jun 2022 10:42:50 +0200 Subject: [PATCH 2/7] Move the `api.h` header file to `api` subfolder Organize the `api` folder by language. --- Cargo.toml | 6 +++--- {runtime/examples/c-operator => api/c/operator}/api.h | 0 api/{ => rust}/node/Cargo.toml | 0 api/{ => rust}/node/src/communication.rs | 0 api/{ => rust}/node/src/config.rs | 0 api/{ => rust}/node/src/lib.rs | 0 api/{ => rust}/operator/Cargo.toml | 0 api/{ => rust}/operator/macros/Cargo.toml | 0 api/{ => rust}/operator/macros/src/lib.rs | 0 api/{ => rust}/operator/src/lib.rs | 0 api/{ => rust}/operator/src/raw.rs | 0 common/Cargo.toml | 2 +- coordinator/Cargo.toml | 2 +- runtime/Cargo.toml | 2 +- runtime/examples/c-operator/.gitignore | 1 + runtime/examples/example-operator/Cargo.toml | 2 +- 16 files changed, 8 insertions(+), 7 deletions(-) rename {runtime/examples/c-operator => api/c/operator}/api.h (100%) rename api/{ => rust}/node/Cargo.toml (100%) rename api/{ => rust}/node/src/communication.rs (100%) rename api/{ => rust}/node/src/config.rs (100%) rename api/{ => rust}/node/src/lib.rs (100%) rename api/{ => rust}/operator/Cargo.toml (100%) rename api/{ => rust}/operator/macros/Cargo.toml (100%) rename api/{ => rust}/operator/macros/src/lib.rs (100%) rename api/{ => rust}/operator/src/lib.rs (100%) rename api/{ => rust}/operator/src/raw.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index bd552f8e..a9210f1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,9 @@ edition = "2021" [workspace] members = [ - "api/node", - "api/operator", - "api/operator/macros", + "api/rust/node", + "api/rust/operator", + "api/rust/operator/macros", "coordinator", "common", "runtime", diff --git a/runtime/examples/c-operator/api.h b/api/c/operator/api.h similarity index 100% rename from runtime/examples/c-operator/api.h rename to api/c/operator/api.h diff --git a/api/node/Cargo.toml b/api/rust/node/Cargo.toml similarity index 100% rename from api/node/Cargo.toml rename to api/rust/node/Cargo.toml diff --git a/api/node/src/communication.rs b/api/rust/node/src/communication.rs similarity index 100% rename from api/node/src/communication.rs rename to api/rust/node/src/communication.rs diff --git a/api/node/src/config.rs b/api/rust/node/src/config.rs similarity index 100% rename from api/node/src/config.rs rename to api/rust/node/src/config.rs diff --git a/api/node/src/lib.rs b/api/rust/node/src/lib.rs similarity index 100% rename from api/node/src/lib.rs rename to api/rust/node/src/lib.rs diff --git a/api/operator/Cargo.toml b/api/rust/operator/Cargo.toml similarity index 100% rename from api/operator/Cargo.toml rename to api/rust/operator/Cargo.toml diff --git a/api/operator/macros/Cargo.toml b/api/rust/operator/macros/Cargo.toml similarity index 100% rename from api/operator/macros/Cargo.toml rename to api/rust/operator/macros/Cargo.toml diff --git a/api/operator/macros/src/lib.rs b/api/rust/operator/macros/src/lib.rs similarity index 100% rename from api/operator/macros/src/lib.rs rename to api/rust/operator/macros/src/lib.rs diff --git a/api/operator/src/lib.rs b/api/rust/operator/src/lib.rs similarity index 100% rename from api/operator/src/lib.rs rename to api/rust/operator/src/lib.rs diff --git a/api/operator/src/raw.rs b/api/rust/operator/src/raw.rs similarity index 100% rename from api/operator/src/raw.rs rename to api/rust/operator/src/raw.rs diff --git a/common/Cargo.toml b/common/Cargo.toml index 27de99d7..70908f08 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -6,6 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../api/node" } +dora-node-api = { version = "0.1.0", path = "../api/rust/node" } eyre = "0.6.8" serde = { version = "1.0.136", features = ["derive"] } diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 8ce99741..4729583c 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] bincode = "1.3.3" -dora-node-api = { path = "../api/node" } +dora-node-api = { path = "../api/rust/node" } eyre = "0.6.7" futures = "0.3.21" serde = { version = "1.0.136", features = ["derive"] } diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index a324901e..ee68957b 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] clap = { version = "3.1.12", features = ["derive"] } -dora-node-api = { path = "../api/node" } +dora-node-api = { path = "../api/rust/node" } dora-common = { version = "0.1.0", path = "../common" } eyre = "0.6.8" futures = "0.3.21" diff --git a/runtime/examples/c-operator/.gitignore b/runtime/examples/c-operator/.gitignore index 7c547269..4eba6870 100644 --- a/runtime/examples/c-operator/.gitignore +++ b/runtime/examples/c-operator/.gitignore @@ -1,2 +1,3 @@ operator.o operator.so +api.h diff --git a/runtime/examples/example-operator/Cargo.toml b/runtime/examples/example-operator/Cargo.toml index 956da5af..a6f3b372 100644 --- a/runtime/examples/example-operator/Cargo.toml +++ b/runtime/examples/example-operator/Cargo.toml @@ -9,4 +9,4 @@ edition = "2021" crate-type = ["cdylib"] [dependencies] -dora-operator-api = { path = "../../../api/operator" } +dora-operator-api = { path = "../../../api/rust/operator" } From a40646d0630653d2d862ea627a34d44feacb970e Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 10 Jun 2022 10:46:30 +0200 Subject: [PATCH 3/7] Use optimized node/operator versions for mini-dataflow example --- coordinator/examples/mini-dataflow.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/coordinator/examples/mini-dataflow.yml b/coordinator/examples/mini-dataflow.yml index 838d8dc3..9e535d99 100644 --- a/coordinator/examples/mini-dataflow.yml +++ b/coordinator/examples/mini-dataflow.yml @@ -5,13 +5,13 @@ communication: nodes: - id: timer custom: - run: ../target/debug/examples/example_source_timer + run: ../target/release/examples/example_source_timer outputs: - time - id: rate-limited-timer custom: - run: ../target/debug/examples/rate_limit --seconds 0.5 + run: ../target/release/examples/rate_limit --seconds 0.5 inputs: data: timer/time outputs: @@ -19,7 +19,7 @@ nodes: - id: random custom: - run: ../target/debug/examples/random_number + run: ../target/release/examples/random_number inputs: timestamp: rate-limited-timer/rate_limited outputs: @@ -27,7 +27,7 @@ nodes: - id: logger custom: - run: ../target/debug/examples/example_sink_logger + run: ../target/release/examples/example_sink_logger inputs: random: random/number time: timer/time @@ -37,7 +37,7 @@ nodes: - id: runtime-node operators: - id: op-1 - shared-library: ../target/debug/libexample_operator.so + shared-library: ../target/release/libexample_operator.so inputs: random: random/number time: timer/time From 9dadaeb831d72e43dd6dc45dcab3daa28feaa735 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 10 Jun 2022 10:57:17 +0200 Subject: [PATCH 4/7] Update the build and run instructions in the README --- coordinator/README.md | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/coordinator/README.md b/coordinator/README.md index 6d0a6262..ae99035f 100644 --- a/coordinator/README.md +++ b/coordinator/README.md @@ -27,8 +27,15 @@ There are drawbacks too, for example: ## Try it out -- Compile the `examples` using `cargo build -p dora-rs --examples` -- Run the `mini-dataflow` example using `cargo run -- run examples/mini-dataflow.yml` +- Compile the dora runtime using `cargo build -p dora-runtime --release` +- Compile the nodes in `examples` using `cargo build -p dora-coordinator --examples --release` +- Compile the Rust example operator through `cargo build --manifest-path ../runtime/examples/example-operator/Cargo.toml --release` +- Compile the C example operator through: + - `cd ../runtime/examples/c-operator` + - `cp ../../../api/c/operator/api.h .` + - `clang -c operator.c` + - `clang -shared -v operator.o -o operator.so` +- Run the `mini-dataflow` example using `cargo run --release -- run examples/mini-dataflow.yml` - This spawns a `timer` source, which sends the current time periodically, and a `logger` sink, which prints the incoming data. - - The `timer` will exit after 100 iterations. The `logger` will then exit with a timeout error. + - The `timer` will exit after 100 iterations. The other nodes/operators will then exit as well because all sources closed. From 5c674d786fb37af35ea1fd85d4d435fe4cf43235 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 10 Jun 2022 11:00:15 +0200 Subject: [PATCH 5/7] Fix: Don't stop operator before all of its events have been processed We don't want to stop the operator if there are still events in the operator event queue, e.g. pending outputs. --- runtime/src/main.rs | 65 ++++++++++++++++++++----------------- runtime/src/operator/mod.rs | 1 + 2 files changed, 37 insertions(+), 29 deletions(-) diff --git a/runtime/src/main.rs b/runtime/src/main.rs index 96ba50d0..6f91715c 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -14,7 +14,10 @@ use futures::{ }; use futures_concurrency::Merge; use operator::{Operator, OperatorEvent}; -use std::{collections::BTreeMap, mem}; +use std::{ + collections::{BTreeMap, HashMap}, + mem, +}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamMap}; @@ -42,6 +45,7 @@ async fn main() -> eyre::Result<()> { let mut operator_map = BTreeMap::new(); let mut operator_events = StreamMap::new(); + let mut operator_events_tx = HashMap::new(); for operator_config in &operators { let (events_tx, events) = mpsc::channel(1); let operator = Operator::init(operator_config.clone(), events_tx.clone()) @@ -49,6 +53,7 @@ async fn main() -> eyre::Result<()> { .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; operator_map.insert(&operator_config.id, operator); operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); + operator_events_tx.insert(operator_config.id.clone(), events_tx); } let communication: Box = @@ -86,35 +91,14 @@ async fn main() -> eyre::Result<()> { })?; } SubscribeEvent::InputsStopped { target_operator } => { - // -------------------------------------------------------- - // TODO FIXME: For some reason, these zenoh publish calls - // (and also subsequent ones) are not visible to other - // nodes. This includes the stop command, so the input - // streams of dependent nodes are not closed properly. - // -------------------------------------------------------- - - communication - .publish("/HHH", &[]) - .await - .wrap_err("failed to send on /HHH")?; - if operator_map.remove(&target_operator).is_some() { - println!("operator {node_id}/{target_operator} finished"); - // send stopped message - publish( - &node_id, - target_operator.clone(), - STOP_TOPIC.to_owned().into(), - &[], - communication.as_ref(), - ) - .await.with_context(|| { - format!("failed to send stop message for operator `{node_id}/{target_operator}`") - })?; - } + let events_tx = operator_events_tx.get(&target_operator).ok_or_else(|| { + eyre!("failed to get events_tx for operator {target_operator}") + })?; - if operator_map.is_empty() { - break; - } + let events_tx = events_tx.clone(); + tokio::spawn(async move { + let _ = events_tx.send(OperatorEvent::EndOfInput).await; + }); } }, Event::Operator { id, event } => { @@ -134,6 +118,29 @@ async fn main() -> eyre::Result<()> { bail!(err.wrap_err(format!("operator {id} failed"))) } OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), + OperatorEvent::EndOfInput => { + if operator_map.remove(&id).is_some() { + println!("operator {node_id}/{id} finished"); + // send stopped message + publish( + &node_id, + id.clone(), + STOP_TOPIC.to_owned().into(), + &[], + communication.as_ref(), + ) + .await + .with_context(|| { + format!("failed to send stop message for operator `{node_id}/{id}`") + })?; + + operator_events_tx.remove(&id); + } + + if operator_map.is_empty() { + break; + } + } } } } diff --git a/runtime/src/operator/mod.rs b/runtime/src/operator/mod.rs index 732042fd..dbfd1cee 100644 --- a/runtime/src/operator/mod.rs +++ b/runtime/src/operator/mod.rs @@ -60,6 +60,7 @@ pub enum OperatorEvent { Output { id: DataId, value: Vec }, Error(eyre::Error), Panic(Box), + EndOfInput, } pub struct OperatorInput { From 66b0976ff1b3fc151792667d01c683a54c4c4cd4 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 10 Jun 2022 11:00:42 +0200 Subject: [PATCH 6/7] Create a README for the C operator --- runtime/examples/c-operator/README.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 runtime/examples/c-operator/README.md diff --git a/runtime/examples/c-operator/README.md b/runtime/examples/c-operator/README.md new file mode 100644 index 00000000..f246c0c7 --- /dev/null +++ b/runtime/examples/c-operator/README.md @@ -0,0 +1,7 @@ +# C-operator Example + +Build with these steps: + +- `cp ../../../api/c/operator/api.h .` +- `clang -c operator.c` +- `clang -shared -v operator.o -o operator.so` From 7fd1916b32d4f6d3cfd288f27388cae0a5fd08fd Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 22 Jun 2022 13:07:06 +0200 Subject: [PATCH 7/7] Apply README suggestions from code review Co-authored-by: Xavier Tao --- coordinator/README.md | 19 ++++++++++++------- runtime/examples/c-operator/README.md | 8 +++++--- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/coordinator/README.md b/coordinator/README.md index ae99035f..90a3e6c4 100644 --- a/coordinator/README.md +++ b/coordinator/README.md @@ -27,14 +27,19 @@ There are drawbacks too, for example: ## Try it out -- Compile the dora runtime using `cargo build -p dora-runtime --release` -- Compile the nodes in `examples` using `cargo build -p dora-coordinator --examples --release` -- Compile the Rust example operator through `cargo build --manifest-path ../runtime/examples/example-operator/Cargo.toml --release` +- Compile: the dora runtime, the nodes in `examples`, and the Rust example operator through: +```bash +cargo build -p dora-runtime --release +cargo build -p dora-coordinator --examples --release +cargo build --manifest-path ../runtime/examples/example-operator/Cargo.toml --release +``` - Compile the C example operator through: - - `cd ../runtime/examples/c-operator` - - `cp ../../../api/c/operator/api.h .` - - `clang -c operator.c` - - `clang -shared -v operator.o -o operator.so` +```bash +cd ../runtime/examples/c-operator +cp ../../../api/c/operator/api.h . +clang -c operator.c +clang -shared -v operator.o -o operator.so +``` - Run the `mini-dataflow` example using `cargo run --release -- run examples/mini-dataflow.yml` - This spawns a `timer` source, which sends the current time periodically, and a `logger` sink, which prints the incoming data. - The `timer` will exit after 100 iterations. The other nodes/operators will then exit as well because all sources closed. diff --git a/runtime/examples/c-operator/README.md b/runtime/examples/c-operator/README.md index f246c0c7..6dfe5e0a 100644 --- a/runtime/examples/c-operator/README.md +++ b/runtime/examples/c-operator/README.md @@ -2,6 +2,8 @@ Build with these steps: -- `cp ../../../api/c/operator/api.h .` -- `clang -c operator.c` -- `clang -shared -v operator.o -o operator.so` +```bash +cp ../../../api/c/operator/api.h . +clang -c operator.c +clang -shared -v operator.o -o operator.so +```