Browse Source

Add an example C++ operator based on the Rust API

tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
331e521f5d
Failed to extract signature
8 changed files with 177 additions and 0 deletions
  1. +13
    -0
      Cargo.lock
  2. +10
    -0
      examples/c++-dataflow/dataflow.yml
  3. +18
    -0
      examples/c++-dataflow/operator-rust-api/Cargo.toml
  4. +10
    -0
      examples/c++-dataflow/operator-rust-api/build.rs
  5. +77
    -0
      examples/c++-dataflow/operator-rust-api/src/lib.rs
  6. +30
    -0
      examples/c++-dataflow/operator-rust-api/src/operator.cc
  7. +17
    -0
      examples/c++-dataflow/operator-rust-api/src/operator.h
  8. +2
    -0
      examples/c++-dataflow/run.rs

+ 13
- 0
Cargo.lock View File

@@ -618,6 +618,19 @@ dependencies = [
"tokio",
]

[[package]]
name = "cxx-dataflow-example-operator-rust-api"
version = "0.1.0"
dependencies = [
"cxx",
"cxx-build",
"dora-operator-api",
"eyre",
"futures",
"rand",
"tokio",
]

[[package]]
name = "cxxbridge-flags"
version = "1.0.73"


+ 10
- 0
examples/c++-dataflow/dataflow.yml View File

@@ -17,3 +17,13 @@ nodes:
tick: dora/timer/millis/300
outputs:
- counter

- id: runtime-node
operators:
- id: operator-rust-api
shared-library: ../../target/release/libcxx_dataflow_example_operator_rust_api.so
inputs:
counter_1: cxx-node-c-api/counter
counter_2: cxx-node-rust-api/counter
outputs:
- status

+ 18
- 0
examples/c++-dataflow/operator-rust-api/Cargo.toml View File

@@ -0,0 +1,18 @@
[package]
name = "cxx-dataflow-example-operator-rust-api"
version = "0.1.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]

[dependencies]
cxx = "1.0.73"
dora-operator-api = { version = "0.1.0", path = "../../../apis/rust/operator" }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.20.1", features = ["rt", "macros"] }

[build-dependencies]
cxx-build = "1.0.73"

+ 10
- 0
examples/c++-dataflow/operator-rust-api/build.rs View File

@@ -0,0 +1,10 @@
fn main() {
cxx_build::bridge("src/lib.rs") // returns a cc::Build
.file("src/operator.cc")
.flag_if_supported("-std=c++14")
.compile("cxx-example-dataflow-operator");

println!("cargo:rerun-if-changed=src/lib.rs");
println!("cargo:rerun-if-changed=src/operator.cc");
println!("cargo:rerun-if-changed=src/operator.h");
}

+ 77
- 0
examples/c++-dataflow/operator-rust-api/src/lib.rs View File

@@ -0,0 +1,77 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{self, register_operator, DoraOperator, DoraOutputSender, DoraStatus};

#[cxx::bridge]
#[allow(unsafe_op_in_unsafe_fn)]
mod ffi {
struct OnInputResult {
error: String,
stop: bool,
}

extern "Rust" {
type OutputSender<'a>;

fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> i32;
}

unsafe extern "C++" {
include!("cxx-dataflow-example-operator-rust-api/src/operator.h");
type Operator;

fn new_operator() -> UniquePtr<Operator>;

fn on_input(
op: Pin<&mut Operator>,
id: &str,
data: &[u8],
output_sender: &mut OutputSender,
) -> OnInputResult;
}
}

pub struct OutputSender<'a>(&'a mut DoraOutputSender);

fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> i32 {
match sender.0.send(id, data) {
Ok(()) => 0,
Err(err_code) => err_code.try_into().unwrap(),
}
}

register_operator!(OperatorWrapper);

struct OperatorWrapper {
operator: cxx::UniquePtr<ffi::Operator>,
}

impl Default for OperatorWrapper {
fn default() -> Self {
Self {
operator: ffi::new_operator(),
}
}
}

impl DoraOperator for OperatorWrapper {
fn on_input(
&mut self,
id: &str,
data: &[u8],
output_sender: &mut DoraOutputSender,
) -> Result<DoraStatus, ()> {
let operator = self.operator.as_mut().unwrap();
let mut output_sender = OutputSender(output_sender);

let result = ffi::on_input(operator, id, data, &mut output_sender);
if result.error.is_empty() {
Ok(match result.stop {
false => DoraStatus::Continue,
true => DoraStatus::Stop,
})
} else {
Err(())
}
}
}

+ 30
- 0
examples/c++-dataflow/operator-rust-api/src/operator.cc View File

@@ -0,0 +1,30 @@
#include "operator.h"
#include "cxx-dataflow-example-operator-rust-api/src/lib.rs.h"
#include <iostream>

Operator::Operator() {}

std::unique_ptr<Operator> new_operator()
{
return std::make_unique<Operator>();
}

OnInputResult on_input(Operator &op, rust::Str id, rust::Slice<const uint8_t> data, OutputSender &output_sender)
{
op.counter += 1;
std::cout << "Rust API operator received input `" << id << "` with data `" << (unsigned int)data[0] << "` (internal counter: " << (unsigned int)op.counter << ")" << std::endl;

std::vector<unsigned char> out_vec{op.counter};
rust::Slice<const uint8_t> out_slice{out_vec.data(), out_vec.size()};
auto result_code = send_output(output_sender, rust::Str("status"), out_slice);
if (result_code == 0)
{
OnInputResult result = {rust::String(), false};
return result;
}
else
{
OnInputResult result = {rust::String("error"), false};
return result;
}
}

+ 17
- 0
examples/c++-dataflow/operator-rust-api/src/operator.h View File

@@ -0,0 +1,17 @@
#pragma once
#include "rust/cxx.h"
#include <memory>

class Operator
{
public:
Operator();
unsigned char counter;
};

std::unique_ptr<Operator> new_operator();

struct OnInputResult;
struct OutputSender;

OnInputResult on_input(Operator &op, rust::Str, rust::Slice<const uint8_t>, OutputSender &output_sender);

+ 2
- 0
examples/c++-dataflow/run.rs View File

@@ -10,6 +10,8 @@ async fn main() -> eyre::Result<()> {
tokio::fs::create_dir_all(root.join("build")).await?;

build_package("cxx-dataflow-example-node-rust-api").await?;
build_package("cxx-dataflow-example-operator-rust-api").await?;

build_package("dora-node-api-c").await?;
build_cxx_node(
root,


Loading…
Cancel
Save