| @@ -3173,6 +3173,7 @@ dependencies = [ | |||
| name = "dora-node-api-cxx" | |||
| version = "0.3.10" | |||
| dependencies = [ | |||
| "arrow 54.2.1", | |||
| "cxx", | |||
| "cxx-build", | |||
| "dora-node-api", | |||
| @@ -146,6 +146,10 @@ path = "examples/rust-dataflow-url/run.rs" | |||
| name = "cxx-dataflow" | |||
| path = "examples/c++-dataflow/run.rs" | |||
| [[example]] | |||
| name = "cxx-dataflow2" | |||
| path = "examples/c++-dataflow2/run.rs" | |||
| [[example]] | |||
| name = "python-dataflow" | |||
| path = "examples/python-dataflow/run.rs" | |||
| @@ -32,6 +32,7 @@ dora-ros2-bridge = { workspace = true, optional = true } | |||
| futures-lite = { version = "2.2" } | |||
| serde = { version = "1.0.164", features = ["derive"], optional = true } | |||
| serde-big-array = { version = "0.5.1", optional = true } | |||
| arrow = { workspace = true, features = ["ffi"] } | |||
| [build-dependencies] | |||
| cxx-build = "1.0.73" | |||
| @@ -12,6 +12,7 @@ use eyre::bail; | |||
| use dora_ros2_bridge::{_core, ros2_client}; | |||
| use futures_lite::{stream, Stream, StreamExt}; | |||
| #[cxx::bridge] | |||
| #[allow(clippy::needless_lifetimes)] | |||
| mod ffi { | |||
| @@ -71,9 +72,26 @@ mod ffi { | |||
| fn is_dora(self: &CombinedEvent) -> bool; | |||
| fn downcast_dora(event: CombinedEvent) -> Result<Box<DoraEvent>>; | |||
| unsafe fn send_arrow_output( | |||
| output_sender: &mut Box<OutputSender>, | |||
| id: String, | |||
| array_ptr: *mut u8, | |||
| schema_ptr: *mut u8, | |||
| ) -> DoraResult; | |||
| unsafe fn event_as_arrow_input( | |||
| event: Box<DoraEvent>, | |||
| out_array: *mut u8, | |||
| out_schema: *mut u8, | |||
| ) -> DoraResult; | |||
| } | |||
| } | |||
| mod arrow_ffi { | |||
| pub use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; | |||
| } | |||
| #[cfg(feature = "ros2-bridge")] | |||
| pub mod ros2 { | |||
| pub use dora_ros2_bridge::*; | |||
| @@ -161,6 +179,41 @@ fn event_as_input(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraInput> { | |||
| }) | |||
| } | |||
| unsafe fn event_as_arrow_input( | |||
| event: Box<DoraEvent>, | |||
| out_array: *mut u8, | |||
| out_schema: *mut u8, | |||
| ) -> ffi::DoraResult { | |||
| // Cast to Arrow FFI types | |||
| let out_array = out_array as *mut arrow::ffi::FFI_ArrowArray; | |||
| let out_schema = out_schema as *mut arrow::ffi::FFI_ArrowSchema; | |||
| let Some(Event::Input { id: _, metadata: _, data }) = event.0 else { | |||
| return ffi::DoraResult { error: "Not an input event".to_string() }; | |||
| }; | |||
| if out_array.is_null() || out_schema.is_null() { | |||
| return ffi::DoraResult { | |||
| error: "Received null output pointer".to_string() | |||
| }; | |||
| } | |||
| let array_data = data.to_data(); | |||
| match arrow::ffi::to_ffi(&array_data.clone()) { | |||
| Ok((ffi_array, ffi_schema)) => { | |||
| std::ptr::write(out_array, ffi_array); | |||
| std::ptr::write(out_schema, ffi_schema); | |||
| ffi::DoraResult { error: String::new() } | |||
| }, | |||
| Err(e) => { | |||
| ffi::DoraResult { | |||
| error: format!("Error exporting Arrow array to C++: {:?}", e) | |||
| } | |||
| } | |||
| } | |||
| } | |||
| pub struct OutputSender(dora_node_api::DoraNode); | |||
| fn send_output(sender: &mut Box<OutputSender>, id: String, data: &[u8]) -> ffi::DoraResult { | |||
| @@ -180,6 +233,43 @@ pub struct MergedEvents { | |||
| events: Option<Box<dyn Stream<Item = MergedEvent<ExternalEvent>> + Unpin>>, | |||
| next_id: u32, | |||
| } | |||
| unsafe fn send_arrow_output( | |||
| sender: &mut Box<OutputSender>, | |||
| id: String, | |||
| array_ptr: *mut u8, | |||
| schema_ptr: *mut u8 | |||
| ) -> ffi::DoraResult { | |||
| let array_ptr = array_ptr as *mut arrow::ffi::FFI_ArrowArray; | |||
| let schema_ptr = schema_ptr as *mut arrow::ffi::FFI_ArrowSchema; | |||
| if array_ptr.is_null() || schema_ptr.is_null() { | |||
| return ffi::DoraResult { | |||
| error: "Received null Arrow array or schema pointer".to_string() | |||
| }; | |||
| } | |||
| let array = std::ptr::read(array_ptr); | |||
| let schema = std::ptr::read(schema_ptr); | |||
| std::ptr::write(array_ptr, std::mem::zeroed()); | |||
| std::ptr::write(schema_ptr, std::mem::zeroed()); | |||
| match arrow::ffi::from_ffi(array, &schema) { | |||
| Ok(array_data) => { | |||
| let arrow_array = arrow::array::make_array(array_data); | |||
| let result = sender.0.send_output(id.into(), Default::default(), arrow_array); | |||
| match result { | |||
| Ok(()) => ffi::DoraResult { error: String::new() }, | |||
| Err(err) => ffi::DoraResult { error: format!("{err:?}") }, | |||
| } | |||
| }, | |||
| Err(e) => { | |||
| ffi::DoraResult { | |||
| error: format!("Error importing array from C++: {:?}", e) | |||
| } | |||
| } | |||
| } | |||
| } | |||
| impl MergedEvents { | |||
| fn next(&mut self) -> MergedDoraEvent { | |||
| @@ -0,0 +1 @@ | |||
| *.o | |||
| @@ -0,0 +1,20 @@ | |||
| # Dora C++ Dataflow Example | |||
| This example shows usage of event_as_arrow_input() and send_arrow_output(). We can send and recieve arrow arrays using these functions which can be serialized and deserialized on either files easliy. These functions are implemented in rust and are provided through dora-node-api.h. Currently this requires to have arrow installed on user sytsem as required during build process. | |||
| ## Compile and Run | |||
| To try it out, you can use the [`run.rs`](./run.rs) binary. It performs all required build steps and then starts the dataflow. Use the following command to run it: `cargo run --example cxx-dataflow2`. | |||
| **Build the dora coordinator and runtime:** | |||
| - Build the `dora-coordinator` executable using `cargo build -p dora-coordinator --release` | |||
| - Build the `dora-runtime` executable using `cargo build -p dora-runtime --release` | |||
| **Run the dataflow:** | |||
| - Start the `dora-coordinator`, passing the paths to the dataflow file and the `dora-runtime` as arguments: | |||
| ``` | |||
| ../../target/release/dora-daemon --run-dataflow dataflow.yml ../../target/release/dora-runtime | |||
| ``` | |||
| @@ -0,0 +1,14 @@ | |||
| nodes: | |||
| - id: cxx-node-rust-api | |||
| path: build/node_rust_api | |||
| inputs: | |||
| tick: dora/timer/millis/300 | |||
| outputs: | |||
| - counter | |||
| - id: cxx-node-rust2-api | |||
| path: build/node_rust_api | |||
| inputs: | |||
| tick: cxx-node-rust-api/counter | |||
| outputs: | |||
| - counter | |||
| @@ -0,0 +1,85 @@ | |||
| extern "C" | |||
| { | |||
| #include "../../../apis/c/node/node_api.h" | |||
| } | |||
| #include <iostream> | |||
| #include <vector> | |||
| int run(void *dora_context) | |||
| { | |||
| unsigned char counter = 0; | |||
| for (int i = 0; i < 20; i++) | |||
| { | |||
| void *event = dora_next_event(dora_context); | |||
| if (event == NULL) | |||
| { | |||
| printf("[c node] ERROR: unexpected end of event\n"); | |||
| return -1; | |||
| } | |||
| enum DoraEventType ty = read_dora_event_type(event); | |||
| if (ty == DoraEventType_Input) | |||
| { | |||
| counter += 1; | |||
| char *id_ptr; | |||
| size_t id_len; | |||
| read_dora_input_id(event, &id_ptr, &id_len); | |||
| std::string id(id_ptr, id_len); | |||
| char *data_ptr; | |||
| size_t data_len; | |||
| read_dora_input_data(event, &data_ptr, &data_len); | |||
| std::vector<unsigned char> data; | |||
| for (size_t i = 0; i < data_len; i++) | |||
| { | |||
| data.push_back(*(data_ptr + i)); | |||
| } | |||
| std::cout | |||
| << "Received input " | |||
| << " (counter: " << (unsigned int)counter << ") data: ["; | |||
| for (unsigned char &v : data) | |||
| { | |||
| std::cout << (unsigned int)v << ", "; | |||
| } | |||
| std::cout << "]" << std::endl; | |||
| std::vector<unsigned char> out_vec{counter}; | |||
| std::string out_id = "counter"; | |||
| int result = dora_send_output(dora_context, &out_id[0], out_id.length(), (char *)&counter, 1); | |||
| if (result != 0) | |||
| { | |||
| std::cerr << "failed to send output" << std::endl; | |||
| return 1; | |||
| } | |||
| } | |||
| else if (ty == DoraEventType_Stop) | |||
| { | |||
| printf("[c node] received stop event\n"); | |||
| } | |||
| else | |||
| { | |||
| printf("[c node] received unexpected event: %d\n", ty); | |||
| } | |||
| free_dora_event(event); | |||
| } | |||
| return 0; | |||
| } | |||
| int main() | |||
| { | |||
| std::cout << "HELLO FROM C++ (using C API)" << std::endl; | |||
| auto dora_context = init_dora_context_from_env(); | |||
| auto ret = run(dora_context); | |||
| free_dora_context(dora_context); | |||
| std::cout << "GOODBYE FROM C++ node (using C API)" << std::endl; | |||
| return ret; | |||
| } | |||
| @@ -0,0 +1,112 @@ | |||
| #include "../build/dora-node-api.h" | |||
| #include <arrow/api.h> | |||
| #include <arrow/c/bridge.h> | |||
| #include <iostream> | |||
| #include <memory> | |||
| #include <string> | |||
| #include <vector> | |||
| std::shared_ptr<arrow::Array> receive_and_print_input(rust::cxxbridge1::Box<DoraEvent> event) { | |||
| std::cout << "Received input event" << std::endl; | |||
| struct ArrowArray c_array; | |||
| struct ArrowSchema c_schema; | |||
| auto result = event_as_arrow_input( | |||
| std::move(event), | |||
| reinterpret_cast<uint8_t*>(&c_array), | |||
| reinterpret_cast<uint8_t*>(&c_schema) | |||
| ); | |||
| if (!result.error.empty()) { | |||
| std::cerr << "Error getting Arrow array: " << std::endl; | |||
| return nullptr; | |||
| } | |||
| auto result2 = arrow::ImportArray(&c_array, &c_schema); | |||
| std::shared_ptr<arrow::Array> input_array = result2.ValueOrDie(); | |||
| std::cout << "Received Arrow array: " << input_array->ToString() << std::endl; | |||
| std::cout << "Array details: type=" << input_array->type()->ToString() | |||
| << ", length=" << input_array->length() << std::endl; | |||
| return input_array; | |||
| } | |||
| // To send output | |||
| bool send_output(DoraNode& dora_node, std::shared_ptr<arrow::Array> output_array) { | |||
| if (!output_array) { | |||
| std::cerr << "Error: Attempted to send a null Arrow array" << std::endl; | |||
| return false; | |||
| } | |||
| struct ArrowArray out_c_array; | |||
| struct ArrowSchema out_c_schema; | |||
| arrow::ExportArray(*output_array, &out_c_array, &out_c_schema); | |||
| auto send_result = send_arrow_output( | |||
| dora_node.send_output, | |||
| "counter", | |||
| reinterpret_cast<uint8_t*>(&out_c_array), | |||
| reinterpret_cast<uint8_t*>(&out_c_schema) | |||
| ); | |||
| if (!send_result.error.empty()) { | |||
| std::string error_message(send_result.error); | |||
| std::cerr << "Error sending Arrow array: " << error_message << std::endl; | |||
| return false; | |||
| } | |||
| return true; | |||
| } | |||
| int main() { | |||
| try { | |||
| auto dora_node = init_dora_node(); | |||
| std::cout << "Dora node initialized successfully" << std::endl; | |||
| int counter=0; | |||
| while (counter<10) { | |||
| counter++; | |||
| auto event = dora_node.events->next(); | |||
| auto type = event_type(event); | |||
| if (type == DoraEventType::Stop) { | |||
| std::cout << "Received stop event, exiting" << std::endl; | |||
| break; | |||
| } | |||
| else if (type == DoraEventType::AllInputsClosed) { | |||
| std::cout << "All inputs closed, exiting" << std::endl; | |||
| break; | |||
| } | |||
| else if (type == DoraEventType::Input) { | |||
| std::shared_ptr<arrow::Array> input_array = receive_and_print_input(std::move(event)); | |||
| std::shared_ptr<arrow::Array> output_array; | |||
| arrow::Int32Builder builder; | |||
| builder.Append(10); | |||
| builder.Append(100); | |||
| builder.Append(1000); | |||
| builder.Finish(&output_array); | |||
| std::cout << "Created new string array: " << output_array->ToString() << std::endl; | |||
| //Printing Before sending | |||
| auto str_array = std::static_pointer_cast<arrow::Int32Array>(output_array); | |||
| std::cout << "Values: ["; | |||
| for (int i = 0; i < str_array->length(); i++) { | |||
| if (i > 0) std::cout << ", "; | |||
| std::cout << str_array->Value(i); | |||
| } | |||
| std::cout << "]" << std::endl; | |||
| send_output(dora_node, output_array); | |||
| } | |||
| } | |||
| return 0; | |||
| } | |||
| catch (const std::exception& e) { | |||
| std::cerr << "Error: " << e.what() << std::endl; | |||
| return 1; | |||
| } | |||
| } | |||
| @@ -0,0 +1,79 @@ | |||
| extern "C" | |||
| { | |||
| #include "../../../apis/c/operator/operator_api.h" | |||
| } | |||
| #include <memory> | |||
| #include <iostream> | |||
| #include <vector> | |||
| #include <string.h> | |||
| #include <cassert> | |||
| class Operator | |||
| { | |||
| public: | |||
| Operator(); | |||
| }; | |||
| Operator::Operator() {} | |||
| extern "C" DoraInitResult_t dora_init_operator() | |||
| { | |||
| Operator *op = std::make_unique<Operator>().release(); | |||
| DoraInitResult_t result = {.operator_context = (void *)op}; | |||
| return result; | |||
| } | |||
| extern "C" DoraResult_t dora_drop_operator(void *operator_context) | |||
| { | |||
| delete (Operator *)operator_context; | |||
| return {}; | |||
| } | |||
| extern "C" OnEventResult_t dora_on_event( | |||
| RawEvent_t *event, | |||
| const SendOutput_t *send_output, | |||
| void *operator_context) | |||
| { | |||
| if (event->input != NULL) | |||
| { | |||
| // input event | |||
| Input_t *input = event->input; | |||
| char *id = dora_read_input_id(input); | |||
| Vec_uint8_t data = dora_read_data(input); | |||
| assert(data.ptr != NULL); | |||
| std::cout | |||
| << "C++ Operator (C-API) received input `" << id << "` with data: ["; | |||
| for (int i = 0; i < data.len; i++) | |||
| { | |||
| std::cout << (unsigned int)data.ptr[i] << ", "; | |||
| } | |||
| std::cout << "]" << std::endl; | |||
| const char *out_id = "half-status"; | |||
| char *out_id_heap = strdup(out_id); | |||
| size_t out_data_len = 1; | |||
| uint8_t *out_data_heap = (uint8_t *)malloc(out_data_len); | |||
| *out_data_heap = *data.ptr / 2; | |||
| DoraResult_t send_result = dora_send_operator_output(send_output, out_id_heap, out_data_heap, out_data_len); | |||
| OnEventResult_t result = {.result = send_result, .status = DORA_STATUS_CONTINUE}; | |||
| dora_free_data(data); | |||
| dora_free_input_id(id); | |||
| return result; | |||
| } | |||
| if (event->stop) | |||
| { | |||
| printf("C operator received stop event\n"); | |||
| } | |||
| OnEventResult_t result = {.status = DORA_STATUS_CONTINUE}; | |||
| return result; | |||
| } | |||
| @@ -0,0 +1,23 @@ | |||
| #include "operator.h" | |||
| #include <iostream> | |||
| #include <vector> | |||
| #include "../build/dora-operator-api.h" | |||
| Operator::Operator() {} | |||
| std::unique_ptr<Operator> new_operator() | |||
| { | |||
| return std::make_unique<Operator>(); | |||
| } | |||
| DoraOnInputResult on_input(Operator &op, rust::Str id, rust::Slice<const uint8_t> data, OutputSender &output_sender) | |||
| { | |||
| op.counter += 1; | |||
| std::cout << "Rust API operator received input `" << id.data() << "` with data `" << (unsigned int)data[0] << "` (internal counter: " << (unsigned int)op.counter << ")" << std::endl; | |||
| std::vector<unsigned char> out_vec{op.counter}; | |||
| rust::Slice<const uint8_t> out_slice{out_vec.data(), out_vec.size()}; | |||
| auto send_result = send_output(output_sender, rust::Str("status"), out_slice); | |||
| DoraOnInputResult result = {send_result.error, false}; | |||
| return result; | |||
| } | |||
| @@ -0,0 +1,16 @@ | |||
| #pragma once | |||
| #include <memory> | |||
| #include "../../../apis/c/operator/operator_api.h" | |||
| class Operator | |||
| { | |||
| public: | |||
| Operator(); | |||
| unsigned char counter; | |||
| }; | |||
| #include "../build/dora-operator-api.h" | |||
| std::unique_ptr<Operator> new_operator(); | |||
| DoraOnInputResult on_input(Operator &op, rust::Str id, rust::Slice<const uint8_t> data, OutputSender &output_sender); | |||
| @@ -0,0 +1,332 @@ | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, Context}; | |||
| use std::{ | |||
| env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, | |||
| path::Path, | |||
| process::Command, | |||
| }; | |||
| struct ArrowConfig { | |||
| cflags: String, | |||
| libs: String, | |||
| } | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| set_up_tracing("c++-dataflow-runner").wrap_err("failed to set up tracing")?; | |||
| if cfg!(windows) { | |||
| tracing::error!( | |||
| "The c++ example does not work on Windows currently because of a linker error" | |||
| ); | |||
| return Ok(()); | |||
| } | |||
| let arrow_config = find_arrow_config().wrap_err("Failed to find Arrow configuration")?; | |||
| tracing::info!("Found Arrow configuration: cflags={}, libs={}", arrow_config.cflags, arrow_config.libs); | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let target = root.join("target"); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| tokio::fs::create_dir_all("build").await?; | |||
| let build_dir = Path::new("build"); | |||
| build_package("dora-node-api-cxx").await?; | |||
| let node_cxxbridge = target | |||
| .join("cxxbridge") | |||
| .join("dora-node-api-cxx") | |||
| .join("src"); | |||
| tokio::fs::copy( | |||
| node_cxxbridge.join("lib.rs.cc"), | |||
| build_dir.join("node-bridge.cc"), | |||
| ) | |||
| .await?; | |||
| tokio::fs::copy( | |||
| node_cxxbridge.join("lib.rs.h"), | |||
| build_dir.join("dora-node-api.h"), | |||
| ) | |||
| .await?; | |||
| tokio::fs::write( | |||
| build_dir.join("operator.h"), | |||
| r###"#include "../operator-rust-api/operator.h""###, | |||
| ) | |||
| .await?; | |||
| build_package("dora-operator-api-cxx").await?; | |||
| let operator_cxxbridge = target | |||
| .join("cxxbridge") | |||
| .join("dora-operator-api-cxx") | |||
| .join("src"); | |||
| tokio::fs::copy( | |||
| operator_cxxbridge.join("lib.rs.cc"), | |||
| build_dir.join("operator-bridge.cc"), | |||
| ) | |||
| .await?; | |||
| tokio::fs::copy( | |||
| operator_cxxbridge.join("lib.rs.h"), | |||
| build_dir.join("dora-operator-api.h"), | |||
| ) | |||
| .await?; | |||
| build_package("dora-node-api-c").await?; | |||
| build_package("dora-operator-api-c").await?; | |||
| build_cxx_node( | |||
| root, | |||
| &[ | |||
| &dunce::canonicalize(Path::new("node-rust-api").join("main.cc"))?, | |||
| &dunce::canonicalize(build_dir.join("node-bridge.cc"))?, | |||
| ], | |||
| "node_rust_api", | |||
| &["-l", "dora_node_api_cxx", &arrow_config.cflags, &arrow_config.libs], | |||
| ) | |||
| .await?; | |||
| build_cxx_node( | |||
| root, | |||
| &[&dunce::canonicalize( | |||
| Path::new("node-c-api").join("main.cc"), | |||
| )?], | |||
| "node_c_api", | |||
| &["-l", "dora_node_api_c"], | |||
| ) | |||
| .await?; | |||
| build_cxx_operator( | |||
| &[ | |||
| &dunce::canonicalize(Path::new("operator-rust-api").join("operator.cc"))?, | |||
| &dunce::canonicalize(build_dir.join("operator-bridge.cc"))?, | |||
| ], | |||
| "operator_rust_api", | |||
| &[ | |||
| "-l", "dora_operator_api_cxx", | |||
| "-L", root.join("target").join("debug").to_str().unwrap(), | |||
| &arrow_config.cflags, &arrow_config.libs, | |||
| ], | |||
| ) | |||
| .await?; | |||
| build_cxx_operator( | |||
| &[&dunce::canonicalize( | |||
| Path::new("operator-c-api").join("operator.cc"), | |||
| )?], | |||
| "operator_c_api", | |||
| &[ | |||
| "-l", "dora_operator_api_c", | |||
| "-L", root.join("target").join("debug").to_str().unwrap(), | |||
| ], | |||
| ) | |||
| .await?; | |||
| let dataflow = Path::new("dataflow.yml").to_owned(); | |||
| build_package("dora-runtime").await?; | |||
| run_dataflow(&dataflow).await?; | |||
| Ok(()) | |||
| } | |||
| fn find_arrow_config() -> eyre::Result<ArrowConfig> { | |||
| let output = Command::new("pkg-config") | |||
| .args(&["--cflags", "arrow"]) | |||
| .output() | |||
| .wrap_err("Failed to run pkg-config. Make sure Arrow C++ is installed")?; | |||
| if !output.status.success() { | |||
| bail!("Arrow C++ not found via pkg-config. Make sure it's installed and in your PKG_CONFIG_PATH"); | |||
| } | |||
| let cflags = String::from_utf8(output.stdout)?.trim().to_string(); | |||
| let output = Command::new("pkg-config") | |||
| .args(&["--libs", "arrow"]) | |||
| .output() | |||
| .wrap_err("Failed to get Arrow library flags")?; | |||
| if !output.status.success() { | |||
| bail!("Failed to get Arrow library flags"); | |||
| } | |||
| let libs = String::from_utf8(output.stdout)?.trim().to_string(); | |||
| Ok(ArrowConfig { cflags, libs }) | |||
| } | |||
| async fn build_package(package: &str) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("build"); | |||
| cmd.arg("--package").arg(package); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build {package}"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| async fn build_cxx_node( | |||
| root: &Path, | |||
| paths: &[&Path], | |||
| out_name: &str, | |||
| args: &[&str], | |||
| ) -> eyre::Result<()> { | |||
| let mut clang = tokio::process::Command::new("clang++"); | |||
| clang.args(paths); | |||
| clang.arg("-std=c++17"); | |||
| #[cfg(target_os = "linux")] | |||
| { | |||
| clang.arg("-l").arg("m"); | |||
| clang.arg("-l").arg("rt"); | |||
| clang.arg("-l").arg("dl"); | |||
| clang.arg("-pthread"); | |||
| } | |||
| #[cfg(target_os = "windows")] | |||
| { | |||
| clang.arg("-ladvapi32"); | |||
| clang.arg("-luserenv"); | |||
| clang.arg("-lkernel32"); | |||
| clang.arg("-lws2_32"); | |||
| clang.arg("-lbcrypt"); | |||
| clang.arg("-lncrypt"); | |||
| clang.arg("-lschannel"); | |||
| clang.arg("-lntdll"); | |||
| clang.arg("-liphlpapi"); | |||
| clang.arg("-lcfgmgr32"); | |||
| clang.arg("-lcredui"); | |||
| clang.arg("-lcrypt32"); | |||
| clang.arg("-lcryptnet"); | |||
| clang.arg("-lfwpuclnt"); | |||
| clang.arg("-lgdi32"); | |||
| clang.arg("-lmsimg32"); | |||
| clang.arg("-lmswsock"); | |||
| clang.arg("-lole32"); | |||
| clang.arg("-lopengl32"); | |||
| clang.arg("-lsecur32"); | |||
| clang.arg("-lshell32"); | |||
| clang.arg("-lsynchronization"); | |||
| clang.arg("-luser32"); | |||
| clang.arg("-lwinspool"); | |||
| clang.arg("-Wl,-nodefaultlib:libcmt"); | |||
| clang.arg("-D_DLL"); | |||
| clang.arg("-lmsvcrt"); | |||
| } | |||
| #[cfg(target_os = "macos")] | |||
| { | |||
| clang.arg("-framework").arg("CoreServices"); | |||
| clang.arg("-framework").arg("Security"); | |||
| clang.arg("-l").arg("System"); | |||
| clang.arg("-l").arg("resolv"); | |||
| clang.arg("-l").arg("pthread"); | |||
| clang.arg("-l").arg("c"); | |||
| clang.arg("-l").arg("m"); | |||
| } | |||
| clang.args(args); | |||
| clang.arg("-L").arg(root.join("target").join("debug")); | |||
| clang | |||
| .arg("--output") | |||
| .arg(Path::new("../build").join(format!("{out_name}{EXE_SUFFIX}"))); | |||
| if let Some(parent) = paths[0].parent() { | |||
| clang.current_dir(parent); | |||
| } | |||
| if !clang.status().await?.success() { | |||
| bail!("failed to compile c++ node"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| async fn build_cxx_operator( | |||
| paths: &[&Path], | |||
| out_name: &str, | |||
| link_args: &[&str], | |||
| ) -> eyre::Result<()> { | |||
| let mut object_file_paths = Vec::new(); | |||
| for path in paths { | |||
| let mut compile = tokio::process::Command::new("clang++"); | |||
| compile.arg("-c").arg(path); | |||
| compile.arg("-std=c++17"); | |||
| let object_file_path = path.with_extension("o"); | |||
| compile.arg("-o").arg(&object_file_path); | |||
| #[cfg(unix)] | |||
| compile.arg("-fPIC"); | |||
| if let Some(parent) = path.parent() { | |||
| compile.current_dir(parent); | |||
| } | |||
| if !compile.status().await?.success() { | |||
| bail!("failed to compile cxx operator"); | |||
| }; | |||
| object_file_paths.push(object_file_path); | |||
| } | |||
| let mut link = tokio::process::Command::new("clang++"); | |||
| link.arg("-shared").args(&object_file_paths); | |||
| link.args(link_args); | |||
| #[cfg(target_os = "windows")] | |||
| { | |||
| link.arg("-ladvapi32"); | |||
| link.arg("-luserenv"); | |||
| link.arg("-lkernel32"); | |||
| link.arg("-lws2_32"); | |||
| link.arg("-lbcrypt"); | |||
| link.arg("-lncrypt"); | |||
| link.arg("-lschannel"); | |||
| link.arg("-lntdll"); | |||
| link.arg("-liphlpapi"); | |||
| link.arg("-lcfgmgr32"); | |||
| link.arg("-lcredui"); | |||
| link.arg("-lcrypt32"); | |||
| link.arg("-lcryptnet"); | |||
| link.arg("-lfwpuclnt"); | |||
| link.arg("-lgdi32"); | |||
| link.arg("-lmsimg32"); | |||
| link.arg("-lmswsock"); | |||
| link.arg("-lole32"); | |||
| link.arg("-lopengl32"); | |||
| link.arg("-lsecur32"); | |||
| link.arg("-lshell32"); | |||
| link.arg("-lsynchronization"); | |||
| link.arg("-luser32"); | |||
| link.arg("-lwinspool"); | |||
| link.arg("-Wl,-nodefaultlib:libcmt"); | |||
| link.arg("-D_DLL"); | |||
| link.arg("-lmsvcrt"); | |||
| link.arg("-fms-runtime-lib=static"); | |||
| } | |||
| #[cfg(target_os = "macos")] | |||
| { | |||
| link.arg("-framework").arg("CoreServices"); | |||
| link.arg("-framework").arg("Security"); | |||
| link.arg("-l").arg("System"); | |||
| link.arg("-l").arg("resolv"); | |||
| link.arg("-l").arg("pthread"); | |||
| link.arg("-l").arg("c"); | |||
| link.arg("-l").arg("m"); | |||
| } | |||
| link.arg("-o") | |||
| .arg(Path::new("../build").join(format!("{DLL_PREFIX}{out_name}{DLL_SUFFIX}"))); | |||
| if let Some(parent) = paths[0].parent() { | |||
| link.current_dir(parent); | |||
| } | |||
| if !link.status().await?.success() { | |||
| bail!("failed to create shared library from cxx operator (c api)"); | |||
| }; | |||
| Ok(()) | |||
| } | |||