Browse Source

Adding float for env variable and metadata parameters (#786)

This PR adds support for float64, Listfloat metadata parameters as well
as float64 environment variables.
tags/v0.3.10-rc0
Haixuan Xavier Tao GitHub 11 months ago
parent
commit
d0cefcbe20
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
3 changed files with 30 additions and 5 deletions
  1. +21
    -1
      apis/python/operator/src/lib.rs
  2. +5
    -2
      libraries/message/src/descriptor.rs
  3. +4
    -2
      libraries/message/src/metadata.rs

+ 21
- 1
apis/python/operator/src/lib.rs View File

@@ -13,7 +13,7 @@ use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge as _;
use pyo3::{
prelude::*,
types::{IntoPyDict, PyBool, PyDict, PyInt, PyList, PyString, PyTuple},
types::{IntoPyDict, PyBool, PyDict, PyFloat, PyInt, PyList, PyString, PyTuple},
};

/// Dora Event
@@ -171,6 +171,8 @@ pub fn pydict_to_metadata(dict: Option<Bound<'_, PyDict>>) -> Result<MetadataPar
parameters.insert(key, Parameter::Bool(value.extract()?))
} else if value.is_instance_of::<PyInt>() {
parameters.insert(key, Parameter::Integer(value.extract::<i64>()?))
} else if value.is_instance_of::<PyFloat>() {
parameters.insert(key, Parameter::Float(value.extract::<f64>()?))
} else if value.is_instance_of::<PyString>() {
parameters.insert(key, Parameter::String(value.extract()?))
} else if value.is_instance_of::<PyTuple>()
@@ -185,6 +187,18 @@ pub fn pydict_to_metadata(dict: Option<Bound<'_, PyDict>>) -> Result<MetadataPar
{
let list: Vec<i64> = value.extract()?;
parameters.insert(key, Parameter::ListInt(list))
} else if value.is_instance_of::<PyTuple>()
&& value.len()? > 0
&& value.get_item(0)?.is_exact_instance_of::<PyFloat>()
{
let list: Vec<f64> = value.extract()?;
parameters.insert(key, Parameter::ListFloat(list))
} else if value.is_instance_of::<PyList>()
&& value.len()? > 0
&& value.get_item(0)?.is_exact_instance_of::<PyFloat>()
{
let list: Vec<f64> = value.extract()?;
parameters.insert(key, Parameter::ListFloat(list))
} else {
println!("could not convert type {value}");
parameters.insert(key, Parameter::String(value.str()?.to_string()))
@@ -207,12 +221,18 @@ pub fn metadata_to_pydict<'a>(
Parameter::Integer(int) => dict
.set_item(k, int)
.context("Could not insert metadata into python dictionary")?,
Parameter::Float(float) => dict
.set_item(k, float)
.context("Could not insert metadata into python dictionary")?,
Parameter::String(s) => dict
.set_item(k, s)
.context("Could not insert metadata into python dictionary")?,
Parameter::ListInt(l) => dict
.set_item(k, l)
.context("Could not insert metadata into python dictionary")?,
Parameter::ListFloat(l) => dict
.set_item(k, l)
.context("Could not insert metadata into python dictionary")?,
}
}



+ 5
- 2
libraries/message/src/descriptor.rs View File

@@ -236,7 +236,9 @@ pub enum EnvValue {
#[serde(deserialize_with = "with_expand_envs")]
Bool(bool),
#[serde(deserialize_with = "with_expand_envs")]
Integer(u64),
Integer(i64),
#[serde(deserialize_with = "with_expand_envs")]
Float(f64),
#[serde(deserialize_with = "with_expand_envs")]
String(String),
}
@@ -245,7 +247,8 @@ impl fmt::Display for EnvValue {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()),
EnvValue::Integer(u64) => fmt.write_str(&u64.to_string()),
EnvValue::Integer(i64) => fmt.write_str(&i64.to_string()),
EnvValue::Float(f64) => fmt.write_str(&f64.to_string()),
EnvValue::String(str) => fmt.write_str(str),
}
}


+ 4
- 2
libraries/message/src/metadata.rs View File

@@ -3,7 +3,7 @@ use std::collections::BTreeMap;
use arrow_schema::DataType;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Metadata {
metadata_version: u16,
timestamp: uhlc::Timestamp,
@@ -55,12 +55,14 @@ pub struct ArrowTypeInfo {
pub child_data: Vec<ArrowTypeInfo>,
}

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum Parameter {
Bool(bool),
Integer(i64),
Float(f64),
String(String),
ListInt(Vec<i64>),
ListFloat(Vec<f64>),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]


Loading…
Cancel
Save