Browse Source

formatting

tags/v0.3.11-rc1
Mati-ur-rehman-017 haixuantao 10 months ago
parent
commit
8e48debcad
2 changed files with 71 additions and 48 deletions
  1. +43
    -33
      apis/c++/node/src/lib.rs
  2. +28
    -15
      examples/c++-dataflow2/run.rs

+ 43
- 33
apis/c++/node/src/lib.rs View File

@@ -12,7 +12,6 @@ use eyre::bail;
use dora_ros2_bridge::{_core, ros2_client};
use futures_lite::{stream, Stream, StreamExt};


#[cxx::bridge]
#[allow(clippy::needless_lifetimes)]
mod ffi {
@@ -76,13 +75,13 @@ mod ffi {
unsafe fn send_arrow_output(
output_sender: &mut Box<OutputSender>,
id: String,
array_ptr: *mut u8,
schema_ptr: *mut u8,
array_ptr: *mut u8,
schema_ptr: *mut u8,
) -> DoraResult;
unsafe fn event_as_arrow_input(
event: Box<DoraEvent>,
out_array: *mut u8,
event: Box<DoraEvent>,
out_array: *mut u8,
out_schema: *mut u8,
) -> DoraResult;
}
@@ -180,7 +179,7 @@ fn event_as_input(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraInput> {
}

unsafe fn event_as_arrow_input(
event: Box<DoraEvent>,
event: Box<DoraEvent>,
out_array: *mut u8,
out_schema: *mut u8,
) -> ffi::DoraResult {
@@ -188,29 +187,36 @@ unsafe fn event_as_arrow_input(
let out_array = out_array as *mut arrow::ffi::FFI_ArrowArray;
let out_schema = out_schema as *mut arrow::ffi::FFI_ArrowSchema;

let Some(Event::Input { id: _, metadata: _, data }) = event.0 else {
return ffi::DoraResult { error: "Not an input event".to_string() };
let Some(Event::Input {
id: _,
metadata: _,
data,
}) = event.0
else {
return ffi::DoraResult {
error: "Not an input event".to_string(),
};
};
if out_array.is_null() || out_schema.is_null() {
return ffi::DoraResult {
error: "Received null output pointer".to_string()
return ffi::DoraResult {
error: "Received null output pointer".to_string(),
};
}
let array_data = data.to_data();
match arrow::ffi::to_ffi(&array_data.clone()) {
Ok((ffi_array, ffi_schema)) => {
std::ptr::write(out_array, ffi_array);
std::ptr::write(out_schema, ffi_schema);
ffi::DoraResult { error: String::new() }
},
Err(e) => {
ffi::DoraResult {
error: format!("Error exporting Arrow array to C++: {:?}", e)
error: String::new(),
}
}
Err(e) => ffi::DoraResult {
error: format!("Error exporting Arrow array to C++: {:?}", e),
},
}
}

@@ -236,38 +242,42 @@ pub struct MergedEvents {
unsafe fn send_arrow_output(
sender: &mut Box<OutputSender>,
id: String,
array_ptr: *mut u8,
schema_ptr: *mut u8
array_ptr: *mut u8,
schema_ptr: *mut u8,
) -> ffi::DoraResult {
let array_ptr = array_ptr as *mut arrow::ffi::FFI_ArrowArray;
let schema_ptr = schema_ptr as *mut arrow::ffi::FFI_ArrowSchema;

if array_ptr.is_null() || schema_ptr.is_null() {
return ffi::DoraResult {
error: "Received null Arrow array or schema pointer".to_string()
return ffi::DoraResult {
error: "Received null Arrow array or schema pointer".to_string(),
};
}
let array = std::ptr::read(array_ptr);
let schema = std::ptr::read(schema_ptr);
std::ptr::write(array_ptr, std::mem::zeroed());
std::ptr::write(schema_ptr, std::mem::zeroed());
match arrow::ffi::from_ffi(array, &schema) {
Ok(array_data) => {
let arrow_array = arrow::array::make_array(array_data);
let result = sender.0.send_output(id.into(), Default::default(), arrow_array);
let result = sender
.0
.send_output(id.into(), Default::default(), arrow_array);
match result {
Ok(()) => ffi::DoraResult { error: String::new() },
Err(err) => ffi::DoraResult { error: format!("{err:?}") },
}
},
Err(e) => {
ffi::DoraResult {
error: format!("Error importing array from C++: {:?}", e)
Ok(()) => ffi::DoraResult {
error: String::new(),
},
Err(err) => ffi::DoraResult {
error: format!("{err:?}"),
},
}
}
Err(e) => ffi::DoraResult {
error: format!("Error importing array from C++: {:?}", e),
},
}
}



+ 28
- 15
examples/c++-dataflow2/run.rs View File

@@ -3,7 +3,7 @@ use eyre::{bail, Context};
use std::{
env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX},
path::Path,
process::Command,
process::Command,
};

struct ArrowConfig {
@@ -23,7 +23,11 @@ async fn main() -> eyre::Result<()> {
}

let arrow_config = find_arrow_config().wrap_err("Failed to find Arrow configuration")?;
tracing::info!("Found Arrow configuration: cflags={}, libs={}", arrow_config.cflags, arrow_config.libs);
tracing::info!(
"Found Arrow configuration: cflags={}, libs={}",
arrow_config.cflags,
arrow_config.libs
);

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let target = root.join("target");
@@ -79,7 +83,12 @@ async fn main() -> eyre::Result<()> {
&dunce::canonicalize(build_dir.join("node-bridge.cc"))?,
],
"node_rust_api",
&["-l", "dora_node_api_cxx", &arrow_config.cflags, &arrow_config.libs],
&[
"-l",
"dora_node_api_cxx",
&arrow_config.cflags,
&arrow_config.libs,
],
)
.await?;
build_cxx_node(
@@ -98,9 +107,12 @@ async fn main() -> eyre::Result<()> {
],
"operator_rust_api",
&[
"-l", "dora_operator_api_cxx",
"-L", root.join("target").join("debug").to_str().unwrap(),
&arrow_config.cflags, &arrow_config.libs,
"-l",
"dora_operator_api_cxx",
"-L",
root.join("target").join("debug").to_str().unwrap(),
&arrow_config.cflags,
&arrow_config.libs,
],
)
.await?;
@@ -110,8 +122,10 @@ async fn main() -> eyre::Result<()> {
)?],
"operator_c_api",
&[
"-l", "dora_operator_api_c",
"-L", root.join("target").join("debug").to_str().unwrap(),
"-l",
"dora_operator_api_c",
"-L",
root.join("target").join("debug").to_str().unwrap(),
],
)
.await?;
@@ -124,29 +138,28 @@ async fn main() -> eyre::Result<()> {
}

fn find_arrow_config() -> eyre::Result<ArrowConfig> {
let output = Command::new("pkg-config")
.args(&["--cflags", "arrow"])
.output()
.wrap_err("Failed to run pkg-config. Make sure Arrow C++ is installed")?;
if !output.status.success() {
bail!("Arrow C++ not found via pkg-config. Make sure it's installed and in your PKG_CONFIG_PATH");
}
let cflags = String::from_utf8(output.stdout)?.trim().to_string();
let output = Command::new("pkg-config")
.args(&["--libs", "arrow"])
.output()
.wrap_err("Failed to get Arrow library flags")?;
if !output.status.success() {
bail!("Failed to get Arrow library flags");
}
let libs = String::from_utf8(output.stdout)?.trim().to_string();
Ok(ArrowConfig { cflags, libs })
}



Loading…
Cancel
Save