Browse Source

Send dora timer messages from coordinator

tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
20501ca461
Failed to extract signature
4 changed files with 49 additions and 16 deletions
  1. +1
    -1
      apis/rust/node/src/config.rs
  2. +29
    -2
      binaries/coordinator/src/main.rs
  3. +1
    -0
      libraries/core/src/descriptor/mod.rs
  4. +18
    -13
      libraries/core/src/descriptor/visualize.rs

+ 1
- 1
apis/rust/node/src/config.rs View File

@@ -222,7 +222,7 @@ pub struct UserInputMapping {
pub output: DataId,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub enum CommunicationConfig {
Zenoh {


+ 29
- 2
binaries/coordinator/src/main.rs View File

@@ -1,8 +1,12 @@
use dora_core::descriptor::{self, CoreNodeKind, Descriptor};
use dora_node_api::config::NodeId;
use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor};
use dora_node_api::{
communication,
config::{format_duration, NodeId},
};
use eyre::{bail, eyre, WrapErr};
use futures::{stream::FuturesUnordered, StreamExt};
use std::path::{Path, PathBuf};
use tokio_stream::wrappers::IntervalStream;

#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Dora coordinator")]
@@ -57,6 +61,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
})?;

let nodes = descriptor.resolve_aliases();
let dora_timers = collect_dora_timers(&nodes);
let mut communication = descriptor.communication;

if nodes
@@ -94,6 +99,28 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
}
}

for interval in dora_timers {
let communication = communication.clone();
let task = tokio::spawn(async move {
let communication = communication::init(&communication)
.await
.wrap_err("failed to init communication layer")?;
let topic = {
let duration = format_duration(interval);
format!("dora/timer/{duration}")
};
let mut stream = IntervalStream::new(tokio::time::interval(interval));
while let Some(_) = stream.next().await {
let publish = communication.publish(&topic, &[]);
publish
.await
.wrap_err("failed to publish timer tick message")?;
}
Ok(())
});
tasks.push(task);
}

while let Some(task_result) = tasks.next().await {
task_result
.wrap_err("failed to join async task")?


+ 1
- 0
libraries/core/src/descriptor/mod.rs View File

@@ -8,6 +8,7 @@ use std::{
collections::{BTreeMap, BTreeSet},
path::PathBuf,
};
pub use visualize::collect_dora_timers;

mod visualize;



+ 18
- 13
libraries/core/src/descriptor/visualize.rs View File

@@ -15,19 +15,7 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String {
all_nodes.insert(&node.id, node);
}

let mut dora_timers = BTreeSet::new();
for node in nodes {
match &node.kind {
CoreNodeKind::Runtime(node) => {
for operator in &node.operators {
collect_dora_nodes(operator.config.inputs.values(), &mut dora_timers);
}
}
CoreNodeKind::Custom(node) => {
collect_dora_nodes(node.run_config.inputs.values(), &mut dora_timers);
}
}
}
let dora_timers = collect_dora_timers(nodes);
if !dora_timers.is_empty() {
writeln!(flowchart, "subgraph ___dora___ [dora]").unwrap();
writeln!(flowchart, " subgraph ___timer_timer___ [timer]").unwrap();
@@ -46,6 +34,23 @@ pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String {
flowchart
}

pub fn collect_dora_timers(nodes: &[ResolvedNode]) -> BTreeSet<Duration> {
let mut dora_timers = BTreeSet::new();
for node in nodes {
match &node.kind {
CoreNodeKind::Runtime(node) => {
for operator in &node.operators {
collect_dora_nodes(operator.config.inputs.values(), &mut dora_timers);
}
}
CoreNodeKind::Custom(node) => {
collect_dora_nodes(node.run_config.inputs.values(), &mut dora_timers);
}
}
}
dora_timers
}

fn collect_dora_nodes(
values: std::collections::btree_map::Values<DataId, InputMapping>,
dora_timers: &mut BTreeSet<Duration>,


Loading…
Cancel
Save