Browse Source

Cancel timer tasks once a dataflow is finished

tags/v0.2.0-candidate
Philipp Oppermann 3 years ago
parent
commit
3c4469d4d3
Failed to extract signature
3 changed files with 7 additions and 0 deletions
  1. +1
    -0
      Cargo.lock
  2. +1
    -0
      binaries/daemon/Cargo.toml
  3. +5
    -0
      binaries/daemon/src/main.rs

+ 1
- 0
Cargo.lock View File

@@ -975,6 +975,7 @@ dependencies = [
"dora-message",
"eyre",
"flume",
"futures",
"futures-concurrency 7.0.0",
"serde",
"serde_json",


+ 1
- 0
binaries/daemon/Cargo.toml View File

@@ -21,3 +21,4 @@ flume = "0.10.14"
dora-download = { path = "../../libraries/extensions/download" }
serde_yaml = "0.8.23"
uuid = { version = "1.1.2", features = ["v4"] }
futures = "0.3.25"

+ 5
- 0
binaries/daemon/src/main.rs View File

@@ -5,6 +5,7 @@ use dora_core::{
};
use dora_message::uhlc::HLC;
use eyre::{bail, eyre, Context, ContextCompat};
use futures::FutureExt;
use futures_concurrency::stream::Merge;
use shared_memory::{Shmem, ShmemConf};
use std::{
@@ -187,7 +188,9 @@ impl Daemon {
}
}
};
let (task, handle) = task.remote_handle();
tokio::spawn(task);
dataflow._timer_handles.push(handle);
}

Ok(())
@@ -396,6 +399,8 @@ pub struct RunningDataflow {
subscribe_channels: HashMap<NodeId, flume::Sender<daemon_messages::NodeEvent>>,
mappings: HashMap<OutputId, BTreeSet<InputId>>,
timers: BTreeMap<Duration, BTreeSet<InputId>>,
/// Keep handles to all timer tasks of this dataflow to cancel them on drop.
_timer_handles: Vec<futures::future::RemoteHandle<()>>,
}

type OutputId = (NodeId, DataId);


Loading…
Cancel
Save