From bb8d6edb40fe6f09155fca4694ba7ec75e609514 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 6 Mar 2023 17:49:17 +0100 Subject: [PATCH] Use more reliable `InputClosed` event instead of timer as exit condition We might not receive any `random` input if the startup of the operator is delayed and the source node is already finished. --- examples/rust-dataflow/operator/src/lib.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/examples/rust-dataflow/operator/src/lib.rs b/examples/rust-dataflow/operator/src/lib.rs index 681213c5..81a1a1f0 100644 --- a/examples/rust-dataflow/operator/src/lib.rs +++ b/examples/rust-dataflow/operator/src/lib.rs @@ -1,14 +1,12 @@ #![warn(unsafe_op_in_unsafe_fn)] use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; -use std::time::{Duration, Instant}; register_operator!(ExampleOperator); #[derive(Debug, Default)] struct ExampleOperator { ticks: usize, - last_random_at: Option, } impl DoraOperator for ExampleOperator { @@ -33,19 +31,22 @@ impl DoraOperator for ExampleOperator { self.ticks ); output_sender.send("status".into(), output.into_bytes())?; - self.last_random_at = Some(Instant::now()); } other => eprintln!("ignoring unexpected input {other}"), }, Event::Stop => {} - } - - if let Some(last_random_at) = self.last_random_at { - if last_random_at.elapsed() > Duration::from_secs(1) { - // looks like the node sending the random values finished -> exit too - return Ok(DoraStatus::Stop); + Event::InputClosed { id } => { + println!("input `{id}` was closed"); + if *id == "random" { + println!("`random` input was closed -> exiting"); + return Ok(DoraStatus::Stop); + } + } + other => { + println!("received unknown event {other:?}"); } } + Ok(DoraStatus::Continue) } }