diff --git a/Cargo.toml b/Cargo.toml index b63e4a2b..5ea71bfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,9 @@ edition = "2021" [workspace] members = [ - "api/node", - "api/operator", - "api/operator/macros", + "api/rust/node", + "api/rust/operator", + "api/rust/operator/macros", "coordinator", "message", "metrics", diff --git a/api/c/operator/api.h b/api/c/operator/api.h new file mode 100644 index 00000000..5ba25954 --- /dev/null +++ b/api/c/operator/api.h @@ -0,0 +1,16 @@ +int dora_init_operator(void **operator_context); + +void dora_drop_operator(void *operator_context); + +int dora_on_input( + const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const int (*output_fn_raw)(const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const void *output_context), + void *output_context, + const void *operator_context); diff --git a/api/node/Cargo.toml b/api/rust/node/Cargo.toml similarity index 100% rename from api/node/Cargo.toml rename to api/rust/node/Cargo.toml diff --git a/api/node/src/communication.rs b/api/rust/node/src/communication.rs similarity index 100% rename from api/node/src/communication.rs rename to api/rust/node/src/communication.rs diff --git a/api/node/src/config.rs b/api/rust/node/src/config.rs similarity index 100% rename from api/node/src/config.rs rename to api/rust/node/src/config.rs diff --git a/api/node/src/lib.rs b/api/rust/node/src/lib.rs similarity index 100% rename from api/node/src/lib.rs rename to api/rust/node/src/lib.rs diff --git a/api/operator/Cargo.toml b/api/rust/operator/Cargo.toml similarity index 100% rename from api/operator/Cargo.toml rename to api/rust/operator/Cargo.toml diff --git a/api/operator/macros/Cargo.toml b/api/rust/operator/macros/Cargo.toml similarity index 100% rename from api/operator/macros/Cargo.toml rename to api/rust/operator/macros/Cargo.toml diff --git a/api/operator/macros/src/lib.rs b/api/rust/operator/macros/src/lib.rs similarity index 100% rename from api/operator/macros/src/lib.rs rename to api/rust/operator/macros/src/lib.rs diff --git a/api/operator/src/lib.rs b/api/rust/operator/src/lib.rs similarity index 100% rename from api/operator/src/lib.rs rename to api/rust/operator/src/lib.rs diff --git a/api/operator/src/raw.rs b/api/rust/operator/src/raw.rs similarity index 100% rename from api/operator/src/raw.rs rename to api/rust/operator/src/raw.rs diff --git a/common/Cargo.toml b/common/Cargo.toml index 27de99d7..70908f08 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -6,6 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../api/node" } +dora-node-api = { version = "0.1.0", path = "../api/rust/node" } eyre = "0.6.8" serde = { version = "1.0.136", features = ["derive"] } diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 8ce99741..4729583c 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] bincode = "1.3.3" -dora-node-api = { path = "../api/node" } +dora-node-api = { path = "../api/rust/node" } eyre = "0.6.7" futures = "0.3.21" serde = { version = "1.0.136", features = ["derive"] } diff --git a/coordinator/README.md b/coordinator/README.md index 6d0a6262..90a3e6c4 100644 --- a/coordinator/README.md +++ b/coordinator/README.md @@ -27,8 +27,20 @@ There are drawbacks too, for example: ## Try it out -- Compile the `examples` using `cargo build -p dora-rs --examples` -- Run the `mini-dataflow` example using `cargo run -- run examples/mini-dataflow.yml` +- Compile: the dora runtime, the nodes in `examples`, and the Rust example operator through: +```bash +cargo build -p dora-runtime --release +cargo build -p dora-coordinator --examples --release +cargo build --manifest-path ../runtime/examples/example-operator/Cargo.toml --release +``` +- Compile the C example operator through: +```bash +cd ../runtime/examples/c-operator +cp ../../../api/c/operator/api.h . +clang -c operator.c +clang -shared -v operator.o -o operator.so +``` +- Run the `mini-dataflow` example using `cargo run --release -- run examples/mini-dataflow.yml` - This spawns a `timer` source, which sends the current time periodically, and a `logger` sink, which prints the incoming data. - - The `timer` will exit after 100 iterations. The `logger` will then exit with a timeout error. + - The `timer` will exit after 100 iterations. The other nodes/operators will then exit as well because all sources closed. diff --git a/coordinator/examples/example_sink_logger.rs b/coordinator/examples/example_sink_logger.rs index dc56630f..8fbeed01 100644 --- a/coordinator/examples/example_sink_logger.rs +++ b/coordinator/examples/example_sink_logger.rs @@ -40,6 +40,9 @@ async fn main() -> eyre::Result<()> { let data = String::from_utf8(input.data)?; println!("received timestamped random value: {data}"); } + "counter" => { + println!("received counter value: {:?}", input.data); + } other => eprintln!("Ignoring unexpected input `{other}`"), } diff --git a/coordinator/examples/mini-dataflow.yml b/coordinator/examples/mini-dataflow.yml index c47a1cb0..9e535d99 100644 --- a/coordinator/examples/mini-dataflow.yml +++ b/coordinator/examples/mini-dataflow.yml @@ -5,13 +5,13 @@ communication: nodes: - id: timer custom: - run: ../target/debug/examples/example_source_timer + run: ../target/release/examples/example_source_timer outputs: - time - id: rate-limited-timer custom: - run: ../target/debug/examples/rate_limit --seconds 0.5 + run: ../target/release/examples/rate_limit --seconds 0.5 inputs: data: timer/time outputs: @@ -19,7 +19,7 @@ nodes: - id: random custom: - run: ../target/debug/examples/random_number + run: ../target/release/examples/random_number inputs: timestamp: rate-limited-timer/rate_limited outputs: @@ -27,18 +27,25 @@ nodes: - id: logger custom: - run: ../target/debug/examples/example_sink_logger + run: ../target/release/examples/example_sink_logger inputs: random: random/number time: timer/time timestamped-random: runtime-node/op-1/timestamped-random + counter: runtime-node/op-2/counter - id: runtime-node operators: - id: op-1 - shared-library: ../target/debug/libexample_operator.so + shared-library: ../target/release/libexample_operator.so inputs: random: random/number time: timer/time outputs: - timestamped-random + - id: op-2 + shared-library: ../runtime/examples/c-operator/operator.so + inputs: + time: timer/time + outputs: + - counter diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index a324901e..ee68957b 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] clap = { version = "3.1.12", features = ["derive"] } -dora-node-api = { path = "../api/node" } +dora-node-api = { path = "../api/rust/node" } dora-common = { version = "0.1.0", path = "../common" } eyre = "0.6.8" futures = "0.3.21" diff --git a/runtime/examples/c-operator/.gitignore b/runtime/examples/c-operator/.gitignore new file mode 100644 index 00000000..4eba6870 --- /dev/null +++ b/runtime/examples/c-operator/.gitignore @@ -0,0 +1,3 @@ +operator.o +operator.so +api.h diff --git a/runtime/examples/c-operator/README.md b/runtime/examples/c-operator/README.md new file mode 100644 index 00000000..6dfe5e0a --- /dev/null +++ b/runtime/examples/c-operator/README.md @@ -0,0 +1,9 @@ +# C-operator Example + +Build with these steps: + +```bash +cp ../../../api/c/operator/api.h . +clang -c operator.c +clang -shared -v operator.o -o operator.so +``` diff --git a/runtime/examples/c-operator/operator.c b/runtime/examples/c-operator/operator.c new file mode 100644 index 00000000..2e3584a6 --- /dev/null +++ b/runtime/examples/c-operator/operator.c @@ -0,0 +1,64 @@ +#include "api.h" +#include +#include +#include + +int dora_init_operator(void **operator_context) +{ + void *context = malloc(1); + char *context_char = (char *)context; + *context_char = 0; + + *operator_context = context; + + return 0; +} + +void dora_drop_operator(void *operator_context) +{ + free(operator_context); +} + +int dora_on_input( + const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const int (*output_fn_raw)(const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const void *output_context), + void *output_context, + const void *operator_context) +{ + char *context = (char *)operator_context; + + char id[id_len + 1]; + memcpy(id, id_start, id_len); + id[id_len] = 0; + + if (strcmp(id, "time") == 0) + { + char time[data_len + 1]; + memcpy(time, data_start, data_len); + time[data_len] = 0; + + printf("C operator received time input %s, context: %i\n", time, *context); + *context += 1; + + char *out_id = "counter"; + + int res = (output_fn_raw)(out_id, strlen(out_id), context, 1, output_context); + if (res != 0) + { + printf("C operator failed to send output\n"); + } + } + else + { + printf("C operator received unexpected input %s, context: %i\n", id, *context); + } + + return 0; +} diff --git a/runtime/examples/example-operator/Cargo.toml b/runtime/examples/example-operator/Cargo.toml index 956da5af..a6f3b372 100644 --- a/runtime/examples/example-operator/Cargo.toml +++ b/runtime/examples/example-operator/Cargo.toml @@ -9,4 +9,4 @@ edition = "2021" crate-type = ["cdylib"] [dependencies] -dora-operator-api = { path = "../../../api/operator" } +dora-operator-api = { path = "../../../api/rust/operator" } diff --git a/runtime/src/main.rs b/runtime/src/main.rs index 96ba50d0..6f91715c 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -14,7 +14,10 @@ use futures::{ }; use futures_concurrency::Merge; use operator::{Operator, OperatorEvent}; -use std::{collections::BTreeMap, mem}; +use std::{ + collections::{BTreeMap, HashMap}, + mem, +}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamMap}; @@ -42,6 +45,7 @@ async fn main() -> eyre::Result<()> { let mut operator_map = BTreeMap::new(); let mut operator_events = StreamMap::new(); + let mut operator_events_tx = HashMap::new(); for operator_config in &operators { let (events_tx, events) = mpsc::channel(1); let operator = Operator::init(operator_config.clone(), events_tx.clone()) @@ -49,6 +53,7 @@ async fn main() -> eyre::Result<()> { .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; operator_map.insert(&operator_config.id, operator); operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); + operator_events_tx.insert(operator_config.id.clone(), events_tx); } let communication: Box = @@ -86,35 +91,14 @@ async fn main() -> eyre::Result<()> { })?; } SubscribeEvent::InputsStopped { target_operator } => { - // -------------------------------------------------------- - // TODO FIXME: For some reason, these zenoh publish calls - // (and also subsequent ones) are not visible to other - // nodes. This includes the stop command, so the input - // streams of dependent nodes are not closed properly. - // -------------------------------------------------------- - - communication - .publish("/HHH", &[]) - .await - .wrap_err("failed to send on /HHH")?; - if operator_map.remove(&target_operator).is_some() { - println!("operator {node_id}/{target_operator} finished"); - // send stopped message - publish( - &node_id, - target_operator.clone(), - STOP_TOPIC.to_owned().into(), - &[], - communication.as_ref(), - ) - .await.with_context(|| { - format!("failed to send stop message for operator `{node_id}/{target_operator}`") - })?; - } + let events_tx = operator_events_tx.get(&target_operator).ok_or_else(|| { + eyre!("failed to get events_tx for operator {target_operator}") + })?; - if operator_map.is_empty() { - break; - } + let events_tx = events_tx.clone(); + tokio::spawn(async move { + let _ = events_tx.send(OperatorEvent::EndOfInput).await; + }); } }, Event::Operator { id, event } => { @@ -134,6 +118,29 @@ async fn main() -> eyre::Result<()> { bail!(err.wrap_err(format!("operator {id} failed"))) } OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), + OperatorEvent::EndOfInput => { + if operator_map.remove(&id).is_some() { + println!("operator {node_id}/{id} finished"); + // send stopped message + publish( + &node_id, + id.clone(), + STOP_TOPIC.to_owned().into(), + &[], + communication.as_ref(), + ) + .await + .with_context(|| { + format!("failed to send stop message for operator `{node_id}/{id}`") + })?; + + operator_events_tx.remove(&id); + } + + if operator_map.is_empty() { + break; + } + } } } } diff --git a/runtime/src/operator/mod.rs b/runtime/src/operator/mod.rs index 732042fd..dbfd1cee 100644 --- a/runtime/src/operator/mod.rs +++ b/runtime/src/operator/mod.rs @@ -60,6 +60,7 @@ pub enum OperatorEvent { Output { id: DataId, value: Vec }, Error(eyre::Error), Panic(Box), + EndOfInput, } pub struct OperatorInput {