Browse Source

Change `MetadataParameters` into a `BTreeMap` to allow user defined metadata as well as enable more flexibility in managing metadata

tags/v0.3.6-rc0
haixuanTao 1 year ago
parent
commit
bc68de3bbd
4 changed files with 64 additions and 61 deletions
  1. +45
    -41
      apis/python/operator/src/lib.rs
  2. +1
    -1
      apis/rust/node/src/lib.rs
  3. +1
    -5
      apis/rust/node/src/node/mod.rs
  4. +17
    -14
      libraries/message/src/lib.rs

+ 45
- 41
apis/python/operator/src/lib.rs View File

@@ -1,20 +1,19 @@
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
sync::{Arc, Mutex},
};

use arrow::pyarrow::ToPyArrow;
use dora_node_api::{
merged::{MergeExternalSend, MergedEvent},
DoraNode, Event, EventStream, Metadata, MetadataParameters,
DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter,
};
use eyre::{Context, Result};
use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge as _;
use pyo3::{
prelude::*,
pybacked::PyBackedStr,
types::{IntoPyDict, PyDict},
types::{IntoPyDict, PyBool, PyDict, PyInt, PyString},
};

/// Dora Event
@@ -94,7 +93,7 @@ impl PyEvent {
if let Some(value) = self.value(py)? {
pydict.insert("value", value);
}
if let Some(metadata) = Self::metadata(event, py) {
if let Some(metadata) = Self::metadata(event, py)? {
pydict.insert("metadata", metadata);
}
if let Some(error) = Self::error(event) {
@@ -143,10 +142,14 @@ impl PyEvent {
}
}

fn metadata(event: &Event, py: Python<'_>) -> Option<PyObject> {
fn metadata(event: &Event, py: Python<'_>) -> Result<Option<PyObject>> {
match event {
Event::Input { metadata, .. } => Some(metadata_to_pydict(metadata, py).to_object(py)),
_ => None,
Event::Input { metadata, .. } => Ok(Some(
metadata_to_pydict(metadata, py)
.context("Issue deserializing metadata")?
.to_object(py),
)),
_ => Ok(None),
}
}

@@ -159,44 +162,45 @@ impl PyEvent {
}

pub fn pydict_to_metadata(dict: Option<Bound<'_, PyDict>>) -> Result<MetadataParameters> {
let mut default_metadata = MetadataParameters::default();
if let Some(metadata) = dict {
for (key, value) in metadata.iter() {
match key
.extract::<PyBackedStr>()
.context("Parsing metadata keys")?
.as_ref()
{
"watermark" => {
default_metadata.watermark =
value.extract().context("parsing watermark failed")?;
}
"deadline" => {
default_metadata.deadline =
value.extract().context("parsing deadline failed")?;
}
"open_telemetry_context" => {
let otel_context: PyBackedStr = value
.extract()
.context("parsing open telemetry context failed")?;
default_metadata.open_telemetry_context = otel_context.to_string();
}
_ => (),
}
let mut parameters = BTreeMap::default();
if let Some(pymetadata) = dict {
for (key, value) in pymetadata.iter() {
let key = key.extract::<String>().context("Parsing metadata keys")?;
if value.is_exact_instance_of::<PyBool>() {
parameters.insert(key, Parameter::Bool(value.extract()?))
} else if value.is_instance_of::<PyInt>() {
parameters.insert(key, Parameter::Integer(value.extract::<i64>()?))
} else if value.is_instance_of::<PyString>() {
parameters.insert(key, Parameter::String(value.extract()?))
} else {
println!("could not convert type {value}");
parameters.insert(key, Parameter::String(value.str()?.to_string()))
};
}
}
Ok(default_metadata)
Ok(parameters)
}

pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> pyo3::Bound<'a, PyDict> {
pub fn metadata_to_pydict<'a>(
metadata: &'a Metadata,
py: Python<'a>,
) -> Result<pyo3::Bound<'a, PyDict>> {
let dict = PyDict::new_bound(py);
dict.set_item(
"open_telemetry_context",
&metadata.parameters.open_telemetry_context,
)
.wrap_err("could not make metadata a python dictionary item")
.unwrap();
dict
for (k, v) in metadata.parameters.iter() {
match v {
Parameter::Bool(bool) => dict
.set_item(k, bool)
.context(format!("Could not insert metadata into python dictionary"))?,
Parameter::Integer(int) => dict
.set_item(k, int)
.context(format!("Could not insert metadata into python dictionary"))?,
Parameter::String(s) => dict
.set_item(k, s)
.context(format!("Could not insert metadata into python dictionary"))?,
}
}

Ok(dict)
}

#[cfg(test)]


+ 1
- 1
apis/rust/node/src/lib.rs View File

@@ -16,7 +16,7 @@
pub use arrow;
pub use dora_arrow_convert::*;
pub use dora_core;
pub use dora_core::message::{uhlc, Metadata, MetadataParameters};
pub use dora_core::message::{uhlc, Metadata, MetadataParameters, Parameter};
pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData};
pub use flume::Receiver;
pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD};


+ 1
- 5
apis/rust/node/src/node/mod.rs View File

@@ -250,11 +250,7 @@ impl DoraNode {
if !self.node_config.outputs.contains(&output_id) {
eyre::bail!("unknown output");
}
let metadata = Metadata::from_parameters(
self.clock.new_timestamp(),
type_info,
parameters.into_owned(),
);
let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);

let (data, shmem) = match sample {
Some(sample) => sample.finalize(),


+ 17
- 14
libraries/message/src/lib.rs View File

@@ -3,12 +3,16 @@

#![allow(clippy::missing_safety_doc)]

use std::collections::BTreeMap;

use arrow_data::ArrayData;
use arrow_schema::DataType;
use eyre::Context;
use serde::{Deserialize, Serialize};
pub use uhlc;

pub type MetadataParameters = BTreeMap<String, Parameter>;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Metadata {
metadata_version: u16,
@@ -105,20 +109,11 @@ pub struct BufferOffset {
pub len: usize,
}

#[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
pub struct MetadataParameters {
pub watermark: u64,
pub deadline: u64,
pub open_telemetry_context: String,
}

impl MetadataParameters {
pub fn into_owned(self) -> MetadataParameters {
MetadataParameters {
open_telemetry_context: self.open_telemetry_context,
..self
}
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum Parameter {
Bool(bool),
Integer(i64),
String(String),
}

impl Metadata {
@@ -142,4 +137,12 @@ impl Metadata {
pub fn timestamp(&self) -> uhlc::Timestamp {
self.timestamp
}

pub fn open_telemetry_context(&self) -> String {
if let Some(Parameter::String(otel)) = self.parameters.get("open_telemetry_context") {
otel.to_string()
} else {
"".to_string()
}
}
}

Loading…
Cancel
Save