|
|
|
@@ -2,9 +2,7 @@ |
|
|
|
#![warn(unsafe_op_in_unsafe_fn)] |
|
|
|
|
|
|
|
use dora_operator_api::{ |
|
|
|
self, register_operator, |
|
|
|
types::arrow::array::{AsArray, BinaryArray}, |
|
|
|
DoraOperator, DoraOutputSender, DoraStatus, Event, |
|
|
|
self, register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, |
|
|
|
}; |
|
|
|
use ffi::DoraSendOutputResult; |
|
|
|
|
|
|
|
@@ -47,7 +45,7 @@ pub struct OutputSender<'a, 'b>(&'a mut DoraOutputSender<'b>); |
|
|
|
fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> DoraSendOutputResult { |
|
|
|
let error = sender |
|
|
|
.0 |
|
|
|
.send(id.into(), data.to_owned().into()) |
|
|
|
.send(id.into(), data.to_owned().into_arrow()) |
|
|
|
.err() |
|
|
|
.unwrap_or_default(); |
|
|
|
DoraSendOutputResult { error } |
|
|
|
@@ -77,7 +75,9 @@ impl DoraOperator for OperatorWrapper { |
|
|
|
Event::Input { id, data } => { |
|
|
|
let operator = self.operator.as_mut().unwrap(); |
|
|
|
let mut output_sender = OutputSender(output_sender); |
|
|
|
let data: &BinaryArray = data.as_binary(); |
|
|
|
let data: &[u8] = data |
|
|
|
.try_into() |
|
|
|
.map_err(|err| format!("expected byte array: {err}"))?; |
|
|
|
|
|
|
|
let result = ffi::on_input(operator, id, data, &mut output_sender); |
|
|
|
if result.error.is_empty() { |
|
|
|
|