From fe065eec6f0d2e472a6d0f3b64a8f95f1fb1ffda Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 2 Nov 2023 15:55:02 +0100 Subject: [PATCH] replace const cache line with 128 to match arrow --- apis/python/operator/src/lib.rs | 7 +++++-- apis/rust/node/src/event_stream/event.rs | 4 ++-- apis/rust/node/src/node/mod.rs | 14 +++++++++----- binaries/daemon/src/lib.rs | 4 ++-- binaries/runtime/src/operator/python.rs | 8 ++++++-- binaries/runtime/src/operator/shared_lib.rs | 8 ++++++-- libraries/core/src/daemon_messages.rs | 6 +++--- 7 files changed, 33 insertions(+), 18 deletions(-) diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 9e9ed68e..26a13ca0 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -151,7 +151,7 @@ pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> &'a PyD mod tests { use std::sync::Arc; - use aligned_vec::avec; + use aligned_vec::{avec, AVec, ConstAlign}; use arrow::{ array::{ ArrayData, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, Int8Array, @@ -169,7 +169,10 @@ mod tests { fn assert_roundtrip(arrow_array: &ArrayData) -> Result<()> { let size = required_data_size(arrow_array); - let mut sample = avec![0; size]; + let mut sample: AVec> = AVec::new(128); + for _ in 0..size { + sample.push(0); + } let info = copy_array_into_sample(&mut sample, arrow_array)?; diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index 05bb3706..75b3c595 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -1,6 +1,6 @@ use std::{ptr::NonNull, sync::Arc}; -use aligned_vec::{AVec, ConstAlign, CACHELINE_ALIGN}; +use aligned_vec::{AVec, ConstAlign}; use dora_arrow_convert::{ArrowData, IntoArrow}; use dora_core::{ config::{DataId, OperatorId}, @@ -29,7 +29,7 @@ pub enum Event { pub enum RawData { Empty, - Vec(AVec>), + Vec(AVec>), SharedMemory(SharedMemoryData), } diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 8f3a2e60..e917a8ee 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -5,7 +5,7 @@ use self::{ control_channel::ControlChannel, drop_stream::DropStream, }; -use aligned_vec::{avec, AVec, ConstAlign, CACHELINE_ALIGN}; +use aligned_vec::{AVec, ConstAlign}; use arrow::array::Array; use dora_core::{ config::{DataId, NodeId, NodeRunConfig}, @@ -257,7 +257,11 @@ impl DoraNode { len: data_len, } } else { - avec![0u8; data_len].into() + let mut avec: AVec> = AVec::new(128); + for _ in 0..data_len { + avec.push(0); + } + avec.into() }; Ok(data) @@ -420,8 +424,8 @@ impl DerefMut for DataSample { } } -impl From>> for DataSample { - fn from(value: AVec>) -> Self { +impl From>> for DataSample { + fn from(value: AVec>) -> Self { Self { len: value.len(), inner: DataSampleInner::Vec(value), @@ -444,7 +448,7 @@ impl std::fmt::Debug for DataSample { enum DataSampleInner { Shmem(ShmemHandle), - Vec(AVec>), + Vec(AVec>), } struct ShmemHandle(Box); diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index b5911b03..bb806859 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1,4 +1,4 @@ -use aligned_vec::{AVec, ConstAlign, CACHELINE_ALIGN}; +use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; @@ -1073,7 +1073,7 @@ async fn send_output_to_local_receivers( metadata: &dora_core::message::Metadata, data: Option, clock: &HLC, -) -> Result>>, eyre::ErrReport> { +) -> Result>>, eyre::ErrReport> { let timestamp = metadata.timestamp(); let empty_set = BTreeSet::new(); let output_id = OutputId(node_id, output_id); diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 1d415a67..97762783 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -269,7 +269,7 @@ mod callback_impl { use crate::operator::OperatorEvent; use super::SendOutputCallback; - use aligned_vec::avec; + use aligned_vec::{AVec, ConstAlign}; use arrow::{array::ArrayData, pyarrow::FromPyArrow}; use dora_core::message::ArrowTypeInfo; use dora_node_api::{ @@ -312,7 +312,11 @@ mod callback_impl { .wrap_err("failed to request output sample")? .wrap_err("failed to allocate output sample") } else { - Ok(avec![0; data_len].into()) + let mut avec: AVec> = AVec::new(128); + for _ in 0..data_len { + avec.push(0); + } + Ok(avec.into()) } }; diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 39137077..65f6b098 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,5 +1,5 @@ use super::{OperatorEvent, StopReason}; -use aligned_vec::avec; +use aligned_vec::{AVec, ConstAlign}; use dora_core::{ adjust_shared_library_path, config::{DataId, NodeId, OperatorId}, @@ -134,7 +134,11 @@ impl<'lib> SharedLibraryOperator<'lib> { }; let total_len = required_data_size(&arrow_array); - let mut sample = avec![0; total_len]; + let mut sample: AVec> = AVec::new(128); + for _ in 0..total_len { + sample.push(0); + } + let type_info = match copy_array_into_sample(&mut sample, &arrow_array) { Ok(t) => t, Err(err) => { diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index d9276f99..a9a6bef8 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -9,7 +9,7 @@ use crate::{ config::{DataId, NodeId, NodeRunConfig, OperatorId}, descriptor::{Descriptor, OperatorDefinition, ResolvedNode}, }; -use aligned_vec::{AVec, ConstAlign, CACHELINE_ALIGN}; +use aligned_vec::{AVec, ConstAlign}; use dora_message::{uhlc, Metadata}; use uuid::Uuid; @@ -88,7 +88,7 @@ impl DaemonRequest { #[derive(serde::Serialize, serde::Deserialize, Clone)] pub enum DataMessage { - Vec(AVec>), + Vec(AVec>), SharedMemory { shared_memory_id: String, len: usize, @@ -234,7 +234,7 @@ pub enum InterDaemonEvent { node_id: NodeId, output_id: DataId, metadata: Metadata, - data: Option>>, + data: Option>>, }, InputsClosed { dataflow_id: DataflowId,