diff --git a/Cargo.lock b/Cargo.lock index 09b47157..edec8aa0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -563,6 +563,7 @@ dependencies = [ "eyre", "futures", "futures-concurrency", + "rand", "serde", "serde_yaml", "time", diff --git a/api/src/config.rs b/api/src/config.rs index 48d92e92..394dea8a 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -42,6 +42,14 @@ impl std::fmt::Display for DataId { } } +impl std::ops::Deref for DataId { + type Target = String; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct InputMapping { pub source: NodeId, diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index d453735e..42209323 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -19,3 +19,4 @@ clap = { version = "3.1.8", features = ["derive"] } uuid = "0.8.2" time = "0.3.9" futures-concurrency = "2.0.3" +rand = "0.8.5" diff --git a/coordinator/examples/example_sink_logger.rs b/coordinator/examples/example_sink_logger.rs index 464b8072..30a00311 100644 --- a/coordinator/examples/example_sink_logger.rs +++ b/coordinator/examples/example_sink_logger.rs @@ -9,6 +9,8 @@ async fn main() -> eyre::Result<()> { let mut inputs = operator.inputs().await?; + let mut last_timestamp = None; + loop { let timeout = Duration::from_secs(2); let input = match tokio::time::timeout(timeout, inputs.next()).await { @@ -17,7 +19,25 @@ async fn main() -> eyre::Result<()> { Err(_) => bail!("timeout while waiting for input"), }; - println!("{}: {}", input.id, String::from_utf8_lossy(&input.data)) + match input.id.as_str() { + "time" => { + // only record it, but don't print anything + last_timestamp = Some(String::from_utf8_lossy(&input.data).into_owned()); + } + "random" => { + let number = match input.data.try_into() { + Ok(bytes) => u64::from_le_bytes(bytes), + Err(_) => { + eprintln!("Malformed `random` message"); + continue; + } + }; + if let Some(timestamp) = &last_timestamp { + println!("random at {}: {}", timestamp, number); + } + } + other => eprintln!("Ignoring unexpected input `{other}`"), + } } Ok(()) diff --git a/coordinator/examples/example_source_timer.rs b/coordinator/examples/example_source_timer.rs index ba5b30ad..bcad8b9c 100644 --- a/coordinator/examples/example_source_timer.rs +++ b/coordinator/examples/example_source_timer.rs @@ -9,7 +9,7 @@ async fn main() -> eyre::Result<()> { let mut interval = tokio::time::interval(Duration::from_millis(20)); let time_output = DataId::from("time".to_owned()); - for _ in 0..100 { + for _ in 0..1000 { interval.tick().await; let now = OffsetDateTime::now_utc().to_string(); operator.send_output(&time_output, now.as_bytes()).await?; diff --git a/coordinator/examples/mini-dataflow.yml b/coordinator/examples/mini-dataflow.yml index 13b758c5..089cc7c5 100644 --- a/coordinator/examples/mini-dataflow.yml +++ b/coordinator/examples/mini-dataflow.yml @@ -7,7 +7,22 @@ operators: - time run: ../target/debug/examples/example_source_timer + - id: rate-limited-timer + run: ../target/debug/examples/rate_limit --seconds 0.5 + inputs: + data: timer/time + outputs: + - rate_limited + + - id: random + run: ../target/debug/examples/random_number + inputs: + timestamp: rate-limited-timer/rate_limited + outputs: + - number + - id: logger run: ../target/debug/examples/example_sink_logger inputs: - data: timer/time + random: random/number + time: timer/time diff --git a/coordinator/examples/random_number.rs b/coordinator/examples/random_number.rs new file mode 100644 index 00000000..176c123c --- /dev/null +++ b/coordinator/examples/random_number.rs @@ -0,0 +1,32 @@ +use dora_api::{self, config::DataId, DoraOperator}; +use eyre::bail; +use futures::StreamExt; +use std::time::Duration; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let output = DataId::from("number".to_owned()); + + let operator = DoraOperator::init_from_args().await?; + + let mut inputs = operator.inputs().await?; + + loop { + let timeout = Duration::from_secs(3); + let input = match tokio::time::timeout(timeout, inputs.next()).await { + Ok(Some(input)) => input, + Ok(None) => break, + Err(_) => bail!("timeout while waiting for input"), + }; + + match input.id.as_str() { + "timestamp" => { + let random: u64 = rand::random(); + operator.send_output(&output, &random.to_le_bytes()).await?; + } + other => eprintln!("Ignoring unexpected input `{other}`"), + } + } + + Ok(()) +} diff --git a/coordinator/examples/rate_limit.rs b/coordinator/examples/rate_limit.rs new file mode 100644 index 00000000..09e3c5db --- /dev/null +++ b/coordinator/examples/rate_limit.rs @@ -0,0 +1,50 @@ +use clap::StructOpt; +use dora_api::{self, config::DataId, DoraOperator}; +use eyre::bail; +use futures::StreamExt; +use std::time::{Duration, Instant}; + +#[derive(Debug, Clone, clap::Parser)] +#[clap(about = "Limit the rate of incoming data")] +struct Args { + /// Minimal interval between two subsequent. + /// + /// Intermediate messages are ignored. + #[clap(long)] + seconds: f32, +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let args = Args::parse(); + let min_interval = Duration::from_secs_f32(args.seconds); + let output = DataId::from("rate_limited".to_owned()); + + let operator = DoraOperator::init_from_args().await?; + + let mut inputs = operator.inputs().await?; + + let mut last_message = Instant::now(); + + loop { + let timeout = Duration::from_secs(3); + let input = match tokio::time::timeout(timeout, inputs.next()).await { + Ok(Some(input)) => input, + Ok(None) => break, + Err(_) => bail!("timeout while waiting for input"), + }; + + match input.id.as_str() { + "data" => { + let elapsed = last_message.elapsed(); + if elapsed > min_interval { + last_message += elapsed; + operator.send_output(&output, &input.data).await?; + } + } + other => eprintln!("Ignoring unexpected input `{other}`"), + } + } + + Ok(()) +}