Browse Source

Use more reliable `InputClosed` event instead of timer as exit condition

We might not receive any `random` input if the startup of the operator is delayed and the source node is already finished.
tags/v0.2.0-candidate
Philipp Oppermann 2 years ago
parent
commit
bb8d6edb40
Failed to extract signature
1 changed files with 10 additions and 9 deletions
  1. +10
    -9
      examples/rust-dataflow/operator/src/lib.rs

+ 10
- 9
examples/rust-dataflow/operator/src/lib.rs View File

@@ -1,14 +1,12 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event};
use std::time::{Duration, Instant};

register_operator!(ExampleOperator);

#[derive(Debug, Default)]
struct ExampleOperator {
ticks: usize,
last_random_at: Option<Instant>,
}

impl DoraOperator for ExampleOperator {
@@ -33,19 +31,22 @@ impl DoraOperator for ExampleOperator {
self.ticks
);
output_sender.send("status".into(), output.into_bytes())?;
self.last_random_at = Some(Instant::now());
}
other => eprintln!("ignoring unexpected input {other}"),
},
Event::Stop => {}
}

if let Some(last_random_at) = self.last_random_at {
if last_random_at.elapsed() > Duration::from_secs(1) {
// looks like the node sending the random values finished -> exit too
return Ok(DoraStatus::Stop);
Event::InputClosed { id } => {
println!("input `{id}` was closed");
if *id == "random" {
println!("`random` input was closed -> exiting");
return Ok(DoraStatus::Stop);
}
}
other => {
println!("received unknown event {other:?}");
}
}

Ok(DoraStatus::Continue)
}
}

Loading…
Cancel
Save