Related to #854. In this, we are using Arrow::FFI module to send and recieve data from rust. Two functions are defined named as event_as_arrow_input() and send_arrow_output(). Both send data in form of [FFI_ArrowArray](https://arrow.apache.org/rust/arrow/ffi/struct.FFI_ArrowArray.html) along with its [FFI_ArrowSchema](https://arrow.apache.org/rust/arrow_schema/ffi/struct.FFI_ArrowSchema.html). These functions provide interface through cxx. One issue i faced was that cxx doesn't support sending opaque types in format so had to use raw pointers for transfer. DataTypes mentioned [here](https://arrow.apache.org/docs/format/CDataInterface.html) are supported to transfer using these functions. Due to my exams i have only tested with INT32 and String. I have added a Simple example using these functions in dora/examples/C++-DATAFLOW2. Currently i am supposing arrow is installed on user system while running above command as build command looks for arrow in user pc. This approach can be changed with downloading package at build and placing it locally in folder for use. We can run example using `cargo run --example cxx-dataflow2 ` Previous api was left unchanged and still works the same. I tested using previous examples.tags/v0.3.11-rc1
| @@ -127,6 +127,26 @@ jobs: | |||
| - name: "C++ Dataflow example" | |||
| timeout-minutes: 15 | |||
| run: cargo run --example cxx-dataflow | |||
| - name: "Install Arrow C++ Library" | |||
| timeout-minutes: 10 | |||
| shell: bash | |||
| run: | | |||
| if [ "$RUNNER_OS" == "Linux" ]; then | |||
| # For Ubuntu | |||
| sudo apt-get update | |||
| sudo apt-get install -y -V ca-certificates lsb-release wget | |||
| wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb | |||
| sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb | |||
| sudo apt-get update | |||
| sudo apt-get install -y -V libarrow-dev libarrow-glib-dev | |||
| elif [ "$RUNNER_OS" == "macOS" ]; then | |||
| # For macOS | |||
| brew update | |||
| brew install apache-arrow | |||
| fi | |||
| - name: "C++ Dataflow2 example" | |||
| timeout-minutes: 15 | |||
| run: cargo run --example cxx-arrow-dataflow | |||
| - name: "Cmake example" | |||
| if: runner.os == 'Linux' | |||
| timeout-minutes: 30 | |||
| @@ -366,7 +386,7 @@ jobs: | |||
| dora destroy | |||
| # Run Python queue latency test | |||
| # Run Python queue latency test | |||
| echo "Running CI Queue Latency Test" | |||
| dora run tests/queue_size_latest_data_python/dataflow.yaml --uv | |||
| @@ -374,7 +394,7 @@ jobs: | |||
| echo "Running CI Queue + Timeout Test" | |||
| dora run tests/queue_size_and_timeout_python/dataflow.yaml --uv | |||
| # Run Rust queue latency test | |||
| # Run Rust queue latency test | |||
| echo "Running CI Queue Size Latest Data Rust Test" | |||
| dora build tests/queue_size_latest_data_rust/dataflow.yaml --uv | |||
| dora run tests/queue_size_latest_data_rust/dataflow.yaml --uv | |||
| @@ -3173,6 +3173,7 @@ dependencies = [ | |||
| name = "dora-node-api-cxx" | |||
| version = "0.3.10" | |||
| dependencies = [ | |||
| "arrow 54.2.1", | |||
| "cxx", | |||
| "cxx-build", | |||
| "dora-node-api", | |||
| @@ -146,6 +146,10 @@ path = "examples/rust-dataflow-url/run.rs" | |||
| name = "cxx-dataflow" | |||
| path = "examples/c++-dataflow/run.rs" | |||
| [[example]] | |||
| name = "cxx-arrow-dataflow" | |||
| path = "examples/c++-arrow-dataflow/run.rs" | |||
| [[example]] | |||
| name = "python-dataflow" | |||
| path = "examples/python-dataflow/run.rs" | |||
| @@ -32,6 +32,7 @@ dora-ros2-bridge = { workspace = true, optional = true } | |||
| futures-lite = { version = "2.2" } | |||
| serde = { version = "1.0.164", features = ["derive"], optional = true } | |||
| serde-big-array = { version = "0.5.1", optional = true } | |||
| arrow = { workspace = true, features = ["ffi"] } | |||
| [build-dependencies] | |||
| cxx-build = "1.0.73" | |||
| @@ -71,9 +71,26 @@ mod ffi { | |||
| fn is_dora(self: &CombinedEvent) -> bool; | |||
| fn downcast_dora(event: CombinedEvent) -> Result<Box<DoraEvent>>; | |||
| unsafe fn send_arrow_output( | |||
| output_sender: &mut Box<OutputSender>, | |||
| id: String, | |||
| array_ptr: *mut u8, | |||
| schema_ptr: *mut u8, | |||
| ) -> DoraResult; | |||
| unsafe fn event_as_arrow_input( | |||
| event: Box<DoraEvent>, | |||
| out_array: *mut u8, | |||
| out_schema: *mut u8, | |||
| ) -> DoraResult; | |||
| } | |||
| } | |||
| mod arrow_ffi { | |||
| pub use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; | |||
| } | |||
| #[cfg(feature = "ros2-bridge")] | |||
| pub mod ros2 { | |||
| pub use dora_ros2_bridge::*; | |||
| @@ -161,6 +178,48 @@ fn event_as_input(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraInput> { | |||
| }) | |||
| } | |||
| unsafe fn event_as_arrow_input( | |||
| event: Box<DoraEvent>, | |||
| out_array: *mut u8, | |||
| out_schema: *mut u8, | |||
| ) -> ffi::DoraResult { | |||
| // Cast to Arrow FFI types | |||
| let out_array = out_array as *mut arrow::ffi::FFI_ArrowArray; | |||
| let out_schema = out_schema as *mut arrow::ffi::FFI_ArrowSchema; | |||
| let Some(Event::Input { | |||
| id: _, | |||
| metadata: _, | |||
| data, | |||
| }) = event.0 | |||
| else { | |||
| return ffi::DoraResult { | |||
| error: "Not an input event".to_string(), | |||
| }; | |||
| }; | |||
| if out_array.is_null() || out_schema.is_null() { | |||
| return ffi::DoraResult { | |||
| error: "Received null output pointer".to_string(), | |||
| }; | |||
| } | |||
| let array_data = data.to_data(); | |||
| match arrow::ffi::to_ffi(&array_data.clone()) { | |||
| Ok((ffi_array, ffi_schema)) => { | |||
| std::ptr::write(out_array, ffi_array); | |||
| std::ptr::write(out_schema, ffi_schema); | |||
| ffi::DoraResult { | |||
| error: String::new(), | |||
| } | |||
| } | |||
| Err(e) => ffi::DoraResult { | |||
| error: format!("Error exporting Arrow array to C++: {:?}", e), | |||
| }, | |||
| } | |||
| } | |||
| pub struct OutputSender(dora_node_api::DoraNode); | |||
| fn send_output(sender: &mut Box<OutputSender>, id: String, data: &[u8]) -> ffi::DoraResult { | |||
| @@ -180,6 +239,47 @@ pub struct MergedEvents { | |||
| events: Option<Box<dyn Stream<Item = MergedEvent<ExternalEvent>> + Unpin>>, | |||
| next_id: u32, | |||
| } | |||
| unsafe fn send_arrow_output( | |||
| sender: &mut Box<OutputSender>, | |||
| id: String, | |||
| array_ptr: *mut u8, | |||
| schema_ptr: *mut u8, | |||
| ) -> ffi::DoraResult { | |||
| let array_ptr = array_ptr as *mut arrow::ffi::FFI_ArrowArray; | |||
| let schema_ptr = schema_ptr as *mut arrow::ffi::FFI_ArrowSchema; | |||
| if array_ptr.is_null() || schema_ptr.is_null() { | |||
| return ffi::DoraResult { | |||
| error: "Received null Arrow array or schema pointer".to_string(), | |||
| }; | |||
| } | |||
| let array = std::ptr::read(array_ptr); | |||
| let schema = std::ptr::read(schema_ptr); | |||
| std::ptr::write(array_ptr, std::mem::zeroed()); | |||
| std::ptr::write(schema_ptr, std::mem::zeroed()); | |||
| match arrow::ffi::from_ffi(array, &schema) { | |||
| Ok(array_data) => { | |||
| let arrow_array = arrow::array::make_array(array_data); | |||
| let result = sender | |||
| .0 | |||
| .send_output(id.into(), Default::default(), arrow_array); | |||
| match result { | |||
| Ok(()) => ffi::DoraResult { | |||
| error: String::new(), | |||
| }, | |||
| Err(err) => ffi::DoraResult { | |||
| error: format!("{err:?}"), | |||
| }, | |||
| } | |||
| } | |||
| Err(e) => ffi::DoraResult { | |||
| error: format!("Error importing array from C++: {:?}", e), | |||
| }, | |||
| } | |||
| } | |||
| impl MergedEvents { | |||
| fn next(&mut self) -> MergedDoraEvent { | |||
| @@ -0,0 +1 @@ | |||
| *.o | |||
| @@ -0,0 +1,33 @@ | |||
| # Dora C++ Dataflow Example 2 | |||
| This example demonstrates how to exchange data between Dora's Rust-based runtime and C++ using Apache Arrow arrays. Through the event_as_arrow_input() and send_arrow_output() functions exposed in the dora-node-api.h header, your C++ nodes can efficiently receive and send structured data within the Dora dataflow system. These functions leverage Apache Arrow's memory-efficient serialization format, allowing data to move seamlessly across language boundaries. | |||
| ## Required System Dependencies | |||
| - **Apache Arrow C++ Library**: Version 19.0.1 or later | |||
| Installing it should look like this: | |||
| ### For Ubuntu | |||
| ```bash | |||
| sudo apt-get update | |||
| sudo apt-get install -y -V ca-certificates lsb-release wget | |||
| wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb | |||
| sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb | |||
| sudo apt-get update | |||
| sudo apt-get install -y -V libarrow-dev libarrow-glib-dev | |||
| ``` | |||
| ### For macOS | |||
| ```bash | |||
| brew update | |||
| brew install apache-arrow | |||
| fi | |||
| ``` | |||
| ## Compile and Run | |||
| To try it out, you can use the [`run.rs`](./run.rs) binary. It performs all required build steps and then starts the dataflow. Use the following command to run it: `cargo run --example cxx-arow-dataflow`. For manual build, check build system for | |||
| `cxx-dataflow` example. | |||
| @@ -0,0 +1,14 @@ | |||
| nodes: | |||
| - id: cxx-node-rust-api | |||
| path: build/node_rust_api | |||
| inputs: | |||
| tick: dora/timer/millis/300 | |||
| outputs: | |||
| - counter | |||
| - id: cxx-node-rust2-api | |||
| path: build/node_rust_api | |||
| inputs: | |||
| tick: cxx-node-rust-api/counter | |||
| outputs: | |||
| - counter | |||
| @@ -0,0 +1,112 @@ | |||
| #include "../build/dora-node-api.h" | |||
| #include <arrow/api.h> | |||
| #include <arrow/c/bridge.h> | |||
| #include <iostream> | |||
| #include <memory> | |||
| #include <string> | |||
| #include <vector> | |||
| std::shared_ptr<arrow::Array> receive_and_print_input(rust::cxxbridge1::Box<DoraEvent> event) { | |||
| std::cout << "Received input event" << std::endl; | |||
| struct ArrowArray c_array; | |||
| struct ArrowSchema c_schema; | |||
| auto result = event_as_arrow_input( | |||
| std::move(event), | |||
| reinterpret_cast<uint8_t*>(&c_array), | |||
| reinterpret_cast<uint8_t*>(&c_schema) | |||
| ); | |||
| if (!result.error.empty()) { | |||
| std::cerr << "Error getting Arrow array: " << std::endl; | |||
| return nullptr; | |||
| } | |||
| auto result2 = arrow::ImportArray(&c_array, &c_schema); | |||
| std::shared_ptr<arrow::Array> input_array = result2.ValueOrDie(); | |||
| std::cout << "Received Arrow array: " << input_array->ToString() << std::endl; | |||
| std::cout << "Array details: type=" << input_array->type()->ToString() | |||
| << ", length=" << input_array->length() << std::endl; | |||
| return input_array; | |||
| } | |||
| // To send output | |||
| bool send_output(DoraNode& dora_node, std::shared_ptr<arrow::Array> output_array) { | |||
| if (!output_array) { | |||
| std::cerr << "Error: Attempted to send a null Arrow array" << std::endl; | |||
| return false; | |||
| } | |||
| struct ArrowArray out_c_array; | |||
| struct ArrowSchema out_c_schema; | |||
| arrow::ExportArray(*output_array, &out_c_array, &out_c_schema); | |||
| auto send_result = send_arrow_output( | |||
| dora_node.send_output, | |||
| "counter", | |||
| reinterpret_cast<uint8_t*>(&out_c_array), | |||
| reinterpret_cast<uint8_t*>(&out_c_schema) | |||
| ); | |||
| if (!send_result.error.empty()) { | |||
| std::string error_message(send_result.error); | |||
| std::cerr << "Error sending Arrow array: " << error_message << std::endl; | |||
| return false; | |||
| } | |||
| return true; | |||
| } | |||
| int main() { | |||
| try { | |||
| auto dora_node = init_dora_node(); | |||
| std::cout << "Dora node initialized successfully" << std::endl; | |||
| int counter=0; | |||
| while (counter<10) { | |||
| counter++; | |||
| auto event = dora_node.events->next(); | |||
| auto type = event_type(event); | |||
| if (type == DoraEventType::Stop) { | |||
| std::cout << "Received stop event, exiting" << std::endl; | |||
| break; | |||
| } | |||
| else if (type == DoraEventType::AllInputsClosed) { | |||
| std::cout << "All inputs closed, exiting" << std::endl; | |||
| break; | |||
| } | |||
| else if (type == DoraEventType::Input) { | |||
| std::shared_ptr<arrow::Array> input_array = receive_and_print_input(std::move(event)); | |||
| std::shared_ptr<arrow::Array> output_array; | |||
| arrow::Int32Builder builder; | |||
| builder.Append(10); | |||
| builder.Append(100); | |||
| builder.Append(1000); | |||
| builder.Finish(&output_array); | |||
| std::cout << "Created new string array: " << output_array->ToString() << std::endl; | |||
| //Printing Before sending | |||
| auto str_array = std::static_pointer_cast<arrow::Int32Array>(output_array); | |||
| std::cout << "Values: ["; | |||
| for (int i = 0; i < str_array->length(); i++) { | |||
| if (i > 0) std::cout << ", "; | |||
| std::cout << str_array->Value(i); | |||
| } | |||
| std::cout << "]" << std::endl; | |||
| send_output(dora_node, output_array); | |||
| } | |||
| } | |||
| return 0; | |||
| } | |||
| catch (const std::exception& e) { | |||
| std::cerr << "Error: " << e.what() << std::endl; | |||
| return 1; | |||
| } | |||
| } | |||
| @@ -0,0 +1,204 @@ | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, Context}; | |||
| use std::{env::consts::EXE_SUFFIX, path::Path, process::Command}; | |||
| struct ArrowConfig { | |||
| cflags: String, | |||
| libs: String, | |||
| } | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| set_up_tracing("c++-dataflow-runner").wrap_err("failed to set up tracing")?; | |||
| if cfg!(windows) { | |||
| tracing::error!( | |||
| "The c++ example does not work on Windows currently because of a linker error" | |||
| ); | |||
| return Ok(()); | |||
| } | |||
| let arrow_config = find_arrow_config().wrap_err("Failed to find Arrow configuration")?; | |||
| tracing::info!( | |||
| "Found Arrow configuration: cflags={}, libs={}", | |||
| arrow_config.cflags, | |||
| arrow_config.libs | |||
| ); | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let target = root.join("target"); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| tokio::fs::create_dir_all("build").await?; | |||
| let build_dir = Path::new("build"); | |||
| build_package("dora-node-api-cxx").await?; | |||
| let node_cxxbridge = target | |||
| .join("cxxbridge") | |||
| .join("dora-node-api-cxx") | |||
| .join("src"); | |||
| tokio::fs::copy( | |||
| node_cxxbridge.join("lib.rs.cc"), | |||
| build_dir.join("node-bridge.cc"), | |||
| ) | |||
| .await?; | |||
| tokio::fs::copy( | |||
| node_cxxbridge.join("lib.rs.h"), | |||
| build_dir.join("dora-node-api.h"), | |||
| ) | |||
| .await?; | |||
| build_cxx_node( | |||
| root, | |||
| &[ | |||
| &dunce::canonicalize(Path::new("node-rust-api").join("main.cc"))?, | |||
| &dunce::canonicalize(build_dir.join("node-bridge.cc"))?, | |||
| ], | |||
| "node_rust_api", | |||
| &[ | |||
| "-l", | |||
| "dora_node_api_cxx", | |||
| &arrow_config.cflags, | |||
| &arrow_config.libs, | |||
| ], | |||
| ) | |||
| .await?; | |||
| let dataflow = Path::new("dataflow.yml").to_owned(); | |||
| run_dataflow(&dataflow).await?; | |||
| Ok(()) | |||
| } | |||
| fn find_arrow_config() -> eyre::Result<ArrowConfig> { | |||
| let output = Command::new("pkg-config") | |||
| .args(&["--cflags", "arrow"]) | |||
| .output() | |||
| .wrap_err("Failed to run pkg-config. Make sure Arrow C++ is installed")?; | |||
| if !output.status.success() { | |||
| bail!("Arrow C++ not found via pkg-config. Make sure it's installed and in your PKG_CONFIG_PATH"); | |||
| } | |||
| let cflags = String::from_utf8(output.stdout)?.trim().to_string(); | |||
| let output = Command::new("pkg-config") | |||
| .args(&["--libs", "arrow"]) | |||
| .output() | |||
| .wrap_err("Failed to get Arrow library flags")?; | |||
| if !output.status.success() { | |||
| bail!("Failed to get Arrow library flags"); | |||
| } | |||
| let libs = String::from_utf8(output.stdout)?.trim().to_string(); | |||
| Ok(ArrowConfig { cflags, libs }) | |||
| } | |||
| async fn build_package(package: &str) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("build"); | |||
| cmd.arg("--package").arg(package); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build {package}"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| async fn build_cxx_node( | |||
| root: &Path, | |||
| paths: &[&Path], | |||
| out_name: &str, | |||
| args: &[&str], | |||
| ) -> eyre::Result<()> { | |||
| let mut clang = tokio::process::Command::new("clang++"); | |||
| clang.args(paths); | |||
| clang.arg("-std=c++17"); | |||
| #[cfg(target_os = "linux")] | |||
| { | |||
| clang.arg("-l").arg("m"); | |||
| clang.arg("-l").arg("rt"); | |||
| clang.arg("-l").arg("dl"); | |||
| clang.arg("-pthread"); | |||
| } | |||
| #[cfg(target_os = "windows")] | |||
| { | |||
| clang.arg("-ladvapi32"); | |||
| clang.arg("-luserenv"); | |||
| clang.arg("-lkernel32"); | |||
| clang.arg("-lws2_32"); | |||
| clang.arg("-lbcrypt"); | |||
| clang.arg("-lncrypt"); | |||
| clang.arg("-lschannel"); | |||
| clang.arg("-lntdll"); | |||
| clang.arg("-liphlpapi"); | |||
| clang.arg("-lcfgmgr32"); | |||
| clang.arg("-lcredui"); | |||
| clang.arg("-lcrypt32"); | |||
| clang.arg("-lcryptnet"); | |||
| clang.arg("-lfwpuclnt"); | |||
| clang.arg("-lgdi32"); | |||
| clang.arg("-lmsimg32"); | |||
| clang.arg("-lmswsock"); | |||
| clang.arg("-lole32"); | |||
| clang.arg("-lopengl32"); | |||
| clang.arg("-lsecur32"); | |||
| clang.arg("-lshell32"); | |||
| clang.arg("-lsynchronization"); | |||
| clang.arg("-luser32"); | |||
| clang.arg("-lwinspool"); | |||
| clang.arg("-Wl,-nodefaultlib:libcmt"); | |||
| clang.arg("-D_DLL"); | |||
| clang.arg("-lmsvcrt"); | |||
| } | |||
| #[cfg(target_os = "macos")] | |||
| { | |||
| clang.arg("-framework").arg("CoreServices"); | |||
| clang.arg("-framework").arg("Security"); | |||
| clang.arg("-l").arg("System"); | |||
| clang.arg("-l").arg("resolv"); | |||
| clang.arg("-l").arg("pthread"); | |||
| clang.arg("-l").arg("c"); | |||
| clang.arg("-l").arg("m"); | |||
| } | |||
| for arg in args { | |||
| if arg.contains(" ") { | |||
| for part in arg.split_whitespace() { | |||
| clang.arg(part); | |||
| } | |||
| } else { | |||
| clang.arg(arg); | |||
| } | |||
| } | |||
| clang.arg("-L").arg(root.join("target").join("debug")); | |||
| clang | |||
| .arg("--output") | |||
| .arg(Path::new("../build").join(format!("{out_name}{EXE_SUFFIX}"))); | |||
| if let Some(parent) = paths[0].parent() { | |||
| clang.current_dir(parent); | |||
| } | |||
| if !clang.status().await?.success() { | |||
| bail!("failed to compile c++ node"); | |||
| }; | |||
| Ok(()) | |||
| } | |||