Browse Source

Merge pull request #121 from dora-rs/c++-operator-api

Implement a C++ operator API
tags/v0.1.0
Philipp Oppermann GitHub 3 years ago
parent
commit
8e64c60105
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 182 additions and 99 deletions
  1. +13
    -13
      Cargo.lock
  2. +0
    -1
      Cargo.toml
  3. +2
    -2
      apis/c++/operator/Cargo.toml
  4. +4
    -0
      apis/c++/operator/build.rs
  5. +10
    -8
      apis/c++/operator/src/lib.rs
  6. +2
    -2
      apis/c/operator/operator_api.h
  7. +1
    -0
      examples/c++-dataflow/.gitignore
  8. +1
    -1
      examples/c++-dataflow/dataflow.yml
  9. +0
    -10
      examples/c++-dataflow/operator-rust-api/build.rs
  10. +23
    -0
      examples/c++-dataflow/operator-rust-api/operator.cc
  11. +16
    -0
      examples/c++-dataflow/operator-rust-api/operator.h
  12. +0
    -22
      examples/c++-dataflow/operator-rust-api/src/operator.cc
  13. +0
    -17
      examples/c++-dataflow/operator-rust-api/src/operator.h
  14. +110
    -23
      examples/c++-dataflow/run.rs

+ 13
- 13
Cargo.lock View File

@@ -746,19 +746,6 @@ dependencies = [
"syn",
]

[[package]]
name = "cxx-dataflow-example-operator-rust-api"
version = "0.1.0"
dependencies = [
"cxx",
"cxx-build",
"dora-operator-api",
"eyre",
"futures",
"rand",
"tokio",
]

[[package]]
name = "cxxbridge-flags"
version = "1.0.73"
@@ -1036,6 +1023,19 @@ dependencies = [
"dora-operator-api-types",
]

[[package]]
name = "dora-operator-api-cxx"
version = "0.1.0"
dependencies = [
"cxx",
"cxx-build",
"dora-operator-api",
"eyre",
"futures",
"rand",
"tokio",
]

[[package]]
name = "dora-operator-api-macros"
version = "0.1.0"


+ 0
- 1
Cargo.toml View File

