|
|
|
@@ -1,15 +1,23 @@ |
|
|
|
use arrow::array::ArrayData; |
|
|
|
use arrow::array::{ArrayData, BufferSpec}; |
|
|
|
use dora_core::message::{ArrowTypeInfo, BufferOffset}; |
|
|
|
|
|
|
|
pub fn required_data_size(array: &ArrayData) -> usize { |
|
|
|
let mut size = 0; |
|
|
|
for buffer in array.buffers() { |
|
|
|
size += buffer.len(); |
|
|
|
let mut next_offset = 0; |
|
|
|
required_data_size_inner(array, &mut next_offset); |
|
|
|
next_offset |
|
|
|
} |
|
|
|
fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) { |
|
|
|
let layout = arrow::array::layout(array.data_type()); |
|
|
|
for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) { |
|
|
|
// consider alignment padding |
|
|
|
if let BufferSpec::FixedWidth { alignment, .. } = spec { |
|
|
|
*next_offset = (*next_offset + alignment - 1) / alignment * alignment; |
|
|
|
} |
|
|
|
*next_offset += buffer.len(); |
|
|
|
} |
|
|
|
for child in array.child_data() { |
|
|
|
size += required_data_size(child); |
|
|
|
required_data_size_inner(child, next_offset); |
|
|
|
} |
|
|
|
size |
|
|
|
} |
|
|
|
|
|
|
|
pub fn copy_array_into_sample( |
|
|
|
@@ -26,7 +34,8 @@ fn copy_array_into_sample_inner( |
|
|
|
arrow_array: &ArrayData, |
|
|
|
) -> eyre::Result<ArrowTypeInfo> { |
|
|
|
let mut buffer_offsets = Vec::new(); |
|
|
|
for buffer in arrow_array.buffers().iter() { |
|
|
|
let layout = arrow::array::layout(arrow_array.data_type()); |
|
|
|
for (buffer, spec) in arrow_array.buffers().iter().zip(&layout.buffers) { |
|
|
|
let len = buffer.len(); |
|
|
|
assert!( |
|
|
|
target_buffer[*next_offset..].len() >= len, |
|
|
|
@@ -34,6 +43,11 @@ fn copy_array_into_sample_inner( |
|
|
|
target_buffer.len(), |
|
|
|
*next_offset, |
|
|
|
); |
|
|
|
// add alignment padding |
|
|
|
if let BufferSpec::FixedWidth { alignment, .. } = spec { |
|
|
|
*next_offset = (*next_offset + alignment - 1) / alignment * alignment; |
|
|
|
} |
|
|
|
|
|
|
|
target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice()); |
|
|
|
buffer_offsets.push(BufferOffset { |
|
|
|
offset: *next_offset, |
|
|
|
|