|
|
|
@@ -265,7 +265,7 @@ impl Daemon { |
|
|
|
.get_mut(&dataflow_id) |
|
|
|
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?; |
|
|
|
|
|
|
|
for channel in dataflow.subscribe_channels.values_mut() { |
|
|
|
for (_node_id, channel) in dataflow.subscribe_channels.drain() { |
|
|
|
let _ = channel.send_async(daemon_messages::NodeEvent::Stop).await; |
|
|
|
} |
|
|
|
Result::<(), eyre::Report>::Ok(()) |
|
|
|
@@ -298,7 +298,13 @@ impl Daemon { |
|
|
|
} |
|
|
|
}; |
|
|
|
for (node_id, params) in nodes { |
|
|
|
dataflow.running_nodes.insert(node_id.clone()); |
|
|
|
for (input_id, mapping) in params.node.run_config.inputs.clone() { |
|
|
|
dataflow |
|
|
|
.open_inputs |
|
|
|
.entry(node_id.clone()) |
|
|
|
.or_default() |
|
|
|
.insert(input_id.clone()); |
|
|
|
match mapping { |
|
|
|
InputMapping::User(mapping) => { |
|
|
|
if mapping.operator.is_some() { |
|
|
|
@@ -366,7 +372,9 @@ impl Daemon { |
|
|
|
dataflow.subscribe_channels.insert(node_id, event_sender); |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
None => Err(format!("no running dataflow with ID `{dataflow_id}`")), |
|
|
|
None => Err(format!( |
|
|
|
"subscribe failed: no running dataflow with ID `{dataflow_id}`" |
|
|
|
)), |
|
|
|
}; |
|
|
|
let _ = reply_sender.send(ControlReply::Result(result)); |
|
|
|
} |
|
|
|
@@ -416,10 +424,9 @@ impl Daemon { |
|
|
|
} = message; |
|
|
|
let data = data.map(|(m, len)| (Rc::new(m), len)); |
|
|
|
|
|
|
|
let dataflow = self |
|
|
|
.running |
|
|
|
.get_mut(&dataflow_id) |
|
|
|
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?; |
|
|
|
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { |
|
|
|
format!("send out failed: no running dataflow with ID `{dataflow_id}`") |
|
|
|
})?; |
|
|
|
|
|
|
|
// figure out receivers from dataflow graph |
|
|
|
let empty_set = BTreeSet::new(); |
|
|
|
@@ -482,7 +489,7 @@ impl Daemon { |
|
|
|
let dataflow = self |
|
|
|
.running |
|
|
|
.get_mut(&dataflow_id) |
|
|
|
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?; |
|
|
|
.wrap_err_with(|| format!("failed to get downstream nodes: no running dataflow with ID `{dataflow_id}`"))?; |
|
|
|
let downstream_nodes: BTreeSet<_> = dataflow |
|
|
|
.mappings |
|
|
|
.iter() |
|
|
|
@@ -499,12 +506,20 @@ impl Daemon { |
|
|
|
id: input_id.clone(), |
|
|
|
}) |
|
|
|
.await; |
|
|
|
|
|
|
|
if let Some(open_inputs) = dataflow.open_inputs.get_mut(receiver_id) { |
|
|
|
open_inputs.remove(input_id); |
|
|
|
if open_inputs.is_empty() { |
|
|
|
// close the subscriber channel |
|
|
|
dataflow.subscribe_channels.remove(receiver_id); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// TODO: notify remote nodes |
|
|
|
|
|
|
|
dataflow.subscribe_channels.remove(&node_id); |
|
|
|
if dataflow.subscribe_channels.is_empty() { |
|
|
|
dataflow.running_nodes.remove(&node_id); |
|
|
|
if dataflow.running_nodes.is_empty() { |
|
|
|
tracing::info!( |
|
|
|
"Dataflow `{dataflow_id}` finished on machine `{}`", |
|
|
|
self.machine_id |
|
|
|
@@ -629,6 +644,8 @@ pub struct RunningDataflow { |
|
|
|
subscribe_channels: HashMap<NodeId, flume::Sender<daemon_messages::NodeEvent>>, |
|
|
|
mappings: HashMap<OutputId, BTreeSet<InputId>>, |
|
|
|
timers: BTreeMap<Duration, BTreeSet<InputId>>, |
|
|
|
open_inputs: BTreeMap<NodeId, BTreeSet<DataId>>, |
|
|
|
running_nodes: BTreeSet<NodeId>, |
|
|
|
/// Keep handles to all timer tasks of this dataflow to cancel them on drop. |
|
|
|
_timer_handles: Vec<futures::future::RemoteHandle<()>>, |
|
|
|
} |
|
|
|
|