diff --git a/Cargo.lock b/Cargo.lock index 94078b0d..8d6586c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -657,6 +657,15 @@ dependencies = [ "serde", ] +[[package]] +name = "dora-examples" +version = "0.0.0" +dependencies = [ + "dora-coordinator", + "eyre", + "tokio", +] + [[package]] name = "dora-message" version = "0.1.0" @@ -2749,10 +2758,11 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.17.0" +version = "1.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +checksum = "7a8325f63a7d4774dd041e363b2409ed1c5cbbd0f867795e661df066b2b0a581" dependencies = [ + "autocfg 1.1.0", "bytes", "libc", "memchr", diff --git a/Cargo.toml b/Cargo.toml index 0e740451..a11b9489 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,3 +12,18 @@ members = [ "libraries/extensions/telemetry/*", "libraries/extensions/zenoh-logger", ] + +[package] +name = "dora-examples" +version = "0.0.0" +edition = "2021" +license = "Apache-2.0" + +[dev-dependencies] +eyre = "0.6.8" +tokio = "1.20.1" +dora-coordinator = { path = "binaries/coordinator" } + +[[example]] +name = "c-dataflow" +path = "examples/c-dataflow/run.rs" diff --git a/examples/c-dataflow/.gitignore b/examples/c-dataflow/.gitignore new file mode 100644 index 00000000..378eac25 --- /dev/null +++ b/examples/c-dataflow/.gitignore @@ -0,0 +1 @@ +build diff --git a/examples/c-dataflow/dataflow.yml b/examples/c-dataflow/dataflow.yml new file mode 100644 index 00000000..31b4238d --- /dev/null +++ b/examples/c-dataflow/dataflow.yml @@ -0,0 +1,25 @@ +communication: + zenoh: + prefix: /example-c-dataflow + +nodes: + - id: c_node + custom: + run: build/c_node + inputs: + timer: dora/timer/secs/1 + outputs: + - tick + - id: runtime-node + operators: + - id: c_operator + shared-library: build/operator.so + inputs: + tick: c_node/tick + outputs: + - counter + - id: c_sink + custom: + run: build/c_sink + inputs: + counter: runtime-node/c_operator/counter diff --git a/examples/c-dataflow/node.c b/examples/c-dataflow/node.c new file mode 100644 index 00000000..94d9d5f5 --- /dev/null +++ b/examples/c-dataflow/node.c @@ -0,0 +1,41 @@ +#include +#include +#include +#include "build/node_api.h" + +// sleep +#ifdef _WIN32 +#include +#else +#include +#endif + +int main() +{ + void *dora_context = init_dora_context_from_env(); + if (dora_context == NULL) + { + fprintf(stderr, "failed to init dora context\n"); + return -1; + } + + for (char i = 0; i < 10; i++) + { + void *input = dora_next_input(dora_context); + + char *data; + size_t data_len; + read_dora_input_data(input, &data, &data_len); + + assert(data_len == 0); + + char out_id[] = "tick"; + dora_send_output(dora_context, out_id, strlen(out_id), &i, 1); + + free_dora_input(input); + } + + free_dora_context(dora_context); + + return 0; +} diff --git a/examples/c-operator/operator.c b/examples/c-dataflow/operator.c similarity index 55% rename from examples/c-operator/operator.c rename to examples/c-dataflow/operator.c index 17e9661c..0ca98c91 100644 --- a/examples/c-operator/operator.c +++ b/examples/c-dataflow/operator.c @@ -1,4 +1,5 @@ -#include "operator_api.h" +#include "build/operator_api.h" +#include #include #include #include @@ -21,35 +22,39 @@ void dora_drop_operator(void *operator_context) int dora_on_input( const char *id_start, - unsigned int id_len, + size_t id_len, const char *data_start, - unsigned int data_len, + size_t data_len, const int (*output_fn_raw)(const char *id_start, - unsigned int id_len, + size_t id_len, const char *data_start, - unsigned int data_len, + size_t data_len, const void *output_context), void *output_context, const void *operator_context) { - char *context = (char *)operator_context; + char *counter = (char *)operator_context; char id[id_len + 1]; memcpy(id, id_start, id_len); id[id_len] = 0; - if (strcmp(id, "time") == 0) + if (strcmp(id, "tick") == 0) { - char time[data_len + 1]; - memcpy(time, data_start, data_len); - time[data_len] = 0; + char data[data_len + 1]; + memcpy(data, data_start, data_len); + data[data_len] = 0; - printf("C operator received time input %s, context: %i\n", time, *context); - *context += 1; + *counter += 1; + printf("C operator received tick input with data `%s`, counter: %i\n", data, *counter); char *out_id = "counter"; - int res = (output_fn_raw)(out_id, strlen(out_id), context, 1, output_context); + char out_data[100]; + int count = snprintf(out_data, sizeof(out_data), "The current counter value is %d", *counter); + assert(count >= 0 && count < 100); + + int res = (output_fn_raw)(out_id, strlen(out_id), out_data, strlen(out_data), output_context); if (res != 0) { printf("C operator failed to send output\n"); @@ -57,7 +62,7 @@ int dora_on_input( } else { - printf("C operator received unexpected input %s, context: %i\n", id, *context); + printf("C operator received unexpected input %s, context: %i\n", id, *counter); } return 0; diff --git a/examples/c-dataflow/run.rs b/examples/c-dataflow/run.rs new file mode 100644 index 00000000..1f597fa2 --- /dev/null +++ b/examples/c-dataflow/run.rs @@ -0,0 +1,84 @@ +use eyre::{bail, Context}; +use std::path::Path; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + std::env::set_current_dir(Path::new(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + tokio::fs::create_dir_all("build").await?; + + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + build_package("dora-runtime").await?; + build_package("dora-node-api-c").await?; + build_c_node(root, "node.c", "c_node").await?; + build_c_node(root, "sink.c", "c_sink").await?; + build_c_operator(root).await?; + + dora_coordinator::run(dora_coordinator::Command::Run { + dataflow: Path::new("dataflow.yml").to_owned(), + runtime: Some(root.join("target").join("release").join("dora-runtime")), + }) + .await?; + + Ok(()) +} + +async fn build_package(package: &str) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("build").arg("--release"); + cmd.arg("--package").arg(package); + if !cmd.status().await?.success() { + bail!("failed to build {package}"); + }; + Ok(()) +} + +async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<()> { + // copy header file + tokio::fs::copy( + root.join("apis").join("c").join("node").join("node_api.h"), + Path::new("build").join("node_api.h"), + ) + .await?; + + let mut clang = tokio::process::Command::new("clang"); + clang.arg(name); + clang.arg("-l").arg("dora_node_api_c"); + clang.arg("-l").arg("m"); + clang.arg("-L").arg(root.join("target").join("release")); + clang.arg("--output").arg(Path::new("build").join(out_name)); + if !clang.status().await?.success() { + bail!("failed to compile c node"); + }; + Ok(()) +} + +async fn build_c_operator(root: &Path) -> eyre::Result<()> { + // copy header file + tokio::fs::copy( + root.join("apis") + .join("c") + .join("operator") + .join("operator_api.h"), + Path::new("build").join("operator_api.h"), + ) + .await?; + + let mut compile = tokio::process::Command::new("clang"); + compile.arg("-c").arg("operator.c"); + compile.arg("-o").arg("build/operator.o"); + if !compile.status().await?.success() { + bail!("failed to compile c operator"); + }; + + let mut link = tokio::process::Command::new("clang"); + link.arg("-shared").arg("build/operator.o"); + link.arg("-o").arg("build/operator.so"); + if !link.status().await?.success() { + bail!("failed to link c operator"); + }; + + Ok(()) +} diff --git a/examples/c-dataflow/sink.c b/examples/c-dataflow/sink.c new file mode 100644 index 00000000..3f1f5da2 --- /dev/null +++ b/examples/c-dataflow/sink.c @@ -0,0 +1,43 @@ +#include +#include +#include +#include "build/node_api.h" + +int main() +{ + void *dora_context = init_dora_context_from_env(); + if (dora_context == NULL) + { + fprintf(stderr, "failed to init dora context\n"); + return -1; + } + + while (1) + { + void *input = dora_next_input(dora_context); + if (input == NULL) + { + break; + } + + char *id = (char *)0xdeadbeafcafebabe; + size_t id_len; + read_dora_input_id(input, &id, &id_len); + + char *data; + size_t data_len; + read_dora_input_data(input, &data, &data_len); + + printf("sink received input `"); + fwrite(id, id_len, 1, stdout); + printf("` with data: '"); + fwrite(data, data_len, 1, stdout); + printf("'\n"); + + free_dora_input(input); + } + + free_dora_context(dora_context); + + return 0; +} diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 5ae27129..94e5917e 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -166,6 +166,18 @@ pub enum OperatorSource { Wasm(PathBuf), } +impl OperatorSource { + pub fn canonicalize(&mut self) -> std::io::Result<()> { + let path = match self { + OperatorSource::SharedLibrary(path) => path, + OperatorSource::Python(path) => path, + OperatorSource::Wasm(path) => path, + }; + *path = path.canonicalize()?; + Ok(()) + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct PythonOperatorConfig { pub path: PathBuf,