From 3c4469d4d3e0472d05871ed2d1f5f37c9254680d Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 13 Dec 2022 14:04:05 +0100 Subject: [PATCH] Cancel timer tasks once a dataflow is finished --- Cargo.lock | 1 + binaries/daemon/Cargo.toml | 1 + binaries/daemon/src/main.rs | 5 +++++ 3 files changed, 7 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 3d187199..a806a614 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -975,6 +975,7 @@ dependencies = [ "dora-message", "eyre", "flume", + "futures", "futures-concurrency 7.0.0", "serde", "serde_json", diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 55f6a27d..2b33f259 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -21,3 +21,4 @@ flume = "0.10.14" dora-download = { path = "../../libraries/extensions/download" } serde_yaml = "0.8.23" uuid = { version = "1.1.2", features = ["v4"] } +futures = "0.3.25" diff --git a/binaries/daemon/src/main.rs b/binaries/daemon/src/main.rs index 46a464d2..8e54867e 100644 --- a/binaries/daemon/src/main.rs +++ b/binaries/daemon/src/main.rs @@ -5,6 +5,7 @@ use dora_core::{ }; use dora_message::uhlc::HLC; use eyre::{bail, eyre, Context, ContextCompat}; +use futures::FutureExt; use futures_concurrency::stream::Merge; use shared_memory::{Shmem, ShmemConf}; use std::{ @@ -187,7 +188,9 @@ impl Daemon { } } }; + let (task, handle) = task.remote_handle(); tokio::spawn(task); + dataflow._timer_handles.push(handle); } Ok(()) @@ -396,6 +399,8 @@ pub struct RunningDataflow { subscribe_channels: HashMap>, mappings: HashMap>, timers: BTreeMap>, + /// Keep handles to all timer tasks of this dataflow to cancel them on drop. + _timer_handles: Vec>, } type OutputId = (NodeId, DataId);