diff --git a/Cargo.toml b/Cargo.toml index 4c18d1cd..bd234270 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ members = [ "apis/rust/operator/macros", "binaries/*", "examples/rust-dataflow/*", - "examples/c++-dataflow/*", + "examples/c++-dataflow/*-rust-*", "libraries/core", "libraries/extensions/message", "libraries/extensions/telemetry/*", diff --git a/examples/c++-dataflow/dataflow.yml b/examples/c++-dataflow/dataflow.yml index 151e3e74..8e4914bd 100644 --- a/examples/c++-dataflow/dataflow.yml +++ b/examples/c++-dataflow/dataflow.yml @@ -3,9 +3,16 @@ communication: prefix: /example-cxx-dataflow nodes: - - id: cxx-node + - id: cxx-node-rust-api custom: - run: ../../target/release/cxx-dataflow-example-node + run: ../../target/release/cxx-dataflow-example-node-rust-api + inputs: + tick: dora/timer/millis/300 + outputs: + - counter + - id: cxx-node-c-api + custom: + run: build/node_c_api inputs: tick: dora/timer/millis/300 outputs: diff --git a/examples/c++-dataflow/node-c-api/main.cc b/examples/c++-dataflow/node-c-api/main.cc new file mode 100644 index 00000000..d4fb7cad --- /dev/null +++ b/examples/c++-dataflow/node-c-api/main.cc @@ -0,0 +1,71 @@ +extern "C" +{ +#include "../../../apis/c/node/node_api.h" +} + +#include +#include + +int run(void *dora_context) +{ + unsigned char counter = 0; + + for (int i = 0; i < 20; i++) + { + + auto input = dora_next_input(dora_context); + if (input == NULL) + { + return 0; // end of input + } + counter += 1; + + char *id_ptr; + size_t id_len; + read_dora_input_id(input, &id_ptr, &id_len); + std::string id(id_ptr, id_len); + + char *data_ptr; + size_t data_len; + read_dora_input_data(input, &data_ptr, &data_len); + std::vector data; + for (size_t i = 0; i < data_len; i++) + { + data.push_back(*(data_ptr + i)); + } + + std::cout + << "Received input " + << " (counter: " << (unsigned int)counter << ") data: ["; + for (unsigned char &v : data) + { + std::cout << (unsigned int)v << ", "; + } + std::cout << "]" << std::endl; + + free_dora_input(input); + + std::vector out_vec{counter}; + + std::string out_id = "counter"; + + int result = dora_send_output(dora_context, &out_id[0], out_id.length(), (char *)&counter, 1); + if (result != 0) + { + std::cerr << "failed to send output" << std::endl; + return 1; + } + } + return 0; +} + +int main() +{ + std::cout << "HELLO FROM C++ (using C API)" << std::endl; + + auto dora_context = init_dora_context_from_env(); + auto ret = run(dora_context); + free_dora_context(dora_context); + + return ret; +} diff --git a/examples/c++-dataflow/run.rs b/examples/c++-dataflow/run.rs index 004ebd1c..865ff54e 100644 --- a/examples/c++-dataflow/run.rs +++ b/examples/c++-dataflow/run.rs @@ -7,7 +7,16 @@ async fn main() -> eyre::Result<()> { std::env::set_current_dir(root.join(file!()).parent().unwrap()) .wrap_err("failed to set working dir")?; + tokio::fs::create_dir_all(root.join("build")).await?; + build_package("cxx-dataflow-example-node-rust-api").await?; + build_package("dora-node-api-c").await?; + build_cxx_node( + root, + &Path::new("node-c-api").join("main.cc").canonicalize()?, + "node_c_api", + ) + .await?; build_package("dora-runtime").await?; dora_coordinator::run(dora_coordinator::Command::Run { @@ -29,3 +38,21 @@ async fn build_package(package: &str) -> eyre::Result<()> { }; Ok(()) } + +async fn build_cxx_node(root: &Path, path: &Path, out_name: &str) -> eyre::Result<()> { + let mut clang = tokio::process::Command::new("clang++"); + clang.arg(path); + clang.arg("-l").arg("dora_node_api_c"); + clang.arg("-L").arg(root.join("target").join("release")); + clang + .arg("--output") + .arg(Path::new("../build").join(out_name)); + if let Some(parent) = path.parent() { + clang.current_dir(parent); + } + + if !clang.status().await?.success() { + bail!("failed to compile c++ node"); + }; + Ok(()) +}