diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index ea34b3b4..59c9dbc1 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -289,7 +289,7 @@ pub fn metadata_to_pydict<'a>( #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{ptr::NonNull, sync::Arc}; use aligned_vec::{AVec, ConstAlign}; use arrow::{ @@ -301,9 +301,8 @@ mod tests { }; use arrow_schema::{DataType, Field}; - use dora_node_api::{ - arrow_utils::{copy_array_into_sample, required_data_size}, - RawData, + use dora_node_api::arrow_utils::{ + buffer_into_arrow_array, copy_array_into_sample, required_data_size, }; use eyre::{Context, Result}; @@ -313,9 +312,16 @@ mod tests { let info = copy_array_into_sample(&mut sample, arrow_array); - let serialized_deserialized_arrow_array = RawData::Vec(sample) - .into_arrow_array(&info) - .context("Could not create arrow array")?; + let serialized_deserialized_arrow_array = { + let ptr = NonNull::new(sample.as_ptr() as *mut _).unwrap(); + let len = sample.len(); + + let raw_buffer = unsafe { + arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(sample)) + }; + buffer_into_arrow_array(&raw_buffer, &info)? + }; + assert_eq!(arrow_array, &serialized_deserialized_arrow_array); Ok(()) diff --git a/apis/rust/node/src/event_stream/data_conversion.rs b/apis/rust/node/src/event_stream/data_conversion.rs index 8e39caf8..acc29bb1 100644 --- a/apis/rust/node/src/event_stream/data_conversion.rs +++ b/apis/rust/node/src/event_stream/data_conversion.rs @@ -2,10 +2,12 @@ use std::{ptr::NonNull, sync::Arc}; use aligned_vec::{AVec, ConstAlign}; use dora_arrow_convert::IntoArrow; -use dora_message::metadata::{ArrowTypeInfo, BufferOffset}; +use dora_message::metadata::ArrowTypeInfo; use eyre::Context; use shared_memory_server::{Shmem, ShmemConf}; +use crate::arrow_utils::buffer_into_arrow_array; + pub enum RawData { Empty, Vec(AVec>), @@ -37,49 +39,17 @@ impl RawData { } } -pub struct SharedMemoryData { - pub data: MappedInputData, - pub _drop: flume::Sender<()>, -} - -fn buffer_into_arrow_array( - raw_buffer: &arrow::buffer::Buffer, - type_info: &ArrowTypeInfo, -) -> eyre::Result { - if raw_buffer.is_empty() { - return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type)); - } - - let mut buffers = Vec::new(); - for BufferOffset { offset, len } in &type_info.buffer_offsets { - buffers.push(raw_buffer.slice_with_length(*offset, *len)); - } - - let mut child_data = Vec::new(); - for child_type_info in &type_info.child_data { - child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?) - } - - arrow::array::ArrayData::try_new( - type_info.data_type.clone(), - type_info.len, - type_info - .validity - .clone() - .map(arrow::buffer::Buffer::from_vec), - type_info.offset, - buffers, - child_data, - ) - .context("Error creating Arrow array") -} - impl std::fmt::Debug for RawData { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Data").finish_non_exhaustive() } } +pub struct SharedMemoryData { + pub data: MappedInputData, + pub _drop: flume::Sender<()>, +} + pub struct MappedInputData { memory: Box, len: usize, diff --git a/apis/rust/node/src/node/arrow_utils.rs b/apis/rust/node/src/node/arrow_utils.rs index 955d6f0b..39c714c2 100644 --- a/apis/rust/node/src/node/arrow_utils.rs +++ b/apis/rust/node/src/node/arrow_utils.rs @@ -1,5 +1,6 @@ use arrow::array::{ArrayData, BufferSpec}; use dora_message::metadata::{ArrowTypeInfo, BufferOffset}; +use eyre::Context; pub fn required_data_size(array: &ArrayData) -> usize { let mut next_offset = 0; @@ -69,3 +70,35 @@ fn copy_array_into_sample_inner( child_data, } } + +pub fn buffer_into_arrow_array( + raw_buffer: &arrow::buffer::Buffer, + type_info: &ArrowTypeInfo, +) -> eyre::Result { + if raw_buffer.is_empty() { + return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type)); + } + + let mut buffers = Vec::new(); + for BufferOffset { offset, len } in &type_info.buffer_offsets { + buffers.push(raw_buffer.slice_with_length(*offset, *len)); + } + + let mut child_data = Vec::new(); + for child_type_info in &type_info.child_data { + child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?) + } + + arrow::array::ArrayData::try_new( + type_info.data_type.clone(), + type_info.len, + type_info + .validity + .clone() + .map(arrow::buffer::Buffer::from_vec), + type_info.offset, + buffers, + child_data, + ) + .context("Error creating Arrow array") +}