diff --git a/apis/c++/operator/src/lib.rs b/apis/c++/operator/src/lib.rs index 1d4d776c..4ea7bab3 100644 --- a/apis/c++/operator/src/lib.rs +++ b/apis/c++/operator/src/lib.rs @@ -2,9 +2,7 @@ #![warn(unsafe_op_in_unsafe_fn)] use dora_operator_api::{ - self, register_operator, - types::arrow::array::{AsArray, BinaryArray}, - DoraOperator, DoraOutputSender, DoraStatus, Event, + self, register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, }; use ffi::DoraSendOutputResult; @@ -47,7 +45,7 @@ pub struct OutputSender<'a, 'b>(&'a mut DoraOutputSender<'b>); fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> DoraSendOutputResult { let error = sender .0 - .send(id.into(), data.to_owned().into()) + .send(id.into(), data.to_owned().into_arrow()) .err() .unwrap_or_default(); DoraSendOutputResult { error } @@ -77,7 +75,9 @@ impl DoraOperator for OperatorWrapper { Event::Input { id, data } => { let operator = self.operator.as_mut().unwrap(); let mut output_sender = OutputSender(output_sender); - let data: &BinaryArray = data.as_binary(); + let data: &[u8] = data + .try_into() + .map_err(|err| format!("expected byte array: {err}"))?; let result = ffi::on_input(operator, id, data, &mut output_sender); if result.error.is_empty() {