Browse Source

Fix python roundtrip test by exposing `buffer_into_arrow_array`

pull/1056/head
Philipp Oppermann 6 months ago
parent
commit
933078843a
Failed to extract signature
3 changed files with 54 additions and 45 deletions
  1. +13
    -7
      apis/python/operator/src/lib.rs
  2. +8
    -38
      apis/rust/node/src/event_stream/data_conversion.rs
  3. +33
    -0
      apis/rust/node/src/node/arrow_utils.rs

+ 13
- 7
apis/python/operator/src/lib.rs View File

@@ -289,7 +289,7 @@ pub fn metadata_to_pydict<'a>(

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{ptr::NonNull, sync::Arc};

use aligned_vec::{AVec, ConstAlign};
use arrow::{
@@ -301,9 +301,8 @@ mod tests {
};

use arrow_schema::{DataType, Field};
use dora_node_api::{
arrow_utils::{copy_array_into_sample, required_data_size},
RawData,
use dora_node_api::arrow_utils::{
buffer_into_arrow_array, copy_array_into_sample, required_data_size,
};
use eyre::{Context, Result};

@@ -313,9 +312,16 @@ mod tests {

let info = copy_array_into_sample(&mut sample, arrow_array);

let serialized_deserialized_arrow_array = RawData::Vec(sample)
.into_arrow_array(&info)
.context("Could not create arrow array")?;
let serialized_deserialized_arrow_array = {
let ptr = NonNull::new(sample.as_ptr() as *mut _).unwrap();
let len = sample.len();

let raw_buffer = unsafe {
arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(sample))
};
buffer_into_arrow_array(&raw_buffer, &info)?
};

assert_eq!(arrow_array, &serialized_deserialized_arrow_array);

Ok(())


+ 8
- 38
apis/rust/node/src/event_stream/data_conversion.rs View File

@@ -2,10 +2,12 @@ use std::{ptr::NonNull, sync::Arc};

use aligned_vec::{AVec, ConstAlign};
use dora_arrow_convert::IntoArrow;
use dora_message::metadata::{ArrowTypeInfo, BufferOffset};
use dora_message::metadata::ArrowTypeInfo;
use eyre::Context;
use shared_memory_server::{Shmem, ShmemConf};

use crate::arrow_utils::buffer_into_arrow_array;

pub enum RawData {
Empty,
Vec(AVec<u8, ConstAlign<128>>),
@@ -37,49 +39,17 @@ impl RawData {
}
}

pub struct SharedMemoryData {
pub data: MappedInputData,
pub _drop: flume::Sender<()>,
}

fn buffer_into_arrow_array(
raw_buffer: &arrow::buffer::Buffer,
type_info: &ArrowTypeInfo,
) -> eyre::Result<arrow::array::ArrayData> {
if raw_buffer.is_empty() {
return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
}

let mut buffers = Vec::new();
for BufferOffset { offset, len } in &type_info.buffer_offsets {
buffers.push(raw_buffer.slice_with_length(*offset, *len));
}

let mut child_data = Vec::new();
for child_type_info in &type_info.child_data {
child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
}

arrow::array::ArrayData::try_new(
type_info.data_type.clone(),
type_info.len,
type_info
.validity
.clone()
.map(arrow::buffer::Buffer::from_vec),
type_info.offset,
buffers,
child_data,
)
.context("Error creating Arrow array")
}

impl std::fmt::Debug for RawData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Data").finish_non_exhaustive()
}
}

pub struct SharedMemoryData {
pub data: MappedInputData,
pub _drop: flume::Sender<()>,
}

pub struct MappedInputData {
memory: Box<Shmem>,
len: usize,


+ 33
- 0
apis/rust/node/src/node/arrow_utils.rs View File

@@ -1,5 +1,6 @@
use arrow::array::{ArrayData, BufferSpec};
use dora_message::metadata::{ArrowTypeInfo, BufferOffset};
use eyre::Context;

pub fn required_data_size(array: &ArrayData) -> usize {
let mut next_offset = 0;
@@ -69,3 +70,35 @@ fn copy_array_into_sample_inner(
child_data,
}
}

pub fn buffer_into_arrow_array(
raw_buffer: &arrow::buffer::Buffer,
type_info: &ArrowTypeInfo,
) -> eyre::Result<arrow::array::ArrayData> {
if raw_buffer.is_empty() {
return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
}

let mut buffers = Vec::new();
for BufferOffset { offset, len } in &type_info.buffer_offsets {
buffers.push(raw_buffer.slice_with_length(*offset, *len));
}

let mut child_data = Vec::new();
for child_type_info in &type_info.child_data {
child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
}

arrow::array::ArrayData::try_new(
type_info.data_type.clone(),
type_info.len,
type_info
.validity
.clone()
.map(arrow::buffer::Buffer::from_vec),
type_info.offset,
buffers,
child_data,
)
.context("Error creating Arrow array")
}

Loading…
Cancel
Save