diff --git a/Cargo.lock b/Cargo.lock index 3d187199..a806a614 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -975,6 +975,7 @@ dependencies = [ "dora-message", "eyre", "flume", + "futures", "futures-concurrency 7.0.0", "serde", "serde_json", diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 55f6a27d..2b33f259 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -21,3 +21,4 @@ flume = "0.10.14" dora-download = { path = "../../libraries/extensions/download" } serde_yaml = "0.8.23" uuid = { version = "1.1.2", features = ["v4"] } +futures = "0.3.25" diff --git a/binaries/daemon/src/main.rs b/binaries/daemon/src/main.rs index 46a464d2..8e54867e 100644 --- a/binaries/daemon/src/main.rs +++ b/binaries/daemon/src/main.rs @@ -5,6 +5,7 @@ use dora_core::{ }; use dora_message::uhlc::HLC; use eyre::{bail, eyre, Context, ContextCompat}; +use futures::FutureExt; use futures_concurrency::stream::Merge; use shared_memory::{Shmem, ShmemConf}; use std::{ @@ -187,7 +188,9 @@ impl Daemon { } } }; + let (task, handle) = task.remote_handle(); tokio::spawn(task); + dataflow._timer_handles.push(handle); } Ok(()) @@ -396,6 +399,8 @@ pub struct RunningDataflow { subscribe_channels: HashMap>, mappings: HashMap>, timers: BTreeMap>, + /// Keep handles to all timer tasks of this dataflow to cancel them on drop. + _timer_handles: Vec>, } type OutputId = (NodeId, DataId);