From eb69afe89053b8e237298992937ea40da2438dee Mon Sep 17 00:00:00 2001 From: Mati-ur-rehman-017 Date: Mon, 24 Mar 2025 15:33:08 +0500 Subject: [PATCH 01/12] Functions for sending and recieving data using Arrow::FFI --- Cargo.lock | 1 + Cargo.toml | 4 + apis/c++/node/Cargo.toml | 1 + apis/c++/node/src/lib.rs | 90 +++++ examples/c++-dataflow2/.gitignore | 1 + examples/c++-dataflow2/README.md | 20 ++ examples/c++-dataflow2/dataflow.yml | 14 + examples/c++-dataflow2/node-c-api/main.cc | 85 +++++ examples/c++-dataflow2/node-rust-api/main.cc | 112 ++++++ .../c++-dataflow2/operator-c-api/operator.cc | 79 +++++ .../operator-rust-api/operator.cc | 23 ++ .../operator-rust-api/operator.h | 16 + examples/c++-dataflow2/run.rs | 332 ++++++++++++++++++ 13 files changed, 778 insertions(+) create mode 100644 examples/c++-dataflow2/.gitignore create mode 100644 examples/c++-dataflow2/README.md create mode 100644 examples/c++-dataflow2/dataflow.yml create mode 100644 examples/c++-dataflow2/node-c-api/main.cc create mode 100644 examples/c++-dataflow2/node-rust-api/main.cc create mode 100644 examples/c++-dataflow2/operator-c-api/operator.cc create mode 100644 examples/c++-dataflow2/operator-rust-api/operator.cc create mode 100644 examples/c++-dataflow2/operator-rust-api/operator.h create mode 100644 examples/c++-dataflow2/run.rs diff --git a/Cargo.lock b/Cargo.lock index 5b4e2397..c5187e08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3173,6 +3173,7 @@ dependencies = [ name = "dora-node-api-cxx" version = "0.3.10" dependencies = [ + "arrow 54.2.1", "cxx", "cxx-build", "dora-node-api", diff --git a/Cargo.toml b/Cargo.toml index 2b448807..5812d49d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -146,6 +146,10 @@ path = "examples/rust-dataflow-url/run.rs" name = "cxx-dataflow" path = "examples/c++-dataflow/run.rs" +[[example]] +name = "cxx-dataflow2" +path = "examples/c++-dataflow2/run.rs" + [[example]] name = "python-dataflow" path = "examples/python-dataflow/run.rs" diff --git a/apis/c++/node/Cargo.toml b/apis/c++/node/Cargo.toml index 562ffa37..4084d42e 100644 --- a/apis/c++/node/Cargo.toml +++ b/apis/c++/node/Cargo.toml @@ -32,6 +32,7 @@ dora-ros2-bridge = { workspace = true, optional = true } futures-lite = { version = "2.2" } serde = { version = "1.0.164", features = ["derive"], optional = true } serde-big-array = { version = "0.5.1", optional = true } +arrow = { workspace = true, features = ["ffi"] } [build-dependencies] cxx-build = "1.0.73" diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index fea50fba..18df97d7 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -12,6 +12,7 @@ use eyre::bail; use dora_ros2_bridge::{_core, ros2_client}; use futures_lite::{stream, Stream, StreamExt}; + #[cxx::bridge] #[allow(clippy::needless_lifetimes)] mod ffi { @@ -71,9 +72,26 @@ mod ffi { fn is_dora(self: &CombinedEvent) -> bool; fn downcast_dora(event: CombinedEvent) -> Result>; + + unsafe fn send_arrow_output( + output_sender: &mut Box, + id: String, + array_ptr: *mut u8, + schema_ptr: *mut u8, + ) -> DoraResult; + + unsafe fn event_as_arrow_input( + event: Box, + out_array: *mut u8, + out_schema: *mut u8, + ) -> DoraResult; } } +mod arrow_ffi { + pub use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; +} + #[cfg(feature = "ros2-bridge")] pub mod ros2 { pub use dora_ros2_bridge::*; @@ -161,6 +179,41 @@ fn event_as_input(event: Box) -> eyre::Result { }) } +unsafe fn event_as_arrow_input( + event: Box, + out_array: *mut u8, + out_schema: *mut u8, +) -> ffi::DoraResult { + // Cast to Arrow FFI types + let out_array = out_array as *mut arrow::ffi::FFI_ArrowArray; + let out_schema = out_schema as *mut arrow::ffi::FFI_ArrowSchema; + + let Some(Event::Input { id: _, metadata: _, data }) = event.0 else { + return ffi::DoraResult { error: "Not an input event".to_string() }; + }; + + if out_array.is_null() || out_schema.is_null() { + return ffi::DoraResult { + error: "Received null output pointer".to_string() + }; + } + + let array_data = data.to_data(); + + match arrow::ffi::to_ffi(&array_data.clone()) { + Ok((ffi_array, ffi_schema)) => { + std::ptr::write(out_array, ffi_array); + std::ptr::write(out_schema, ffi_schema); + ffi::DoraResult { error: String::new() } + }, + Err(e) => { + ffi::DoraResult { + error: format!("Error exporting Arrow array to C++: {:?}", e) + } + } + } +} + pub struct OutputSender(dora_node_api::DoraNode); fn send_output(sender: &mut Box, id: String, data: &[u8]) -> ffi::DoraResult { @@ -180,6 +233,43 @@ pub struct MergedEvents { events: Option> + Unpin>>, next_id: u32, } +unsafe fn send_arrow_output( + sender: &mut Box, + id: String, + array_ptr: *mut u8, + schema_ptr: *mut u8 +) -> ffi::DoraResult { + let array_ptr = array_ptr as *mut arrow::ffi::FFI_ArrowArray; + let schema_ptr = schema_ptr as *mut arrow::ffi::FFI_ArrowSchema; + + if array_ptr.is_null() || schema_ptr.is_null() { + return ffi::DoraResult { + error: "Received null Arrow array or schema pointer".to_string() + }; + } + + let array = std::ptr::read(array_ptr); + let schema = std::ptr::read(schema_ptr); + + std::ptr::write(array_ptr, std::mem::zeroed()); + std::ptr::write(schema_ptr, std::mem::zeroed()); + + match arrow::ffi::from_ffi(array, &schema) { + Ok(array_data) => { + let arrow_array = arrow::array::make_array(array_data); + let result = sender.0.send_output(id.into(), Default::default(), arrow_array); + match result { + Ok(()) => ffi::DoraResult { error: String::new() }, + Err(err) => ffi::DoraResult { error: format!("{err:?}") }, + } + }, + Err(e) => { + ffi::DoraResult { + error: format!("Error importing array from C++: {:?}", e) + } + } + } +} impl MergedEvents { fn next(&mut self) -> MergedDoraEvent { diff --git a/examples/c++-dataflow2/.gitignore b/examples/c++-dataflow2/.gitignore new file mode 100644 index 00000000..5761abcf --- /dev/null +++ b/examples/c++-dataflow2/.gitignore @@ -0,0 +1 @@ +*.o diff --git a/examples/c++-dataflow2/README.md b/examples/c++-dataflow2/README.md new file mode 100644 index 00000000..bcbe9312 --- /dev/null +++ b/examples/c++-dataflow2/README.md @@ -0,0 +1,20 @@ +# Dora C++ Dataflow Example + +This example shows usage of event_as_arrow_input() and send_arrow_output(). We can send and recieve arrow arrays using these functions which can be serialized and deserialized on either files easliy. These functions are implemented in rust and are provided through dora-node-api.h. Currently this requires to have arrow installed on user sytsem as required during build process. + +## Compile and Run + +To try it out, you can use the [`run.rs`](./run.rs) binary. It performs all required build steps and then starts the dataflow. Use the following command to run it: `cargo run --example cxx-dataflow2`. + +**Build the dora coordinator and runtime:** + +- Build the `dora-coordinator` executable using `cargo build -p dora-coordinator --release` +- Build the `dora-runtime` executable using `cargo build -p dora-runtime --release` + +**Run the dataflow:** + +- Start the `dora-coordinator`, passing the paths to the dataflow file and the `dora-runtime` as arguments: + + ``` + ../../target/release/dora-daemon --run-dataflow dataflow.yml ../../target/release/dora-runtime + ``` diff --git a/examples/c++-dataflow2/dataflow.yml b/examples/c++-dataflow2/dataflow.yml new file mode 100644 index 00000000..fd32a014 --- /dev/null +++ b/examples/c++-dataflow2/dataflow.yml @@ -0,0 +1,14 @@ +nodes: + - id: cxx-node-rust-api + path: build/node_rust_api + inputs: + tick: dora/timer/millis/300 + outputs: + - counter + + - id: cxx-node-rust2-api + path: build/node_rust_api + inputs: + tick: cxx-node-rust-api/counter + outputs: + - counter diff --git a/examples/c++-dataflow2/node-c-api/main.cc b/examples/c++-dataflow2/node-c-api/main.cc new file mode 100644 index 00000000..8148bf19 --- /dev/null +++ b/examples/c++-dataflow2/node-c-api/main.cc @@ -0,0 +1,85 @@ +extern "C" +{ +#include "../../../apis/c/node/node_api.h" +} + +#include +#include + +int run(void *dora_context) +{ + unsigned char counter = 0; + + for (int i = 0; i < 20; i++) + { + void *event = dora_next_event(dora_context); + if (event == NULL) + { + printf("[c node] ERROR: unexpected end of event\n"); + return -1; + } + + enum DoraEventType ty = read_dora_event_type(event); + + if (ty == DoraEventType_Input) + { + counter += 1; + + char *id_ptr; + size_t id_len; + read_dora_input_id(event, &id_ptr, &id_len); + std::string id(id_ptr, id_len); + + char *data_ptr; + size_t data_len; + read_dora_input_data(event, &data_ptr, &data_len); + std::vector data; + for (size_t i = 0; i < data_len; i++) + { + data.push_back(*(data_ptr + i)); + } + + std::cout + << "Received input " + << " (counter: " << (unsigned int)counter << ") data: ["; + for (unsigned char &v : data) + { + std::cout << (unsigned int)v << ", "; + } + std::cout << "]" << std::endl; + + std::vector out_vec{counter}; + std::string out_id = "counter"; + int result = dora_send_output(dora_context, &out_id[0], out_id.length(), (char *)&counter, 1); + if (result != 0) + { + std::cerr << "failed to send output" << std::endl; + return 1; + } + } + else if (ty == DoraEventType_Stop) + { + printf("[c node] received stop event\n"); + } + else + { + printf("[c node] received unexpected event: %d\n", ty); + } + + free_dora_event(event); + } + return 0; +} + +int main() +{ + std::cout << "HELLO FROM C++ (using C API)" << std::endl; + + auto dora_context = init_dora_context_from_env(); + auto ret = run(dora_context); + free_dora_context(dora_context); + + std::cout << "GOODBYE FROM C++ node (using C API)" << std::endl; + + return ret; +} diff --git a/examples/c++-dataflow2/node-rust-api/main.cc b/examples/c++-dataflow2/node-rust-api/main.cc new file mode 100644 index 00000000..bae31917 --- /dev/null +++ b/examples/c++-dataflow2/node-rust-api/main.cc @@ -0,0 +1,112 @@ +#include "../build/dora-node-api.h" +#include +#include +#include +#include +#include +#include + +std::shared_ptr receive_and_print_input(rust::cxxbridge1::Box event) { + std::cout << "Received input event" << std::endl; + + struct ArrowArray c_array; + struct ArrowSchema c_schema; + + auto result = event_as_arrow_input( + std::move(event), + reinterpret_cast(&c_array), + reinterpret_cast(&c_schema) + ); + + if (!result.error.empty()) { + std::cerr << "Error getting Arrow array: " << std::endl; + return nullptr; + } + + auto result2 = arrow::ImportArray(&c_array, &c_schema); + std::shared_ptr input_array = result2.ValueOrDie(); + std::cout << "Received Arrow array: " << input_array->ToString() << std::endl; + + std::cout << "Array details: type=" << input_array->type()->ToString() + << ", length=" << input_array->length() << std::endl; + + return input_array; +} + +// To send output +bool send_output(DoraNode& dora_node, std::shared_ptr output_array) { + if (!output_array) { + std::cerr << "Error: Attempted to send a null Arrow array" << std::endl; + return false; + } + struct ArrowArray out_c_array; + struct ArrowSchema out_c_schema; + + arrow::ExportArray(*output_array, &out_c_array, &out_c_schema); + + auto send_result = send_arrow_output( + dora_node.send_output, + "counter", + reinterpret_cast(&out_c_array), + reinterpret_cast(&out_c_schema) + ); + + if (!send_result.error.empty()) { + std::string error_message(send_result.error); + std::cerr << "Error sending Arrow array: " << error_message << std::endl; + return false; + } + + return true; +} + +int main() { + try { + auto dora_node = init_dora_node(); + std::cout << "Dora node initialized successfully" << std::endl; + int counter=0; + while (counter<10) { + counter++; + auto event = dora_node.events->next(); + auto type = event_type(event); + + if (type == DoraEventType::Stop) { + std::cout << "Received stop event, exiting" << std::endl; + break; + } + else if (type == DoraEventType::AllInputsClosed) { + std::cout << "All inputs closed, exiting" << std::endl; + break; + } + else if (type == DoraEventType::Input) { + std::shared_ptr input_array = receive_and_print_input(std::move(event)); + + std::shared_ptr output_array; + + arrow::Int32Builder builder; + builder.Append(10); + builder.Append(100); + builder.Append(1000); + builder.Finish(&output_array); + std::cout << "Created new string array: " << output_array->ToString() << std::endl; + + //Printing Before sending + auto str_array = std::static_pointer_cast(output_array); + std::cout << "Values: ["; + for (int i = 0; i < str_array->length(); i++) { + if (i > 0) std::cout << ", "; + std::cout << str_array->Value(i); + } + std::cout << "]" << std::endl; + + send_output(dora_node, output_array); + } + } + + return 0; + } + catch (const std::exception& e) { + std::cerr << "Error: " << e.what() << std::endl; + return 1; + } +} \ No newline at end of file diff --git a/examples/c++-dataflow2/operator-c-api/operator.cc b/examples/c++-dataflow2/operator-c-api/operator.cc new file mode 100644 index 00000000..f15492dd --- /dev/null +++ b/examples/c++-dataflow2/operator-c-api/operator.cc @@ -0,0 +1,79 @@ +extern "C" +{ +#include "../../../apis/c/operator/operator_api.h" +} + +#include +#include +#include +#include +#include + +class Operator +{ +public: + Operator(); +}; + +Operator::Operator() {} + +extern "C" DoraInitResult_t dora_init_operator() +{ + Operator *op = std::make_unique().release(); + + DoraInitResult_t result = {.operator_context = (void *)op}; + return result; +} + +extern "C" DoraResult_t dora_drop_operator(void *operator_context) +{ + delete (Operator *)operator_context; + return {}; +} + +extern "C" OnEventResult_t dora_on_event( + RawEvent_t *event, + const SendOutput_t *send_output, + void *operator_context) +{ + if (event->input != NULL) + { + // input event + Input_t *input = event->input; + char *id = dora_read_input_id(input); + + Vec_uint8_t data = dora_read_data(input); + assert(data.ptr != NULL); + + std::cout + << "C++ Operator (C-API) received input `" << id << "` with data: ["; + for (int i = 0; i < data.len; i++) + { + std::cout << (unsigned int)data.ptr[i] << ", "; + } + std::cout << "]" << std::endl; + + const char *out_id = "half-status"; + char *out_id_heap = strdup(out_id); + + size_t out_data_len = 1; + uint8_t *out_data_heap = (uint8_t *)malloc(out_data_len); + *out_data_heap = *data.ptr / 2; + + DoraResult_t send_result = dora_send_operator_output(send_output, out_id_heap, out_data_heap, out_data_len); + + OnEventResult_t result = {.result = send_result, .status = DORA_STATUS_CONTINUE}; + + dora_free_data(data); + dora_free_input_id(id); + + return result; + } + if (event->stop) + { + printf("C operator received stop event\n"); + } + + OnEventResult_t result = {.status = DORA_STATUS_CONTINUE}; + return result; +} diff --git a/examples/c++-dataflow2/operator-rust-api/operator.cc b/examples/c++-dataflow2/operator-rust-api/operator.cc new file mode 100644 index 00000000..2b812188 --- /dev/null +++ b/examples/c++-dataflow2/operator-rust-api/operator.cc @@ -0,0 +1,23 @@ +#include "operator.h" +#include +#include +#include "../build/dora-operator-api.h" + +Operator::Operator() {} + +std::unique_ptr new_operator() +{ + return std::make_unique(); +} + +DoraOnInputResult on_input(Operator &op, rust::Str id, rust::Slice data, OutputSender &output_sender) +{ + op.counter += 1; + std::cout << "Rust API operator received input `" << id.data() << "` with data `" << (unsigned int)data[0] << "` (internal counter: " << (unsigned int)op.counter << ")" << std::endl; + + std::vector out_vec{op.counter}; + rust::Slice out_slice{out_vec.data(), out_vec.size()}; + auto send_result = send_output(output_sender, rust::Str("status"), out_slice); + DoraOnInputResult result = {send_result.error, false}; + return result; +} diff --git a/examples/c++-dataflow2/operator-rust-api/operator.h b/examples/c++-dataflow2/operator-rust-api/operator.h new file mode 100644 index 00000000..9b5e3ab2 --- /dev/null +++ b/examples/c++-dataflow2/operator-rust-api/operator.h @@ -0,0 +1,16 @@ +#pragma once +#include +#include "../../../apis/c/operator/operator_api.h" + +class Operator +{ +public: + Operator(); + unsigned char counter; +}; + +#include "../build/dora-operator-api.h" + +std::unique_ptr new_operator(); + +DoraOnInputResult on_input(Operator &op, rust::Str id, rust::Slice data, OutputSender &output_sender); diff --git a/examples/c++-dataflow2/run.rs b/examples/c++-dataflow2/run.rs new file mode 100644 index 00000000..3413500e --- /dev/null +++ b/examples/c++-dataflow2/run.rs @@ -0,0 +1,332 @@ +use dora_tracing::set_up_tracing; +use eyre::{bail, Context}; +use std::{ + env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, + path::Path, + process::Command, +}; + +struct ArrowConfig { + cflags: String, + libs: String, +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + set_up_tracing("c++-dataflow-runner").wrap_err("failed to set up tracing")?; + + if cfg!(windows) { + tracing::error!( + "The c++ example does not work on Windows currently because of a linker error" + ); + return Ok(()); + } + + let arrow_config = find_arrow_config().wrap_err("Failed to find Arrow configuration")?; + tracing::info!("Found Arrow configuration: cflags={}, libs={}", arrow_config.cflags, arrow_config.libs); + + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let target = root.join("target"); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + tokio::fs::create_dir_all("build").await?; + let build_dir = Path::new("build"); + + build_package("dora-node-api-cxx").await?; + let node_cxxbridge = target + .join("cxxbridge") + .join("dora-node-api-cxx") + .join("src"); + tokio::fs::copy( + node_cxxbridge.join("lib.rs.cc"), + build_dir.join("node-bridge.cc"), + ) + .await?; + tokio::fs::copy( + node_cxxbridge.join("lib.rs.h"), + build_dir.join("dora-node-api.h"), + ) + .await?; + tokio::fs::write( + build_dir.join("operator.h"), + r###"#include "../operator-rust-api/operator.h""###, + ) + .await?; + + build_package("dora-operator-api-cxx").await?; + let operator_cxxbridge = target + .join("cxxbridge") + .join("dora-operator-api-cxx") + .join("src"); + tokio::fs::copy( + operator_cxxbridge.join("lib.rs.cc"), + build_dir.join("operator-bridge.cc"), + ) + .await?; + tokio::fs::copy( + operator_cxxbridge.join("lib.rs.h"), + build_dir.join("dora-operator-api.h"), + ) + .await?; + + build_package("dora-node-api-c").await?; + build_package("dora-operator-api-c").await?; + build_cxx_node( + root, + &[ + &dunce::canonicalize(Path::new("node-rust-api").join("main.cc"))?, + &dunce::canonicalize(build_dir.join("node-bridge.cc"))?, + ], + "node_rust_api", + &["-l", "dora_node_api_cxx", &arrow_config.cflags, &arrow_config.libs], + ) + .await?; + build_cxx_node( + root, + &[&dunce::canonicalize( + Path::new("node-c-api").join("main.cc"), + )?], + "node_c_api", + &["-l", "dora_node_api_c"], + ) + .await?; + build_cxx_operator( + &[ + &dunce::canonicalize(Path::new("operator-rust-api").join("operator.cc"))?, + &dunce::canonicalize(build_dir.join("operator-bridge.cc"))?, + ], + "operator_rust_api", + &[ + "-l", "dora_operator_api_cxx", + "-L", root.join("target").join("debug").to_str().unwrap(), + &arrow_config.cflags, &arrow_config.libs, + ], + ) + .await?; + build_cxx_operator( + &[&dunce::canonicalize( + Path::new("operator-c-api").join("operator.cc"), + )?], + "operator_c_api", + &[ + "-l", "dora_operator_api_c", + "-L", root.join("target").join("debug").to_str().unwrap(), + ], + ) + .await?; + + let dataflow = Path::new("dataflow.yml").to_owned(); + build_package("dora-runtime").await?; + run_dataflow(&dataflow).await?; + + Ok(()) +} + +fn find_arrow_config() -> eyre::Result { + + let output = Command::new("pkg-config") + .args(&["--cflags", "arrow"]) + .output() + .wrap_err("Failed to run pkg-config. Make sure Arrow C++ is installed")?; + + if !output.status.success() { + bail!("Arrow C++ not found via pkg-config. Make sure it's installed and in your PKG_CONFIG_PATH"); + } + + let cflags = String::from_utf8(output.stdout)?.trim().to_string(); + + let output = Command::new("pkg-config") + .args(&["--libs", "arrow"]) + .output() + .wrap_err("Failed to get Arrow library flags")?; + + if !output.status.success() { + bail!("Failed to get Arrow library flags"); + } + + let libs = String::from_utf8(output.stdout)?.trim().to_string(); + + Ok(ArrowConfig { cflags, libs }) +} + +async fn build_package(package: &str) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("build"); + cmd.arg("--package").arg(package); + if !cmd.status().await?.success() { + bail!("failed to build {package}"); + }; + Ok(()) +} + +async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--") + .arg("daemon") + .arg("--run-dataflow") + .arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + Ok(()) +} + +async fn build_cxx_node( + root: &Path, + paths: &[&Path], + out_name: &str, + args: &[&str], +) -> eyre::Result<()> { + let mut clang = tokio::process::Command::new("clang++"); + clang.args(paths); + clang.arg("-std=c++17"); + #[cfg(target_os = "linux")] + { + clang.arg("-l").arg("m"); + clang.arg("-l").arg("rt"); + clang.arg("-l").arg("dl"); + clang.arg("-pthread"); + } + #[cfg(target_os = "windows")] + { + clang.arg("-ladvapi32"); + clang.arg("-luserenv"); + clang.arg("-lkernel32"); + clang.arg("-lws2_32"); + clang.arg("-lbcrypt"); + clang.arg("-lncrypt"); + clang.arg("-lschannel"); + clang.arg("-lntdll"); + clang.arg("-liphlpapi"); + + clang.arg("-lcfgmgr32"); + clang.arg("-lcredui"); + clang.arg("-lcrypt32"); + clang.arg("-lcryptnet"); + clang.arg("-lfwpuclnt"); + clang.arg("-lgdi32"); + clang.arg("-lmsimg32"); + clang.arg("-lmswsock"); + clang.arg("-lole32"); + clang.arg("-lopengl32"); + clang.arg("-lsecur32"); + clang.arg("-lshell32"); + clang.arg("-lsynchronization"); + clang.arg("-luser32"); + clang.arg("-lwinspool"); + + clang.arg("-Wl,-nodefaultlib:libcmt"); + clang.arg("-D_DLL"); + clang.arg("-lmsvcrt"); + } + #[cfg(target_os = "macos")] + { + clang.arg("-framework").arg("CoreServices"); + clang.arg("-framework").arg("Security"); + clang.arg("-l").arg("System"); + clang.arg("-l").arg("resolv"); + clang.arg("-l").arg("pthread"); + clang.arg("-l").arg("c"); + clang.arg("-l").arg("m"); + } + clang.args(args); + clang.arg("-L").arg(root.join("target").join("debug")); + clang + .arg("--output") + .arg(Path::new("../build").join(format!("{out_name}{EXE_SUFFIX}"))); + if let Some(parent) = paths[0].parent() { + clang.current_dir(parent); + } + + if !clang.status().await?.success() { + bail!("failed to compile c++ node"); + }; + Ok(()) +} + +async fn build_cxx_operator( + paths: &[&Path], + out_name: &str, + link_args: &[&str], +) -> eyre::Result<()> { + let mut object_file_paths = Vec::new(); + + for path in paths { + let mut compile = tokio::process::Command::new("clang++"); + compile.arg("-c").arg(path); + compile.arg("-std=c++17"); + let object_file_path = path.with_extension("o"); + compile.arg("-o").arg(&object_file_path); + #[cfg(unix)] + compile.arg("-fPIC"); + if let Some(parent) = path.parent() { + compile.current_dir(parent); + } + if !compile.status().await?.success() { + bail!("failed to compile cxx operator"); + }; + object_file_paths.push(object_file_path); + } + + let mut link = tokio::process::Command::new("clang++"); + link.arg("-shared").args(&object_file_paths); + link.args(link_args); + #[cfg(target_os = "windows")] + { + link.arg("-ladvapi32"); + link.arg("-luserenv"); + link.arg("-lkernel32"); + link.arg("-lws2_32"); + link.arg("-lbcrypt"); + link.arg("-lncrypt"); + link.arg("-lschannel"); + link.arg("-lntdll"); + link.arg("-liphlpapi"); + + link.arg("-lcfgmgr32"); + link.arg("-lcredui"); + link.arg("-lcrypt32"); + link.arg("-lcryptnet"); + link.arg("-lfwpuclnt"); + link.arg("-lgdi32"); + link.arg("-lmsimg32"); + link.arg("-lmswsock"); + link.arg("-lole32"); + link.arg("-lopengl32"); + link.arg("-lsecur32"); + link.arg("-lshell32"); + link.arg("-lsynchronization"); + link.arg("-luser32"); + link.arg("-lwinspool"); + + link.arg("-Wl,-nodefaultlib:libcmt"); + link.arg("-D_DLL"); + link.arg("-lmsvcrt"); + link.arg("-fms-runtime-lib=static"); + } + #[cfg(target_os = "macos")] + { + link.arg("-framework").arg("CoreServices"); + link.arg("-framework").arg("Security"); + link.arg("-l").arg("System"); + link.arg("-l").arg("resolv"); + link.arg("-l").arg("pthread"); + link.arg("-l").arg("c"); + link.arg("-l").arg("m"); + } + link.arg("-o") + .arg(Path::new("../build").join(format!("{DLL_PREFIX}{out_name}{DLL_SUFFIX}"))); + if let Some(parent) = paths[0].parent() { + link.current_dir(parent); + } + if !link.status().await?.success() { + bail!("failed to create shared library from cxx operator (c api)"); + }; + + Ok(()) +} From 8e48debcaddcba8bf4860bebc04bacf1609be22c Mon Sep 17 00:00:00 2001 From: Mati-ur-rehman-017 Date: Mon, 24 Mar 2025 16:33:05 +0500 Subject: [PATCH 02/12] formatting --- apis/c++/node/src/lib.rs | 76 ++++++++++++++++++++--------------- examples/c++-dataflow2/run.rs | 43 +++++++++++++------- 2 files changed, 71 insertions(+), 48 deletions(-) diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index 18df97d7..a7ec233a 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -12,7 +12,6 @@ use eyre::bail; use dora_ros2_bridge::{_core, ros2_client}; use futures_lite::{stream, Stream, StreamExt}; - #[cxx::bridge] #[allow(clippy::needless_lifetimes)] mod ffi { @@ -76,13 +75,13 @@ mod ffi { unsafe fn send_arrow_output( output_sender: &mut Box, id: String, - array_ptr: *mut u8, - schema_ptr: *mut u8, + array_ptr: *mut u8, + schema_ptr: *mut u8, ) -> DoraResult; - + unsafe fn event_as_arrow_input( - event: Box, - out_array: *mut u8, + event: Box, + out_array: *mut u8, out_schema: *mut u8, ) -> DoraResult; } @@ -180,7 +179,7 @@ fn event_as_input(event: Box) -> eyre::Result { } unsafe fn event_as_arrow_input( - event: Box, + event: Box, out_array: *mut u8, out_schema: *mut u8, ) -> ffi::DoraResult { @@ -188,29 +187,36 @@ unsafe fn event_as_arrow_input( let out_array = out_array as *mut arrow::ffi::FFI_ArrowArray; let out_schema = out_schema as *mut arrow::ffi::FFI_ArrowSchema; - let Some(Event::Input { id: _, metadata: _, data }) = event.0 else { - return ffi::DoraResult { error: "Not an input event".to_string() }; + let Some(Event::Input { + id: _, + metadata: _, + data, + }) = event.0 + else { + return ffi::DoraResult { + error: "Not an input event".to_string(), + }; }; - + if out_array.is_null() || out_schema.is_null() { - return ffi::DoraResult { - error: "Received null output pointer".to_string() + return ffi::DoraResult { + error: "Received null output pointer".to_string(), }; } - + let array_data = data.to_data(); - + match arrow::ffi::to_ffi(&array_data.clone()) { Ok((ffi_array, ffi_schema)) => { std::ptr::write(out_array, ffi_array); std::ptr::write(out_schema, ffi_schema); - ffi::DoraResult { error: String::new() } - }, - Err(e) => { ffi::DoraResult { - error: format!("Error exporting Arrow array to C++: {:?}", e) + error: String::new(), } } + Err(e) => ffi::DoraResult { + error: format!("Error exporting Arrow array to C++: {:?}", e), + }, } } @@ -236,38 +242,42 @@ pub struct MergedEvents { unsafe fn send_arrow_output( sender: &mut Box, id: String, - array_ptr: *mut u8, - schema_ptr: *mut u8 + array_ptr: *mut u8, + schema_ptr: *mut u8, ) -> ffi::DoraResult { let array_ptr = array_ptr as *mut arrow::ffi::FFI_ArrowArray; let schema_ptr = schema_ptr as *mut arrow::ffi::FFI_ArrowSchema; if array_ptr.is_null() || schema_ptr.is_null() { - return ffi::DoraResult { - error: "Received null Arrow array or schema pointer".to_string() + return ffi::DoraResult { + error: "Received null Arrow array or schema pointer".to_string(), }; } - + let array = std::ptr::read(array_ptr); let schema = std::ptr::read(schema_ptr); - + std::ptr::write(array_ptr, std::mem::zeroed()); std::ptr::write(schema_ptr, std::mem::zeroed()); - + match arrow::ffi::from_ffi(array, &schema) { Ok(array_data) => { let arrow_array = arrow::array::make_array(array_data); - let result = sender.0.send_output(id.into(), Default::default(), arrow_array); + let result = sender + .0 + .send_output(id.into(), Default::default(), arrow_array); match result { - Ok(()) => ffi::DoraResult { error: String::new() }, - Err(err) => ffi::DoraResult { error: format!("{err:?}") }, - } - }, - Err(e) => { - ffi::DoraResult { - error: format!("Error importing array from C++: {:?}", e) + Ok(()) => ffi::DoraResult { + error: String::new(), + }, + Err(err) => ffi::DoraResult { + error: format!("{err:?}"), + }, } } + Err(e) => ffi::DoraResult { + error: format!("Error importing array from C++: {:?}", e), + }, } } diff --git a/examples/c++-dataflow2/run.rs b/examples/c++-dataflow2/run.rs index 3413500e..b1a9e999 100644 --- a/examples/c++-dataflow2/run.rs +++ b/examples/c++-dataflow2/run.rs @@ -3,7 +3,7 @@ use eyre::{bail, Context}; use std::{ env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, path::Path, - process::Command, + process::Command, }; struct ArrowConfig { @@ -23,7 +23,11 @@ async fn main() -> eyre::Result<()> { } let arrow_config = find_arrow_config().wrap_err("Failed to find Arrow configuration")?; - tracing::info!("Found Arrow configuration: cflags={}, libs={}", arrow_config.cflags, arrow_config.libs); + tracing::info!( + "Found Arrow configuration: cflags={}, libs={}", + arrow_config.cflags, + arrow_config.libs + ); let root = Path::new(env!("CARGO_MANIFEST_DIR")); let target = root.join("target"); @@ -79,7 +83,12 @@ async fn main() -> eyre::Result<()> { &dunce::canonicalize(build_dir.join("node-bridge.cc"))?, ], "node_rust_api", - &["-l", "dora_node_api_cxx", &arrow_config.cflags, &arrow_config.libs], + &[ + "-l", + "dora_node_api_cxx", + &arrow_config.cflags, + &arrow_config.libs, + ], ) .await?; build_cxx_node( @@ -98,9 +107,12 @@ async fn main() -> eyre::Result<()> { ], "operator_rust_api", &[ - "-l", "dora_operator_api_cxx", - "-L", root.join("target").join("debug").to_str().unwrap(), - &arrow_config.cflags, &arrow_config.libs, + "-l", + "dora_operator_api_cxx", + "-L", + root.join("target").join("debug").to_str().unwrap(), + &arrow_config.cflags, + &arrow_config.libs, ], ) .await?; @@ -110,8 +122,10 @@ async fn main() -> eyre::Result<()> { )?], "operator_c_api", &[ - "-l", "dora_operator_api_c", - "-L", root.join("target").join("debug").to_str().unwrap(), + "-l", + "dora_operator_api_c", + "-L", + root.join("target").join("debug").to_str().unwrap(), ], ) .await?; @@ -124,29 +138,28 @@ async fn main() -> eyre::Result<()> { } fn find_arrow_config() -> eyre::Result { - let output = Command::new("pkg-config") .args(&["--cflags", "arrow"]) .output() .wrap_err("Failed to run pkg-config. Make sure Arrow C++ is installed")?; - + if !output.status.success() { bail!("Arrow C++ not found via pkg-config. Make sure it's installed and in your PKG_CONFIG_PATH"); } - + let cflags = String::from_utf8(output.stdout)?.trim().to_string(); - + let output = Command::new("pkg-config") .args(&["--libs", "arrow"]) .output() .wrap_err("Failed to get Arrow library flags")?; - + if !output.status.success() { bail!("Failed to get Arrow library flags"); } - + let libs = String::from_utf8(output.stdout)?.trim().to_string(); - + Ok(ArrowConfig { cflags, libs }) } From 619fc7c5e6fabc0b36d7941b18bac07cfda62353 Mon Sep 17 00:00:00 2001 From: Mati-ur-rehman-017 Date: Mon, 24 Mar 2025 16:52:44 +0500 Subject: [PATCH 03/12] readme fixed --- examples/c++-dataflow2/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/c++-dataflow2/README.md b/examples/c++-dataflow2/README.md index bcbe9312..c7e5c411 100644 --- a/examples/c++-dataflow2/README.md +++ b/examples/c++-dataflow2/README.md @@ -1,6 +1,6 @@ # Dora C++ Dataflow Example -This example shows usage of event_as_arrow_input() and send_arrow_output(). We can send and recieve arrow arrays using these functions which can be serialized and deserialized on either files easliy. These functions are implemented in rust and are provided through dora-node-api.h. Currently this requires to have arrow installed on user sytsem as required during build process. +This example shows usage of event_as_arrow_input() and send_arrow_output(). We can send and receive arrow arrays using these functions which can be serialized and deserialized on either files easily. These functions are implemented in rust and are provided through dora-node-api.h. Currently this requires to have arrow installed on user system as required during build process. ## Compile and Run From b739f10888222f664afc40bb8871c9f0dec801b6 Mon Sep 17 00:00:00 2001 From: Mati-ur-rehman-017 Date: Mon, 24 Mar 2025 18:47:28 +0500 Subject: [PATCH 04/12] Updated readme --- examples/c++-dataflow2/README.md | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/examples/c++-dataflow2/README.md b/examples/c++-dataflow2/README.md index c7e5c411..9c1c5c54 100644 --- a/examples/c++-dataflow2/README.md +++ b/examples/c++-dataflow2/README.md @@ -1,20 +1,13 @@ # Dora C++ Dataflow Example -This example shows usage of event_as_arrow_input() and send_arrow_output(). We can send and receive arrow arrays using these functions which can be serialized and deserialized on either files easily. These functions are implemented in rust and are provided through dora-node-api.h. Currently this requires to have arrow installed on user system as required during build process. +This example demonstrates how to exchange data between Dora's Rust-based runtime and C++ using Apache Arrow arrays. Through the event_as_arrow_input() and send_arrow_output() functions exposed in the dora-node-api.h header, your C++ nodes can efficiently receive and send structured data within the Dora dataflow system. These functions leverage Apache Arrow's memory-efficient serialization format, allowing data to move seamlessly across language boundaries. -## Compile and Run - -To try it out, you can use the [`run.rs`](./run.rs) binary. It performs all required build steps and then starts the dataflow. Use the following command to run it: `cargo run --example cxx-dataflow2`. +## Required System Dependencies -**Build the dora coordinator and runtime:** +- **Apache Arrow C++ Library**: Version 19.0.1 or later -- Build the `dora-coordinator` executable using `cargo build -p dora-coordinator --release` -- Build the `dora-runtime` executable using `cargo build -p dora-runtime --release` - -**Run the dataflow:** +## Compile and Run -- Start the `dora-coordinator`, passing the paths to the dataflow file and the `dora-runtime` as arguments: +To try it out, you can use the [`run.rs`](./run.rs) binary. It performs all required build steps and then starts the dataflow. Use the following command to run it: `cargo run --example cxx-dataflow2`. For manaul build, check build system for +`cxx-dataflow`. - ``` - ../../target/release/dora-daemon --run-dataflow dataflow.yml ../../target/release/dora-runtime - ``` From f54575ae6c0058a65d004533b6dc1dda11bc0809 Mon Sep 17 00:00:00 2001 From: Mati-ur-rehman-017 Date: Mon, 24 Mar 2025 19:09:32 +0500 Subject: [PATCH 05/12] typo fix --- examples/c++-dataflow2/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/c++-dataflow2/README.md b/examples/c++-dataflow2/README.md index 9c1c5c54..7c1a56f7 100644 --- a/examples/c++-dataflow2/README.md +++ b/examples/c++-dataflow2/README.md @@ -1,4 +1,4 @@ -# Dora C++ Dataflow Example +# Dora C++ Dataflow Example 2 This example demonstrates how to exchange data between Dora's Rust-based runtime and C++ using Apache Arrow arrays. Through the event_as_arrow_input() and send_arrow_output() functions exposed in the dora-node-api.h header, your C++ nodes can efficiently receive and send structured data within the Dora dataflow system. These functions leverage Apache Arrow's memory-efficient serialization format, allowing data to move seamlessly across language boundaries. @@ -8,6 +8,6 @@ This example demonstrates how to exchange data between Dora's Rust-based runtime ## Compile and Run -To try it out, you can use the [`run.rs`](./run.rs) binary. It performs all required build steps and then starts the dataflow. Use the following command to run it: `cargo run --example cxx-dataflow2`. For manaul build, check build system for -`cxx-dataflow`. +To try it out, you can use the [`run.rs`](./run.rs) binary. It performs all required build steps and then starts the dataflow. Use the following command to run it: `cargo run --example cxx-dataflow2`. For manual build, check build system for +`cxx-dataflow` example. From c854cb188ad1532df18d4922f66482bc2a033e83 Mon Sep 17 00:00:00 2001 From: Mati-ur-rehman-017 Date: Tue, 25 Mar 2025 19:46:16 +0500 Subject: [PATCH 06/12] Added example in CI --- .github/workflows/ci.yml | 20 ++++++++++++++++++++ examples/c++-dataflow2/run.rs | 22 +++++++++++++++++++--- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8bfa2b68..771bab37 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -127,6 +127,26 @@ jobs: - name: "C++ Dataflow example" timeout-minutes: 15 run: cargo run --example cxx-dataflow + - name: "Install Arrow C++ Library" + timeout-minutes: 10 + shell: bash + run: | + if [ "$RUNNER_OS" == "Linux" ]; then + # For Ubuntu + sudo apt-get update + sudo apt-get install -y -V ca-certificates lsb-release wget + wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get update + sudo apt-get install -y -V libarrow-dev libarrow-glib-dev + elif [ "$RUNNER_OS" == "macOS" ]; then + # For macOS + brew update + brew install apache-arrow + fi + - name: "C++ Dataflow2 example" + timeout-minutes: 15 + run : cargo run --example cxx-dataflow2 - name: "Cmake example" if: runner.os == 'Linux' timeout-minutes: 30 diff --git a/examples/c++-dataflow2/run.rs b/examples/c++-dataflow2/run.rs index b1a9e999..5e1f2185 100644 --- a/examples/c++-dataflow2/run.rs +++ b/examples/c++-dataflow2/run.rs @@ -247,7 +247,15 @@ async fn build_cxx_node( clang.arg("-l").arg("c"); clang.arg("-l").arg("m"); } - clang.args(args); + for arg in args { + if arg.contains(" ") { + for part in arg.split_whitespace() { + clang.arg(part); + } + } else { + clang.arg(arg); + } + } clang.arg("-L").arg(root.join("target").join("debug")); clang .arg("--output") @@ -288,7 +296,15 @@ async fn build_cxx_operator( let mut link = tokio::process::Command::new("clang++"); link.arg("-shared").args(&object_file_paths); - link.args(link_args); + for arg in link_args { + if arg.contains(" ") { + for part in arg.split_whitespace() { + link.arg(part); + } + } else { + link.arg(arg); + } + } #[cfg(target_os = "windows")] { link.arg("-ladvapi32"); @@ -342,4 +358,4 @@ async fn build_cxx_operator( }; Ok(()) -} +} \ No newline at end of file From 194e4c7d3d398019ce82a111058bd6651e891847 Mon Sep 17 00:00:00 2001 From: Mati-ur-rehman-017 Date: Tue, 25 Mar 2025 20:11:29 +0500 Subject: [PATCH 07/12] added in ci --- examples/c++-dataflow2/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/c++-dataflow2/run.rs b/examples/c++-dataflow2/run.rs index 5e1f2185..998ca585 100644 --- a/examples/c++-dataflow2/run.rs +++ b/examples/c++-dataflow2/run.rs @@ -358,4 +358,4 @@ async fn build_cxx_operator( }; Ok(()) -} \ No newline at end of file +} From 85731cc23c65d037311d3576d22bc399a883bc1e Mon Sep 17 00:00:00 2001 From: haixuantao Date: Tue, 25 Mar 2025 18:15:34 +0100 Subject: [PATCH 08/12] Improve readme and improve name of the example --- Cargo.toml | 4 +- .../.gitignore | 0 .../README.md | 26 ++- .../dataflow.yml | 0 .../node-rust-api/main.cc | 0 .../run.rs | 159 +----------------- examples/c++-dataflow2/node-c-api/main.cc | 85 ---------- .../c++-dataflow2/operator-c-api/operator.cc | 79 --------- .../operator-rust-api/operator.cc | 23 --- .../operator-rust-api/operator.h | 16 -- 10 files changed, 26 insertions(+), 366 deletions(-) rename examples/{c++-dataflow2 => c++-arrow-dataflow}/.gitignore (100%) rename examples/{c++-dataflow2 => c++-arrow-dataflow}/README.md (56%) rename examples/{c++-dataflow2 => c++-arrow-dataflow}/dataflow.yml (100%) rename examples/{c++-dataflow2 => c++-arrow-dataflow}/node-rust-api/main.cc (100%) rename examples/{c++-dataflow2 => c++-arrow-dataflow}/run.rs (55%) delete mode 100644 examples/c++-dataflow2/node-c-api/main.cc delete mode 100644 examples/c++-dataflow2/operator-c-api/operator.cc delete mode 100644 examples/c++-dataflow2/operator-rust-api/operator.cc delete mode 100644 examples/c++-dataflow2/operator-rust-api/operator.h diff --git a/Cargo.toml b/Cargo.toml index 5812d49d..f67b0fcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,8 +147,8 @@ name = "cxx-dataflow" path = "examples/c++-dataflow/run.rs" [[example]] -name = "cxx-dataflow2" -path = "examples/c++-dataflow2/run.rs" +name = "cxx-arrow-dataflow" +path = "examples/c++-arrow-dataflow/run.rs" [[example]] name = "python-dataflow" diff --git a/examples/c++-dataflow2/.gitignore b/examples/c++-arrow-dataflow/.gitignore similarity index 100% rename from examples/c++-dataflow2/.gitignore rename to examples/c++-arrow-dataflow/.gitignore diff --git a/examples/c++-dataflow2/README.md b/examples/c++-arrow-dataflow/README.md similarity index 56% rename from examples/c++-dataflow2/README.md rename to examples/c++-arrow-dataflow/README.md index 7c1a56f7..1227c00a 100644 --- a/examples/c++-dataflow2/README.md +++ b/examples/c++-arrow-dataflow/README.md @@ -1,13 +1,33 @@ # Dora C++ Dataflow Example 2 -This example demonstrates how to exchange data between Dora's Rust-based runtime and C++ using Apache Arrow arrays. Through the event_as_arrow_input() and send_arrow_output() functions exposed in the dora-node-api.h header, your C++ nodes can efficiently receive and send structured data within the Dora dataflow system. These functions leverage Apache Arrow's memory-efficient serialization format, allowing data to move seamlessly across language boundaries. +This example demonstrates how to exchange data between Dora's Rust-based runtime and C++ using Apache Arrow arrays. Through the event_as_arrow_input() and send_arrow_output() functions exposed in the dora-node-api.h header, your C++ nodes can efficiently receive and send structured data within the Dora dataflow system. These functions leverage Apache Arrow's memory-efficient serialization format, allowing data to move seamlessly across language boundaries. ## Required System Dependencies - **Apache Arrow C++ Library**: Version 19.0.1 or later +Installing it should look like this: + +### For Ubuntu + +```bash +sudo apt-get update +sudo apt-get install -y -V ca-certificates lsb-release wget +wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb +sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb +sudo apt-get update +sudo apt-get install -y -V libarrow-dev libarrow-glib-dev +``` + +### For macOS + +```bash +brew update +brew install apache-arrow +fi +``` + ## Compile and Run -To try it out, you can use the [`run.rs`](./run.rs) binary. It performs all required build steps and then starts the dataflow. Use the following command to run it: `cargo run --example cxx-dataflow2`. For manual build, check build system for +To try it out, you can use the [`run.rs`](./run.rs) binary. It performs all required build steps and then starts the dataflow. Use the following command to run it: `cargo run --example cxx-dataflow2`. For manual build, check build system for `cxx-dataflow` example. - diff --git a/examples/c++-dataflow2/dataflow.yml b/examples/c++-arrow-dataflow/dataflow.yml similarity index 100% rename from examples/c++-dataflow2/dataflow.yml rename to examples/c++-arrow-dataflow/dataflow.yml diff --git a/examples/c++-dataflow2/node-rust-api/main.cc b/examples/c++-arrow-dataflow/node-rust-api/main.cc similarity index 100% rename from examples/c++-dataflow2/node-rust-api/main.cc rename to examples/c++-arrow-dataflow/node-rust-api/main.cc diff --git a/examples/c++-dataflow2/run.rs b/examples/c++-arrow-dataflow/run.rs similarity index 55% rename from examples/c++-dataflow2/run.rs rename to examples/c++-arrow-dataflow/run.rs index 998ca585..399a73b1 100644 --- a/examples/c++-dataflow2/run.rs +++ b/examples/c++-arrow-dataflow/run.rs @@ -1,10 +1,6 @@ use dora_tracing::set_up_tracing; use eyre::{bail, Context}; -use std::{ - env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, - path::Path, - process::Command, -}; +use std::{env::consts::EXE_SUFFIX, path::Path, process::Command}; struct ArrowConfig { cflags: String, @@ -52,30 +48,7 @@ async fn main() -> eyre::Result<()> { build_dir.join("dora-node-api.h"), ) .await?; - tokio::fs::write( - build_dir.join("operator.h"), - r###"#include "../operator-rust-api/operator.h""###, - ) - .await?; - build_package("dora-operator-api-cxx").await?; - let operator_cxxbridge = target - .join("cxxbridge") - .join("dora-operator-api-cxx") - .join("src"); - tokio::fs::copy( - operator_cxxbridge.join("lib.rs.cc"), - build_dir.join("operator-bridge.cc"), - ) - .await?; - tokio::fs::copy( - operator_cxxbridge.join("lib.rs.h"), - build_dir.join("dora-operator-api.h"), - ) - .await?; - - build_package("dora-node-api-c").await?; - build_package("dora-operator-api-c").await?; build_cxx_node( root, &[ @@ -91,47 +64,7 @@ async fn main() -> eyre::Result<()> { ], ) .await?; - build_cxx_node( - root, - &[&dunce::canonicalize( - Path::new("node-c-api").join("main.cc"), - )?], - "node_c_api", - &["-l", "dora_node_api_c"], - ) - .await?; - build_cxx_operator( - &[ - &dunce::canonicalize(Path::new("operator-rust-api").join("operator.cc"))?, - &dunce::canonicalize(build_dir.join("operator-bridge.cc"))?, - ], - "operator_rust_api", - &[ - "-l", - "dora_operator_api_cxx", - "-L", - root.join("target").join("debug").to_str().unwrap(), - &arrow_config.cflags, - &arrow_config.libs, - ], - ) - .await?; - build_cxx_operator( - &[&dunce::canonicalize( - Path::new("operator-c-api").join("operator.cc"), - )?], - "operator_c_api", - &[ - "-l", - "dora_operator_api_c", - "-L", - root.join("target").join("debug").to_str().unwrap(), - ], - ) - .await?; - let dataflow = Path::new("dataflow.yml").to_owned(); - build_package("dora-runtime").await?; run_dataflow(&dataflow).await?; Ok(()) @@ -269,93 +202,3 @@ async fn build_cxx_node( }; Ok(()) } - -async fn build_cxx_operator( - paths: &[&Path], - out_name: &str, - link_args: &[&str], -) -> eyre::Result<()> { - let mut object_file_paths = Vec::new(); - - for path in paths { - let mut compile = tokio::process::Command::new("clang++"); - compile.arg("-c").arg(path); - compile.arg("-std=c++17"); - let object_file_path = path.with_extension("o"); - compile.arg("-o").arg(&object_file_path); - #[cfg(unix)] - compile.arg("-fPIC"); - if let Some(parent) = path.parent() { - compile.current_dir(parent); - } - if !compile.status().await?.success() { - bail!("failed to compile cxx operator"); - }; - object_file_paths.push(object_file_path); - } - - let mut link = tokio::process::Command::new("clang++"); - link.arg("-shared").args(&object_file_paths); - for arg in link_args { - if arg.contains(" ") { - for part in arg.split_whitespace() { - link.arg(part); - } - } else { - link.arg(arg); - } - } - #[cfg(target_os = "windows")] - { - link.arg("-ladvapi32"); - link.arg("-luserenv"); - link.arg("-lkernel32"); - link.arg("-lws2_32"); - link.arg("-lbcrypt"); - link.arg("-lncrypt"); - link.arg("-lschannel"); - link.arg("-lntdll"); - link.arg("-liphlpapi"); - - link.arg("-lcfgmgr32"); - link.arg("-lcredui"); - link.arg("-lcrypt32"); - link.arg("-lcryptnet"); - link.arg("-lfwpuclnt"); - link.arg("-lgdi32"); - link.arg("-lmsimg32"); - link.arg("-lmswsock"); - link.arg("-lole32"); - link.arg("-lopengl32"); - link.arg("-lsecur32"); - link.arg("-lshell32"); - link.arg("-lsynchronization"); - link.arg("-luser32"); - link.arg("-lwinspool"); - - link.arg("-Wl,-nodefaultlib:libcmt"); - link.arg("-D_DLL"); - link.arg("-lmsvcrt"); - link.arg("-fms-runtime-lib=static"); - } - #[cfg(target_os = "macos")] - { - link.arg("-framework").arg("CoreServices"); - link.arg("-framework").arg("Security"); - link.arg("-l").arg("System"); - link.arg("-l").arg("resolv"); - link.arg("-l").arg("pthread"); - link.arg("-l").arg("c"); - link.arg("-l").arg("m"); - } - link.arg("-o") - .arg(Path::new("../build").join(format!("{DLL_PREFIX}{out_name}{DLL_SUFFIX}"))); - if let Some(parent) = paths[0].parent() { - link.current_dir(parent); - } - if !link.status().await?.success() { - bail!("failed to create shared library from cxx operator (c api)"); - }; - - Ok(()) -} diff --git a/examples/c++-dataflow2/node-c-api/main.cc b/examples/c++-dataflow2/node-c-api/main.cc deleted file mode 100644 index 8148bf19..00000000 --- a/examples/c++-dataflow2/node-c-api/main.cc +++ /dev/null @@ -1,85 +0,0 @@ -extern "C" -{ -#include "../../../apis/c/node/node_api.h" -} - -#include -#include - -int run(void *dora_context) -{ - unsigned char counter = 0; - - for (int i = 0; i < 20; i++) - { - void *event = dora_next_event(dora_context); - if (event == NULL) - { - printf("[c node] ERROR: unexpected end of event\n"); - return -1; - } - - enum DoraEventType ty = read_dora_event_type(event); - - if (ty == DoraEventType_Input) - { - counter += 1; - - char *id_ptr; - size_t id_len; - read_dora_input_id(event, &id_ptr, &id_len); - std::string id(id_ptr, id_len); - - char *data_ptr; - size_t data_len; - read_dora_input_data(event, &data_ptr, &data_len); - std::vector data; - for (size_t i = 0; i < data_len; i++) - { - data.push_back(*(data_ptr + i)); - } - - std::cout - << "Received input " - << " (counter: " << (unsigned int)counter << ") data: ["; - for (unsigned char &v : data) - { - std::cout << (unsigned int)v << ", "; - } - std::cout << "]" << std::endl; - - std::vector out_vec{counter}; - std::string out_id = "counter"; - int result = dora_send_output(dora_context, &out_id[0], out_id.length(), (char *)&counter, 1); - if (result != 0) - { - std::cerr << "failed to send output" << std::endl; - return 1; - } - } - else if (ty == DoraEventType_Stop) - { - printf("[c node] received stop event\n"); - } - else - { - printf("[c node] received unexpected event: %d\n", ty); - } - - free_dora_event(event); - } - return 0; -} - -int main() -{ - std::cout << "HELLO FROM C++ (using C API)" << std::endl; - - auto dora_context = init_dora_context_from_env(); - auto ret = run(dora_context); - free_dora_context(dora_context); - - std::cout << "GOODBYE FROM C++ node (using C API)" << std::endl; - - return ret; -} diff --git a/examples/c++-dataflow2/operator-c-api/operator.cc b/examples/c++-dataflow2/operator-c-api/operator.cc deleted file mode 100644 index f15492dd..00000000 --- a/examples/c++-dataflow2/operator-c-api/operator.cc +++ /dev/null @@ -1,79 +0,0 @@ -extern "C" -{ -#include "../../../apis/c/operator/operator_api.h" -} - -#include -#include -#include -#include -#include - -class Operator -{ -public: - Operator(); -}; - -Operator::Operator() {} - -extern "C" DoraInitResult_t dora_init_operator() -{ - Operator *op = std::make_unique().release(); - - DoraInitResult_t result = {.operator_context = (void *)op}; - return result; -} - -extern "C" DoraResult_t dora_drop_operator(void *operator_context) -{ - delete (Operator *)operator_context; - return {}; -} - -extern "C" OnEventResult_t dora_on_event( - RawEvent_t *event, - const SendOutput_t *send_output, - void *operator_context) -{ - if (event->input != NULL) - { - // input event - Input_t *input = event->input; - char *id = dora_read_input_id(input); - - Vec_uint8_t data = dora_read_data(input); - assert(data.ptr != NULL); - - std::cout - << "C++ Operator (C-API) received input `" << id << "` with data: ["; - for (int i = 0; i < data.len; i++) - { - std::cout << (unsigned int)data.ptr[i] << ", "; - } - std::cout << "]" << std::endl; - - const char *out_id = "half-status"; - char *out_id_heap = strdup(out_id); - - size_t out_data_len = 1; - uint8_t *out_data_heap = (uint8_t *)malloc(out_data_len); - *out_data_heap = *data.ptr / 2; - - DoraResult_t send_result = dora_send_operator_output(send_output, out_id_heap, out_data_heap, out_data_len); - - OnEventResult_t result = {.result = send_result, .status = DORA_STATUS_CONTINUE}; - - dora_free_data(data); - dora_free_input_id(id); - - return result; - } - if (event->stop) - { - printf("C operator received stop event\n"); - } - - OnEventResult_t result = {.status = DORA_STATUS_CONTINUE}; - return result; -} diff --git a/examples/c++-dataflow2/operator-rust-api/operator.cc b/examples/c++-dataflow2/operator-rust-api/operator.cc deleted file mode 100644 index 2b812188..00000000 --- a/examples/c++-dataflow2/operator-rust-api/operator.cc +++ /dev/null @@ -1,23 +0,0 @@ -#include "operator.h" -#include -#include -#include "../build/dora-operator-api.h" - -Operator::Operator() {} - -std::unique_ptr new_operator() -{ - return std::make_unique(); -} - -DoraOnInputResult on_input(Operator &op, rust::Str id, rust::Slice data, OutputSender &output_sender) -{ - op.counter += 1; - std::cout << "Rust API operator received input `" << id.data() << "` with data `" << (unsigned int)data[0] << "` (internal counter: " << (unsigned int)op.counter << ")" << std::endl; - - std::vector out_vec{op.counter}; - rust::Slice out_slice{out_vec.data(), out_vec.size()}; - auto send_result = send_output(output_sender, rust::Str("status"), out_slice); - DoraOnInputResult result = {send_result.error, false}; - return result; -} diff --git a/examples/c++-dataflow2/operator-rust-api/operator.h b/examples/c++-dataflow2/operator-rust-api/operator.h deleted file mode 100644 index 9b5e3ab2..00000000 --- a/examples/c++-dataflow2/operator-rust-api/operator.h +++ /dev/null @@ -1,16 +0,0 @@ -#pragma once -#include -#include "../../../apis/c/operator/operator_api.h" - -class Operator -{ -public: - Operator(); - unsigned char counter; -}; - -#include "../build/dora-operator-api.h" - -std::unique_ptr new_operator(); - -DoraOnInputResult on_input(Operator &op, rust::Str id, rust::Slice data, OutputSender &output_sender); From 3f7010ce250c82d3c6070b8761486e710501e3fe Mon Sep 17 00:00:00 2001 From: Enzo Le Van Date: Wed, 26 Mar 2025 10:51:50 +0100 Subject: [PATCH 09/12] Update .github/workflows/ci.yml --- .github/workflows/ci.yml | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 771bab37..abe81a85 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -127,23 +127,23 @@ jobs: - name: "C++ Dataflow example" timeout-minutes: 15 run: cargo run --example cxx-dataflow - - name: "Install Arrow C++ Library" - timeout-minutes: 10 - shell: bash - run: | - if [ "$RUNNER_OS" == "Linux" ]; then - # For Ubuntu - sudo apt-get update - sudo apt-get install -y -V ca-certificates lsb-release wget - wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt-get update - sudo apt-get install -y -V libarrow-dev libarrow-glib-dev - elif [ "$RUNNER_OS" == "macOS" ]; then - # For macOS - brew update - brew install apache-arrow - fi + - name: "Install Arrow C++ Library" + timeout-minutes: 10 + shell: bash + run: | + if [ "$RUNNER_OS" == "Linux" ]; then + # For Ubuntu + sudo apt-get update + sudo apt-get install -y -V ca-certificates lsb-release wget + wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get update + sudo apt-get install -y -V libarrow-dev libarrow-glib-dev + elif [ "$RUNNER_OS" == "macOS" ]; then + # For macOS + brew update + brew install apache-arrow + fi - name: "C++ Dataflow2 example" timeout-minutes: 15 run : cargo run --example cxx-dataflow2 From f79fd079aa1c017572fcdb68fa454bb735249891 Mon Sep 17 00:00:00 2001 From: Enzo Le Van Date: Wed, 26 Mar 2025 10:58:34 +0100 Subject: [PATCH 10/12] Update ci.yml Fix CI indentation --- .github/workflows/ci.yml | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index abe81a85..867dfb1a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -127,26 +127,26 @@ jobs: - name: "C++ Dataflow example" timeout-minutes: 15 run: cargo run --example cxx-dataflow - - name: "Install Arrow C++ Library" - timeout-minutes: 10 - shell: bash - run: | - if [ "$RUNNER_OS" == "Linux" ]; then - # For Ubuntu - sudo apt-get update - sudo apt-get install -y -V ca-certificates lsb-release wget - wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt-get update - sudo apt-get install -y -V libarrow-dev libarrow-glib-dev - elif [ "$RUNNER_OS" == "macOS" ]; then - # For macOS - brew update - brew install apache-arrow - fi + - name: "Install Arrow C++ Library" + timeout-minutes: 10 + shell: bash + run: | + if [ "$RUNNER_OS" == "Linux" ]; then + # For Ubuntu + sudo apt-get update + sudo apt-get install -y -V ca-certificates lsb-release wget + wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get update + sudo apt-get install -y -V libarrow-dev libarrow-glib-dev + elif [ "$RUNNER_OS" == "macOS" ]; then + # For macOS + brew update + brew install apache-arrow + fi - name: "C++ Dataflow2 example" timeout-minutes: 15 - run : cargo run --example cxx-dataflow2 + run: cargo run --example cxx-dataflow2 - name: "Cmake example" if: runner.os == 'Linux' timeout-minutes: 30 @@ -386,7 +386,7 @@ jobs: dora destroy - # Run Python queue latency test + # Run Python queue latency test echo "Running CI Queue Latency Test" dora run tests/queue_size_latest_data_python/dataflow.yaml --uv @@ -394,7 +394,7 @@ jobs: echo "Running CI Queue + Timeout Test" dora run tests/queue_size_and_timeout_python/dataflow.yaml --uv - # Run Rust queue latency test + # Run Rust queue latency test echo "Running CI Queue Size Latest Data Rust Test" dora build tests/queue_size_latest_data_rust/dataflow.yaml --uv dora run tests/queue_size_latest_data_rust/dataflow.yaml --uv From 164b3df49f9242638373be48c361705c48cb08e2 Mon Sep 17 00:00:00 2001 From: Enzo Le Van Date: Wed, 26 Mar 2025 11:07:49 +0100 Subject: [PATCH 11/12] cxx-dataflow2 -> cxx-arrow-dataflow --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 867dfb1a..928b81f6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -146,7 +146,7 @@ jobs: fi - name: "C++ Dataflow2 example" timeout-minutes: 15 - run: cargo run --example cxx-dataflow2 + run: cargo run --example cxx-arrow-dataflow - name: "Cmake example" if: runner.os == 'Linux' timeout-minutes: 30 From 3f1fa6450474f51a43735687327b90dfd3bf9b2c Mon Sep 17 00:00:00 2001 From: Haixuan Xavier Tao Date: Wed, 26 Mar 2025 13:24:27 +0100 Subject: [PATCH 12/12] Use cxx-arow-dataflow name --- examples/c++-arrow-dataflow/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/c++-arrow-dataflow/README.md b/examples/c++-arrow-dataflow/README.md index 1227c00a..4c8bda1f 100644 --- a/examples/c++-arrow-dataflow/README.md +++ b/examples/c++-arrow-dataflow/README.md @@ -29,5 +29,5 @@ fi ## Compile and Run -To try it out, you can use the [`run.rs`](./run.rs) binary. It performs all required build steps and then starts the dataflow. Use the following command to run it: `cargo run --example cxx-dataflow2`. For manual build, check build system for +To try it out, you can use the [`run.rs`](./run.rs) binary. It performs all required build steps and then starts the dataflow. Use the following command to run it: `cargo run --example cxx-arow-dataflow`. For manual build, check build system for `cxx-dataflow` example.