Browse Source

Provide higher-level function to send output from C operator

tags/v0.3.0-rc
Philipp Oppermann 2 years ago
parent
commit
94c0da61de
Failed to extract signature
2 changed files with 44 additions and 2 deletions
  1. +8
    -0
      apis/c/operator/operator_types.h
  2. +36
    -2
      apis/rust/operator/types/src/lib.rs

+ 8
- 0
apis/c/operator/operator_types.h View File

@@ -164,6 +164,14 @@ char *
dora_read_input_id (
Input_t const * input);

/** <No documentation available> */
DoraResult_t
dora_send_output (
SendOutput_t const * send_output,
char const * id,
uint8_t const * data_ptr,
size_t data_len);


#ifdef __cplusplus
} /* extern \"C\" */


+ 36
- 2
apis/rust/operator/types/src/lib.rs View File

@@ -1,10 +1,14 @@
#![deny(elided_lifetimes_in_paths)] // required for safer-ffi

pub use arrow;
use dora_arrow_convert::ArrowData;
use dora_arrow_convert::{ArrowData, IntoArrow};
pub use safer_ffi;

use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use arrow::{
array::Array,
ffi::{FFI_ArrowArray, FFI_ArrowSchema},
};
use core::slice;
use safer_ffi::{
char_p::{self, char_p_boxed},
closure::ArcDynFn1,
@@ -145,6 +149,36 @@ pub fn dora_read_data(input: &mut Input) -> Option<safer_ffi::Vec<u8>> {
#[ffi_export]
pub fn dora_free_data(_data: safer_ffi::Vec<u8>) {}

#[ffi_export]
pub unsafe fn dora_send_output(
send_output: &SendOutput,
id: safer_ffi::char_p::char_p_ref<'_>,
data_ptr: *const u8,
data_len: usize,
) -> DoraResult {
let result = || {
let data = unsafe { slice::from_raw_parts(data_ptr, data_len) };
let arrow_data = data.to_owned().into_arrow();
let (data_array, schema) =
arrow::ffi::to_ffi(&arrow_data.into_data()).map_err(|err| err.to_string())?;
let output = Output {
id: id.to_str().to_owned().into(),
data_array,
schema,
metadata: Metadata {
open_telemetry_context: String::new().into(), // TODO
},
};
Result::<_, String>::Ok(output)
};
match result() {
Ok(output) => send_output.send_output.call(output),
Err(error) => DoraResult {
error: Some(error.into()),
},
}
}

pub fn generate_headers(target_file: &Path) -> ::std::io::Result<()> {
::safer_ffi::headers::builder()
.to_file(target_file)?


Loading…
Cancel
Save