@@ -9,7 +9,6 @@ members = [
"apis/rust/operator/types",
"binaries/*",
"examples/rust-dataflow/*",
"examples/c++-dataflow/operator-rust-api",
"examples/iceoryx/*",
"libraries/communication-layer",
"libraries/core",


examples/c++-dataflow/operator-rust-api/Cargo.toml → apis/c++/operator/Cargo.toml View File

@@ -1,10 +1,10 @@
[package]
name = "cxx-dataflow-example-operator-rust-api"
name = "dora-operator-api-cxx"
version = "0.1.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]
crate-type = ["staticlib"]

[dependencies]
cxx = "1.0.73"

+ 4
- 0
apis/c++/operator/build.rs View File

@@ -0,0 +1,4 @@
fn main() {
let _ = cxx_build::bridge("src/lib.rs");
println!("cargo:rerun-if-changed=src/lib.rs");
}

examples/c++-dataflow/operator-rust-api/src/lib.rs → apis/c++/operator/src/lib.rs View File

@@ -1,28 +1,30 @@
#![cfg(not(test))]
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{self, register_operator, DoraOperator, DoraOutputSender, DoraStatus};
use ffi::SendOutputResult;
use ffi::DoraSendOutputResult;

#[cxx::bridge]
#[allow(unsafe_op_in_unsafe_fn)]
mod ffi {
struct OnInputResult {
struct DoraOnInputResult {
error: String,
stop: bool,
}

struct SendOutputResult {
struct DoraSendOutputResult {
error: String,
}

extern "Rust" {
type OutputSender<'a, 'b>;

fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> SendOutputResult;
fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> DoraSendOutputResult;
}

unsafe extern "C++" {
include!("cxx-dataflow-example-operator-rust-api/src/operator.h");
include!("operator.h");

type Operator;

fn new_operator() -> UniquePtr<Operator>;
@@ -32,19 +34,19 @@ mod ffi {
id: &str,
data: &[u8],
output_sender: &mut OutputSender,
) -> OnInputResult;
) -> DoraOnInputResult;
}
}

pub struct OutputSender<'a, 'b>(&'a mut DoraOutputSender<'b>);

fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> SendOutputResult {
fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> DoraSendOutputResult {
let error = sender
.0
.send(id.into(), data.to_owned())
.err()
.unwrap_or_default();
SendOutputResult { error }
DoraSendOutputResult { error }
}

register_operator!(OperatorWrapper);

+ 2
- 2
apis/c/operator/operator_api.h View File

@@ -11,7 +11,7 @@ extern "C"
#ifdef _WIN32
#define EXPORT __declspec(dllexport)
#else
#define EXPORT
#define EXPORT __attribute__((visibility("default")))
#endif

EXPORT DoraInitResult_t dora_init_operator(void);
@@ -23,7 +23,7 @@ extern "C"
const SendOutput_t *send_output,
void *operator_context);

void __dora_type_assertions()
static void __dora_type_assertions()
{
DoraInitOperator_t __dora_init_operator = {.init_operator = dora_init_operator};
DoraDropOperator_t __dora_drop_operator = {.drop_operator = dora_drop_operator};


+ 1
- 0
examples/c++-dataflow/.gitignore View File

@@ -0,0 +1 @@
*.o

+ 1
- 1
examples/c++-dataflow/dataflow.yml View File

@@ -21,7 +21,7 @@ nodes:
- id: runtime-node
operators:
- id: operator-rust-api
shared-library: ../../target/debug/cxx_dataflow_example_operator_rust_api
shared-library: build/operator_rust_api
inputs:
counter_1: cxx-node-c-api/counter
counter_2: cxx-node-rust-api/counter


+ 0
- 10
examples/c++-dataflow/operator-rust-api/build.rs View File

@@ -1,10 +0,0 @@
fn main() {
cxx_build::bridge("src/lib.rs") // returns a cc::Build
.file("src/operator.cc")
.flag_if_supported("-std=c++14")
.compile("cxx-example-dataflow-operator");

println!("cargo:rerun-if-changed=src/lib.rs");
println!("cargo:rerun-if-changed=src/operator.cc");
println!("cargo:rerun-if-changed=src/operator.h");
}

+ 23
- 0
examples/c++-dataflow/operator-rust-api/operator.cc View File

@@ -0,0 +1,23 @@
#include "operator.h"
#include <iostream>
#include <vector>
#include "../build/dora-operator-api.h"

Operator::Operator() {}

std::unique_ptr<Operator> new_operator()
{
return std::make_unique<Operator>();
}

DoraOnInputResult on_input(Operator &op, rust::Str id, rust::Slice<const uint8_t> data, OutputSender &output_sender)
{
op.counter += 1;
std::cout << "Rust API operator received input `" << id.data() << "` with data `" << (unsigned int)data[0] << "` (internal counter: " << (unsigned int)op.counter << ")" << std::endl;

std::vector<unsigned char> out_vec{op.counter};
rust::Slice<const uint8_t> out_slice{out_vec.data(), out_vec.size()};
auto send_result = send_output(output_sender, rust::Str("status"), out_slice);
DoraOnInputResult result = {send_result.error, false};
return result;
}

+ 16
- 0
examples/c++-dataflow/operator-rust-api/operator.h View File

@@ -0,0 +1,16 @@
#pragma once
#include <memory>
#include "../../../apis/c/operator/operator_api.h"

class Operator
{
public:
Operator();
unsigned char counter;
};

#include "../build/dora-operator-api.h"

std::unique_ptr<Operator> new_operator();

DoraOnInputResult on_input(Operator &op, rust::Str id, rust::Slice<const uint8_t> data, OutputSender &output_sender);

+ 0
- 22
examples/c++-dataflow/operator-rust-api/src/operator.cc View File

@@ -1,22 +0,0 @@
#include "cxx-dataflow-example-operator-rust-api/src/operator.h"
#include "cxx-dataflow-example-operator-rust-api/src/lib.rs.h"
#include <iostream>

Operator::Operator() {}

std::unique_ptr<Operator> new_operator()
{
return std::make_unique<Operator>();
}

OnInputResult on_input(Operator &op, rust::Str id, rust::Slice<const uint8_t> data, OutputSender &output_sender)
{
op.counter += 1;
std::cout << "Rust API operator received input `" << id << "` with data `" << (unsigned int)data[0] << "` (internal counter: " << (unsigned int)op.counter << ")" << std::endl;

std::vector<unsigned char> out_vec{op.counter};
rust::Slice<const uint8_t> out_slice{out_vec.data(), out_vec.size()};
auto send_result = send_output(output_sender, rust::Str("status"), out_slice);
OnInputResult result = {send_result.error, false};
return result;
}

+ 0
- 17
examples/c++-dataflow/operator-rust-api/src/operator.h View File

@@ -1,17 +0,0 @@
#pragma once
#include "rust/cxx.h"
#include <memory>

class Operator
{
public:
Operator();
unsigned char counter;
};

std::unique_ptr<Operator> new_operator();

struct OnInputResult;
struct OutputSender;

OnInputResult on_input(Operator &op, rust::Str, rust::Slice<const uint8_t>, OutputSender &output_sender);

+ 110
- 23
examples/c++-dataflow/run.rs View File

@@ -7,6 +7,11 @@ use std::{

#[tokio::main]
async fn main() -> eyre::Result<()> {
if cfg!(windows) {
eprintln!("The c++ example does not work on Windows currently because of a linker error");
return Ok(());
}

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let target = root.join("target");
std::env::set_current_dir(root.join(file!()).parent().unwrap())
@@ -15,19 +20,42 @@ async fn main() -> eyre::Result<()> {
tokio::fs::create_dir_all("build").await?;
let build_dir = Path::new("build");

build_package("cxx-dataflow-example-operator-rust-api").await?;
build_package("dora-operator-api-cxx").await?;
let operator_cxxbridge = target
.join("cxxbridge")
.join("dora-operator-api-cxx")
.join("src");
tokio::fs::copy(
operator_cxxbridge.join("lib.rs.cc"),
build_dir.join("operator-bridge.cc"),
)
.await?;
tokio::fs::copy(
operator_cxxbridge.join("lib.rs.h"),
build_dir.join("dora-operator-api.h"),
)
.await?;

build_package("dora-node-api-cxx").await?;
let cxxbridge = target
let node_cxxbridge = target
.join("cxxbridge")
.join("dora-node-api-cxx")
.join("src");
tokio::fs::copy(cxxbridge.join("lib.rs.cc"), build_dir.join("bridge.cc")).await?;
tokio::fs::copy(
cxxbridge.join("lib.rs.h"),
node_cxxbridge.join("lib.rs.cc"),
build_dir.join("node-bridge.cc"),
)
.await?;
tokio::fs::copy(
node_cxxbridge.join("lib.rs.h"),
build_dir.join("dora-node-api.h"),
)
.await?;
tokio::fs::write(
build_dir.join("operator.h"),
r###"#include "../operator-rust-api/operator.h""###,
)
.await?;

build_package("dora-node-api-c").await?;
build_package("dora-operator-api-c").await?;
@@ -35,7 +63,7 @@ async fn main() -> eyre::Result<()> {
root,
&[
&dunce::canonicalize(Path::new("node-rust-api").join("main.cc"))?,
&dunce::canonicalize(build_dir.join("bridge.cc"))?,
&dunce::canonicalize(build_dir.join("node-bridge.cc"))?,
],
"node_rust_api",
&["-l", "dora_node_api_cxx"],
@@ -51,8 +79,25 @@ async fn main() -> eyre::Result<()> {
)
.await?;
build_cxx_operator(
&dunce::canonicalize(Path::new("operator-c-api").join("operator.cc"))?,
&[
&dunce::canonicalize(Path::new("operator-rust-api").join("operator.cc"))?,
&dunce::canonicalize(build_dir.join("operator-bridge.cc"))?,
],
"operator_rust_api",
&[
"-l",
"dora_operator_api_cxx",
"-L",
&root.join("target").join("debug").to_str().unwrap(),
],
)
.await?;
build_cxx_operator(
&[&dunce::canonicalize(
Path::new("operator-c-api").join("operator.cc"),
)?],
"operator_c_api",
&[],
)
.await?;

@@ -87,7 +132,6 @@ async fn build_cxx_node(
let mut clang = tokio::process::Command::new("clang++");
clang.args(paths);
clang.arg("-std=c++17");
clang.args(args);
#[cfg(target_os = "linux")]
{
clang.arg("-l").arg("m");
@@ -137,6 +181,7 @@ async fn build_cxx_node(
clang.arg("-l").arg("c");
clang.arg("-l").arg("m");
}
clang.args(args);
clang.arg("-L").arg(root.join("target").join("debug"));
clang
.arg("--output")
@@ -151,27 +196,69 @@ async fn build_cxx_node(
Ok(())
}

async fn build_cxx_operator(path: &Path, out_name: &str) -> eyre::Result<()> {
let object_file_path = Path::new("../build").join(out_name).with_extension("o");

let mut compile = tokio::process::Command::new("clang++");
compile.arg("-c").arg(path);
compile.arg("-std=c++17");
compile.arg("-o").arg(&object_file_path);
#[cfg(unix)]
compile.arg("-fPIC");
if let Some(parent) = path.parent() {
compile.current_dir(parent);
async fn build_cxx_operator(
paths: &[&Path],
out_name: &str,
link_args: &[&str],
) -> eyre::Result<()> {
let mut object_file_paths = Vec::new();

for path in paths {
let mut compile = tokio::process::Command::new("clang++");
compile.arg("-c").arg(path);
compile.arg("-std=c++17");
let object_file_path = path.with_extension("o");
compile.arg("-o").arg(&object_file_path);
#[cfg(unix)]
compile.arg("-fPIC");
if let Some(parent) = path.parent() {
compile.current_dir(parent);
}
if !compile.status().await?.success() {
bail!("failed to compile cxx operator");
};
object_file_paths.push(object_file_path);
}
if !compile.status().await?.success() {
bail!("failed to compile cxx operator");
};

let mut link = tokio::process::Command::new("clang++");
link.arg("-shared").arg(&object_file_path);
link.arg("-shared").args(&object_file_paths);
link.args(link_args);
#[cfg(target_os = "windows")]
{
link.arg("-ladvapi32");
link.arg("-luserenv");
link.arg("-lkernel32");
link.arg("-lws2_32");
link.arg("-lbcrypt");
link.arg("-lncrypt");
link.arg("-lschannel");
link.arg("-lntdll");
link.arg("-liphlpapi");

link.arg("-lcfgmgr32");
link.arg("-lcredui");
link.arg("-lcrypt32");
link.arg("-lcryptnet");
link.arg("-lfwpuclnt");
link.arg("-lgdi32");
link.arg("-lmsimg32");
link.arg("-lmswsock");
link.arg("-lole32");
link.arg("-lopengl32");
link.arg("-lsecur32");
link.arg("-lshell32");
link.arg("-lsynchronization");
link.arg("-luser32");
link.arg("-lwinspool");

link.arg("-Wl,-nodefaultlib:libcmt");
link.arg("-D_DLL");
link.arg("-lmsvcrt");
link.arg("-fms-runtime-lib=static");
}
link.arg("-o")
.arg(Path::new("../build").join(library_filename(out_name)));
if let Some(parent) = path.parent() {
if let Some(parent) = paths[0].parent() {
link.current_dir(parent);
}
if !link.status().await?.success() {


Loading…
Cancel
Save