From 331e521f5d88216abec81ffb506bf7020797ad34 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 11 Aug 2022 18:13:49 +0200 Subject: [PATCH] Add an example C++ operator based on the Rust API --- Cargo.lock | 13 ++++ examples/c++-dataflow/dataflow.yml | 10 +++ .../c++-dataflow/operator-rust-api/Cargo.toml | 18 +++++ .../c++-dataflow/operator-rust-api/build.rs | 10 +++ .../c++-dataflow/operator-rust-api/src/lib.rs | 77 +++++++++++++++++++ .../operator-rust-api/src/operator.cc | 30 ++++++++ .../operator-rust-api/src/operator.h | 17 ++++ examples/c++-dataflow/run.rs | 2 + 8 files changed, 177 insertions(+) create mode 100644 examples/c++-dataflow/operator-rust-api/Cargo.toml create mode 100644 examples/c++-dataflow/operator-rust-api/build.rs create mode 100644 examples/c++-dataflow/operator-rust-api/src/lib.rs create mode 100644 examples/c++-dataflow/operator-rust-api/src/operator.cc create mode 100644 examples/c++-dataflow/operator-rust-api/src/operator.h diff --git a/Cargo.lock b/Cargo.lock index cd961fdf..20fc4300 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,6 +618,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "cxx-dataflow-example-operator-rust-api" +version = "0.1.0" +dependencies = [ + "cxx", + "cxx-build", + "dora-operator-api", + "eyre", + "futures", + "rand", + "tokio", +] + [[package]] name = "cxxbridge-flags" version = "1.0.73" diff --git a/examples/c++-dataflow/dataflow.yml b/examples/c++-dataflow/dataflow.yml index 8e4914bd..7228ae4c 100644 --- a/examples/c++-dataflow/dataflow.yml +++ b/examples/c++-dataflow/dataflow.yml @@ -17,3 +17,13 @@ nodes: tick: dora/timer/millis/300 outputs: - counter + + - id: runtime-node + operators: + - id: operator-rust-api + shared-library: ../../target/release/libcxx_dataflow_example_operator_rust_api.so + inputs: + counter_1: cxx-node-c-api/counter + counter_2: cxx-node-rust-api/counter + outputs: + - status diff --git a/examples/c++-dataflow/operator-rust-api/Cargo.toml b/examples/c++-dataflow/operator-rust-api/Cargo.toml new file mode 100644 index 00000000..cfc6bec4 --- /dev/null +++ b/examples/c++-dataflow/operator-rust-api/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "cxx-dataflow-example-operator-rust-api" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +cxx = "1.0.73" +dora-operator-api = { version = "0.1.0", path = "../../../apis/rust/operator" } +eyre = "0.6.8" +futures = "0.3.21" +rand = "0.8.5" +tokio = { version = "1.20.1", features = ["rt", "macros"] } + +[build-dependencies] +cxx-build = "1.0.73" diff --git a/examples/c++-dataflow/operator-rust-api/build.rs b/examples/c++-dataflow/operator-rust-api/build.rs new file mode 100644 index 00000000..c8b861a4 --- /dev/null +++ b/examples/c++-dataflow/operator-rust-api/build.rs @@ -0,0 +1,10 @@ +fn main() { + cxx_build::bridge("src/lib.rs") // returns a cc::Build + .file("src/operator.cc") + .flag_if_supported("-std=c++14") + .compile("cxx-example-dataflow-operator"); + + println!("cargo:rerun-if-changed=src/lib.rs"); + println!("cargo:rerun-if-changed=src/operator.cc"); + println!("cargo:rerun-if-changed=src/operator.h"); +} diff --git a/examples/c++-dataflow/operator-rust-api/src/lib.rs b/examples/c++-dataflow/operator-rust-api/src/lib.rs new file mode 100644 index 00000000..7d94cf9e --- /dev/null +++ b/examples/c++-dataflow/operator-rust-api/src/lib.rs @@ -0,0 +1,77 @@ +#![warn(unsafe_op_in_unsafe_fn)] + +use dora_operator_api::{self, register_operator, DoraOperator, DoraOutputSender, DoraStatus}; + +#[cxx::bridge] +#[allow(unsafe_op_in_unsafe_fn)] +mod ffi { + struct OnInputResult { + error: String, + stop: bool, + } + + extern "Rust" { + type OutputSender<'a>; + + fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> i32; + } + + unsafe extern "C++" { + include!("cxx-dataflow-example-operator-rust-api/src/operator.h"); + type Operator; + + fn new_operator() -> UniquePtr; + + fn on_input( + op: Pin<&mut Operator>, + id: &str, + data: &[u8], + output_sender: &mut OutputSender, + ) -> OnInputResult; + } +} + +pub struct OutputSender<'a>(&'a mut DoraOutputSender); + +fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> i32 { + match sender.0.send(id, data) { + Ok(()) => 0, + Err(err_code) => err_code.try_into().unwrap(), + } +} + +register_operator!(OperatorWrapper); + +struct OperatorWrapper { + operator: cxx::UniquePtr, +} + +impl Default for OperatorWrapper { + fn default() -> Self { + Self { + operator: ffi::new_operator(), + } + } +} + +impl DoraOperator for OperatorWrapper { + fn on_input( + &mut self, + id: &str, + data: &[u8], + output_sender: &mut DoraOutputSender, + ) -> Result { + let operator = self.operator.as_mut().unwrap(); + let mut output_sender = OutputSender(output_sender); + + let result = ffi::on_input(operator, id, data, &mut output_sender); + if result.error.is_empty() { + Ok(match result.stop { + false => DoraStatus::Continue, + true => DoraStatus::Stop, + }) + } else { + Err(()) + } + } +} diff --git a/examples/c++-dataflow/operator-rust-api/src/operator.cc b/examples/c++-dataflow/operator-rust-api/src/operator.cc new file mode 100644 index 00000000..e92dbe38 --- /dev/null +++ b/examples/c++-dataflow/operator-rust-api/src/operator.cc @@ -0,0 +1,30 @@ +#include "operator.h" +#include "cxx-dataflow-example-operator-rust-api/src/lib.rs.h" +#include + +Operator::Operator() {} + +std::unique_ptr new_operator() +{ + return std::make_unique(); +} + +OnInputResult on_input(Operator &op, rust::Str id, rust::Slice data, OutputSender &output_sender) +{ + op.counter += 1; + std::cout << "Rust API operator received input `" << id << "` with data `" << (unsigned int)data[0] << "` (internal counter: " << (unsigned int)op.counter << ")" << std::endl; + + std::vector out_vec{op.counter}; + rust::Slice out_slice{out_vec.data(), out_vec.size()}; + auto result_code = send_output(output_sender, rust::Str("status"), out_slice); + if (result_code == 0) + { + OnInputResult result = {rust::String(), false}; + return result; + } + else + { + OnInputResult result = {rust::String("error"), false}; + return result; + } +} diff --git a/examples/c++-dataflow/operator-rust-api/src/operator.h b/examples/c++-dataflow/operator-rust-api/src/operator.h new file mode 100644 index 00000000..bdaecbf4 --- /dev/null +++ b/examples/c++-dataflow/operator-rust-api/src/operator.h @@ -0,0 +1,17 @@ +#pragma once +#include "rust/cxx.h" +#include + +class Operator +{ +public: + Operator(); + unsigned char counter; +}; + +std::unique_ptr new_operator(); + +struct OnInputResult; +struct OutputSender; + +OnInputResult on_input(Operator &op, rust::Str, rust::Slice, OutputSender &output_sender); diff --git a/examples/c++-dataflow/run.rs b/examples/c++-dataflow/run.rs index 865ff54e..1e6cd32a 100644 --- a/examples/c++-dataflow/run.rs +++ b/examples/c++-dataflow/run.rs @@ -10,6 +10,8 @@ async fn main() -> eyre::Result<()> { tokio::fs::create_dir_all(root.join("build")).await?; build_package("cxx-dataflow-example-node-rust-api").await?; + build_package("cxx-dataflow-example-operator-rust-api").await?; + build_package("dora-node-api-c").await?; build_cxx_node( root,