From bc68de3bbd900fa90001d446e54ac0087b3ba7ef Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 21 Jul 2024 08:00:13 +0200 Subject: [PATCH] Change `MetadataParameters` into a `BTreeMap` to allow user defined metadata as well as enable more flexibility in managing metadata --- apis/python/operator/src/lib.rs | 86 +++++++++++++++++---------------- apis/rust/node/src/lib.rs | 2 +- apis/rust/node/src/node/mod.rs | 6 +-- libraries/message/src/lib.rs | 31 ++++++------ 4 files changed, 64 insertions(+), 61 deletions(-) diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 1929bc36..4ac5080c 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,20 +1,19 @@ use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, sync::{Arc, Mutex}, }; use arrow::pyarrow::ToPyArrow; use dora_node_api::{ merged::{MergeExternalSend, MergedEvent}, - DoraNode, Event, EventStream, Metadata, MetadataParameters, + DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter, }; use eyre::{Context, Result}; use futures::{Stream, StreamExt}; use futures_concurrency::stream::Merge as _; use pyo3::{ prelude::*, - pybacked::PyBackedStr, - types::{IntoPyDict, PyDict}, + types::{IntoPyDict, PyBool, PyDict, PyInt, PyString}, }; /// Dora Event @@ -94,7 +93,7 @@ impl PyEvent { if let Some(value) = self.value(py)? { pydict.insert("value", value); } - if let Some(metadata) = Self::metadata(event, py) { + if let Some(metadata) = Self::metadata(event, py)? { pydict.insert("metadata", metadata); } if let Some(error) = Self::error(event) { @@ -143,10 +142,14 @@ impl PyEvent { } } - fn metadata(event: &Event, py: Python<'_>) -> Option { + fn metadata(event: &Event, py: Python<'_>) -> Result> { match event { - Event::Input { metadata, .. } => Some(metadata_to_pydict(metadata, py).to_object(py)), - _ => None, + Event::Input { metadata, .. } => Ok(Some( + metadata_to_pydict(metadata, py) + .context("Issue deserializing metadata")? + .to_object(py), + )), + _ => Ok(None), } } @@ -159,44 +162,45 @@ impl PyEvent { } pub fn pydict_to_metadata(dict: Option>) -> Result { - let mut default_metadata = MetadataParameters::default(); - if let Some(metadata) = dict { - for (key, value) in metadata.iter() { - match key - .extract::() - .context("Parsing metadata keys")? - .as_ref() - { - "watermark" => { - default_metadata.watermark = - value.extract().context("parsing watermark failed")?; - } - "deadline" => { - default_metadata.deadline = - value.extract().context("parsing deadline failed")?; - } - "open_telemetry_context" => { - let otel_context: PyBackedStr = value - .extract() - .context("parsing open telemetry context failed")?; - default_metadata.open_telemetry_context = otel_context.to_string(); - } - _ => (), - } + let mut parameters = BTreeMap::default(); + if let Some(pymetadata) = dict { + for (key, value) in pymetadata.iter() { + let key = key.extract::().context("Parsing metadata keys")?; + if value.is_exact_instance_of::() { + parameters.insert(key, Parameter::Bool(value.extract()?)) + } else if value.is_instance_of::() { + parameters.insert(key, Parameter::Integer(value.extract::()?)) + } else if value.is_instance_of::() { + parameters.insert(key, Parameter::String(value.extract()?)) + } else { + println!("could not convert type {value}"); + parameters.insert(key, Parameter::String(value.str()?.to_string())) + }; } } - Ok(default_metadata) + Ok(parameters) } -pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> pyo3::Bound<'a, PyDict> { +pub fn metadata_to_pydict<'a>( + metadata: &'a Metadata, + py: Python<'a>, +) -> Result> { let dict = PyDict::new_bound(py); - dict.set_item( - "open_telemetry_context", - &metadata.parameters.open_telemetry_context, - ) - .wrap_err("could not make metadata a python dictionary item") - .unwrap(); - dict + for (k, v) in metadata.parameters.iter() { + match v { + Parameter::Bool(bool) => dict + .set_item(k, bool) + .context(format!("Could not insert metadata into python dictionary"))?, + Parameter::Integer(int) => dict + .set_item(k, int) + .context(format!("Could not insert metadata into python dictionary"))?, + Parameter::String(s) => dict + .set_item(k, s) + .context(format!("Could not insert metadata into python dictionary"))?, + } + } + + Ok(dict) } #[cfg(test)] diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index d2ce63dd..7c61559e 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -16,7 +16,7 @@ pub use arrow; pub use dora_arrow_convert::*; pub use dora_core; -pub use dora_core::message::{uhlc, Metadata, MetadataParameters}; +pub use dora_core::message::{uhlc, Metadata, MetadataParameters, Parameter}; pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData}; pub use flume::Receiver; pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 9eb4b18e..8951deb5 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -250,11 +250,7 @@ impl DoraNode { if !self.node_config.outputs.contains(&output_id) { eyre::bail!("unknown output"); } - let metadata = Metadata::from_parameters( - self.clock.new_timestamp(), - type_info, - parameters.into_owned(), - ); + let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters); let (data, shmem) = match sample { Some(sample) => sample.finalize(), diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index 40c18f99..fa69c2fb 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -3,12 +3,16 @@ #![allow(clippy::missing_safety_doc)] +use std::collections::BTreeMap; + use arrow_data::ArrayData; use arrow_schema::DataType; use eyre::Context; use serde::{Deserialize, Serialize}; pub use uhlc; +pub type MetadataParameters = BTreeMap; + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Metadata { metadata_version: u16, @@ -105,20 +109,11 @@ pub struct BufferOffset { pub len: usize, } -#[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)] -pub struct MetadataParameters { - pub watermark: u64, - pub deadline: u64, - pub open_telemetry_context: String, -} - -impl MetadataParameters { - pub fn into_owned(self) -> MetadataParameters { - MetadataParameters { - open_telemetry_context: self.open_telemetry_context, - ..self - } - } +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum Parameter { + Bool(bool), + Integer(i64), + String(String), } impl Metadata { @@ -142,4 +137,12 @@ impl Metadata { pub fn timestamp(&self) -> uhlc::Timestamp { self.timestamp } + + pub fn open_telemetry_context(&self) -> String { + if let Some(Parameter::String(otel)) = self.parameters.get("open_telemetry_context") { + otel.to_string() + } else { + "".to_string() + } + } }