Browse Source

Create more complex example with rate limiting and message aggregation

tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
f466170bca
8 changed files with 130 additions and 3 deletions
  1. +1
    -0
      Cargo.lock
  2. +8
    -0
      api/src/config.rs
  3. +1
    -0
      coordinator/Cargo.toml
  4. +21
    -1
      coordinator/examples/example_sink_logger.rs
  5. +1
    -1
      coordinator/examples/example_source_timer.rs
  6. +16
    -1
      coordinator/examples/mini-dataflow.yml
  7. +32
    -0
      coordinator/examples/random_number.rs
  8. +50
    -0
      coordinator/examples/rate_limit.rs

+ 1
- 0
Cargo.lock View File

@@ -563,6 +563,7 @@ dependencies = [
"eyre",
"futures",
"futures-concurrency",
"rand",
"serde",
"serde_yaml",
"time",


+ 8
- 0
api/src/config.rs View File

@@ -42,6 +42,14 @@ impl std::fmt::Display for DataId {
}
}

impl std::ops::Deref for DataId {
type Target = String;

fn deref(&self) -> &Self::Target {
&self.0
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InputMapping {
pub source: NodeId,


+ 1
- 0
coordinator/Cargo.toml View File

@@ -19,3 +19,4 @@ clap = { version = "3.1.8", features = ["derive"] }
uuid = "0.8.2"
time = "0.3.9"
futures-concurrency = "2.0.3"
rand = "0.8.5"

+ 21
- 1
coordinator/examples/example_sink_logger.rs View File

@@ -9,6 +9,8 @@ async fn main() -> eyre::Result<()> {

let mut inputs = operator.inputs().await?;

let mut last_timestamp = None;

loop {
let timeout = Duration::from_secs(2);
let input = match tokio::time::timeout(timeout, inputs.next()).await {
@@ -17,7 +19,25 @@ async fn main() -> eyre::Result<()> {
Err(_) => bail!("timeout while waiting for input"),
};

println!("{}: {}", input.id, String::from_utf8_lossy(&input.data))
match input.id.as_str() {
"time" => {
// only record it, but don't print anything
last_timestamp = Some(String::from_utf8_lossy(&input.data).into_owned());
}
"random" => {
let number = match input.data.try_into() {
Ok(bytes) => u64::from_le_bytes(bytes),
Err(_) => {
eprintln!("Malformed `random` message");
continue;
}
};
if let Some(timestamp) = &last_timestamp {
println!("random at {}: {}", timestamp, number);
}
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

Ok(())


+ 1
- 1
coordinator/examples/example_source_timer.rs View File

@@ -9,7 +9,7 @@ async fn main() -> eyre::Result<()> {
let mut interval = tokio::time::interval(Duration::from_millis(20));

let time_output = DataId::from("time".to_owned());
for _ in 0..100 {
for _ in 0..1000 {
interval.tick().await;
let now = OffsetDateTime::now_utc().to_string();
operator.send_output(&time_output, now.as_bytes()).await?;


+ 16
- 1
coordinator/examples/mini-dataflow.yml View File

@@ -7,7 +7,22 @@ operators:
- time
run: ../target/debug/examples/example_source_timer

- id: rate-limited-timer
run: ../target/debug/examples/rate_limit --seconds 0.5
inputs:
data: timer/time
outputs:
- rate_limited

- id: random
run: ../target/debug/examples/random_number
inputs:
timestamp: rate-limited-timer/rate_limited
outputs:
- number

- id: logger
run: ../target/debug/examples/example_sink_logger
inputs:
data: timer/time
random: random/number
time: timer/time

+ 32
- 0
coordinator/examples/random_number.rs View File

@@ -0,0 +1,32 @@
use dora_api::{self, config::DataId, DoraOperator};
use eyre::bail;
use futures::StreamExt;
use std::time::Duration;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let output = DataId::from("number".to_owned());

let operator = DoraOperator::init_from_args().await?;

let mut inputs = operator.inputs().await?;

loop {
let timeout = Duration::from_secs(3);
let input = match tokio::time::timeout(timeout, inputs.next()).await {
Ok(Some(input)) => input,
Ok(None) => break,
Err(_) => bail!("timeout while waiting for input"),
};

match input.id.as_str() {
"timestamp" => {
let random: u64 = rand::random();
operator.send_output(&output, &random.to_le_bytes()).await?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

Ok(())
}

+ 50
- 0
coordinator/examples/rate_limit.rs View File

@@ -0,0 +1,50 @@
use clap::StructOpt;
use dora_api::{self, config::DataId, DoraOperator};
use eyre::bail;
use futures::StreamExt;
use std::time::{Duration, Instant};

#[derive(Debug, Clone, clap::Parser)]
#[clap(about = "Limit the rate of incoming data")]
struct Args {
/// Minimal interval between two subsequent.
///
/// Intermediate messages are ignored.
#[clap(long)]
seconds: f32,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let args = Args::parse();
let min_interval = Duration::from_secs_f32(args.seconds);
let output = DataId::from("rate_limited".to_owned());

let operator = DoraOperator::init_from_args().await?;

let mut inputs = operator.inputs().await?;

let mut last_message = Instant::now();

loop {
let timeout = Duration::from_secs(3);
let input = match tokio::time::timeout(timeout, inputs.next()).await {
Ok(Some(input)) => input,
Ok(None) => break,
Err(_) => bail!("timeout while waiting for input"),
};

match input.id.as_str() {
"data" => {
let elapsed = last_message.elapsed();
if elapsed > min_interval {
last_message += elapsed;
operator.send_output(&output, &input.data).await?;
}
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

Ok(())
}

Loading…
Cancel
Save