| @@ -1,9 +1,14 @@ | |||
| use arrow::array::ArrayData; | |||
| use dora_ros2_bridge_msg_gen::types::sequences; | |||
| use crate::typed::TypeInfo; | |||
| use super::sequence::SequenceVisitor; | |||
| pub struct ArrayDeserializer<'a>(pub &'a sequences::Array); | |||
| pub struct ArrayDeserializer<'a> { | |||
| pub array_type: &'a sequences::Array, | |||
| pub type_info: &'a TypeInfo<'a>, | |||
| } | |||
| impl<'de> serde::de::DeserializeSeed<'de> for ArrayDeserializer<'_> { | |||
| type Value = ArrayData; | |||
| @@ -12,6 +17,12 @@ impl<'de> serde::de::DeserializeSeed<'de> for ArrayDeserializer<'_> { | |||
| where | |||
| D: serde::Deserializer<'de>, | |||
| { | |||
| deserializer.deserialize_tuple(self.0.size, SequenceVisitor(&self.0.value_type)) | |||
| deserializer.deserialize_tuple( | |||
| self.array_type.size, | |||
| SequenceVisitor { | |||
| item_type: &self.array_type.value_type, | |||
| type_info: self.type_info, | |||
| }, | |||
| ) | |||
| } | |||
| } | |||
| @@ -125,13 +125,13 @@ impl<'a, 'de> serde::de::Visitor<'de> for StructVisitor<'a> { | |||
| } | |||
| }, | |||
| dora_ros2_bridge_msg_gen::types::MemberType::Array(a) => { | |||
| data.next_element_seed(array::ArrayDeserializer(a))? | |||
| data.next_element_seed(array::ArrayDeserializer{ array_type : a, type_info: self.type_info})? | |||
| }, | |||
| dora_ros2_bridge_msg_gen::types::MemberType::Sequence(s) => { | |||
| data.next_element_seed(sequence::SequenceDeserializer(&s.value_type))? | |||
| data.next_element_seed(sequence::SequenceDeserializer{item_type: &s.value_type, type_info: self.type_info})? | |||
| }, | |||
| dora_ros2_bridge_msg_gen::types::MemberType::BoundedSequence(s) => { | |||
| data.next_element_seed(sequence::SequenceDeserializer(&s.value_type))? | |||
| data.next_element_seed(sequence::SequenceDeserializer{ item_type: &s.value_type, type_info: self.type_info})? | |||
| }, | |||
| }; | |||
| @@ -5,8 +5,16 @@ use arrow::{ | |||
| use core::fmt; | |||
| use dora_ros2_bridge_msg_gen::types::primitives::{self, BasicType, NestableType}; | |||
| use serde::Deserialize; | |||
| use std::{borrow::Cow, ops::Deref}; | |||
| pub struct SequenceDeserializer<'a>(pub &'a NestableType); | |||
| use crate::typed::TypeInfo; | |||
| use super::{error, StructDeserializer}; | |||
| pub struct SequenceDeserializer<'a> { | |||
| pub item_type: &'a NestableType, | |||
| pub type_info: &'a TypeInfo<'a>, | |||
| } | |||
| impl<'de> serde::de::DeserializeSeed<'de> for SequenceDeserializer<'_> { | |||
| type Value = ArrayData; | |||
| @@ -15,11 +23,17 @@ impl<'de> serde::de::DeserializeSeed<'de> for SequenceDeserializer<'_> { | |||
| where | |||
| D: serde::Deserializer<'de>, | |||
| { | |||
| deserializer.deserialize_seq(SequenceVisitor(self.0)) | |||
| deserializer.deserialize_seq(SequenceVisitor { | |||
| item_type: self.item_type, | |||
| type_info: self.type_info, | |||
| }) | |||
| } | |||
| } | |||
| pub struct SequenceVisitor<'a>(pub &'a NestableType); | |||
| pub struct SequenceVisitor<'a> { | |||
| pub item_type: &'a NestableType, | |||
| pub type_info: &'a TypeInfo<'a>, | |||
| } | |||
| impl<'de> serde::de::Visitor<'de> for SequenceVisitor<'_> { | |||
| type Value = ArrayData; | |||
| @@ -32,7 +46,7 @@ impl<'de> serde::de::Visitor<'de> for SequenceVisitor<'_> { | |||
| where | |||
| A: serde::de::SeqAccess<'de>, | |||
| { | |||
| match &self.0 { | |||
| match &self.item_type { | |||
| NestableType::BasicType(t) => match t { | |||
| BasicType::I8 => deserialize_primitive_seq::<_, datatypes::Int8Type>(seq), | |||
| BasicType::I16 => deserialize_primitive_seq::<_, datatypes::Int16Type>(seq), | |||
| @@ -54,9 +68,30 @@ impl<'de> serde::de::Visitor<'de> for SequenceVisitor<'_> { | |||
| Ok(array.finish().into()) | |||
| } | |||
| }, | |||
| NestableType::NamedType(_) => todo!("deserialize sequence of NestableType::NamedType"), | |||
| NestableType::NamespacedType(_) => { | |||
| todo!("deserialize sequence of NestableType::NamedspacedType") | |||
| NestableType::NamedType(name) => { | |||
| let deserializer = StructDeserializer { | |||
| type_info: Cow::Owned(TypeInfo { | |||
| package_name: Cow::Borrowed(&self.type_info.package_name), | |||
| message_name: Cow::Borrowed(&name.0), | |||
| messages: self.type_info.messages.clone(), | |||
| }), | |||
| }; | |||
| deserialize_struct_array(&mut seq, deserializer) | |||
| } | |||
| NestableType::NamespacedType(reference) => { | |||
| if reference.namespace != "msg" { | |||
| return Err(error(format!( | |||
| "sequence item references non-message type {reference:?}", | |||
| ))); | |||
| } | |||
| let deserializer = StructDeserializer { | |||
| type_info: Cow::Owned(TypeInfo { | |||
| package_name: Cow::Borrowed(&reference.package), | |||
| message_name: Cow::Borrowed(&reference.name), | |||
| messages: self.type_info.messages.clone(), | |||
| }), | |||
| }; | |||
| deserialize_struct_array(&mut seq, deserializer) | |||
| } | |||
| NestableType::GenericString(t) => match t { | |||
| primitives::GenericString::String | primitives::GenericString::BoundedString(_) => { | |||
| @@ -75,6 +110,23 @@ impl<'de> serde::de::Visitor<'de> for SequenceVisitor<'_> { | |||
| } | |||
| } | |||
| fn deserialize_struct_array<'de, A>( | |||
| seq: &mut A, | |||
| deserializer: StructDeserializer<'_>, | |||
| ) -> Result<ArrayData, <A as serde::de::SeqAccess<'de>>::Error> | |||
| where | |||
| A: serde::de::SeqAccess<'de>, | |||
| { | |||
| let mut values = Vec::new(); | |||
| while let Some(value) = seq.next_element_seed(deserializer.clone())? { | |||
| values.push(arrow::array::make_array(value)); | |||
| } | |||
| let refs: Vec<_> = values.iter().map(|a| a.deref()).collect(); | |||
| arrow::compute::concat(&refs) | |||
| .map(|a| a.to_data()) | |||
| .map_err(super::error) | |||
| } | |||
| fn deserialize_primitive_seq<'de, S, T>( | |||
| mut seq: S, | |||
| ) -> Result<ArrayData, <S as serde::de::SeqAccess<'de>>::Error> | |||
| @@ -1,21 +1,24 @@ | |||
| use std::marker::PhantomData; | |||
| use std::{borrow::Cow, marker::PhantomData, sync::Arc}; | |||
| use arrow::{ | |||
| array::{Array, ArrayRef, AsArray, PrimitiveArray}, | |||
| array::{Array, ArrayRef, AsArray, OffsetSizeTrait, PrimitiveArray}, | |||
| datatypes::{self, ArrowPrimitiveType}, | |||
| }; | |||
| use dora_ros2_bridge_msg_gen::types::{ | |||
| primitives::{BasicType, NestableType}, | |||
| primitives::{BasicType, GenericString, NestableType}, | |||
| sequences, | |||
| }; | |||
| use serde::ser::SerializeTuple; | |||
| use super::error; | |||
| use crate::typed::TypeInfo; | |||
| use super::{error, TypedValue}; | |||
| /// Serialize an array with known size as tuple. | |||
| pub struct ArraySerializeWrapper<'a> { | |||
| pub array_info: &'a sequences::Array, | |||
| pub column: &'a ArrayRef, | |||
| pub type_info: &'a TypeInfo<'a>, | |||
| } | |||
| impl serde::Serialize for ArraySerializeWrapper<'_> { | |||
| @@ -106,13 +109,67 @@ impl serde::Serialize for ArraySerializeWrapper<'_> { | |||
| } | |||
| .serialize(serializer), | |||
| }, | |||
| NestableType::NamedType(_) => todo!("serializing arrays of NestableType::NamedType"), | |||
| NestableType::NamespacedType(_) => { | |||
| todo!("serializing arrays of NestableType::NamespacedType") | |||
| NestableType::NamedType(name) => { | |||
| let array = entry | |||
| .as_struct_opt() | |||
| .ok_or_else(|| error("not a struct array"))?; | |||
| let mut seq = serializer.serialize_tuple(self.array_info.size)?; | |||
| for i in 0..array.len() { | |||
| let row = array.slice(i, 1); | |||
| seq.serialize_element(&TypedValue { | |||
| value: &(Arc::new(row) as ArrayRef), | |||
| type_info: &crate::typed::TypeInfo { | |||
| package_name: Cow::Borrowed(&self.type_info.package_name), | |||
| message_name: Cow::Borrowed(&name.0), | |||
| messages: self.type_info.messages.clone(), | |||
| }, | |||
| })?; | |||
| } | |||
| seq.end() | |||
| } | |||
| NestableType::GenericString(_) => { | |||
| todo!("serializing arrays of NestableType::GenericString") | |||
| NestableType::NamespacedType(reference) => { | |||
| if reference.namespace != "msg" { | |||
| return Err(error(format!( | |||
| "sequence references non-message type {reference:?}" | |||
| ))); | |||
| } | |||
| let array = entry | |||
| .as_struct_opt() | |||
| .ok_or_else(|| error("not a struct array"))?; | |||
| let mut seq = serializer.serialize_tuple(self.array_info.size)?; | |||
| for i in 0..array.len() { | |||
| let row = array.slice(i, 1); | |||
| seq.serialize_element(&TypedValue { | |||
| value: &(Arc::new(row) as ArrayRef), | |||
| type_info: &crate::typed::TypeInfo { | |||
| package_name: Cow::Borrowed(&reference.package), | |||
| message_name: Cow::Borrowed(&reference.name), | |||
| messages: self.type_info.messages.clone(), | |||
| }, | |||
| })?; | |||
| } | |||
| seq.end() | |||
| } | |||
| NestableType::GenericString(s) => match s { | |||
| GenericString::String | GenericString::BoundedString(_) => { | |||
| match entry.as_string_opt::<i32>() { | |||
| Some(array) => { | |||
| serialize_arrow_string(serializer, array, self.array_info.size) | |||
| } | |||
| None => { | |||
| let array = entry | |||
| .as_string_opt::<i64>() | |||
| .ok_or_else(|| error("expected string array"))?; | |||
| serialize_arrow_string(serializer, array, self.array_info.size) | |||
| } | |||
| } | |||
| } | |||
| GenericString::WString => { | |||
| todo!("serializing WString sequences") | |||
| } | |||
| GenericString::BoundedWString(_) => todo!("serializing BoundedWString sequences"), | |||
| }, | |||
| } | |||
| } | |||
| } | |||
| @@ -184,3 +241,19 @@ impl serde::Serialize for BoolArrayAsTuple<'_> { | |||
| seq.end() | |||
| } | |||
| } | |||
| fn serialize_arrow_string<S, O>( | |||
| serializer: S, | |||
| array: &arrow::array::GenericByteArray<datatypes::GenericStringType<O>>, | |||
| array_len: usize, | |||
| ) -> Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error> | |||
| where | |||
| S: serde::Serializer, | |||
| O: OffsetSizeTrait, | |||
| { | |||
| let mut seq = serializer.serialize_tuple(array_len)?; | |||
| for s in array.iter() { | |||
| seq.serialize_element(s.unwrap_or_default())?; | |||
| } | |||
| seq.end() | |||
| } | |||
| @@ -145,6 +145,7 @@ impl serde::Serialize for TypedValue<'_> { | |||
| &array::ArraySerializeWrapper { | |||
| array_info: a, | |||
| column: column.as_ref(), | |||
| type_info: self.type_info, | |||
| }, | |||
| )?; | |||
| } | |||
| @@ -154,6 +155,7 @@ impl serde::Serialize for TypedValue<'_> { | |||
| &sequence::SequenceSerializeWrapper { | |||
| item_type: &v.value_type, | |||
| column: column.as_ref(), | |||
| type_info: self.type_info, | |||
| }, | |||
| )?; | |||
| } | |||
| @@ -163,6 +165,7 @@ impl serde::Serialize for TypedValue<'_> { | |||
| &sequence::SequenceSerializeWrapper { | |||
| item_type: &v.value_type, | |||
| column: column.as_ref(), | |||
| type_info: self.type_info, | |||
| }, | |||
| )?; | |||
| } | |||
| @@ -1,18 +1,21 @@ | |||
| use std::marker::PhantomData; | |||
| use std::{borrow::Cow, marker::PhantomData, sync::Arc}; | |||
| use arrow::{ | |||
| array::{Array, ArrayRef, AsArray, PrimitiveArray}, | |||
| array::{Array, ArrayRef, AsArray, OffsetSizeTrait, PrimitiveArray}, | |||
| datatypes::{self, ArrowPrimitiveType}, | |||
| }; | |||
| use dora_ros2_bridge_msg_gen::types::primitives::{BasicType, NestableType}; | |||
| use dora_ros2_bridge_msg_gen::types::primitives::{BasicType, GenericString, NestableType}; | |||
| use serde::ser::{SerializeSeq, SerializeTuple}; | |||
| use super::error; | |||
| use crate::typed::TypeInfo; | |||
| use super::{error, TypedValue}; | |||
| /// Serialize a variable-sized sequence. | |||
| pub struct SequenceSerializeWrapper<'a> { | |||
| pub item_type: &'a NestableType, | |||
| pub column: &'a ArrayRef, | |||
| pub type_info: &'a TypeInfo<'a>, | |||
| } | |||
| impl serde::Serialize for SequenceSerializeWrapper<'_> { | |||
| @@ -88,17 +91,84 @@ impl serde::Serialize for SequenceSerializeWrapper<'_> { | |||
| .serialize(serializer), | |||
| BasicType::Bool => BoolArray { value: &entry }.serialize(serializer), | |||
| }, | |||
| NestableType::NamedType(_) => todo!("serializing NestableType::NamedType sequences"), | |||
| NestableType::NamespacedType(_) => { | |||
| todo!("serializing NestableType::NamespacedType sequences") | |||
| NestableType::NamedType(name) => { | |||
| let array = entry | |||
| .as_struct_opt() | |||
| .ok_or_else(|| error("not a struct array"))?; | |||
| let mut seq = serializer.serialize_seq(Some(array.len()))?; | |||
| for i in 0..array.len() { | |||
| let row = array.slice(i, 1); | |||
| seq.serialize_element(&TypedValue { | |||
| value: &(Arc::new(row) as ArrayRef), | |||
| type_info: &crate::typed::TypeInfo { | |||
| package_name: Cow::Borrowed(&self.type_info.package_name), | |||
| message_name: Cow::Borrowed(&name.0), | |||
| messages: self.type_info.messages.clone(), | |||
| }, | |||
| })?; | |||
| } | |||
| seq.end() | |||
| } | |||
| NestableType::GenericString(_) => { | |||
| todo!("serializing NestableType::Genericstring sequences") | |||
| NestableType::NamespacedType(reference) => { | |||
| if reference.namespace != "msg" { | |||
| return Err(error(format!( | |||
| "sequence references non-message type {reference:?}" | |||
| ))); | |||
| } | |||
| let array = entry | |||
| .as_struct_opt() | |||
| .ok_or_else(|| error("not a struct array"))?; | |||
| let mut seq = serializer.serialize_seq(Some(array.len()))?; | |||
| for i in 0..array.len() { | |||
| let row = array.slice(i, 1); | |||
| seq.serialize_element(&TypedValue { | |||
| value: &(Arc::new(row) as ArrayRef), | |||
| type_info: &crate::typed::TypeInfo { | |||
| package_name: Cow::Borrowed(&reference.package), | |||
| message_name: Cow::Borrowed(&reference.name), | |||
| messages: self.type_info.messages.clone(), | |||
| }, | |||
| })?; | |||
| } | |||
| seq.end() | |||
| } | |||
| NestableType::GenericString(s) => match s { | |||
| GenericString::String | GenericString::BoundedString(_) => { | |||
| match entry.as_string_opt::<i32>() { | |||
| Some(array) => serialize_arrow_string(serializer, array), | |||
| None => { | |||
| let array = entry | |||
| .as_string_opt::<i64>() | |||
| .ok_or_else(|| error("expected string array"))?; | |||
| serialize_arrow_string(serializer, array) | |||
| } | |||
| } | |||
| } | |||
| GenericString::WString => { | |||
| todo!("serializing WString sequences") | |||
| } | |||
| GenericString::BoundedWString(_) => todo!("serializing BoundedWString sequences"), | |||
| }, | |||
| } | |||
| } | |||
| } | |||
| fn serialize_arrow_string<S, O>( | |||
| serializer: S, | |||
| array: &arrow::array::GenericByteArray<datatypes::GenericStringType<O>>, | |||
| ) -> Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error> | |||
| where | |||
| S: serde::Serializer, | |||
| O: OffsetSizeTrait, | |||
| { | |||
| let mut seq = serializer.serialize_seq(Some(array.len()))?; | |||
| for s in array.iter() { | |||
| seq.serialize_element(s.unwrap_or_default())?; | |||
| } | |||
| seq.end() | |||
| } | |||
| struct BasicSequence<'a, T> { | |||
| value: &'a ArrayRef, | |||
| ty: PhantomData<T>, | |||