| @@ -1,7 +1,10 @@ | |||
| #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] | |||
| use arrow::datatypes::DataType; | |||
| use dora_node_api::{DoraNode, EventStream}; | |||
| use dora_operator_api_python::{process_python_output, pydict_to_metadata, PyEvent}; | |||
| use dora_operator_api_python::{ | |||
| process_python_output, process_python_type, pydict_to_metadata, PyEvent, | |||
| }; | |||
| use eyre::Context; | |||
| use pyo3::prelude::*; | |||
| use pyo3::types::PyDict; | |||
| @@ -79,8 +82,9 @@ impl Node { | |||
| metadata: Option<&PyDict>, | |||
| py: Python, | |||
| ) -> eyre::Result<()> { | |||
| let data_type = process_python_type(&data, py).context("could not get type")?; | |||
| process_python_output(&data, py, |data| { | |||
| self.send_output_slice(output_id, data.len(), data, metadata) | |||
| self.send_output_slice(output_id, data.len(), data_type, data, metadata) | |||
| }) | |||
| } | |||
| @@ -97,12 +101,13 @@ impl Node { | |||
| &mut self, | |||
| output_id: String, | |||
| len: usize, | |||
| data_type: DataType, | |||
| data: &[u8], | |||
| metadata: Option<&PyDict>, | |||
| ) -> eyre::Result<()> { | |||
| let metadata = pydict_to_metadata(metadata)?; | |||
| let parameters = pydict_to_metadata(metadata)?; | |||
| self.node | |||
| .send_output(output_id.into(), metadata, len, |out| { | |||
| .send_typed_output(output_id.into(), data_type, parameters, len, |out| { | |||
| out.copy_from_slice(data); | |||
| }) | |||
| .wrap_err("failed to send output") | |||
| @@ -178,3 +178,13 @@ pub fn process_python_output<T>( | |||
| eyre::bail!("invalid `data` type, must by `PyBytes` or arrow array") | |||
| } | |||
| } | |||
| pub fn process_python_type(data: &PyObject, py: Python) -> Result<DataType> { | |||
| if let Ok(_py_bytes) = data.downcast::<PyBytes>(py) { | |||
| Ok(DataType::UInt8) | |||
| } else if let Ok(arrow_array) = arrow::array::ArrayData::from_pyarrow(data.as_ref(py)) { | |||
| Ok(arrow_array.data_type().clone()) | |||
| } else { | |||
| eyre::bail!("invalid `data` type, must by `PyBytes` or arrow array") | |||
| } | |||
| } | |||
| @@ -1,10 +1,11 @@ | |||
| use std::{ptr::NonNull, sync::Arc}; | |||
| use arrow_schema::DataType; | |||
| use dora_core::{ | |||
| config::{DataId, OperatorId}, | |||
| message::Metadata, | |||
| }; | |||
| use eyre::Context; | |||
| use eyre::{Context, ContextCompat, Result}; | |||
| use shared_memory_extended::{Shmem, ShmemConf}; | |||
| #[derive(Debug)] | |||
| @@ -35,19 +36,15 @@ pub enum Data { | |||
| impl Data { | |||
| pub fn into_arrow_array( | |||
| self: Arc<Self>, | |||
| ) -> Result<arrow::array::ArrayData, arrow::error::ArrowError> { | |||
| data_type: DataType, | |||
| ) -> Result<arrow::array::ArrayData> { | |||
| let ptr = NonNull::new(self.as_ptr() as *mut _).unwrap(); | |||
| let len = self.len(); | |||
| let buffer = unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, self) }; | |||
| arrow::array::ArrayData::try_new( | |||
| arrow::datatypes::DataType::UInt8, | |||
| len, | |||
| None, | |||
| 0, | |||
| vec![buffer], | |||
| vec![], | |||
| ) | |||
| let size = data_type.primitive_width().context("No primitive width")?; | |||
| arrow::array::ArrayData::try_new(data_type, len / size, None, 0, vec![buffer], vec![]) | |||
| .context("Error creating Arrow Array") | |||
| } | |||
| } | |||
| @@ -1,6 +1,7 @@ | |||
| use crate::EventStream; | |||
| use self::{control_channel::ControlChannel, drop_stream::DropStream}; | |||
| use arrow_schema::DataType; | |||
| use dora_core::{ | |||
| config::{DataId, NodeId, NodeRunConfig}, | |||
| daemon_messages::{Data, DropToken, NodeConfig}, | |||
| @@ -22,7 +23,7 @@ use dora_tracing::set_up_tracing; | |||
| mod control_channel; | |||
| mod drop_stream; | |||
| const ZERO_COPY_THRESHOLD: usize = 4096; | |||
| pub const ZERO_COPY_THRESHOLD: usize = 4096; | |||
| pub struct DoraNode { | |||
| id: NodeId, | |||
| @@ -130,7 +131,7 @@ impl DoraNode { | |||
| let mut sample = self.allocate_data_sample(data_len)?; | |||
| data(&mut sample); | |||
| self.send_output_sample(output_id, parameters, Some(sample)) | |||
| self.send_output_sample(output_id, DataType::UInt8, parameters, Some(sample)) | |||
| } | |||
| pub fn send_output_bytes( | |||
| @@ -145,9 +146,27 @@ impl DoraNode { | |||
| }) | |||
| } | |||
| pub fn send_typed_output<F>( | |||
| &mut self, | |||
| output_id: DataId, | |||
| data_type: DataType, | |||
| parameters: MetadataParameters, | |||
| data_len: usize, | |||
| data: F, | |||
| ) -> eyre::Result<()> | |||
| where | |||
| F: FnOnce(&mut [u8]), | |||
| { | |||
| let mut sample = self.allocate_data_sample(data_len)?; | |||
| data(&mut sample); | |||
| self.send_output_sample(output_id, data_type, parameters, Some(sample)) | |||
| } | |||
| pub fn send_output_sample( | |||
| &mut self, | |||
| output_id: DataId, | |||
| data_type: DataType, | |||
| parameters: MetadataParameters, | |||
| sample: Option<DataSample>, | |||
| ) -> eyre::Result<()> { | |||
| @@ -156,8 +175,11 @@ impl DoraNode { | |||
| if !self.node_config.outputs.contains(&output_id) { | |||
| eyre::bail!("unknown output"); | |||
| } | |||
| let metadata = | |||
| Metadata::from_parameters(self.clock.new_timestamp(), parameters.into_owned()); | |||
| let metadata = Metadata::from_parameters( | |||
| self.clock.new_timestamp(), | |||
| data_type, | |||
| parameters.into_owned(), | |||
| ); | |||
| let (data, shmem) = match sample { | |||
| Some(sample) => sample.finalize(), | |||
| @@ -1316,6 +1316,7 @@ impl RunningDataflow { | |||
| let metadata = dora_core::message::Metadata::from_parameters( | |||
| hlc.new_timestamp(), | |||
| arrow_schema::DataType::Null, | |||
| MetadataParameters { | |||
| watermark: 0, | |||
| deadline: 0, | |||
| @@ -218,13 +218,15 @@ async fn run( | |||
| } | |||
| OperatorEvent::Output { | |||
| output_id, | |||
| metadata, | |||
| data_type, | |||
| parameters, | |||
| data, | |||
| } => { | |||
| let output_id = operator_output_id(&operator_id, &output_id); | |||
| let result; | |||
| (node, result) = tokio::task::spawn_blocking(move || { | |||
| let result = node.send_output_sample(output_id, metadata, data); | |||
| let result = | |||
| node.send_output_sample(output_id, data_type, parameters, data); | |||
| (node, result) | |||
| }) | |||
| .await | |||
| @@ -1,3 +1,4 @@ | |||
| use arrow_schema::DataType; | |||
| use dora_core::{ | |||
| config::{DataId, NodeId}, | |||
| descriptor::{Descriptor, OperatorDefinition, OperatorSource}, | |||
| @@ -77,7 +78,8 @@ pub enum OperatorEvent { | |||
| }, | |||
| Output { | |||
| output_id: DataId, | |||
| metadata: MetadataParameters<'static>, | |||
| data_type: DataType, | |||
| parameters: MetadataParameters, | |||
| data: Option<DataSample>, | |||
| }, | |||
| Error(eyre::Error), | |||
| @@ -13,7 +13,6 @@ use dora_operator_api_types::{ | |||
| use eyre::{bail, eyre, Context, Result}; | |||
| use libloading::Symbol; | |||
| use std::{ | |||
| borrow::Cow, | |||
| ffi::c_void, | |||
| panic::{catch_unwind, AssertUnwindSafe}, | |||
| path::Path, | |||
| @@ -115,14 +114,16 @@ impl<'lib> SharedLibraryOperator<'lib> { | |||
| open_telemetry_context, | |||
| }, | |||
| } = output; | |||
| let metadata = MetadataParameters { | |||
| open_telemetry_context: Cow::Owned(open_telemetry_context.into()), | |||
| let parameters = MetadataParameters { | |||
| open_telemetry_context: open_telemetry_context.into(), | |||
| ..Default::default() | |||
| }; | |||
| let data_type = arrow_schema::DataType::UInt8; | |||
| let event = OperatorEvent::Output { | |||
| output_id: DataId::from(String::from(output_id)), | |||
| metadata, | |||
| data_type, | |||
| parameters, | |||
| data: Some(data.to_owned().into()), | |||
| }; | |||
| @@ -164,7 +165,7 @@ impl<'lib> SharedLibraryOperator<'lib> { | |||
| span.set_parent(cx); | |||
| let cx = span.context(); | |||
| let string_cx = serialize_context(&cx); | |||
| metadata.parameters.open_telemetry_context = Cow::Owned(string_cx); | |||
| metadata.parameters.open_telemetry_context = string_cx; | |||
| } | |||
| let operator_event = match event { | |||
| @@ -186,7 +187,6 @@ impl<'lib> SharedLibraryOperator<'lib> { | |||
| open_telemetry_context: metadata | |||
| .parameters | |||
| .open_telemetry_context | |||
| .into_owned() | |||
| .into(), | |||
| }, | |||
| }; | |||
| @@ -1,99 +1,55 @@ | |||
| //! Enable serialisation and deserialisation of capnproto messages | |||
| //! | |||
| use std::borrow::Cow; | |||
| pub mod message_capnp { | |||
| include!("message_capnp.rs"); | |||
| } | |||
| use arrow_schema::{DataType, Field, Schema}; | |||
| use serde::{Deserialize, Serialize}; | |||
| pub use uhlc; | |||
| #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] | |||
| pub struct Metadata<'a> { | |||
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | |||
| pub struct Metadata { | |||
| metadata_version: u16, | |||
| timestamp: uhlc::Timestamp, | |||
| pub parameters: MetadataParameters<'a>, | |||
| pub schema: Schema, | |||
| pub parameters: MetadataParameters, | |||
| } | |||
| #[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)] | |||
| pub struct MetadataParameters<'a> { | |||
| pub struct MetadataParameters { | |||
| pub watermark: u64, | |||
| pub deadline: u64, | |||
| pub open_telemetry_context: Cow<'a, str>, | |||
| pub open_telemetry_context: String, | |||
| } | |||
| impl MetadataParameters<'_> { | |||
| pub fn into_owned(self) -> MetadataParameters<'static> { | |||
| impl MetadataParameters { | |||
| pub fn into_owned(self) -> MetadataParameters { | |||
| MetadataParameters { | |||
| open_telemetry_context: self.open_telemetry_context.into_owned().into(), | |||
| open_telemetry_context: self.open_telemetry_context.into(), | |||
| ..self | |||
| } | |||
| } | |||
| } | |||
| impl<'a> Metadata<'a> { | |||
| pub fn new(timestamp: uhlc::Timestamp) -> Self { | |||
| Self::from_parameters(timestamp, Default::default()) | |||
| impl Metadata { | |||
| pub fn new(timestamp: uhlc::Timestamp, data_type: DataType) -> Self { | |||
| Self::from_parameters(timestamp, data_type, Default::default()) | |||
| } | |||
| pub fn from_parameters(timestamp: uhlc::Timestamp, parameters: MetadataParameters<'a>) -> Self { | |||
| pub fn from_parameters( | |||
| timestamp: uhlc::Timestamp, | |||
| data_type: DataType, | |||
| parameters: MetadataParameters, | |||
| ) -> Self { | |||
| let schema = Schema::new(vec![Field::new_list( | |||
| "value", | |||
| Field::new("item", data_type, false), | |||
| true, | |||
| )]); | |||
| Self { | |||
| metadata_version: 0, | |||
| timestamp, | |||
| parameters, | |||
| } | |||
| } | |||
| pub fn serialize(&self) -> Result<Vec<u8>, capnp::Error> { | |||
| let Metadata { | |||
| metadata_version, | |||
| timestamp, | |||
| parameters: | |||
| MetadataParameters { | |||
| watermark, | |||
| deadline, | |||
| open_telemetry_context: otel_context, | |||
| }, | |||
| } = self; | |||
| let mut meta_builder = capnp::message::Builder::new_default(); | |||
| let mut metadata = meta_builder.init_root::<message_capnp::metadata::Builder>(); | |||
| metadata.set_metadata_version(*metadata_version); | |||
| metadata.set_watermark(*watermark); | |||
| metadata.set_deadline(*deadline); | |||
| metadata.set_otel_context(otel_context); | |||
| metadata.set_timestamp(×tamp.to_string()); | |||
| let mut buffer = Vec::new(); | |||
| capnp::serialize::write_message(&mut buffer, &meta_builder)?; | |||
| Ok(buffer) | |||
| } | |||
| pub fn deserialize(raw: &mut &[u8]) -> Result<Self, capnp::Error> { | |||
| let deserialized = capnp::serialize::read_message_from_flat_slice(raw, Default::default())? | |||
| .into_typed::<message_capnp::metadata::Owned>(); | |||
| let metadata_reader = deserialized.get()?; | |||
| let metadata = Metadata { | |||
| metadata_version: metadata_reader.get_metadata_version(), | |||
| timestamp: metadata_reader | |||
| .get_timestamp()? | |||
| .parse() | |||
| .map_err(|_| capnp::Error::failed("failed to parse timestamp".into()))?, | |||
| parameters: MetadataParameters { | |||
| watermark: metadata_reader.get_watermark(), | |||
| deadline: metadata_reader.get_deadline(), | |||
| open_telemetry_context: metadata_reader.get_otel_context()?.into(), | |||
| }, | |||
| }; | |||
| Ok(metadata.into_owned()) | |||
| } | |||
| fn into_owned(self) -> Metadata<'static> { | |||
| Metadata { | |||
| parameters: self.parameters.into_owned(), | |||
| ..self | |||
| schema, | |||
| } | |||
| } | |||