diff --git a/Cargo.lock b/Cargo.lock index e89ca45f..96602de7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1511,6 +1511,7 @@ dependencies = [ name = "dora-node-api-c" version = "0.2.4" dependencies = [ + "arrow-array", "dora-node-api", "eyre", "flume", @@ -1594,6 +1595,7 @@ dependencies = [ name = "dora-operator-api-types" version = "0.2.4" dependencies = [ + "arrow", "safer-ffi", ] @@ -4618,9 +4620,9 @@ checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" [[package]] name = "safer-ffi" -version = "0.1.0" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62fd645a8c0b4a71f0883dce1bc48e358fcc02b99c83613f62ede5660b3572c1" +checksum = "4f47f1d2f33598dab2baa9517fffa1cf722f2e3a30633f2a230f20f9da67c564" dependencies = [ "inventory", "libc", @@ -4635,9 +4637,9 @@ dependencies = [ [[package]] name = "safer_ffi-proc_macros" -version = "0.1.0" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d39be56c8f7e5878594154dd0d2c03e2b12c66fa5a5ba3fe302412bd89bad774" +checksum = "b08f58cf71a58bda5734758eb20051cdb66c06c9243badbc45092ced1be834df" dependencies = [ "macro_rules_attribute", "prettyplease", diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index e953961a..16e27bad 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -92,7 +92,7 @@ pub struct OutputSender(dora_node_api::DoraNode); fn send_output(sender: &mut Box, id: String, data: &[u8]) -> ffi::DoraResult { let result = sender .0 - .send_output(id.into(), Default::default(), data.len(), |out| { + .send_output_raw(id.into(), Default::default(), data.len(), |out| { out.copy_from_slice(data) }); let error = match result { diff --git a/apis/c/node/Cargo.toml b/apis/c/node/Cargo.toml index ec823543..22c03512 100644 --- a/apis/c/node/Cargo.toml +++ b/apis/c/node/Cargo.toml @@ -21,6 +21,7 @@ tracing = ["dora-node-api/tracing"] eyre = "0.6.8" flume = "0.10.14" tracing = "0.1.33" +arrow-array = "45.0.0" [dependencies.dora-node-api] workspace = true diff --git a/apis/c/node/src/lib.rs b/apis/c/node/src/lib.rs index 102d5eac..88a77675 100644 --- a/apis/c/node/src/lib.rs +++ b/apis/c/node/src/lib.rs @@ -1,6 +1,7 @@ #![deny(unsafe_op_in_unsafe_fn)] -use dora_node_api::{DoraNode, Event, EventStream}; +use arrow_array::BinaryArray; +use dora_node_api::{arrow::array::AsArray, DoraNode, Event, EventStream}; use eyre::Context; use std::{ffi::c_void, ptr, slice}; @@ -170,9 +171,10 @@ pub unsafe extern "C" fn read_dora_input_data( let event: &Event = unsafe { &*event.cast() }; match event { Event::Input { data, .. } => { - if let Ok(data) = data.as_byte_slice() { - let ptr = data.as_ptr(); - let len = data.len(); + let data: Option<&BinaryArray> = data.as_binary_opt(); + if let Some(data) = data { + let ptr = data.value(0).as_ptr(); + let len = data.value(0).len(); unsafe { *out_ptr = ptr; *out_len = len; @@ -250,7 +252,7 @@ unsafe fn try_send_output( let data = unsafe { slice::from_raw_parts(data_ptr, data_len) }; context .node - .send_output(output_id, Default::default(), data.len(), |out| { + .send_output_raw(output_id, Default::default(), data.len(), |out| { out.copy_from_slice(data); }) } diff --git a/apis/c/operator/operator_api.h b/apis/c/operator/operator_api.h index 846fd3cb..24f57ab2 100644 --- a/apis/c/operator/operator_api.h +++ b/apis/c/operator/operator_api.h @@ -19,7 +19,7 @@ extern "C" EXPORT DoraResult_t dora_drop_operator(void *operator_context); EXPORT OnEventResult_t dora_on_event( - const RawEvent_t *event, + RawEvent_t *event, const SendOutput_t *send_output, void *operator_context); diff --git a/apis/c/operator/operator_types.h b/apis/c/operator/operator_types.h index 30585ca4..57c9dfd5 100644 --- a/apis/c/operator/operator_types.h +++ b/apis/c/operator/operator_types.h @@ -86,22 +86,7 @@ typedef struct OnEventResult { } OnEventResult_t; /** */ -typedef struct Metadata { - /** */ - Vec_uint8_t open_telemetry_context; -} Metadata_t; - -/** */ -typedef struct Input { - /** */ - Vec_uint8_t id; - - /** */ - Vec_uint8_t data; - - /** */ - Metadata_t metadata; -} Input_t; +typedef struct Input Input_t; #include @@ -122,16 +107,7 @@ typedef struct RawEvent { } RawEvent_t; /** */ -typedef struct Output { - /** */ - Vec_uint8_t id; - - /** */ - Vec_uint8_t data; - - /** */ - Metadata_t metadata; -} Output_t; +typedef struct Output Output_t; /** \brief * `Arc Ret>` @@ -159,9 +135,15 @@ typedef struct SendOutput { /** */ typedef struct DoraOnEvent { /** */ - OnEventResult_t (*on_event)(RawEvent_t const *, SendOutput_t const *, void *); + OnEventResult_t (*on_event)(RawEvent_t *, SendOutput_t const *, void *); } DoraOnEvent_t; +/** */ +typedef struct Metadata { + /** */ + Vec_uint8_t open_telemetry_context; +} Metadata_t; + #ifdef __cplusplus } /* extern \"C\" */ diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 2918527a..19f61264 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -3,9 +3,7 @@ use arrow::pyarrow::{FromPyArrow, ToPyArrow}; use dora_node_api::merged::{MergeExternalSend, MergedEvent}; use dora_node_api::{DoraNode, EventStream}; -use dora_operator_api_python::{ - copy_array_into_sample, pydict_to_metadata, required_data_size, PyEvent, -}; +use dora_operator_api_python::{pydict_to_metadata, PyEvent}; use dora_ros2_bridge_python::Ros2Subscription; use eyre::Context; use futures::{Stream, StreamExt}; @@ -100,14 +98,11 @@ impl Node { .send_output_bytes(output_id.into(), parameters, data.len(), data) .wrap_err("failed to send output")?; } else if let Ok(arrow_array) = arrow::array::ArrayData::from_pyarrow(data.as_ref(py)) { - let total_len = required_data_size(&arrow_array); - - let mut sample = self.node.allocate_data_sample(total_len)?; - let type_info = copy_array_into_sample(&mut sample, &arrow_array)?; - - self.node - .send_output_sample(output_id.into(), type_info, parameters, Some(sample)) - .wrap_err("failed to send output")?; + self.node.send_output( + output_id.into(), + parameters, + arrow::array::make_array(arrow_array), + )?; } else { eyre::bail!("invalid `data` type, must by `PyBytes` or arrow array") } diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index d2e9644f..8858791c 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,22 +1,12 @@ -use std::sync::Arc; - -use arrow::{array::ArrayData, pyarrow::ToPyArrow}; -use dora_node_api::{ - dora_core::message::{ArrowTypeInfo, BufferOffset}, - merged::MergedEvent, - Data, Event, Metadata, MetadataParameters, -}; +use arrow::{array::ArrayRef, pyarrow::ToPyArrow}; +use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters}; use eyre::{Context, Result}; -use pyo3::{ - exceptions::PyLookupError, - prelude::*, - types::{PyBytes, PyDict}, -}; +use pyo3::{exceptions::PyLookupError, prelude::*, types::PyDict}; #[pyclass] pub struct PyEvent { event: MergedEvent, - data: Option>, + data: Option, } #[pymethods] @@ -34,7 +24,6 @@ impl PyEvent { let value = match key { "type" => Some(Self::ty(event).to_object(py)), "id" => Self::id(event).map(|v| v.to_object(py)), - "data" => self.data(py), "value" => self.value(py)?, "metadata" => Self::metadata(event, py), "error" => Self::error(event).map(|v| v.to_object(py)), @@ -77,21 +66,12 @@ impl PyEvent { } } - /// Returns the payload of an input event as a `PyBytes` object (if any). - fn data(&self, py: Python<'_>) -> Option { - self.data.as_ref().map(|data| PyBytes::new(py, data).into()) - } - /// Returns the payload of an input event as an arrow array (if any). fn value(&self, py: Python<'_>) -> PyResult> { match (&self.event, &self.data) { - (MergedEvent::Dora(Event::Input { metadata, .. }), Some(data)) => { - let array = data - .clone() - .into_arrow_array(&metadata.type_info) - .context("Could not create arrow array")?; + (MergedEvent::Dora(Event::Input { .. }), Some(data)) => { // TODO: Does this call leak data? - let array_data = array.to_pyarrow(py)?; + let array_data = data.to_data().to_pyarrow(py)?; Ok(Some(array_data)) } _ => Ok(None), @@ -122,7 +102,7 @@ impl From for PyEvent { impl From> for PyEvent { fn from(mut event: MergedEvent) -> Self { let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event { - data.take().map(Arc::new) + Some(data.clone()) } else { None }; @@ -166,61 +146,3 @@ pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> &'a PyD .unwrap(); dict } - -pub fn copy_array_into_sample( - target_buffer: &mut [u8], - arrow_array: &ArrayData, -) -> eyre::Result { - let mut next_offset = 0; - copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) -} - -fn copy_array_into_sample_inner( - target_buffer: &mut [u8], - next_offset: &mut usize, - arrow_array: &ArrayData, -) -> eyre::Result { - let mut buffer_offsets = Vec::new(); - for buffer in arrow_array.buffers().iter() { - let len = buffer.len(); - assert!( - target_buffer[*next_offset..].len() >= len, - "target buffer too small (total_len: {}, offset: {}, required_len: {len})", - target_buffer.len(), - *next_offset, - ); - target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice()); - buffer_offsets.push(BufferOffset { - offset: *next_offset, - len, - }); - *next_offset += len; - } - - let mut child_data = Vec::new(); - for child in arrow_array.child_data() { - let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child)?; - child_data.push(child_type_info); - } - - Ok(ArrowTypeInfo { - data_type: arrow_array.data_type().clone(), - len: arrow_array.len(), - null_count: arrow_array.null_count(), - validity: arrow_array.nulls().map(|b| b.validity().to_owned()), - offset: arrow_array.offset(), - buffer_offsets, - child_data, - }) -} - -pub fn required_data_size(array: &ArrayData) -> usize { - let mut size = 0; - for buffer in array.buffers() { - size += buffer.len(); - } - for child in array.child_data() { - size += required_data_size(child); - } - size -} diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index bc90a2bc..bd5fdd62 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -1,11 +1,10 @@ use std::{ptr::NonNull, sync::Arc}; -use arrow_schema::DataType; use dora_core::{ config::{DataId, OperatorId}, message::{ArrowTypeInfo, BufferOffset, Metadata}, }; -use eyre::{Context, ContextCompat, Result}; +use eyre::{Context, Result}; use shared_memory_extended::{Shmem, ShmemConf}; #[derive(Debug)] @@ -18,7 +17,7 @@ pub enum Event { Input { id: DataId, metadata: Metadata, - data: ArrowData, + data: arrow::array::ArrayRef, }, InputClosed { id: DataId, @@ -26,56 +25,6 @@ pub enum Event { Error(String), } -#[derive(Debug)] -pub struct ArrowData(arrow::array::ArrayData); - -impl ArrowData { - pub(super) fn new(data: Option, metadata: &Metadata) -> eyre::Result { - let raw_data = Arc::new(data.unwrap_or(Data::Vec(Vec::new()))); - raw_data.into_arrow_array(&metadata.type_info).map(Self) - } - - pub fn as_byte_slice(&self) -> eyre::Result<&[u8]> { - let first_buffer = self.0.buffers().iter().next().context("no buffers")?; - if self.0.buffers().len() != 1 { - eyre::bail!( - "must have exactly one buffer, has {} buffers", - self.0.buffers().len() - ); - } - if !self.0.child_data().is_empty() { - eyre::bail!( - "should have no child data, has {}", - self.0.child_data().len() - ); - } - - if !matches!(self.0.data_type(), DataType::UInt8) { - eyre::bail!("should have DataType::UInt8, has {}", self.0.data_type()); - } - if self.0.len() != first_buffer.len() { - eyre::bail!( - "len ({}) should match len of first buffer ({})", - self.0.len(), - first_buffer.len() - ); - } - - if self.0.null_count() != 0 { - eyre::bail!("should have no nulls, has {}", self.0.null_count()); - } - - if self.0.offset() != 0 { - eyre::bail!( - "should start at offset 0, starts at offset {}", - self.0.offset() - ); - } - - Ok(first_buffer.as_slice()) - } -} - pub(super) enum Data { Vec(Vec), SharedMemory { diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 198711aa..f4b8d4c3 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -pub use event::{ArrowData, Event, MappedInputData}; +pub use event::{Event, MappedInputData}; use futures::{Stream, StreamExt}; use self::{ @@ -134,7 +134,12 @@ impl EventStream { }) }, }; - let data = data.and_then(|data| ArrowData::new(data, &metadata)); + let data = data.and_then(|data| { + let raw_data = Arc::new(data.unwrap_or(Data::Vec(Vec::new()))); + raw_data + .into_arrow_array(&metadata.type_info) + .map(arrow::array::make_array) + }); match data { Ok(data) => Event::Input { id, metadata, data }, Err(err) => Event::Error(format!("{err:?}")), diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 96350bff..c53f72d0 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -13,11 +13,12 @@ //! dora new project_xyz --kind dataflow //! ``` //! +pub use arrow; pub use dora_core; pub use dora_core::message::{uhlc, Metadata, MetadataParameters}; -pub use event_stream::{merged, ArrowData, Event, EventStream, MappedInputData}; +pub use event_stream::{merged, Event, EventStream, MappedInputData}; pub use flume::Receiver; -pub use node::{DataSample, DoraNode, ZERO_COPY_THRESHOLD}; +pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; mod daemon_connection; mod event_stream; diff --git a/apis/rust/node/src/node/arrow_utils.rs b/apis/rust/node/src/node/arrow_utils.rs new file mode 100644 index 00000000..765b2052 --- /dev/null +++ b/apis/rust/node/src/node/arrow_utils.rs @@ -0,0 +1,60 @@ +use arrow::array::ArrayData; +use dora_core::message::{ArrowTypeInfo, BufferOffset}; + +pub fn required_data_size(array: &ArrayData) -> usize { + let mut size = 0; + for buffer in array.buffers() { + size += buffer.len(); + } + for child in array.child_data() { + size += required_data_size(child); + } + size +} + +pub fn copy_array_into_sample( + target_buffer: &mut [u8], + arrow_array: &ArrayData, +) -> eyre::Result { + let mut next_offset = 0; + copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) +} + +fn copy_array_into_sample_inner( + target_buffer: &mut [u8], + next_offset: &mut usize, + arrow_array: &ArrayData, +) -> eyre::Result { + let mut buffer_offsets = Vec::new(); + for buffer in arrow_array.buffers().iter() { + let len = buffer.len(); + assert!( + target_buffer[*next_offset..].len() >= len, + "target buffer too small (total_len: {}, offset: {}, required_len: {len})", + target_buffer.len(), + *next_offset, + ); + target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice()); + buffer_offsets.push(BufferOffset { + offset: *next_offset, + len, + }); + *next_offset += len; + } + + let mut child_data = Vec::new(); + for child in arrow_array.child_data() { + let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child)?; + child_data.push(child_type_info); + } + + Ok(ArrowTypeInfo { + data_type: arrow_array.data_type().clone(), + len: arrow_array.len(), + null_count: arrow_array.null_count(), + validity: arrow_array.nulls().map(|b| b.validity().to_owned()), + offset: arrow_array.offset(), + buffer_offsets, + child_data, + }) +} diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index bc91e581..ce3279a5 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -1,6 +1,11 @@ use crate::EventStream; -use self::{control_channel::ControlChannel, drop_stream::DropStream}; +use self::{ + arrow_utils::{copy_array_into_sample, required_data_size}, + control_channel::ControlChannel, + drop_stream::DropStream, +}; +use arrow::array::Array; use dora_core::{ config::{DataId, NodeId, NodeRunConfig}, daemon_messages::{Data, DropToken, NodeConfig}, @@ -19,6 +24,7 @@ use std::{ #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; +pub mod arrow_utils; mod control_channel; mod drop_stream; @@ -117,7 +123,7 @@ impl DoraNode { /// }).expect("Could not send output"); /// ``` /// - pub fn send_output( + pub fn send_output_raw( &mut self, output_id: DataId, parameters: MetadataParameters, @@ -135,6 +141,25 @@ impl DoraNode { self.send_output_sample(output_id, type_info, parameters, Some(sample)) } + pub fn send_output( + &mut self, + output_id: DataId, + parameters: MetadataParameters, + data: impl Array, + ) -> eyre::Result<()> { + let arrow_array = data.to_data(); + + let total_len = required_data_size(&arrow_array); + + let mut sample = self.allocate_data_sample(total_len)?; + let type_info = copy_array_into_sample(&mut sample, &arrow_array)?; + + self.send_output_sample(output_id, type_info, parameters, Some(sample)) + .wrap_err("failed to send output")?; + + Ok(()) + } + pub fn send_output_bytes( &mut self, output_id: DataId, @@ -142,7 +167,7 @@ impl DoraNode { data_len: usize, data: &[u8], ) -> eyre::Result<()> { - self.send_output(output_id, parameters, data_len, |sample| { + self.send_output_raw(output_id, parameters, data_len, |sample| { sample.copy_from_slice(data) }) } diff --git a/apis/rust/operator/macros/src/lib.rs b/apis/rust/operator/macros/src/lib.rs index 2af1687d..8b5384f6 100644 --- a/apis/rust/operator/macros/src/lib.rs +++ b/apis/rust/operator/macros/src/lib.rs @@ -52,7 +52,7 @@ fn register_operator_impl(item: &TokenStream2) -> syn::Result { let on_event = quote! { #[no_mangle] pub unsafe extern "C" fn dora_on_event( - event: &dora_operator_api::types::RawEvent, + event: &mut dora_operator_api::types::RawEvent, send_output: &dora_operator_api::types::SendOutput, operator_context: *mut std::ffi::c_void, ) -> dora_operator_api::types::OnEventResult { diff --git a/apis/rust/operator/src/lib.rs b/apis/rust/operator/src/lib.rs index a8707a5c..3542a061 100644 --- a/apis/rust/operator/src/lib.rs +++ b/apis/rust/operator/src/lib.rs @@ -21,14 +21,21 @@ pub use dora_operator_api_macros::register_operator; pub use dora_operator_api_types as types; pub use types::DoraStatus; -use types::{Metadata, Output, SendOutput}; +use types::{ + arrow::{ + self, + array::{Array, ArrayRef}, + }, + Metadata, Output, SendOutput, +}; pub mod raw; #[derive(Debug)] #[non_exhaustive] pub enum Event<'a> { - Input { id: &'a str, data: &'a [u8] }, + Input { id: &'a str, data: ArrayRef }, + InputParseError { id: &'a str, error: String }, InputClosed { id: &'a str }, Stop, } @@ -48,10 +55,13 @@ impl DoraOutputSender<'_> { /// Send an output from the operator: /// - `id` is the `output_id` as defined in your dataflow. /// - `data` is the data that should be sent - pub fn send(&mut self, id: String, data: Vec) -> Result<(), String> { + pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String> { + let (data_array, schema) = + arrow::ffi::to_ffi(&data.into_data()).map_err(|err| err.to_string())?; let result = self.0.send_output.call(Output { id: id.into(), - data: data.into(), + data_array, + schema, metadata: Metadata { open_telemetry_context: String::new().into(), // TODO }, diff --git a/apis/rust/operator/src/raw.rs b/apis/rust/operator/src/raw.rs index c3db60e4..7041fba0 100644 --- a/apis/rust/operator/src/raw.rs +++ b/apis/rust/operator/src/raw.rs @@ -1,5 +1,7 @@ use crate::{DoraOperator, DoraOutputSender, DoraStatus, Event}; -use dora_operator_api_types::{DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput}; +use dora_operator_api_types::{ + arrow, DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput, +}; use std::ffi::c_void; pub type OutputFnRaw = unsafe extern "C" fn( @@ -27,7 +29,7 @@ pub unsafe fn dora_drop_operator(operator_context: *mut c_void) -> DoraResult } pub unsafe fn dora_on_event( - event: &RawEvent, + event: &mut RawEvent, send_output: &SendOutput, operator_context: *mut std::ffi::c_void, ) -> OnEventResult { @@ -35,11 +37,24 @@ pub unsafe fn dora_on_event( let operator: &mut O = unsafe { &mut *operator_context.cast() }; - let event_variant = if let Some(input) = &event.input { - let data = input.data.as_ref().as_slice(); - Event::Input { - id: &input.id, - data, + let event_variant = if let Some(input) = &mut event.input { + let Some(data_array) = input.data_array.take() else { + return OnEventResult { + result: DoraResult { error: Some("data already taken".to_string().into()) }, + status: DoraStatus::Continue, + }; + }; + let data = arrow::ffi::from_ffi(data_array, &input.schema); + + match data { + Ok(data) => Event::Input { + id: &input.id, + data: arrow::array::make_array(data), + }, + Err(err) => Event::InputParseError { + id: &input.id, + error: format!("{err}"), + }, } } else if let Some(input_id) = &event.input_closed { Event::InputClosed { id: input_id } diff --git a/apis/rust/operator/types/Cargo.toml b/apis/rust/operator/types/Cargo.toml index 6ee7b7f5..cb90e30f 100644 --- a/apis/rust/operator/types/Cargo.toml +++ b/apis/rust/operator/types/Cargo.toml @@ -8,6 +8,9 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[dependencies] +arrow = { version = "45.0.0", features = ["ffi"] } + [dependencies.safer-ffi] -version = "0.1.0-rc1" +version = "0.1.2" features = ["headers"] diff --git a/apis/rust/operator/types/src/lib.rs b/apis/rust/operator/types/src/lib.rs index fd2715b5..2ab39c4c 100644 --- a/apis/rust/operator/types/src/lib.rs +++ b/apis/rust/operator/types/src/lib.rs @@ -1,6 +1,9 @@ #![deny(elided_lifetimes_in_paths)] // required for safer-ffi +pub use arrow; pub use safer_ffi; + +use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; use safer_ffi::{closure::ArcDynFn1, derive_ReprC, ffi_export}; use std::path::Path; @@ -46,7 +49,7 @@ pub struct DoraOnEvent { #[repr(transparent)] pub struct OnEventFn( pub unsafe extern "C" fn( - event: &RawEvent, + event: &mut RawEvent, send_output: &SendOutput, operator_context: *mut std::ffi::c_void, ) -> OnEventResult, @@ -64,12 +67,12 @@ pub struct RawEvent { } #[derive_ReprC] -#[ffi_export] -#[repr(C)] +#[repr(opaque)] #[derive(Debug)] pub struct Input { pub id: safer_ffi::String, - pub data: safer_ffi::Vec, + pub data_array: Option, + pub schema: FFI_ArrowSchema, pub metadata: Metadata, } @@ -89,12 +92,12 @@ pub struct SendOutput { } #[derive_ReprC] -#[ffi_export] -#[repr(C)] +#[repr(opaque)] #[derive(Debug)] pub struct Output { pub id: safer_ffi::String, - pub data: safer_ffi::Vec, + pub data_array: FFI_ArrowArray, + pub schema: FFI_ArrowSchema, pub metadata: Metadata, } diff --git a/binaries/cli/src/template/c/operator/operator-template.c b/binaries/cli/src/template/c/operator/operator-template.c index 3ba7fd5d..7639cfc6 100644 --- a/binaries/cli/src/template/c/operator/operator-template.c +++ b/binaries/cli/src/template/c/operator/operator-template.c @@ -23,7 +23,7 @@ DoraResult_t dora_drop_operator(void *operator_context) } OnEventResult_t dora_on_event( - const RawEvent_t *event, + RawEvent_t *event, const SendOutput_t *send_output, void *operator_context) { diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 04d95046..ecacef96 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -37,7 +37,7 @@ clap = { version = "4.0.3", features = ["derive"] } tracing-opentelemetry = { version = "0.18.0", optional = true } pythonize = { version = "0.19.0", optional = true } arrow-schema = "45.0.0" -arrow = { version = "45.0.0" } +arrow = { version = "45.0.0", features = ["ffi"] } [features] default = ["tracing"] diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 841a8f6a..37ecdc1b 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -218,7 +218,7 @@ async fn run( } OperatorEvent::Output { output_id, - type_info: type_info, + type_info, parameters, data, } => { diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 7d9ee39d..53e67be4 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -271,10 +271,11 @@ mod callback_impl { use super::SendOutputCallback; use arrow::{array::ArrayData, pyarrow::FromPyArrow}; use dora_core::message::ArrowTypeInfo; - use dora_node_api::ZERO_COPY_THRESHOLD; - use dora_operator_api_python::{ - copy_array_into_sample, pydict_to_metadata, required_data_size, + use dora_node_api::{ + arrow_utils::{copy_array_into_sample, required_data_size}, + ZERO_COPY_THRESHOLD, }; + use dora_operator_api_python::pydict_to_metadata; use eyre::{eyre, Context, Result}; use pyo3::{ pymethods, diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 7ada6eee..cdef0156 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -3,10 +3,12 @@ use dora_core::{ adjust_shared_library_path, config::{DataId, NodeId, OperatorId}, descriptor::source_is_url, - message::ArrowTypeInfo, }; use dora_download::download_file; -use dora_node_api::{Event, MetadataParameters}; +use dora_node_api::{ + arrow_utils::{copy_array_into_sample, required_data_size}, + Event, MetadataParameters, +}; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnEvent, DoraResult, DoraStatus, Metadata, OnEventResult, Output, SendOutput, @@ -110,7 +112,8 @@ impl<'lib> SharedLibraryOperator<'lib> { let send_output_closure = Arc::new(move |output: Output| { let Output { id: output_id, - data, + data_array, + schema, metadata: Metadata { open_telemetry_context, }, @@ -120,11 +123,31 @@ impl<'lib> SharedLibraryOperator<'lib> { ..Default::default() }; + let arrow_array = match arrow::ffi::from_ffi(data_array, &schema) { + Ok(a) => a, + Err(err) => { + return DoraResult { + error: Some(err.to_string().into()), + } + } + }; + + let total_len = required_data_size(&arrow_array); + let mut sample = vec![0; total_len]; + let type_info = match copy_array_into_sample(&mut sample, &arrow_array) { + Ok(t) => t, + Err(err) => { + return DoraResult { + error: Some(err.to_string().into()), + } + } + }; + let event = OperatorEvent::Output { output_id: DataId::from(String::from(output_id)), - type_info: ArrowTypeInfo::byte_array(data.len()), + type_info, parameters, - data: Some(data.to_owned().into()), + data: Some(sample.into()), }; let result = self @@ -168,7 +191,7 @@ impl<'lib> SharedLibraryOperator<'lib> { metadata.parameters.open_telemetry_context = string_cx; } - let operator_event = match event { + let mut operator_event = match event { Event::Stop => dora_operator_api_types::RawEvent { input: None, input_closed: None, @@ -180,14 +203,12 @@ impl<'lib> SharedLibraryOperator<'lib> { metadata, data, } => { + let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?; + let operator_input = dora_operator_api_types::Input { id: String::from(input_id).into(), - data: data - .as_byte_slice() - .map(|d| d.to_vec()) - // TODO: don't silence error - .unwrap_or_default() - .into(), + data_array: Some(data_array), + schema, metadata: Metadata { open_telemetry_context: metadata .parameters @@ -232,7 +253,7 @@ impl<'lib> SharedLibraryOperator<'lib> { status, } = unsafe { (self.bindings.on_event.on_event)( - &operator_event, + &mut operator_event, &send_output, operator_context.raw, ) diff --git a/examples/benchmark/node/src/main.rs b/examples/benchmark/node/src/main.rs index 2ec10cad..9080bbc5 100644 --- a/examples/benchmark/node/src/main.rs +++ b/examples/benchmark/node/src/main.rs @@ -31,7 +31,7 @@ fn main() -> eyre::Result<()> { .sample_iter(rand::distributions::Standard) .take(size) .collect(); - node.send_output(latency.clone(), Default::default(), data.len(), |out| { + node.send_output_raw(latency.clone(), Default::default(), data.len(), |out| { out.copy_from_slice(&data); })?; @@ -50,7 +50,7 @@ fn main() -> eyre::Result<()> { .sample_iter(rand::distributions::Standard) .take(size) .collect(); - node.send_output(throughput.clone(), Default::default(), data.len(), |out| { + node.send_output_raw(throughput.clone(), Default::default(), data.len(), |out| { out.copy_from_slice(&data); })?; } diff --git a/examples/c++-dataflow/operator-c-api/operator.cc b/examples/c++-dataflow/operator-c-api/operator.cc index 7c9fd299..c48f49e2 100644 --- a/examples/c++-dataflow/operator-c-api/operator.cc +++ b/examples/c++-dataflow/operator-c-api/operator.cc @@ -31,7 +31,7 @@ extern "C" DoraResult_t dora_drop_operator(void *operator_context) } extern "C" OnEventResult_t dora_on_event( - const RawEvent_t *event, + RawEvent_t *event, const SendOutput_t *send_output, void *operator_context) { diff --git a/examples/c-dataflow/operator.c b/examples/c-dataflow/operator.c index ba64e8e3..d3cc01ca 100644 --- a/examples/c-dataflow/operator.c +++ b/examples/c-dataflow/operator.c @@ -23,7 +23,7 @@ DoraResult_t dora_drop_operator(void *operator_context) } OnEventResult_t dora_on_event( - const RawEvent_t *event, + RawEvent_t *event, const SendOutput_t *send_output, void *operator_context) { diff --git a/examples/multiple-daemons/node/src/main.rs b/examples/multiple-daemons/node/src/main.rs index c52e4618..23602e84 100644 --- a/examples/multiple-daemons/node/src/main.rs +++ b/examples/multiple-daemons/node/src/main.rs @@ -23,7 +23,7 @@ fn main() -> eyre::Result<()> { let random: u64 = rand::random(); println!("tick {i}, sending {random:#x}"); let data: &[u8] = &random.to_le_bytes(); - node.send_output(output.clone(), metadata.parameters, data.len(), |out| { + node.send_output_raw(output.clone(), metadata.parameters, data.len(), |out| { out.copy_from_slice(data); })?; } diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index c52e4618..b90ed47e 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -1,4 +1,9 @@ -use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event}; +use dora_node_api::{ + self, + arrow::{array::PrimitiveBuilder, datatypes::UInt64Type}, + dora_core::config::DataId, + DoraNode, Event, +}; fn main() -> eyre::Result<()> { println!("hello"); @@ -22,10 +27,12 @@ fn main() -> eyre::Result<()> { "tick" => { let random: u64 = rand::random(); println!("tick {i}, sending {random:#x}"); - let data: &[u8] = &random.to_le_bytes(); - node.send_output(output.clone(), metadata.parameters, data.len(), |out| { - out.copy_from_slice(data); - })?; + let data = { + let mut builder: PrimitiveBuilder = PrimitiveBuilder::new(); + builder.append_value(random); + builder.finish() + }; + node.send_output(output.clone(), metadata.parameters, data)?; } other => eprintln!("Ignoring unexpected input `{other}`"), }, diff --git a/examples/rust-dataflow/operator/src/lib.rs b/examples/rust-dataflow/operator/src/lib.rs index 81a1a1f0..fe544049 100644 --- a/examples/rust-dataflow/operator/src/lib.rs +++ b/examples/rust-dataflow/operator/src/lib.rs @@ -1,6 +1,13 @@ #![warn(unsafe_op_in_unsafe_fn)] -use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; +use dora_operator_api::{ + register_operator, + types::arrow::{ + array::{AsArray, PrimitiveArray, StringBuilder}, + datatypes::UInt64Type, + }, + DoraOperator, DoraOutputSender, DoraStatus, Event, +}; register_operator!(ExampleOperator); @@ -21,16 +28,20 @@ impl DoraOperator for ExampleOperator { self.ticks += 1; } "random" => { - let parsed = { - let data: [u8; 8] = - (*data).try_into().map_err(|_| "unexpected random data")?; - u64::from_le_bytes(data) - }; + let primitive_array: &PrimitiveArray = + data.as_primitive_opt().ok_or("expected primitive array")?; + let value = primitive_array.value(0); + let output = format!( - "operator received random value {parsed:#x} after {} ticks", + "operator received random value {value:#x} after {} ticks", self.ticks ); - output_sender.send("status".into(), output.into_bytes())?; + let output_data = { + let mut builder = StringBuilder::new(); + builder.append_value(output); + builder.finish() + }; + output_sender.send("status".into(), output_data)?; } other => eprintln!("ignoring unexpected input {other}"), }, diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index 674deacf..b75709a5 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -1,5 +1,9 @@ -use dora_node_api::{self, DoraNode, Event}; -use eyre::{bail, Context}; +use dora_node_api::{ + self, + arrow::array::{AsArray, StringArray}, + DoraNode, Event, +}; +use eyre::{bail, ContextCompat}; fn main() -> eyre::Result<()> { let (_node, mut events) = DoraNode::init_from_env()?; @@ -8,8 +12,10 @@ fn main() -> eyre::Result<()> { match event { Event::Input { id, metadata, data } => match id.as_str() { "message" => { - let received_string = std::str::from_utf8(data.as_byte_slice()?) + let string_array: &StringArray = data + .as_string_opt() .wrap_err("received message was not utf8-encoded")?; + let received_string = string_array.value(0); println!("sink received message: {}", received_string); if !received_string.starts_with("operator received random value ") { bail!("unexpected message format (should start with 'operator received random value')")