Create C++ example dataflowtags/v0.0.0-test.4
| @@ -50,6 +50,11 @@ jobs: | |||
| with: | |||
| command: run | |||
| args: --example c-dataflow | |||
| - name: "C++ Dataflow example" | |||
| uses: actions-rs/cargo@v1 | |||
| with: | |||
| command: run | |||
| args: --example cxx-dataflow | |||
| clippy: | |||
| name: "Clippy" | |||
| @@ -452,6 +452,16 @@ dependencies = [ | |||
| "os_str_bytes", | |||
| ] | |||
| [[package]] | |||
| name = "codespan-reporting" | |||
| version = "0.11.1" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" | |||
| dependencies = [ | |||
| "termcolor", | |||
| "unicode-width", | |||
| ] | |||
| [[package]] | |||
| name = "concurrent-queue" | |||
| version = "1.2.2" | |||
| @@ -568,6 +578,76 @@ dependencies = [ | |||
| "syn", | |||
| ] | |||
| [[package]] | |||
| name = "cxx" | |||
| version = "1.0.73" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "873c2e83af70859af2aaecd1f5d862f3790b747b1f4f50fb45a931d000ac0422" | |||
| dependencies = [ | |||
| "cc", | |||
| "cxxbridge-flags", | |||
| "cxxbridge-macro", | |||
| "link-cplusplus", | |||
| ] | |||
| [[package]] | |||
| name = "cxx-build" | |||
| version = "1.0.73" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "b49edea7163bbc7a39e3d829b4b0b66a9d30486973152842b7413f2c7b5632bf" | |||
| dependencies = [ | |||
| "cc", | |||
| "codespan-reporting", | |||
| "once_cell", | |||
| "proc-macro2", | |||
| "quote", | |||
| "scratch", | |||
| "syn", | |||
| ] | |||
| [[package]] | |||
| name = "cxx-dataflow-example-node-rust-api" | |||
| version = "0.1.0" | |||
| dependencies = [ | |||
| "cxx", | |||
| "cxx-build", | |||
| "dora-node-api", | |||
| "eyre", | |||
| "futures", | |||
| "rand", | |||
| "tokio", | |||
| ] | |||
| [[package]] | |||
| name = "cxx-dataflow-example-operator-rust-api" | |||
| version = "0.1.0" | |||
| dependencies = [ | |||
| "cxx", | |||
| "cxx-build", | |||
| "dora-operator-api", | |||
| "eyre", | |||
| "futures", | |||
| "rand", | |||
| "tokio", | |||
| ] | |||
| [[package]] | |||
| name = "cxxbridge-flags" | |||
| version = "1.0.73" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "f46b787c15af80277db5c88c6ac6c502ae545e622f010e06f95e540d34931acf" | |||
| [[package]] | |||
| name = "cxxbridge-macro" | |||
| version = "1.0.73" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "2ba3f3a7efa46626878fb5d324fabca4d19d2956b6ae97ce43044ef4515f5abc" | |||
| dependencies = [ | |||
| "proc-macro2", | |||
| "quote", | |||
| "syn", | |||
| ] | |||
| [[package]] | |||
| name = "dashmap" | |||
| version = "4.0.2" | |||
| @@ -1345,6 +1425,15 @@ version = "0.2.2" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "33a33a362ce288760ec6a508b94caaec573ae7d3bbbd91b87aa0bad4456839db" | |||
| [[package]] | |||
| name = "link-cplusplus" | |||
| version = "1.0.6" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "f8cae2cd7ba2f3f63938b9c724475dfb7b9861b545a90324476324ed21dbc8c8" | |||
| dependencies = [ | |||
| "cc", | |||
| ] | |||
| [[package]] | |||
| name = "linked-hash-map" | |||
| version = "0.5.4" | |||
| @@ -1954,11 +2043,11 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" | |||
| [[package]] | |||
| name = "proc-macro2" | |||
| version = "1.0.36" | |||
| version = "1.0.43" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" | |||
| checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" | |||
| dependencies = [ | |||
| "unicode-xid", | |||
| "unicode-ident", | |||
| ] | |||
| [[package]] | |||
| @@ -2395,6 +2484,12 @@ version = "1.1.0" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" | |||
| [[package]] | |||
| name = "scratch" | |||
| version = "1.0.2" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" | |||
| [[package]] | |||
| name = "sct" | |||
| version = "0.6.1" | |||
| @@ -2630,13 +2725,13 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" | |||
| [[package]] | |||
| name = "syn" | |||
| version = "1.0.94" | |||
| version = "1.0.99" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "a07e33e919ebcd69113d5be0e4d70c5707004ff45188910106854f38b960df4a" | |||
| checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13" | |||
| dependencies = [ | |||
| "proc-macro2", | |||
| "quote", | |||
| "unicode-xid", | |||
| "unicode-ident", | |||
| ] | |||
| [[package]] | |||
| @@ -3008,6 +3103,12 @@ dependencies = [ | |||
| "uuid", | |||
| ] | |||
| [[package]] | |||
| name = "unicode-ident" | |||
| version = "1.0.3" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf" | |||
| [[package]] | |||
| name = "unicode-segmentation" | |||
| version = "1.9.0" | |||
| @@ -5,9 +5,8 @@ members = [ | |||
| "apis/rust/*", | |||
| "apis/rust/operator/macros", | |||
| "binaries/*", | |||
| "examples/rust-dataflow/operator", | |||
| "examples/rust-dataflow/node", | |||
| "examples/rust-dataflow/sink", | |||
| "examples/rust-dataflow/*", | |||
| "examples/c++-dataflow/*-rust-*", | |||
| "libraries/core", | |||
| "libraries/extensions/message", | |||
| "libraries/extensions/telemetry/*", | |||
| @@ -33,3 +32,7 @@ path = "examples/c-dataflow/run.rs" | |||
| [[example]] | |||
| name = "rust-dataflow" | |||
| path = "examples/rust-dataflow/run.rs" | |||
| [[example]] | |||
| name = "cxx-dataflow" | |||
| path = "examples/c++-dataflow/run.rs" | |||
| @@ -66,6 +66,12 @@ impl std::fmt::Display for OperatorId { | |||
| #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] | |||
| pub struct DataId(String); | |||
| impl From<DataId> for String { | |||
| fn from(id: DataId) -> Self { | |||
| id.0 | |||
| } | |||
| } | |||
| impl From<String> for DataId { | |||
| fn from(id: String) -> Self { | |||
| Self(id) | |||
| @@ -0,0 +1,5 @@ | |||
| # Dora C++ Dataflow Example | |||
| This example shows how to create dora operators and custom nodes with C++. | |||
| Dora does not provide a C++ API yet, but we can create adapters for either the C or Rust API. The `operator-rust-api` and `node-rust-api` folders implement an example operator and node based on dora's Rust API, using the `cxx` crate for bridging. The `operator-c-api` and `node-c-api` show how to create operators and nodes based on dora's C API. Both approaches work, so you can choose the API that fits your application better. | |||
| @@ -0,0 +1,35 @@ | |||
| communication: | |||
| zenoh: | |||
| prefix: /example-cxx-dataflow | |||
| nodes: | |||
| - id: cxx-node-rust-api | |||
| custom: | |||
| run: ../../target/release/cxx-dataflow-example-node-rust-api | |||
| inputs: | |||
| tick: dora/timer/millis/300 | |||
| outputs: | |||
| - counter | |||
| - id: cxx-node-c-api | |||
| custom: | |||
| run: build/node_c_api | |||
| inputs: | |||
| tick: dora/timer/millis/300 | |||
| outputs: | |||
| - counter | |||
| - id: runtime-node | |||
| operators: | |||
| - id: operator-rust-api | |||
| shared-library: ../../target/release/libcxx_dataflow_example_operator_rust_api.so | |||
| inputs: | |||
| counter_1: cxx-node-c-api/counter | |||
| counter_2: cxx-node-rust-api/counter | |||
| outputs: | |||
| - status | |||
| - id: operator-c-api | |||
| shared-library: build/operator_c_api.so | |||
| inputs: | |||
| op_status: runtime-node/operator-rust-api/status | |||
| outputs: | |||
| - half-status | |||
| @@ -0,0 +1,71 @@ | |||
| extern "C" | |||
| { | |||
| #include "../../../apis/c/node/node_api.h" | |||
| } | |||
| #include <iostream> | |||
| #include <vector> | |||
| int run(void *dora_context) | |||
| { | |||
| unsigned char counter = 0; | |||
| for (int i = 0; i < 20; i++) | |||
| { | |||
| auto input = dora_next_input(dora_context); | |||
| if (input == NULL) | |||
| { | |||
| return 0; // end of input | |||
| } | |||
| counter += 1; | |||
| char *id_ptr; | |||
| size_t id_len; | |||
| read_dora_input_id(input, &id_ptr, &id_len); | |||
| std::string id(id_ptr, id_len); | |||
| char *data_ptr; | |||
| size_t data_len; | |||
| read_dora_input_data(input, &data_ptr, &data_len); | |||
| std::vector<unsigned char> data; | |||
| for (size_t i = 0; i < data_len; i++) | |||
| { | |||
| data.push_back(*(data_ptr + i)); | |||
| } | |||
| std::cout | |||
| << "Received input " | |||
| << " (counter: " << (unsigned int)counter << ") data: ["; | |||
| for (unsigned char &v : data) | |||
| { | |||
| std::cout << (unsigned int)v << ", "; | |||
| } | |||
| std::cout << "]" << std::endl; | |||
| free_dora_input(input); | |||
| std::vector<unsigned char> out_vec{counter}; | |||
| std::string out_id = "counter"; | |||
| int result = dora_send_output(dora_context, &out_id[0], out_id.length(), (char *)&counter, 1); | |||
| if (result != 0) | |||
| { | |||
| std::cerr << "failed to send output" << std::endl; | |||
| return 1; | |||
| } | |||
| } | |||
| return 0; | |||
| } | |||
| int main() | |||
| { | |||
| std::cout << "HELLO FROM C++ (using C API)" << std::endl; | |||
| auto dora_context = init_dora_context_from_env(); | |||
| auto ret = run(dora_context); | |||
| free_dora_context(dora_context); | |||
| return ret; | |||
| } | |||
| @@ -0,0 +1,17 @@ | |||
| [package] | |||
| name = "cxx-dataflow-example-node-rust-api" | |||
| version = "0.1.0" | |||
| edition = "2021" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |||
| [dependencies] | |||
| cxx = "1.0.73" | |||
| dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } | |||
| eyre = "0.6.8" | |||
| futures = "0.3.21" | |||
| rand = "0.8.5" | |||
| tokio = { version = "1.20.1", features = ["rt", "macros"] } | |||
| [build-dependencies] | |||
| cxx-build = "1.0.73" | |||
| @@ -0,0 +1,10 @@ | |||
| fn main() { | |||
| cxx_build::bridge("src/main.rs") // returns a cc::Build | |||
| .file("src/main.cc") | |||
| .flag_if_supported("-std=c++11") | |||
| .compile("cxx-example-dataflow-node"); | |||
| println!("cargo:rerun-if-changed=src/main.rs"); | |||
| println!("cargo:rerun-if-changed=src/main.cc"); | |||
| println!("cargo:rerun-if-changed=src/main.h"); | |||
| } | |||
| @@ -0,0 +1,33 @@ | |||
| #include "main.h" | |||
| #include <iostream> | |||
| #include <vector> | |||
| void cxx_main(Inputs &inputs, const OutputSender &output_sender) | |||
| { | |||
| std::cout << "HELLO FROM C++" << std::endl; | |||
| unsigned char counter = 0; | |||
| for (int i = 0; i < 20; i++) | |||
| { | |||
| auto input = next_input(inputs); | |||
| if (input.end_of_input) | |||
| { | |||
| return; | |||
| } | |||
| counter += 1; | |||
| std::cout << "Received input " << std::string(input.id) << " (counter: " << (unsigned int)counter << ")" << std::endl; | |||
| std::vector<unsigned char> out_vec{counter}; | |||
| rust::Slice<const uint8_t> out_slice{out_vec.data(), out_vec.size()}; | |||
| auto result = send_output(output_sender, "counter", out_slice); | |||
| auto error = std::string(result.error); | |||
| if (!error.empty()) | |||
| { | |||
| std::cerr << "Error: " << error << std::endl; | |||
| return; | |||
| } | |||
| } | |||
| } | |||
| @@ -0,0 +1,4 @@ | |||
| #pragma once | |||
| #include "cxx-dataflow-example-node-rust-api/src/main.rs.h" | |||
| void cxx_main(Inputs &inputs, const OutputSender &output_sender); | |||
| @@ -0,0 +1,68 @@ | |||
| use dora_node_api::{self, DoraNode, Input}; | |||
| use futures::{executor::block_on, Stream, StreamExt}; | |||
| use std::pin::Pin; | |||
| #[cxx::bridge] | |||
| mod ffi { | |||
| struct DoraInput { | |||
| end_of_input: bool, | |||
| id: String, | |||
| data: Vec<u8>, | |||
| } | |||
| struct DoraResult { | |||
| error: String, | |||
| } | |||
| extern "Rust" { | |||
| type Inputs<'a>; | |||
| type OutputSender<'a>; | |||
| fn next_input(inputs: &mut Inputs) -> DoraInput; | |||
| fn send_output(output_sender: &OutputSender, id: String, data: &[u8]) -> DoraResult; | |||
| } | |||
| unsafe extern "C++" { | |||
| include!("cxx-dataflow-example-node-rust-api/src/main.h"); | |||
| fn cxx_main(inputs: &mut Inputs, output_sender: &OutputSender); | |||
| } | |||
| } | |||
| pub struct Inputs<'a>(Pin<Box<dyn Stream<Item = Input> + 'a>>); | |||
| fn next_input(inputs: &mut Inputs) -> ffi::DoraInput { | |||
| match block_on(inputs.0.next()) { | |||
| Some(input) => ffi::DoraInput { | |||
| end_of_input: false, | |||
| id: input.id.into(), | |||
| data: input.data, | |||
| }, | |||
| None => ffi::DoraInput { | |||
| end_of_input: true, | |||
| id: String::new(), | |||
| data: Vec::new(), | |||
| }, | |||
| } | |||
| } | |||
| pub struct OutputSender<'a>(&'a DoraNode); | |||
| fn send_output(sender: &OutputSender, id: String, data: &[u8]) -> ffi::DoraResult { | |||
| let error = match block_on(sender.0.send_output(&id.into(), data)) { | |||
| Ok(()) => String::new(), | |||
| Err(err) => format!("{err:?}"), | |||
| }; | |||
| ffi::DoraResult { error } | |||
| } | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| let node = DoraNode::init_from_env().await?; | |||
| let input_stream = node.inputs().await?; | |||
| let mut inputs = Inputs(Box::pin(input_stream)); | |||
| let outputs = OutputSender(&node); | |||
| ffi::cxx_main(&mut inputs, &outputs); | |||
| Ok(()) | |||
| } | |||
| @@ -0,0 +1,71 @@ | |||
| extern "C" | |||
| { | |||
| #include "../../../apis/c/operator/operator_api.h" | |||
| } | |||
| #include <memory> | |||
| #include <iostream> | |||
| #include <vector> | |||
| class Operator | |||
| { | |||
| public: | |||
| Operator(); | |||
| }; | |||
| Operator::Operator() {} | |||
| extern "C" int dora_init_operator(void **operator_context) | |||
| { | |||
| Operator *op = std::make_unique<Operator>().release(); | |||
| *operator_context = (void *)op; | |||
| return 0; | |||
| } | |||
| extern "C" void dora_drop_operator(void *operator_context) | |||
| { | |||
| delete (Operator *)operator_context; | |||
| } | |||
| extern "C" int dora_on_input( | |||
| const char *id_start, | |||
| size_t id_len, | |||
| const char *data_start, | |||
| size_t data_len, | |||
| const int (*output_fn_raw)(const char *id_start, | |||
| size_t id_len, | |||
| const char *data_start, | |||
| size_t data_len, | |||
| const void *output_context), | |||
| void *output_context, | |||
| const void *operator_context) | |||
| { | |||
| std::string id(id_start, id_len); | |||
| std::vector<unsigned char> data; | |||
| for (size_t i = 0; i < data_len; i++) | |||
| { | |||
| data.push_back(*(data_start + i)); | |||
| } | |||
| std::cout | |||
| << "C++ Operator (C-API) received input `" << id << "` with data: ["; | |||
| for (unsigned char &v : data) | |||
| { | |||
| std::cout << (unsigned int)v << ", "; | |||
| } | |||
| std::cout << "]" << std::endl; | |||
| char out = data[0] / 2; | |||
| std::string out_id = "half-status"; | |||
| int result = output_fn_raw(&out_id[0], out_id.length(), &out, 1, output_context); | |||
| if (result != 0) | |||
| { | |||
| std::cerr << "failed to send output" << std::endl; | |||
| return 1; | |||
| } | |||
| return 0; | |||
| } | |||
| @@ -0,0 +1,18 @@ | |||
| [package] | |||
| name = "cxx-dataflow-example-operator-rust-api" | |||
| version = "0.1.0" | |||
| edition = "2021" | |||
| [lib] | |||
| crate-type = ["cdylib"] | |||
| [dependencies] | |||
| cxx = "1.0.73" | |||
| dora-operator-api = { version = "0.1.0", path = "../../../apis/rust/operator" } | |||
| eyre = "0.6.8" | |||
| futures = "0.3.21" | |||
| rand = "0.8.5" | |||
| tokio = { version = "1.20.1", features = ["rt", "macros"] } | |||
| [build-dependencies] | |||
| cxx-build = "1.0.73" | |||
| @@ -0,0 +1,10 @@ | |||
| fn main() { | |||
| cxx_build::bridge("src/lib.rs") // returns a cc::Build | |||
| .file("src/operator.cc") | |||
| .flag_if_supported("-std=c++14") | |||
| .compile("cxx-example-dataflow-operator"); | |||
| println!("cargo:rerun-if-changed=src/lib.rs"); | |||
| println!("cargo:rerun-if-changed=src/operator.cc"); | |||
| println!("cargo:rerun-if-changed=src/operator.h"); | |||
| } | |||
| @@ -0,0 +1,77 @@ | |||
| #![warn(unsafe_op_in_unsafe_fn)] | |||
| use dora_operator_api::{self, register_operator, DoraOperator, DoraOutputSender, DoraStatus}; | |||
| #[cxx::bridge] | |||
| #[allow(unsafe_op_in_unsafe_fn)] | |||
| mod ffi { | |||
| struct OnInputResult { | |||
| error: String, | |||
| stop: bool, | |||
| } | |||
| extern "Rust" { | |||
| type OutputSender<'a>; | |||
| fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> i32; | |||
| } | |||
| unsafe extern "C++" { | |||
| include!("cxx-dataflow-example-operator-rust-api/src/operator.h"); | |||
| type Operator; | |||
| fn new_operator() -> UniquePtr<Operator>; | |||
| fn on_input( | |||
| op: Pin<&mut Operator>, | |||
| id: &str, | |||
| data: &[u8], | |||
| output_sender: &mut OutputSender, | |||
| ) -> OnInputResult; | |||
| } | |||
| } | |||
| pub struct OutputSender<'a>(&'a mut DoraOutputSender); | |||
| fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> i32 { | |||
| match sender.0.send(id, data) { | |||
| Ok(()) => 0, | |||
| Err(err_code) => err_code.try_into().unwrap(), | |||
| } | |||
| } | |||
| register_operator!(OperatorWrapper); | |||
| struct OperatorWrapper { | |||
| operator: cxx::UniquePtr<ffi::Operator>, | |||
| } | |||
| impl Default for OperatorWrapper { | |||
| fn default() -> Self { | |||
| Self { | |||
| operator: ffi::new_operator(), | |||
| } | |||
| } | |||
| } | |||
| impl DoraOperator for OperatorWrapper { | |||
| fn on_input( | |||
| &mut self, | |||
| id: &str, | |||
| data: &[u8], | |||
| output_sender: &mut DoraOutputSender, | |||
| ) -> Result<DoraStatus, ()> { | |||
| let operator = self.operator.as_mut().unwrap(); | |||
| let mut output_sender = OutputSender(output_sender); | |||
| let result = ffi::on_input(operator, id, data, &mut output_sender); | |||
| if result.error.is_empty() { | |||
| Ok(match result.stop { | |||
| false => DoraStatus::Continue, | |||
| true => DoraStatus::Stop, | |||
| }) | |||
| } else { | |||
| Err(()) | |||
| } | |||
| } | |||
| } | |||
| @@ -0,0 +1,30 @@ | |||
| #include "operator.h" | |||
| #include "cxx-dataflow-example-operator-rust-api/src/lib.rs.h" | |||
| #include <iostream> | |||
| Operator::Operator() {} | |||
| std::unique_ptr<Operator> new_operator() | |||
| { | |||
| return std::make_unique<Operator>(); | |||
| } | |||
| OnInputResult on_input(Operator &op, rust::Str id, rust::Slice<const uint8_t> data, OutputSender &output_sender) | |||
| { | |||
| op.counter += 1; | |||
| std::cout << "Rust API operator received input `" << id << "` with data `" << (unsigned int)data[0] << "` (internal counter: " << (unsigned int)op.counter << ")" << std::endl; | |||
| std::vector<unsigned char> out_vec{op.counter}; | |||
| rust::Slice<const uint8_t> out_slice{out_vec.data(), out_vec.size()}; | |||
| auto result_code = send_output(output_sender, rust::Str("status"), out_slice); | |||
| if (result_code == 0) | |||
| { | |||
| OnInputResult result = {rust::String(), false}; | |||
| return result; | |||
| } | |||
| else | |||
| { | |||
| OnInputResult result = {rust::String("error"), false}; | |||
| return result; | |||
| } | |||
| } | |||
| @@ -0,0 +1,17 @@ | |||
| #pragma once | |||
| #include "rust/cxx.h" | |||
| #include <memory> | |||
| class Operator | |||
| { | |||
| public: | |||
| Operator(); | |||
| unsigned char counter; | |||
| }; | |||
| std::unique_ptr<Operator> new_operator(); | |||
| struct OnInputResult; | |||
| struct OutputSender; | |||
| OnInputResult on_input(Operator &op, rust::Str, rust::Slice<const uint8_t>, OutputSender &output_sender); | |||
| @@ -0,0 +1,100 @@ | |||
| use eyre::{bail, Context}; | |||
| use std::path::Path; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| tokio::fs::create_dir_all("build").await?; | |||
| build_package("cxx-dataflow-example-node-rust-api").await?; | |||
| build_package("cxx-dataflow-example-operator-rust-api").await?; | |||
| build_package("dora-node-api-c").await?; | |||
| build_cxx_node( | |||
| root, | |||
| &Path::new("node-c-api").join("main.cc").canonicalize()?, | |||
| "node_c_api", | |||
| ) | |||
| .await?; | |||
| build_cxx_operator( | |||
| &Path::new("operator-c-api") | |||
| .join("operator.cc") | |||
| .canonicalize()?, | |||
| "operator_c_api", | |||
| ) | |||
| .await?; | |||
| build_package("dora-runtime").await?; | |||
| dora_coordinator::run(dora_coordinator::Command::Run { | |||
| dataflow: Path::new("dataflow.yml").to_owned(), | |||
| runtime: Some(root.join("target").join("release").join("dora-runtime")), | |||
| }) | |||
| .await?; | |||
| Ok(()) | |||
| } | |||
| async fn build_package(package: &str) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("build").arg("--release"); | |||
| cmd.arg("--package").arg(package); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build {package}"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| async fn build_cxx_node(root: &Path, path: &Path, out_name: &str) -> eyre::Result<()> { | |||
| let mut clang = tokio::process::Command::new("clang++"); | |||
| clang.arg(path); | |||
| clang.arg("-l").arg("dora_node_api_c"); | |||
| clang.arg("-l").arg("m"); | |||
| clang.arg("-l").arg("rt"); | |||
| clang.arg("-l").arg("dl"); | |||
| clang.arg("-pthread"); | |||
| clang.arg("-L").arg(root.join("target").join("release")); | |||
| clang | |||
| .arg("--output") | |||
| .arg(Path::new("../build").join(out_name)); | |||
| if let Some(parent) = path.parent() { | |||
| clang.current_dir(parent); | |||
| } | |||
| if !clang.status().await?.success() { | |||
| bail!("failed to compile c++ node"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| async fn build_cxx_operator(path: &Path, out_name: &str) -> eyre::Result<()> { | |||
| let object_file_path = Path::new("../build").join(out_name).with_extension("o"); | |||
| let mut compile = tokio::process::Command::new("clang++"); | |||
| compile.arg("-c").arg(path); | |||
| compile.arg("-o").arg(&object_file_path); | |||
| compile.arg("-fPIC"); | |||
| if let Some(parent) = path.parent() { | |||
| compile.current_dir(parent); | |||
| } | |||
| if !compile.status().await?.success() { | |||
| bail!("failed to compile cxx operator"); | |||
| }; | |||
| let mut link = tokio::process::Command::new("clang++"); | |||
| link.arg("-shared").arg(&object_file_path); | |||
| link.arg("-o") | |||
| .arg(Path::new("../build").join(out_name).with_extension("so")); | |||
| if let Some(parent) = path.parent() { | |||
| link.current_dir(parent); | |||
| } | |||
| if !link.status().await?.success() { | |||
| bail!("failed to create shared library from cxx operator (c api)"); | |||
| }; | |||
| Ok(()) | |||
| } | |||