Browse Source

Implement parsing of new dora timer input keys

tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
3c8ee37795
Failed to extract signature
4 changed files with 117 additions and 31 deletions
  1. +3
    -2
      Cargo.lock
  2. +1
    -0
      apis/rust/node/Cargo.toml
  3. +110
    -15
      apis/rust/node/src/config.rs
  4. +3
    -14
      apis/rust/node/src/lib.rs

+ 3
- 2
Cargo.lock View File

@@ -685,6 +685,7 @@ dependencies = [
"futures",
"futures-concurrency",
"futures-time",
"once_cell",
"serde",
"serde_yaml",
"thiserror",
@@ -1523,9 +1524,9 @@ dependencies = [

[[package]]
name = "once_cell"
version = "1.12.0"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225"
checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1"

[[package]]
name = "opaque-debug"


+ 1
- 0
apis/rust/node/Cargo.toml View File

@@ -12,6 +12,7 @@ eyre = "0.6.7"
futures = "0.3.21"
futures-concurrency = "2.0.3"
futures-time = "1.0.0"
once_cell = "1.13.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.8.23"
thiserror = "1.0.30"


+ 110
- 15
apis/rust/node/src/config.rs View File

@@ -1,9 +1,11 @@
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, BTreeSet},
convert::Infallible,
fmt::Write as _,
str::FromStr,
time::Duration,
};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -84,11 +86,54 @@ impl std::ops::Deref for DataId {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InputMapping {
pub source: NodeId,
pub operator: Option<OperatorId>,
pub output: DataId,
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum InputMapping {
Timer { interval: Duration },
User(UserInputMapping),
}

impl InputMapping {
pub fn source(&self) -> &NodeId {
static DORA_NODE_ID: OnceCell<NodeId> = OnceCell::new();

match self {
InputMapping::User(mapping) => &mapping.source,
InputMapping::Timer { .. } => DORA_NODE_ID.get_or_init(|| NodeId("dora".to_string())),
}
}

pub fn operator(&self) -> &Option<OperatorId> {
match self {
InputMapping::User(mapping) => &mapping.operator,
InputMapping::Timer { .. } => &None,
}
}
}

impl ToString for InputMapping {
fn to_string(&self) -> String {
match self {
InputMapping::Timer { interval } => {
let duration = format_duration(*interval);
format!("dora/timer/{duration}")
}
InputMapping::User(mapping) => {
if let Some(operator) = &mapping.operator {
format!("{}/{operator}/{}", mapping.source, mapping.output)
} else {
format!("{}/{}", mapping.source, mapping.output)
}
}
}
}
}

pub fn format_duration(interval: Duration) -> String {
if interval.subsec_millis() == 0 {
format!("secs/{}", interval.as_secs())
} else {
format!("millis/{}", interval.as_millis())
}
}

impl Serialize for InputMapping {
@@ -96,11 +141,7 @@ impl Serialize for InputMapping {
where
S: serde::Serializer,
{
if let Some(operator) = &self.operator {
serializer.collect_str(&format_args!("{}/{operator}/{}", self.source, self.output))
} else {
serializer.collect_str(&format_args!("{}/{}", self.source, self.output))
}
serializer.serialize_str(&self.to_string())
}
}

@@ -119,14 +160,68 @@ impl<'de> Deserialize<'de> for InputMapping {
.map(|(op, out)| (Some(op), out))
.unwrap_or((None, rest));

Ok(Self {
source: source.to_owned().into(),
operator: operator.map(|o| o.to_owned().into()),
output: output.to_owned().into(),
})
let deserialized = match source {
"dora" => match operator {
Some("timer") => {
let (unit, value) = output.split_once('/').ok_or_else(|| {
serde::de::Error::custom(
"timer input must specify unit and value (e.g. `secs/5` or `millis/100`)",
)
})?;
let interval = match unit {
"secs" => {
let value = value.parse().map_err(|_| {
serde::de::Error::custom(format!(
"secs must be an integer (got `{value}`)"
))
})?;
Duration::from_secs(value)
}
"millis" => {
let value = value.parse().map_err(|_| {
serde::de::Error::custom(format!(
"millis must be an integer (got `{value}`)"
))
})?;
Duration::from_millis(value)
}
other => {
return Err(serde::de::Error::custom(format!(
"timer unit must be either secs or millis (got `{other}`"
)))
}
};
Self::Timer { interval }
}
Some(other) => {
return Err(serde::de::Error::custom(format!(
"unknown dora input `{other}`"
)))
}
None => {
return Err(serde::de::Error::custom(format!(
"dora input has invalid format"
)))
}
},
_ => Self::User(UserInputMapping {
source: source.to_owned().into(),
operator: operator.map(|o| o.to_owned().into()),
output: output.to_owned().into(),
}),
};

Ok(deserialized)
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct UserInputMapping {
pub source: NodeId,
pub operator: Option<OperatorId>,
pub output: DataId,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub enum CommunicationConfig {


+ 3
- 14
apis/rust/node/src/lib.rs View File

@@ -52,19 +52,8 @@ impl DoraNode {

pub async fn inputs(&self) -> eyre::Result<impl futures::Stream<Item = Input> + '_> {
let mut streams = Vec::new();
for (
input,
config::InputMapping {
source,
operator,
output,
},
) in &self.node_config.inputs
{
let topic = match operator {
Some(operator) => format!("{source}/{operator}/{output}"),
None => format!("{source}/{output}"),
};
for (input, mapping) in &self.node_config.inputs {
let topic = mapping.to_string();
let sub = self
.communication
.subscribe(&topic)
@@ -81,7 +70,7 @@ impl DoraNode {
.node_config
.inputs
.values()
.map(|v| (&v.source, &v.operator))
.map(|v| (v.source(), v.operator()))
.collect();
for (source, operator) in &sources {
let topic = match operator {


Loading…
Cancel
Save