diff --git a/apis/rust/node/src/config.rs b/apis/rust/node/src/config.rs index 5102b0b8..d2a20643 100644 --- a/apis/rust/node/src/config.rs +++ b/apis/rust/node/src/config.rs @@ -222,7 +222,7 @@ pub struct UserInputMapping { pub output: DataId, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub enum CommunicationConfig { Zenoh { diff --git a/binaries/coordinator/src/main.rs b/binaries/coordinator/src/main.rs index f6a743f0..e860c58e 100644 --- a/binaries/coordinator/src/main.rs +++ b/binaries/coordinator/src/main.rs @@ -1,8 +1,12 @@ -use dora_core::descriptor::{self, CoreNodeKind, Descriptor}; -use dora_node_api::config::NodeId; +use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor}; +use dora_node_api::{ + communication, + config::{format_duration, NodeId}, +}; use eyre::{bail, eyre, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; use std::path::{Path, PathBuf}; +use tokio_stream::wrappers::IntervalStream; #[derive(Debug, Clone, clap::Parser)] #[clap(about = "Dora coordinator")] @@ -57,6 +61,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() })?; let nodes = descriptor.resolve_aliases(); + let dora_timers = collect_dora_timers(&nodes); let mut communication = descriptor.communication; if nodes @@ -94,6 +99,28 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() } } + for interval in dora_timers { + let communication = communication.clone(); + let task = tokio::spawn(async move { + let communication = communication::init(&communication) + .await + .wrap_err("failed to init communication layer")?; + let topic = { + let duration = format_duration(interval); + format!("dora/timer/{duration}") + }; + let mut stream = IntervalStream::new(tokio::time::interval(interval)); + while let Some(_) = stream.next().await { + let publish = communication.publish(&topic, &[]); + publish + .await + .wrap_err("failed to publish timer tick message")?; + } + Ok(()) + }); + tasks.push(task); + } + while let Some(task_result) = tasks.next().await { task_result .wrap_err("failed to join async task")? diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 4797f06c..5ae27129 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -8,6 +8,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, path::PathBuf, }; +pub use visualize::collect_dora_timers; mod visualize; diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index aeed8842..8d4a8d0a 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -15,19 +15,7 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { all_nodes.insert(&node.id, node); } - let mut dora_timers = BTreeSet::new(); - for node in nodes { - match &node.kind { - CoreNodeKind::Runtime(node) => { - for operator in &node.operators { - collect_dora_nodes(operator.config.inputs.values(), &mut dora_timers); - } - } - CoreNodeKind::Custom(node) => { - collect_dora_nodes(node.run_config.inputs.values(), &mut dora_timers); - } - } - } + let dora_timers = collect_dora_timers(nodes); if !dora_timers.is_empty() { writeln!(flowchart, "subgraph ___dora___ [dora]").unwrap(); writeln!(flowchart, " subgraph ___timer_timer___ [timer]").unwrap(); @@ -46,6 +34,23 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { flowchart } +pub fn collect_dora_timers(nodes: &[ResolvedNode]) -> BTreeSet { + let mut dora_timers = BTreeSet::new(); + for node in nodes { + match &node.kind { + CoreNodeKind::Runtime(node) => { + for operator in &node.operators { + collect_dora_nodes(operator.config.inputs.values(), &mut dora_timers); + } + } + CoreNodeKind::Custom(node) => { + collect_dora_nodes(node.run_config.inputs.values(), &mut dora_timers); + } + } + } + dora_timers +} + fn collect_dora_nodes( values: std::collections::btree_map::Values, dora_timers: &mut BTreeSet,