From 2eeb40b1ffd3ca2a6a59c091b5b4379e5a742125 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 14 Feb 2024 12:14:38 +0100 Subject: [PATCH] Fix operator API: Box optional error string since `Option` is not FFI-safe --- apis/c++/node/src/lib.rs | 7 ++++- apis/c/operator/operator_types.h | 2 +- apis/rust/operator/src/lib.rs | 5 +--- apis/rust/operator/src/raw.rs | 10 +++---- apis/rust/operator/types/src/lib.rs | 30 ++++++++++++++++++--- binaries/runtime/src/operator/shared_lib.rs | 28 +++++++------------ libraries/arrow-convert/src/from_impls.rs | 2 +- 7 files changed, 49 insertions(+), 35 deletions(-) diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index 99a3b1cf..33705e5b 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -123,7 +123,12 @@ fn event_type(event: &DoraEvent) -> ffi::DoraEventType { } fn event_as_input(event: Box) -> eyre::Result { - let Some(Event::Input { id, metadata: _, data }) = event.0 else { + let Some(Event::Input { + id, + metadata: _, + data, + }) = event.0 + else { bail!("not an input event"); }; let data: Option<&BinaryArray> = data.as_binary_opt(); diff --git a/apis/c/operator/operator_types.h b/apis/c/operator/operator_types.h index c6e3c6f8..9cf2f3d2 100644 --- a/apis/c/operator/operator_types.h +++ b/apis/c/operator/operator_types.h @@ -34,7 +34,7 @@ typedef struct Vec_uint8 { /** */ typedef struct DoraResult { /** */ - Vec_uint8_t error; + Vec_uint8_t * error; } DoraResult_t; /** */ diff --git a/apis/rust/operator/src/lib.rs b/apis/rust/operator/src/lib.rs index fd770c78..262fcdd3 100644 --- a/apis/rust/operator/src/lib.rs +++ b/apis/rust/operator/src/lib.rs @@ -64,9 +64,6 @@ impl DoraOutputSender<'_> { open_telemetry_context: String::new().into(), // TODO }, }); - match result.error { - None => Ok(()), - Some(error) => Err(error.into()), - } + result.into_result() } } diff --git a/apis/rust/operator/src/raw.rs b/apis/rust/operator/src/raw.rs index 9a05a5e3..2ff89c64 100644 --- a/apis/rust/operator/src/raw.rs +++ b/apis/rust/operator/src/raw.rs @@ -24,7 +24,7 @@ pub unsafe fn dora_init_operator() -> DoraInitResult { pub unsafe fn dora_drop_operator(operator_context: *mut c_void) -> DoraResult { let raw: *mut O = operator_context.cast(); - unsafe { Box::from_raw(raw) }; + drop(unsafe { Box::from_raw(raw) }); DoraResult { error: None } } @@ -40,10 +40,10 @@ pub unsafe fn dora_on_event( let event_variant = if let Some(input) = &mut event.input { let Some(data_array) = input.data_array.take() else { return OnEventResult { - result: DoraResult { error: Some("data already taken".to_string().into()) }, + result: DoraResult::from_error("data already taken".to_string()), status: DoraStatus::Continue, }; - }; + }; let data = arrow::ffi::from_ffi(data_array, &input.schema); match data { @@ -73,9 +73,7 @@ pub unsafe fn dora_on_event( status, }, Err(error) => OnEventResult { - result: DoraResult { - error: Some(error.into()), - }, + result: DoraResult::from_error(error), status: DoraStatus::Stop, }, } diff --git a/apis/rust/operator/types/src/lib.rs b/apis/rust/operator/types/src/lib.rs index af44c5be..e7f5cb8a 100644 --- a/apis/rust/operator/types/src/lib.rs +++ b/apis/rust/operator/types/src/lib.rs @@ -14,7 +14,7 @@ use safer_ffi::{ closure::ArcDynFn1, derive_ReprC, ffi_export, }; -use std::path::Path; +use std::{ops::Deref, path::Path}; #[derive_ReprC] #[ffi_export] @@ -43,7 +43,31 @@ pub struct DoraDropOperator { #[repr(C)] #[derive(Debug)] pub struct DoraResult { - pub error: Option, + pub error: Option>, +} + +impl DoraResult { + pub const SUCCESS: Self = Self { error: None }; + + pub fn from_error(error: String) -> Self { + Self { + error: Some(Box::new(safer_ffi::String::from(error)).into()), + } + } + + pub fn error(&self) -> Option<&str> { + self.error.as_deref().map(|s| s.deref()) + } + + pub fn into_result(self) -> Result<(), String> { + match self.error { + None => Ok(()), + Some(error) => { + let converted = safer_ffi::boxed::Box_::into(error); + Err((*converted).into()) + } + } + } } #[derive_ReprC] @@ -174,7 +198,7 @@ pub unsafe fn dora_send_operator_output( match result() { Ok(output) => send_output.send_output.call(output), Err(error) => DoraResult { - error: Some(error.into()), + error: Some(Box::new(safer_ffi::String::from(error)).into()), }, } } diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index ca758c4f..495c1b6c 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -98,7 +98,7 @@ impl<'lib> SharedLibraryOperator<'lib> { let raw = match result.error { Some(error) => { let _ = init_done.send(Err(eyre!(error.to_string()))); - bail!("init_operator failed: {}", String::from(error)) + bail!("init_operator failed: {}", *error) } None => operator_context, }; @@ -126,11 +126,7 @@ impl<'lib> SharedLibraryOperator<'lib> { let arrow_array = match arrow::ffi::from_ffi(data_array, &schema) { Ok(a) => a, - Err(err) => { - return DoraResult { - error: Some(err.to_string().into()), - } - } + Err(err) => return DoraResult::from_error(err.to_string()), }; let total_len = required_data_size(&arrow_array); @@ -138,11 +134,7 @@ impl<'lib> SharedLibraryOperator<'lib> { let type_info = match copy_array_into_sample(&mut sample, &arrow_array) { Ok(t) => t, - Err(err) => { - return DoraResult { - error: Some(err.to_string().into()), - } - } + Err(err) => return DoraResult::from_error(err.to_string()), }; let event = OperatorEvent::Output { @@ -157,18 +149,16 @@ impl<'lib> SharedLibraryOperator<'lib> { .blocking_send(event) .map_err(|_| eyre!("failed to send output to runtime")); - let error = match result { - Ok(()) => None, - Err(_) => Some(String::from("runtime process closed unexpectedly").into()), - }; - - DoraResult { error } + match result { + Ok(()) => DoraResult::SUCCESS, + Err(_) => DoraResult::from_error("runtime process closed unexpectedly".into()), + } }); let reason = loop { #[allow(unused_mut)] let Ok(mut event) = self.incoming_events.recv() else { - break StopReason::InputsClosed + break StopReason::InputsClosed; }; let span = span!(tracing::Level::TRACE, "on_event", input_id = field::Empty); @@ -261,7 +251,7 @@ impl<'lib> SharedLibraryOperator<'lib> { ) }; match error { - Some(error) => bail!("on_input failed: {}", String::from(error)), + Some(error) => bail!("on_input failed: {}", *error), None => match status { DoraStatus::Continue => {} DoraStatus::Stop => break StopReason::ExplicitStop, diff --git a/libraries/arrow-convert/src/from_impls.rs b/libraries/arrow-convert/src/from_impls.rs index 8d918a3a..db484c6b 100644 --- a/libraries/arrow-convert/src/from_impls.rs +++ b/libraries/arrow-convert/src/from_impls.rs @@ -1,5 +1,5 @@ use arrow::{ - array::{make_array, Array, AsArray, PrimitiveArray, StringArray}, + array::{Array, AsArray, PrimitiveArray, StringArray}, datatypes::ArrowPrimitiveType, }; use eyre::ContextCompat;