diff --git a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/array.rs b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/array.rs index 79030abe..170092dc 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/array.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/array.rs @@ -1,9 +1,14 @@ use arrow::array::ArrayData; use dora_ros2_bridge_msg_gen::types::sequences; +use crate::typed::TypeInfo; + use super::sequence::SequenceVisitor; -pub struct ArrayDeserializer<'a>(pub &'a sequences::Array); +pub struct ArrayDeserializer<'a> { + pub array_type: &'a sequences::Array, + pub type_info: &'a TypeInfo<'a>, +} impl<'de> serde::de::DeserializeSeed<'de> for ArrayDeserializer<'_> { type Value = ArrayData; @@ -12,6 +17,12 @@ impl<'de> serde::de::DeserializeSeed<'de> for ArrayDeserializer<'_> { where D: serde::Deserializer<'de>, { - deserializer.deserialize_tuple(self.0.size, SequenceVisitor(&self.0.value_type)) + deserializer.deserialize_tuple( + self.array_type.size, + SequenceVisitor { + item_type: &self.array_type.value_type, + type_info: self.type_info, + }, + ) } } diff --git a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/mod.rs b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/mod.rs index 93a94eee..ca74e8e5 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/mod.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/mod.rs @@ -125,13 +125,13 @@ impl<'a, 'de> serde::de::Visitor<'de> for StructVisitor<'a> { } }, dora_ros2_bridge_msg_gen::types::MemberType::Array(a) => { - data.next_element_seed(array::ArrayDeserializer(a))? + data.next_element_seed(array::ArrayDeserializer{ array_type : a, type_info: self.type_info})? }, dora_ros2_bridge_msg_gen::types::MemberType::Sequence(s) => { - data.next_element_seed(sequence::SequenceDeserializer(&s.value_type))? + data.next_element_seed(sequence::SequenceDeserializer{item_type: &s.value_type, type_info: self.type_info})? }, dora_ros2_bridge_msg_gen::types::MemberType::BoundedSequence(s) => { - data.next_element_seed(sequence::SequenceDeserializer(&s.value_type))? + data.next_element_seed(sequence::SequenceDeserializer{ item_type: &s.value_type, type_info: self.type_info})? }, }; diff --git a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/sequence.rs b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/sequence.rs index 47395012..ee918ef3 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/sequence.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/sequence.rs @@ -5,8 +5,16 @@ use arrow::{ use core::fmt; use dora_ros2_bridge_msg_gen::types::primitives::{self, BasicType, NestableType}; use serde::Deserialize; +use std::{borrow::Cow, ops::Deref}; -pub struct SequenceDeserializer<'a>(pub &'a NestableType); +use crate::typed::TypeInfo; + +use super::{error, StructDeserializer}; + +pub struct SequenceDeserializer<'a> { + pub item_type: &'a NestableType, + pub type_info: &'a TypeInfo<'a>, +} impl<'de> serde::de::DeserializeSeed<'de> for SequenceDeserializer<'_> { type Value = ArrayData; @@ -15,11 +23,17 @@ impl<'de> serde::de::DeserializeSeed<'de> for SequenceDeserializer<'_> { where D: serde::Deserializer<'de>, { - deserializer.deserialize_seq(SequenceVisitor(self.0)) + deserializer.deserialize_seq(SequenceVisitor { + item_type: self.item_type, + type_info: self.type_info, + }) } } -pub struct SequenceVisitor<'a>(pub &'a NestableType); +pub struct SequenceVisitor<'a> { + pub item_type: &'a NestableType, + pub type_info: &'a TypeInfo<'a>, +} impl<'de> serde::de::Visitor<'de> for SequenceVisitor<'_> { type Value = ArrayData; @@ -32,7 +46,7 @@ impl<'de> serde::de::Visitor<'de> for SequenceVisitor<'_> { where A: serde::de::SeqAccess<'de>, { - match &self.0 { + match &self.item_type { NestableType::BasicType(t) => match t { BasicType::I8 => deserialize_primitive_seq::<_, datatypes::Int8Type>(seq), BasicType::I16 => deserialize_primitive_seq::<_, datatypes::Int16Type>(seq), @@ -54,9 +68,30 @@ impl<'de> serde::de::Visitor<'de> for SequenceVisitor<'_> { Ok(array.finish().into()) } }, - NestableType::NamedType(_) => todo!("deserialize sequence of NestableType::NamedType"), - NestableType::NamespacedType(_) => { - todo!("deserialize sequence of NestableType::NamedspacedType") + NestableType::NamedType(name) => { + let deserializer = StructDeserializer { + type_info: Cow::Owned(TypeInfo { + package_name: Cow::Borrowed(&self.type_info.package_name), + message_name: Cow::Borrowed(&name.0), + messages: self.type_info.messages.clone(), + }), + }; + deserialize_struct_array(&mut seq, deserializer) + } + NestableType::NamespacedType(reference) => { + if reference.namespace != "msg" { + return Err(error(format!( + "sequence item references non-message type {reference:?}", + ))); + } + let deserializer = StructDeserializer { + type_info: Cow::Owned(TypeInfo { + package_name: Cow::Borrowed(&reference.package), + message_name: Cow::Borrowed(&reference.name), + messages: self.type_info.messages.clone(), + }), + }; + deserialize_struct_array(&mut seq, deserializer) } NestableType::GenericString(t) => match t { primitives::GenericString::String | primitives::GenericString::BoundedString(_) => { @@ -75,6 +110,23 @@ impl<'de> serde::de::Visitor<'de> for SequenceVisitor<'_> { } } +fn deserialize_struct_array<'de, A>( + seq: &mut A, + deserializer: StructDeserializer<'_>, +) -> Result>::Error> +where + A: serde::de::SeqAccess<'de>, +{ + let mut values = Vec::new(); + while let Some(value) = seq.next_element_seed(deserializer.clone())? { + values.push(arrow::array::make_array(value)); + } + let refs: Vec<_> = values.iter().map(|a| a.deref()).collect(); + arrow::compute::concat(&refs) + .map(|a| a.to_data()) + .map_err(super::error) +} + fn deserialize_primitive_seq<'de, S, T>( mut seq: S, ) -> Result>::Error> diff --git a/libraries/extensions/ros2-bridge/python/src/typed/serialize/array.rs b/libraries/extensions/ros2-bridge/python/src/typed/serialize/array.rs index 517d2e6a..d1454fd8 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/serialize/array.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/serialize/array.rs @@ -1,21 +1,24 @@ -use std::marker::PhantomData; +use std::{borrow::Cow, marker::PhantomData, sync::Arc}; use arrow::{ - array::{Array, ArrayRef, AsArray, PrimitiveArray}, + array::{Array, ArrayRef, AsArray, OffsetSizeTrait, PrimitiveArray}, datatypes::{self, ArrowPrimitiveType}, }; use dora_ros2_bridge_msg_gen::types::{ - primitives::{BasicType, NestableType}, + primitives::{BasicType, GenericString, NestableType}, sequences, }; use serde::ser::SerializeTuple; -use super::error; +use crate::typed::TypeInfo; + +use super::{error, TypedValue}; /// Serialize an array with known size as tuple. pub struct ArraySerializeWrapper<'a> { pub array_info: &'a sequences::Array, pub column: &'a ArrayRef, + pub type_info: &'a TypeInfo<'a>, } impl serde::Serialize for ArraySerializeWrapper<'_> { @@ -106,13 +109,67 @@ impl serde::Serialize for ArraySerializeWrapper<'_> { } .serialize(serializer), }, - NestableType::NamedType(_) => todo!("serializing arrays of NestableType::NamedType"), - NestableType::NamespacedType(_) => { - todo!("serializing arrays of NestableType::NamespacedType") + NestableType::NamedType(name) => { + let array = entry + .as_struct_opt() + .ok_or_else(|| error("not a struct array"))?; + let mut seq = serializer.serialize_tuple(self.array_info.size)?; + for i in 0..array.len() { + let row = array.slice(i, 1); + seq.serialize_element(&TypedValue { + value: &(Arc::new(row) as ArrayRef), + type_info: &crate::typed::TypeInfo { + package_name: Cow::Borrowed(&self.type_info.package_name), + message_name: Cow::Borrowed(&name.0), + messages: self.type_info.messages.clone(), + }, + })?; + } + seq.end() } - NestableType::GenericString(_) => { - todo!("serializing arrays of NestableType::GenericString") + NestableType::NamespacedType(reference) => { + if reference.namespace != "msg" { + return Err(error(format!( + "sequence references non-message type {reference:?}" + ))); + } + + let array = entry + .as_struct_opt() + .ok_or_else(|| error("not a struct array"))?; + let mut seq = serializer.serialize_tuple(self.array_info.size)?; + for i in 0..array.len() { + let row = array.slice(i, 1); + seq.serialize_element(&TypedValue { + value: &(Arc::new(row) as ArrayRef), + type_info: &crate::typed::TypeInfo { + package_name: Cow::Borrowed(&reference.package), + message_name: Cow::Borrowed(&reference.name), + messages: self.type_info.messages.clone(), + }, + })?; + } + seq.end() } + NestableType::GenericString(s) => match s { + GenericString::String | GenericString::BoundedString(_) => { + match entry.as_string_opt::() { + Some(array) => { + serialize_arrow_string(serializer, array, self.array_info.size) + } + None => { + let array = entry + .as_string_opt::() + .ok_or_else(|| error("expected string array"))?; + serialize_arrow_string(serializer, array, self.array_info.size) + } + } + } + GenericString::WString => { + todo!("serializing WString sequences") + } + GenericString::BoundedWString(_) => todo!("serializing BoundedWString sequences"), + }, } } } @@ -184,3 +241,19 @@ impl serde::Serialize for BoolArrayAsTuple<'_> { seq.end() } } + +fn serialize_arrow_string( + serializer: S, + array: &arrow::array::GenericByteArray>, + array_len: usize, +) -> Result<::Ok, ::Error> +where + S: serde::Serializer, + O: OffsetSizeTrait, +{ + let mut seq = serializer.serialize_tuple(array_len)?; + for s in array.iter() { + seq.serialize_element(s.unwrap_or_default())?; + } + seq.end() +} diff --git a/libraries/extensions/ros2-bridge/python/src/typed/serialize/mod.rs b/libraries/extensions/ros2-bridge/python/src/typed/serialize/mod.rs index ab7556d4..0382b355 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/serialize/mod.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/serialize/mod.rs @@ -145,6 +145,7 @@ impl serde::Serialize for TypedValue<'_> { &array::ArraySerializeWrapper { array_info: a, column: column.as_ref(), + type_info: self.type_info, }, )?; } @@ -154,6 +155,7 @@ impl serde::Serialize for TypedValue<'_> { &sequence::SequenceSerializeWrapper { item_type: &v.value_type, column: column.as_ref(), + type_info: self.type_info, }, )?; } @@ -163,6 +165,7 @@ impl serde::Serialize for TypedValue<'_> { &sequence::SequenceSerializeWrapper { item_type: &v.value_type, column: column.as_ref(), + type_info: self.type_info, }, )?; } diff --git a/libraries/extensions/ros2-bridge/python/src/typed/serialize/sequence.rs b/libraries/extensions/ros2-bridge/python/src/typed/serialize/sequence.rs index fd8dd591..3fb6bdae 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/serialize/sequence.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/serialize/sequence.rs @@ -1,18 +1,21 @@ -use std::marker::PhantomData; +use std::{borrow::Cow, marker::PhantomData, sync::Arc}; use arrow::{ - array::{Array, ArrayRef, AsArray, PrimitiveArray}, + array::{Array, ArrayRef, AsArray, OffsetSizeTrait, PrimitiveArray}, datatypes::{self, ArrowPrimitiveType}, }; -use dora_ros2_bridge_msg_gen::types::primitives::{BasicType, NestableType}; +use dora_ros2_bridge_msg_gen::types::primitives::{BasicType, GenericString, NestableType}; use serde::ser::{SerializeSeq, SerializeTuple}; -use super::error; +use crate::typed::TypeInfo; + +use super::{error, TypedValue}; /// Serialize a variable-sized sequence. pub struct SequenceSerializeWrapper<'a> { pub item_type: &'a NestableType, pub column: &'a ArrayRef, + pub type_info: &'a TypeInfo<'a>, } impl serde::Serialize for SequenceSerializeWrapper<'_> { @@ -88,17 +91,84 @@ impl serde::Serialize for SequenceSerializeWrapper<'_> { .serialize(serializer), BasicType::Bool => BoolArray { value: &entry }.serialize(serializer), }, - NestableType::NamedType(_) => todo!("serializing NestableType::NamedType sequences"), - NestableType::NamespacedType(_) => { - todo!("serializing NestableType::NamespacedType sequences") + NestableType::NamedType(name) => { + let array = entry + .as_struct_opt() + .ok_or_else(|| error("not a struct array"))?; + let mut seq = serializer.serialize_seq(Some(array.len()))?; + for i in 0..array.len() { + let row = array.slice(i, 1); + seq.serialize_element(&TypedValue { + value: &(Arc::new(row) as ArrayRef), + type_info: &crate::typed::TypeInfo { + package_name: Cow::Borrowed(&self.type_info.package_name), + message_name: Cow::Borrowed(&name.0), + messages: self.type_info.messages.clone(), + }, + })?; + } + seq.end() } - NestableType::GenericString(_) => { - todo!("serializing NestableType::Genericstring sequences") + NestableType::NamespacedType(reference) => { + if reference.namespace != "msg" { + return Err(error(format!( + "sequence references non-message type {reference:?}" + ))); + } + + let array = entry + .as_struct_opt() + .ok_or_else(|| error("not a struct array"))?; + let mut seq = serializer.serialize_seq(Some(array.len()))?; + for i in 0..array.len() { + let row = array.slice(i, 1); + seq.serialize_element(&TypedValue { + value: &(Arc::new(row) as ArrayRef), + type_info: &crate::typed::TypeInfo { + package_name: Cow::Borrowed(&reference.package), + message_name: Cow::Borrowed(&reference.name), + messages: self.type_info.messages.clone(), + }, + })?; + } + seq.end() } + NestableType::GenericString(s) => match s { + GenericString::String | GenericString::BoundedString(_) => { + match entry.as_string_opt::() { + Some(array) => serialize_arrow_string(serializer, array), + None => { + let array = entry + .as_string_opt::() + .ok_or_else(|| error("expected string array"))?; + serialize_arrow_string(serializer, array) + } + } + } + GenericString::WString => { + todo!("serializing WString sequences") + } + GenericString::BoundedWString(_) => todo!("serializing BoundedWString sequences"), + }, } } } +fn serialize_arrow_string( + serializer: S, + array: &arrow::array::GenericByteArray>, +) -> Result<::Ok, ::Error> +where + S: serde::Serializer, + O: OffsetSizeTrait, +{ + let mut seq = serializer.serialize_seq(Some(array.len()))?; + for s in array.iter() { + seq.serialize_element(s.unwrap_or_default())?; + } + seq.end() +} + struct BasicSequence<'a, T> { value: &'a ArrayRef, ty: PhantomData,