diff --git a/Cargo.lock b/Cargo.lock index 55caa521..df2732a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -452,6 +452,16 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "codespan-reporting" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" +dependencies = [ + "termcolor", + "unicode-width", +] + [[package]] name = "concurrent-queue" version = "1.2.2" @@ -568,6 +578,63 @@ dependencies = [ "syn", ] +[[package]] +name = "cxx" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "873c2e83af70859af2aaecd1f5d862f3790b747b1f4f50fb45a931d000ac0422" +dependencies = [ + "cc", + "cxxbridge-flags", + "cxxbridge-macro", + "link-cplusplus", +] + +[[package]] +name = "cxx-build" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49edea7163bbc7a39e3d829b4b0b66a9d30486973152842b7413f2c7b5632bf" +dependencies = [ + "cc", + "codespan-reporting", + "once_cell", + "proc-macro2", + "quote", + "scratch", + "syn", +] + +[[package]] +name = "cxx-dataflow-example-node" +version = "0.1.0" +dependencies = [ + "cxx", + "cxx-build", + "dora-node-api", + "eyre", + "futures", + "rand", + "tokio", +] + +[[package]] +name = "cxxbridge-flags" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46b787c15af80277db5c88c6ac6c502ae545e622f010e06f95e540d34931acf" + +[[package]] +name = "cxxbridge-macro" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ba3f3a7efa46626878fb5d324fabca4d19d2956b6ae97ce43044ef4515f5abc" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dashmap" version = "4.0.2" @@ -1345,6 +1412,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33a33a362ce288760ec6a508b94caaec573ae7d3bbbd91b87aa0bad4456839db" +[[package]] +name = "link-cplusplus" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8cae2cd7ba2f3f63938b9c724475dfb7b9861b545a90324476324ed21dbc8c8" +dependencies = [ + "cc", +] + [[package]] name = "linked-hash-map" version = "0.5.4" @@ -1954,11 +2030,11 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.36" +version = "1.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" +checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] @@ -2395,6 +2471,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "scratch" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" + [[package]] name = "sct" version = "0.6.1" @@ -2630,13 +2712,13 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.94" +version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a07e33e919ebcd69113d5be0e4d70c5707004ff45188910106854f38b960df4a" +checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13" dependencies = [ "proc-macro2", "quote", - "unicode-xid", + "unicode-ident", ] [[package]] @@ -3008,6 +3090,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "unicode-ident" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf" + [[package]] name = "unicode-segmentation" version = "1.9.0" diff --git a/Cargo.toml b/Cargo.toml index 2a656c60..4c18d1cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,8 @@ members = [ "apis/rust/*", "apis/rust/operator/macros", "binaries/*", - "examples/rust-dataflow/operator", - "examples/rust-dataflow/node", - "examples/rust-dataflow/sink", + "examples/rust-dataflow/*", + "examples/c++-dataflow/*", "libraries/core", "libraries/extensions/message", "libraries/extensions/telemetry/*", @@ -33,3 +32,7 @@ path = "examples/c-dataflow/run.rs" [[example]] name = "rust-dataflow" path = "examples/rust-dataflow/run.rs" + +[[example]] +name = "cxx-dataflow" +path = "examples/c++-dataflow/run.rs" diff --git a/apis/rust/node/src/config.rs b/apis/rust/node/src/config.rs index a6991235..dccae1d1 100644 --- a/apis/rust/node/src/config.rs +++ b/apis/rust/node/src/config.rs @@ -66,6 +66,12 @@ impl std::fmt::Display for OperatorId { #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] pub struct DataId(String); +impl From for String { + fn from(id: DataId) -> Self { + id.0 + } +} + impl From for DataId { fn from(id: String) -> Self { Self(id) diff --git a/examples/c++-dataflow/dataflow.yml b/examples/c++-dataflow/dataflow.yml new file mode 100644 index 00000000..151e3e74 --- /dev/null +++ b/examples/c++-dataflow/dataflow.yml @@ -0,0 +1,12 @@ +communication: + zenoh: + prefix: /example-cxx-dataflow + +nodes: + - id: cxx-node + custom: + run: ../../target/release/cxx-dataflow-example-node + inputs: + tick: dora/timer/millis/300 + outputs: + - counter diff --git a/examples/c++-dataflow/node/Cargo.toml b/examples/c++-dataflow/node/Cargo.toml new file mode 100644 index 00000000..d118866d --- /dev/null +++ b/examples/c++-dataflow/node/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "cxx-dataflow-example-node" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +cxx = "1.0.73" +dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +eyre = "0.6.8" +futures = "0.3.21" +rand = "0.8.5" +tokio = { version = "1.20.1", features = ["rt", "macros"] } + +[build-dependencies] +cxx-build = "1.0.73" diff --git a/examples/c++-dataflow/node/build.rs b/examples/c++-dataflow/node/build.rs new file mode 100644 index 00000000..12e4875c --- /dev/null +++ b/examples/c++-dataflow/node/build.rs @@ -0,0 +1,10 @@ +fn main() { + cxx_build::bridge("src/main.rs") // returns a cc::Build + .file("src/main.cc") + .flag_if_supported("-std=c++11") + .compile("cxx-example-dataflow-node"); + + println!("cargo:rerun-if-changed=src/main.rs"); + println!("cargo:rerun-if-changed=src/main.cc"); + println!("cargo:rerun-if-changed=src/main.h"); +} diff --git a/examples/c++-dataflow/node/src/main.cc b/examples/c++-dataflow/node/src/main.cc new file mode 100644 index 00000000..2d52e539 --- /dev/null +++ b/examples/c++-dataflow/node/src/main.cc @@ -0,0 +1,33 @@ +#include "main.h" + +#include +#include + +void cxx_main(Inputs &inputs, const OutputSender &output_sender) +{ + std::cout << "HELLO FROM C++" << std::endl; + unsigned char counter = 0; + + for (int i = 0; i < 20; i++) + { + + auto input = next_input(inputs); + if (input.end_of_input) + { + return; + } + counter += 1; + + std::cout << "Received input " << std::string(input.id) << " (counter: " << (unsigned int)counter << ")" << std::endl; + + std::vector out_vec{counter}; + rust::Slice out_slice{out_vec.data(), out_vec.size()}; + auto result = send_output(output_sender, "counter", out_slice); + auto error = std::string(result.error); + if (!error.empty()) + { + std::cerr << "Error: " << error << std::endl; + return; + } + } +} diff --git a/examples/c++-dataflow/node/src/main.h b/examples/c++-dataflow/node/src/main.h new file mode 100644 index 00000000..ac430d20 --- /dev/null +++ b/examples/c++-dataflow/node/src/main.h @@ -0,0 +1,4 @@ +#pragma once +#include "cxx-dataflow-example-node/src/main.rs.h" + +void cxx_main(Inputs &inputs, const OutputSender &output_sender); diff --git a/examples/c++-dataflow/node/src/main.rs b/examples/c++-dataflow/node/src/main.rs new file mode 100644 index 00000000..2d1535a9 --- /dev/null +++ b/examples/c++-dataflow/node/src/main.rs @@ -0,0 +1,68 @@ +use dora_node_api::{self, DoraNode, Input}; +use futures::{executor::block_on, Stream, StreamExt}; +use std::pin::Pin; + +#[cxx::bridge] +mod ffi { + struct DoraInput { + end_of_input: bool, + id: String, + data: Vec, + } + + struct DoraResult { + error: String, + } + + extern "Rust" { + type Inputs<'a>; + type OutputSender<'a>; + + fn next_input(inputs: &mut Inputs) -> DoraInput; + fn send_output(output_sender: &OutputSender, id: String, data: &[u8]) -> DoraResult; + } + + unsafe extern "C++" { + include!("cxx-dataflow-example-node/src/main.h"); + + fn cxx_main(inputs: &mut Inputs, output_sender: &OutputSender); + } +} + +pub struct Inputs<'a>(Pin + 'a>>); + +fn next_input(inputs: &mut Inputs) -> ffi::DoraInput { + match block_on(inputs.0.next()) { + Some(input) => ffi::DoraInput { + end_of_input: false, + id: input.id.into(), + data: input.data, + }, + None => ffi::DoraInput { + end_of_input: true, + id: String::new(), + data: Vec::new(), + }, + } +} + +pub struct OutputSender<'a>(&'a DoraNode); + +fn send_output(sender: &OutputSender, id: String, data: &[u8]) -> ffi::DoraResult { + let error = match block_on(sender.0.send_output(&id.into(), data)) { + Ok(()) => String::new(), + Err(err) => format!("{err:?}"), + }; + ffi::DoraResult { error } +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let node = DoraNode::init_from_env().await?; + let input_stream = node.inputs().await?; + let mut inputs = Inputs(Box::pin(input_stream)); + let outputs = OutputSender(&node); + ffi::cxx_main(&mut inputs, &outputs); + + Ok(()) +} diff --git a/examples/c++-dataflow/run.rs b/examples/c++-dataflow/run.rs new file mode 100644 index 00000000..cf0a3a13 --- /dev/null +++ b/examples/c++-dataflow/run.rs @@ -0,0 +1,31 @@ +use eyre::{bail, Context}; +use std::path::Path; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + build_package("cxx-dataflow-example-node").await?; + build_package("dora-runtime").await?; + + dora_coordinator::run(dora_coordinator::Command::Run { + dataflow: Path::new("dataflow.yml").to_owned(), + runtime: Some(root.join("target").join("release").join("dora-runtime")), + }) + .await?; + + Ok(()) +} + +async fn build_package(package: &str) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("build").arg("--release"); + cmd.arg("--package").arg(package); + if !cmd.status().await?.success() { + bail!("failed to build {package}"); + }; + Ok(()) +}