Browse Source

Fix operator API: Box optional error string since `Option<String>` is not FFI-safe

tags/v0.3.3-rc1
Philipp Oppermann 1 year ago
parent
commit
2eeb40b1ff
Failed to extract signature
7 changed files with 49 additions and 35 deletions
  1. +6
    -1
      apis/c++/node/src/lib.rs
  2. +1
    -1
      apis/c/operator/operator_types.h
  3. +1
    -4
      apis/rust/operator/src/lib.rs
  4. +4
    -6
      apis/rust/operator/src/raw.rs
  5. +27
    -3
      apis/rust/operator/types/src/lib.rs
  6. +9
    -19
      binaries/runtime/src/operator/shared_lib.rs
  7. +1
    -1
      libraries/arrow-convert/src/from_impls.rs

+ 6
- 1
apis/c++/node/src/lib.rs View File

@@ -123,7 +123,12 @@ fn event_type(event: &DoraEvent) -> ffi::DoraEventType {
}

fn event_as_input(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraInput> {
let Some(Event::Input { id, metadata: _, data }) = event.0 else {
let Some(Event::Input {
id,
metadata: _,
data,
}) = event.0
else {
bail!("not an input event");
};
let data: Option<&BinaryArray> = data.as_binary_opt();


+ 1
- 1
apis/c/operator/operator_types.h View File

@@ -34,7 +34,7 @@ typedef struct Vec_uint8 {
/** <No documentation available> */
typedef struct DoraResult {
/** <No documentation available> */
Vec_uint8_t error;
Vec_uint8_t * error;
} DoraResult_t;

/** <No documentation available> */


+ 1
- 4
apis/rust/operator/src/lib.rs View File

@@ -64,9 +64,6 @@ impl DoraOutputSender<'_> {
open_telemetry_context: String::new().into(), // TODO
},
});
match result.error {
None => Ok(()),
Some(error) => Err(error.into()),
}
result.into_result()
}
}

+ 4
- 6
apis/rust/operator/src/raw.rs View File

@@ -24,7 +24,7 @@ pub unsafe fn dora_init_operator<O: DoraOperator>() -> DoraInitResult {

pub unsafe fn dora_drop_operator<O>(operator_context: *mut c_void) -> DoraResult {
let raw: *mut O = operator_context.cast();
unsafe { Box::from_raw(raw) };
drop(unsafe { Box::from_raw(raw) });
DoraResult { error: None }
}

@@ -40,10 +40,10 @@ pub unsafe fn dora_on_event<O: DoraOperator>(
let event_variant = if let Some(input) = &mut event.input {
let Some(data_array) = input.data_array.take() else {
return OnEventResult {
result: DoraResult { error: Some("data already taken".to_string().into()) },
result: DoraResult::from_error("data already taken".to_string()),
status: DoraStatus::Continue,
};
};
};
let data = arrow::ffi::from_ffi(data_array, &input.schema);

match data {
@@ -73,9 +73,7 @@ pub unsafe fn dora_on_event<O: DoraOperator>(
status,
},
Err(error) => OnEventResult {
result: DoraResult {
error: Some(error.into()),
},
result: DoraResult::from_error(error),
status: DoraStatus::Stop,
},
}


+ 27
- 3
apis/rust/operator/types/src/lib.rs View File

@@ -14,7 +14,7 @@ use safer_ffi::{
closure::ArcDynFn1,
derive_ReprC, ffi_export,
};
use std::path::Path;
use std::{ops::Deref, path::Path};

#[derive_ReprC]
#[ffi_export]
@@ -43,7 +43,31 @@ pub struct DoraDropOperator {
#[repr(C)]
#[derive(Debug)]
pub struct DoraResult {
pub error: Option<safer_ffi::String>,
pub error: Option<safer_ffi::boxed::Box<safer_ffi::String>>,
}

impl DoraResult {
pub const SUCCESS: Self = Self { error: None };

pub fn from_error(error: String) -> Self {
Self {
error: Some(Box::new(safer_ffi::String::from(error)).into()),
}
}

pub fn error(&self) -> Option<&str> {
self.error.as_deref().map(|s| s.deref())
}

pub fn into_result(self) -> Result<(), String> {
match self.error {
None => Ok(()),
Some(error) => {
let converted = safer_ffi::boxed::Box_::into(error);
Err((*converted).into())
}
}
}
}

#[derive_ReprC]
@@ -174,7 +198,7 @@ pub unsafe fn dora_send_operator_output(
match result() {
Ok(output) => send_output.send_output.call(output),
Err(error) => DoraResult {
error: Some(error.into()),
error: Some(Box::new(safer_ffi::String::from(error)).into()),
},
}
}


+ 9
- 19
binaries/runtime/src/operator/shared_lib.rs View File

@@ -98,7 +98,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
let raw = match result.error {
Some(error) => {
let _ = init_done.send(Err(eyre!(error.to_string())));
bail!("init_operator failed: {}", String::from(error))
bail!("init_operator failed: {}", *error)
}
None => operator_context,
};
@@ -126,11 +126,7 @@ impl<'lib> SharedLibraryOperator<'lib> {

let arrow_array = match arrow::ffi::from_ffi(data_array, &schema) {
Ok(a) => a,
Err(err) => {
return DoraResult {
error: Some(err.to_string().into()),
}
}
Err(err) => return DoraResult::from_error(err.to_string()),
};

let total_len = required_data_size(&arrow_array);
@@ -138,11 +134,7 @@ impl<'lib> SharedLibraryOperator<'lib> {

let type_info = match copy_array_into_sample(&mut sample, &arrow_array) {
Ok(t) => t,
Err(err) => {
return DoraResult {
error: Some(err.to_string().into()),
}
}
Err(err) => return DoraResult::from_error(err.to_string()),
};

let event = OperatorEvent::Output {
@@ -157,18 +149,16 @@ impl<'lib> SharedLibraryOperator<'lib> {
.blocking_send(event)
.map_err(|_| eyre!("failed to send output to runtime"));

let error = match result {
Ok(()) => None,
Err(_) => Some(String::from("runtime process closed unexpectedly").into()),
};

DoraResult { error }
match result {
Ok(()) => DoraResult::SUCCESS,
Err(_) => DoraResult::from_error("runtime process closed unexpectedly".into()),
}
});

let reason = loop {
#[allow(unused_mut)]
let Ok(mut event) = self.incoming_events.recv() else {
break StopReason::InputsClosed
break StopReason::InputsClosed;
};

let span = span!(tracing::Level::TRACE, "on_event", input_id = field::Empty);
@@ -261,7 +251,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
)
};
match error {
Some(error) => bail!("on_input failed: {}", String::from(error)),
Some(error) => bail!("on_input failed: {}", *error),
None => match status {
DoraStatus::Continue => {}
DoraStatus::Stop => break StopReason::ExplicitStop,


+ 1
- 1
libraries/arrow-convert/src/from_impls.rs View File

@@ -1,5 +1,5 @@
use arrow::{
array::{make_array, Array, AsArray, PrimitiveArray, StringArray},
array::{Array, AsArray, PrimitiveArray, StringArray},
datatypes::ArrowPrimitiveType,
};
use eyre::ContextCompat;


Loading…
Cancel
Save