diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index 18df97d7..a7ec233a 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -12,7 +12,6 @@ use eyre::bail; use dora_ros2_bridge::{_core, ros2_client}; use futures_lite::{stream, Stream, StreamExt}; - #[cxx::bridge] #[allow(clippy::needless_lifetimes)] mod ffi { @@ -76,13 +75,13 @@ mod ffi { unsafe fn send_arrow_output( output_sender: &mut Box, id: String, - array_ptr: *mut u8, - schema_ptr: *mut u8, + array_ptr: *mut u8, + schema_ptr: *mut u8, ) -> DoraResult; - + unsafe fn event_as_arrow_input( - event: Box, - out_array: *mut u8, + event: Box, + out_array: *mut u8, out_schema: *mut u8, ) -> DoraResult; } @@ -180,7 +179,7 @@ fn event_as_input(event: Box) -> eyre::Result { } unsafe fn event_as_arrow_input( - event: Box, + event: Box, out_array: *mut u8, out_schema: *mut u8, ) -> ffi::DoraResult { @@ -188,29 +187,36 @@ unsafe fn event_as_arrow_input( let out_array = out_array as *mut arrow::ffi::FFI_ArrowArray; let out_schema = out_schema as *mut arrow::ffi::FFI_ArrowSchema; - let Some(Event::Input { id: _, metadata: _, data }) = event.0 else { - return ffi::DoraResult { error: "Not an input event".to_string() }; + let Some(Event::Input { + id: _, + metadata: _, + data, + }) = event.0 + else { + return ffi::DoraResult { + error: "Not an input event".to_string(), + }; }; - + if out_array.is_null() || out_schema.is_null() { - return ffi::DoraResult { - error: "Received null output pointer".to_string() + return ffi::DoraResult { + error: "Received null output pointer".to_string(), }; } - + let array_data = data.to_data(); - + match arrow::ffi::to_ffi(&array_data.clone()) { Ok((ffi_array, ffi_schema)) => { std::ptr::write(out_array, ffi_array); std::ptr::write(out_schema, ffi_schema); - ffi::DoraResult { error: String::new() } - }, - Err(e) => { ffi::DoraResult { - error: format!("Error exporting Arrow array to C++: {:?}", e) + error: String::new(), } } + Err(e) => ffi::DoraResult { + error: format!("Error exporting Arrow array to C++: {:?}", e), + }, } } @@ -236,38 +242,42 @@ pub struct MergedEvents { unsafe fn send_arrow_output( sender: &mut Box, id: String, - array_ptr: *mut u8, - schema_ptr: *mut u8 + array_ptr: *mut u8, + schema_ptr: *mut u8, ) -> ffi::DoraResult { let array_ptr = array_ptr as *mut arrow::ffi::FFI_ArrowArray; let schema_ptr = schema_ptr as *mut arrow::ffi::FFI_ArrowSchema; if array_ptr.is_null() || schema_ptr.is_null() { - return ffi::DoraResult { - error: "Received null Arrow array or schema pointer".to_string() + return ffi::DoraResult { + error: "Received null Arrow array or schema pointer".to_string(), }; } - + let array = std::ptr::read(array_ptr); let schema = std::ptr::read(schema_ptr); - + std::ptr::write(array_ptr, std::mem::zeroed()); std::ptr::write(schema_ptr, std::mem::zeroed()); - + match arrow::ffi::from_ffi(array, &schema) { Ok(array_data) => { let arrow_array = arrow::array::make_array(array_data); - let result = sender.0.send_output(id.into(), Default::default(), arrow_array); + let result = sender + .0 + .send_output(id.into(), Default::default(), arrow_array); match result { - Ok(()) => ffi::DoraResult { error: String::new() }, - Err(err) => ffi::DoraResult { error: format!("{err:?}") }, - } - }, - Err(e) => { - ffi::DoraResult { - error: format!("Error importing array from C++: {:?}", e) + Ok(()) => ffi::DoraResult { + error: String::new(), + }, + Err(err) => ffi::DoraResult { + error: format!("{err:?}"), + }, } } + Err(e) => ffi::DoraResult { + error: format!("Error importing array from C++: {:?}", e), + }, } } diff --git a/examples/c++-dataflow2/run.rs b/examples/c++-dataflow2/run.rs index 3413500e..b1a9e999 100644 --- a/examples/c++-dataflow2/run.rs +++ b/examples/c++-dataflow2/run.rs @@ -3,7 +3,7 @@ use eyre::{bail, Context}; use std::{ env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, path::Path, - process::Command, + process::Command, }; struct ArrowConfig { @@ -23,7 +23,11 @@ async fn main() -> eyre::Result<()> { } let arrow_config = find_arrow_config().wrap_err("Failed to find Arrow configuration")?; - tracing::info!("Found Arrow configuration: cflags={}, libs={}", arrow_config.cflags, arrow_config.libs); + tracing::info!( + "Found Arrow configuration: cflags={}, libs={}", + arrow_config.cflags, + arrow_config.libs + ); let root = Path::new(env!("CARGO_MANIFEST_DIR")); let target = root.join("target"); @@ -79,7 +83,12 @@ async fn main() -> eyre::Result<()> { &dunce::canonicalize(build_dir.join("node-bridge.cc"))?, ], "node_rust_api", - &["-l", "dora_node_api_cxx", &arrow_config.cflags, &arrow_config.libs], + &[ + "-l", + "dora_node_api_cxx", + &arrow_config.cflags, + &arrow_config.libs, + ], ) .await?; build_cxx_node( @@ -98,9 +107,12 @@ async fn main() -> eyre::Result<()> { ], "operator_rust_api", &[ - "-l", "dora_operator_api_cxx", - "-L", root.join("target").join("debug").to_str().unwrap(), - &arrow_config.cflags, &arrow_config.libs, + "-l", + "dora_operator_api_cxx", + "-L", + root.join("target").join("debug").to_str().unwrap(), + &arrow_config.cflags, + &arrow_config.libs, ], ) .await?; @@ -110,8 +122,10 @@ async fn main() -> eyre::Result<()> { )?], "operator_c_api", &[ - "-l", "dora_operator_api_c", - "-L", root.join("target").join("debug").to_str().unwrap(), + "-l", + "dora_operator_api_c", + "-L", + root.join("target").join("debug").to_str().unwrap(), ], ) .await?; @@ -124,29 +138,28 @@ async fn main() -> eyre::Result<()> { } fn find_arrow_config() -> eyre::Result { - let output = Command::new("pkg-config") .args(&["--cflags", "arrow"]) .output() .wrap_err("Failed to run pkg-config. Make sure Arrow C++ is installed")?; - + if !output.status.success() { bail!("Arrow C++ not found via pkg-config. Make sure it's installed and in your PKG_CONFIG_PATH"); } - + let cflags = String::from_utf8(output.stdout)?.trim().to_string(); - + let output = Command::new("pkg-config") .args(&["--libs", "arrow"]) .output() .wrap_err("Failed to get Arrow library flags")?; - + if !output.status.success() { bail!("Failed to get Arrow library flags"); } - + let libs = String::from_utf8(output.stdout)?.trim().to_string(); - + Ok(ArrowConfig { cflags, libs }) }