Browse Source

Create an example C++ node based on Rust node API and `cxx` crate

tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
09866cbb2b
Failed to extract signature
10 changed files with 281 additions and 9 deletions
  1. +94
    -6
      Cargo.lock
  2. +6
    -3
      Cargo.toml
  3. +6
    -0
      apis/rust/node/src/config.rs
  4. +12
    -0
      examples/c++-dataflow/dataflow.yml
  5. +17
    -0
      examples/c++-dataflow/node/Cargo.toml
  6. +10
    -0
      examples/c++-dataflow/node/build.rs
  7. +33
    -0
      examples/c++-dataflow/node/src/main.cc
  8. +4
    -0
      examples/c++-dataflow/node/src/main.h
  9. +68
    -0
      examples/c++-dataflow/node/src/main.rs
  10. +31
    -0
      examples/c++-dataflow/run.rs

+ 94
- 6
Cargo.lock View File

@@ -452,6 +452,16 @@ dependencies = [
"os_str_bytes",
]

[[package]]
name = "codespan-reporting"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e"
dependencies = [
"termcolor",
"unicode-width",
]

[[package]]
name = "concurrent-queue"
version = "1.2.2"
@@ -568,6 +578,63 @@ dependencies = [
"syn",
]

[[package]]
name = "cxx"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "873c2e83af70859af2aaecd1f5d862f3790b747b1f4f50fb45a931d000ac0422"
dependencies = [
"cc",
"cxxbridge-flags",
"cxxbridge-macro",
"link-cplusplus",
]

[[package]]
name = "cxx-build"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b49edea7163bbc7a39e3d829b4b0b66a9d30486973152842b7413f2c7b5632bf"
dependencies = [
"cc",
"codespan-reporting",
"once_cell",
"proc-macro2",
"quote",
"scratch",
"syn",
]

[[package]]
name = "cxx-dataflow-example-node"
version = "0.1.0"
dependencies = [
"cxx",
"cxx-build",
"dora-node-api",
"eyre",
"futures",
"rand",
"tokio",
]

[[package]]
name = "cxxbridge-flags"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46b787c15af80277db5c88c6ac6c502ae545e622f010e06f95e540d34931acf"

[[package]]
name = "cxxbridge-macro"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ba3f3a7efa46626878fb5d324fabca4d19d2956b6ae97ce43044ef4515f5abc"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "dashmap"
version = "4.0.2"
@@ -1345,6 +1412,15 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33a33a362ce288760ec6a508b94caaec573ae7d3bbbd91b87aa0bad4456839db"

[[package]]
name = "link-cplusplus"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8cae2cd7ba2f3f63938b9c724475dfb7b9861b545a90324476324ed21dbc8c8"
dependencies = [
"cc",
]

[[package]]
name = "linked-hash-map"
version = "0.5.4"
@@ -1954,11 +2030,11 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"

[[package]]
name = "proc-macro2"
version = "1.0.36"
version = "1.0.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029"
checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab"
dependencies = [
"unicode-xid",
"unicode-ident",
]

[[package]]
@@ -2395,6 +2471,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"

[[package]]
name = "scratch"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898"

[[package]]
name = "sct"
version = "0.6.1"
@@ -2630,13 +2712,13 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"

[[package]]
name = "syn"
version = "1.0.94"
version = "1.0.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a07e33e919ebcd69113d5be0e4d70c5707004ff45188910106854f38b960df4a"
checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
"unicode-ident",
]

[[package]]
@@ -3008,6 +3090,12 @@ dependencies = [
"uuid",
]

[[package]]
name = "unicode-ident"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf"

[[package]]
name = "unicode-segmentation"
version = "1.9.0"


+ 6
- 3
Cargo.toml View File

