diff --git a/Cargo.lock b/Cargo.lock index 818069b9..58234124 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -685,6 +685,7 @@ dependencies = [ "futures", "futures-concurrency", "futures-time", + "once_cell", "serde", "serde_yaml", "thiserror", @@ -1523,9 +1524,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" +checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" [[package]] name = "opaque-debug" diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index 4af1a161..5b6840fb 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -12,6 +12,7 @@ eyre = "0.6.7" futures = "0.3.21" futures-concurrency = "2.0.3" futures-time = "1.0.0" +once_cell = "1.13.0" serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.8.23" thiserror = "1.0.30" diff --git a/apis/rust/node/src/config.rs b/apis/rust/node/src/config.rs index 18bda253..5102b0b8 100644 --- a/apis/rust/node/src/config.rs +++ b/apis/rust/node/src/config.rs @@ -1,9 +1,11 @@ +use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use std::{ collections::{BTreeMap, BTreeSet}, convert::Infallible, fmt::Write as _, str::FromStr, + time::Duration, }; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -84,11 +86,54 @@ impl std::ops::Deref for DataId { } } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct InputMapping { - pub source: NodeId, - pub operator: Option, - pub output: DataId, +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub enum InputMapping { + Timer { interval: Duration }, + User(UserInputMapping), +} + +impl InputMapping { + pub fn source(&self) -> &NodeId { + static DORA_NODE_ID: OnceCell = OnceCell::new(); + + match self { + InputMapping::User(mapping) => &mapping.source, + InputMapping::Timer { .. } => DORA_NODE_ID.get_or_init(|| NodeId("dora".to_string())), + } + } + + pub fn operator(&self) -> &Option { + match self { + InputMapping::User(mapping) => &mapping.operator, + InputMapping::Timer { .. } => &None, + } + } +} + +impl ToString for InputMapping { + fn to_string(&self) -> String { + match self { + InputMapping::Timer { interval } => { + let duration = format_duration(*interval); + format!("dora/timer/{duration}") + } + InputMapping::User(mapping) => { + if let Some(operator) = &mapping.operator { + format!("{}/{operator}/{}", mapping.source, mapping.output) + } else { + format!("{}/{}", mapping.source, mapping.output) + } + } + } + } +} + +pub fn format_duration(interval: Duration) -> String { + if interval.subsec_millis() == 0 { + format!("secs/{}", interval.as_secs()) + } else { + format!("millis/{}", interval.as_millis()) + } } impl Serialize for InputMapping { @@ -96,11 +141,7 @@ impl Serialize for InputMapping { where S: serde::Serializer, { - if let Some(operator) = &self.operator { - serializer.collect_str(&format_args!("{}/{operator}/{}", self.source, self.output)) - } else { - serializer.collect_str(&format_args!("{}/{}", self.source, self.output)) - } + serializer.serialize_str(&self.to_string()) } } @@ -119,14 +160,68 @@ impl<'de> Deserialize<'de> for InputMapping { .map(|(op, out)| (Some(op), out)) .unwrap_or((None, rest)); - Ok(Self { - source: source.to_owned().into(), - operator: operator.map(|o| o.to_owned().into()), - output: output.to_owned().into(), - }) + let deserialized = match source { + "dora" => match operator { + Some("timer") => { + let (unit, value) = output.split_once('/').ok_or_else(|| { + serde::de::Error::custom( + "timer input must specify unit and value (e.g. `secs/5` or `millis/100`)", + ) + })?; + let interval = match unit { + "secs" => { + let value = value.parse().map_err(|_| { + serde::de::Error::custom(format!( + "secs must be an integer (got `{value}`)" + )) + })?; + Duration::from_secs(value) + } + "millis" => { + let value = value.parse().map_err(|_| { + serde::de::Error::custom(format!( + "millis must be an integer (got `{value}`)" + )) + })?; + Duration::from_millis(value) + } + other => { + return Err(serde::de::Error::custom(format!( + "timer unit must be either secs or millis (got `{other}`" + ))) + } + }; + Self::Timer { interval } + } + Some(other) => { + return Err(serde::de::Error::custom(format!( + "unknown dora input `{other}`" + ))) + } + None => { + return Err(serde::de::Error::custom(format!( + "dora input has invalid format" + ))) + } + }, + _ => Self::User(UserInputMapping { + source: source.to_owned().into(), + operator: operator.map(|o| o.to_owned().into()), + output: output.to_owned().into(), + }), + }; + + Ok(deserialized) } } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct UserInputMapping { + pub source: NodeId, + pub operator: Option, + pub output: DataId, +} + #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub enum CommunicationConfig { diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 21510d11..21aa79ef 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -52,19 +52,8 @@ impl DoraNode { pub async fn inputs(&self) -> eyre::Result + '_> { let mut streams = Vec::new(); - for ( - input, - config::InputMapping { - source, - operator, - output, - }, - ) in &self.node_config.inputs - { - let topic = match operator { - Some(operator) => format!("{source}/{operator}/{output}"), - None => format!("{source}/{output}"), - }; + for (input, mapping) in &self.node_config.inputs { + let topic = mapping.to_string(); let sub = self .communication .subscribe(&topic) @@ -81,7 +70,7 @@ impl DoraNode { .node_config .inputs .values() - .map(|v| (&v.source, &v.operator)) + .map(|v| (v.source(), v.operator())) .collect(); for (source, operator) in &sources { let topic = match operator {