diff --git a/binaries/daemon/src/main.rs b/binaries/daemon/src/main.rs index 1352718a..4286fd36 100644 --- a/binaries/daemon/src/main.rs +++ b/binaries/daemon/src/main.rs @@ -283,9 +283,34 @@ impl Daemon { let _ = reply_sender.send(ControlReply::Result(Ok(()))); } DaemonNodeEvent::Stopped => { - // TODO send stop message to downstream nodes + tracing::info!("Stopped: {dataflow_id}/{node_id}"); let _ = reply_sender.send(ControlReply::Result(Ok(()))); + + // notify downstream nodes + let dataflow = self + .running + .get_mut(&dataflow_id) + .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?; + let downstream_nodes: BTreeSet<_> = dataflow + .mappings + .iter() + .filter(|((source_id, _), _)| source_id == &node_id) + .flat_map(|(_, v)| v) + .collect(); + for (receiver_id, input_id) in downstream_nodes { + let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else { + continue; + }; + + let _ = channel + .send_async(daemon_messages::NodeEvent::InputClosed { + id: input_id.clone(), + }) + .await; + } + + // TODO: notify remote nodes } } Ok(()) diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index ce7408df..442932e7 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -31,6 +31,7 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, + other => eprintln!("Received unexpected input: {other:?}"), } } diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index 59f631f0..57e1f026 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -26,6 +26,11 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, + NodeEvent::InputClosed { id } => { + println!("Input `{id}` was closed -> exiting"); + break; + } + other => eprintln!("Received unexpected input: {other:?}"), } } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index a0494535..e0021b88 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -47,6 +47,7 @@ pub enum ControlReply { } #[derive(Debug, serde::Serialize, serde::Deserialize)] +#[non_exhaustive] pub enum NodeEvent { Stop, Input { @@ -54,6 +55,9 @@ pub enum NodeEvent { metadata: Metadata<'static>, data: Option, }, + InputClosed { + id: DataId, + }, } #[derive(Debug, serde::Serialize, serde::Deserialize)]