@@ -5,9 +5,8 @@ members = [
"apis/rust/*",
"apis/rust/operator/macros",
"binaries/*",
"examples/rust-dataflow/operator",
"examples/rust-dataflow/node",
"examples/rust-dataflow/sink",
"examples/rust-dataflow/*",
"examples/c++-dataflow/*",
"libraries/core",
"libraries/extensions/message",
"libraries/extensions/telemetry/*",
@@ -33,3 +32,7 @@ path = "examples/c-dataflow/run.rs"
[[example]]
name = "rust-dataflow"
path = "examples/rust-dataflow/run.rs"

[[example]]
name = "cxx-dataflow"
path = "examples/c++-dataflow/run.rs"

+ 6
- 0
apis/rust/node/src/config.rs View File

@@ -66,6 +66,12 @@ impl std::fmt::Display for OperatorId {
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct DataId(String);

impl From<DataId> for String {
fn from(id: DataId) -> Self {
id.0
}
}

impl From<String> for DataId {
fn from(id: String) -> Self {
Self(id)


+ 12
- 0
examples/c++-dataflow/dataflow.yml View File

@@ -0,0 +1,12 @@
communication:
zenoh:
prefix: /example-cxx-dataflow

nodes:
- id: cxx-node
custom:
run: ../../target/release/cxx-dataflow-example-node
inputs:
tick: dora/timer/millis/300
outputs:
- counter

+ 17
- 0
examples/c++-dataflow/node/Cargo.toml View File

@@ -0,0 +1,17 @@
[package]
name = "cxx-dataflow-example-node"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
cxx = "1.0.73"
dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.20.1", features = ["rt", "macros"] }

[build-dependencies]
cxx-build = "1.0.73"

+ 10
- 0
examples/c++-dataflow/node/build.rs View File

@@ -0,0 +1,10 @@
fn main() {
cxx_build::bridge("src/main.rs") // returns a cc::Build
.file("src/main.cc")
.flag_if_supported("-std=c++11")
.compile("cxx-example-dataflow-node");

println!("cargo:rerun-if-changed=src/main.rs");
println!("cargo:rerun-if-changed=src/main.cc");
println!("cargo:rerun-if-changed=src/main.h");
}

+ 33
- 0
examples/c++-dataflow/node/src/main.cc View File

@@ -0,0 +1,33 @@
#include "main.h"

#include <iostream>
#include <vector>

void cxx_main(Inputs &inputs, const OutputSender &output_sender)
{
std::cout << "HELLO FROM C++" << std::endl;
unsigned char counter = 0;

for (int i = 0; i < 20; i++)
{

auto input = next_input(inputs);
if (input.end_of_input)
{
return;
}
counter += 1;

std::cout << "Received input " << std::string(input.id) << " (counter: " << (unsigned int)counter << ")" << std::endl;

std::vector<unsigned char> out_vec{counter};
rust::Slice<const uint8_t> out_slice{out_vec.data(), out_vec.size()};
auto result = send_output(output_sender, "counter", out_slice);
auto error = std::string(result.error);
if (!error.empty())
{
std::cerr << "Error: " << error << std::endl;
return;
}
}
}

+ 4
- 0
examples/c++-dataflow/node/src/main.h View File

@@ -0,0 +1,4 @@
#pragma once
#include "cxx-dataflow-example-node/src/main.rs.h"

void cxx_main(Inputs &inputs, const OutputSender &output_sender);

+ 68
- 0
examples/c++-dataflow/node/src/main.rs View File

@@ -0,0 +1,68 @@
use dora_node_api::{self, DoraNode, Input};
use futures::{executor::block_on, Stream, StreamExt};
use std::pin::Pin;

#[cxx::bridge]
mod ffi {
struct DoraInput {
end_of_input: bool,
id: String,
data: Vec<u8>,
}

struct DoraResult {
error: String,
}

extern "Rust" {
type Inputs<'a>;
type OutputSender<'a>;

fn next_input(inputs: &mut Inputs) -> DoraInput;
fn send_output(output_sender: &OutputSender, id: String, data: &[u8]) -> DoraResult;
}

unsafe extern "C++" {
include!("cxx-dataflow-example-node/src/main.h");

fn cxx_main(inputs: &mut Inputs, output_sender: &OutputSender);
}
}

pub struct Inputs<'a>(Pin<Box<dyn Stream<Item = Input> + 'a>>);

fn next_input(inputs: &mut Inputs) -> ffi::DoraInput {
match block_on(inputs.0.next()) {
Some(input) => ffi::DoraInput {
end_of_input: false,
id: input.id.into(),
data: input.data,
},
None => ffi::DoraInput {
end_of_input: true,
id: String::new(),
data: Vec::new(),
},
}
}

pub struct OutputSender<'a>(&'a DoraNode);

fn send_output(sender: &OutputSender, id: String, data: &[u8]) -> ffi::DoraResult {
let error = match block_on(sender.0.send_output(&id.into(), data)) {
Ok(()) => String::new(),
Err(err) => format!("{err:?}"),
};
ffi::DoraResult { error }
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let node = DoraNode::init_from_env().await?;
let input_stream = node.inputs().await?;
let mut inputs = Inputs(Box::pin(input_stream));
let outputs = OutputSender(&node);
ffi::cxx_main(&mut inputs, &outputs);

Ok(())
}

+ 31
- 0
examples/c++-dataflow/run.rs View File

@@ -0,0 +1,31 @@
use eyre::{bail, Context};
use std::path::Path;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

build_package("cxx-dataflow-example-node").await?;
build_package("dora-runtime").await?;

dora_coordinator::run(dora_coordinator::Command::Run {
dataflow: Path::new("dataflow.yml").to_owned(),
runtime: Some(root.join("target").join("release").join("dora-runtime")),
})
.await?;

Ok(())
}

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build").arg("--release");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
};
Ok(())
}

Loading…
Cancel
Save