diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 443e3219..e2ef12e5 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -13,7 +13,7 @@ use futures::{Stream, StreamExt}; use futures_concurrency::stream::Merge as _; use pyo3::{ prelude::*, - types::{IntoPyDict, PyBool, PyDict, PyInt, PyList, PyString, PyTuple}, + types::{IntoPyDict, PyBool, PyDict, PyFloat, PyInt, PyList, PyString, PyTuple}, }; /// Dora Event @@ -171,6 +171,8 @@ pub fn pydict_to_metadata(dict: Option>) -> Result() { parameters.insert(key, Parameter::Integer(value.extract::()?)) + } else if value.is_instance_of::() { + parameters.insert(key, Parameter::Float(value.extract::()?)) } else if value.is_instance_of::() { parameters.insert(key, Parameter::String(value.extract()?)) } else if value.is_instance_of::() @@ -185,6 +187,18 @@ pub fn pydict_to_metadata(dict: Option>) -> Result = value.extract()?; parameters.insert(key, Parameter::ListInt(list)) + } else if value.is_instance_of::() + && value.len()? > 0 + && value.get_item(0)?.is_exact_instance_of::() + { + let list: Vec = value.extract()?; + parameters.insert(key, Parameter::ListFloat(list)) + } else if value.is_instance_of::() + && value.len()? > 0 + && value.get_item(0)?.is_exact_instance_of::() + { + let list: Vec = value.extract()?; + parameters.insert(key, Parameter::ListFloat(list)) } else { println!("could not convert type {value}"); parameters.insert(key, Parameter::String(value.str()?.to_string())) @@ -207,12 +221,18 @@ pub fn metadata_to_pydict<'a>( Parameter::Integer(int) => dict .set_item(k, int) .context("Could not insert metadata into python dictionary")?, + Parameter::Float(float) => dict + .set_item(k, float) + .context("Could not insert metadata into python dictionary")?, Parameter::String(s) => dict .set_item(k, s) .context("Could not insert metadata into python dictionary")?, Parameter::ListInt(l) => dict .set_item(k, l) .context("Could not insert metadata into python dictionary")?, + Parameter::ListFloat(l) => dict + .set_item(k, l) + .context("Could not insert metadata into python dictionary")?, } } diff --git a/libraries/message/src/descriptor.rs b/libraries/message/src/descriptor.rs index a84963e3..a20c3fb7 100644 --- a/libraries/message/src/descriptor.rs +++ b/libraries/message/src/descriptor.rs @@ -236,7 +236,9 @@ pub enum EnvValue { #[serde(deserialize_with = "with_expand_envs")] Bool(bool), #[serde(deserialize_with = "with_expand_envs")] - Integer(u64), + Integer(i64), + #[serde(deserialize_with = "with_expand_envs")] + Float(f64), #[serde(deserialize_with = "with_expand_envs")] String(String), } @@ -245,7 +247,8 @@ impl fmt::Display for EnvValue { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { match self { EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()), - EnvValue::Integer(u64) => fmt.write_str(&u64.to_string()), + EnvValue::Integer(i64) => fmt.write_str(&i64.to_string()), + EnvValue::Float(f64) => fmt.write_str(&f64.to_string()), EnvValue::String(str) => fmt.write_str(str), } } diff --git a/libraries/message/src/metadata.rs b/libraries/message/src/metadata.rs index 35c52db1..a5b5a806 100644 --- a/libraries/message/src/metadata.rs +++ b/libraries/message/src/metadata.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use arrow_schema::DataType; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Metadata { metadata_version: u16, timestamp: uhlc::Timestamp, @@ -55,12 +55,14 @@ pub struct ArrowTypeInfo { pub child_data: Vec, } -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] pub enum Parameter { Bool(bool), Integer(i64), + Float(f64), String(String), ListInt(Vec), + ListFloat(Vec), } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]