Browse Source

Notify downstream nodes about closed inputs

tags/v0.2.0-candidate
Philipp Oppermann 3 years ago
parent
commit
3cd1b4de65
Failed to extract signature
4 changed files with 36 additions and 1 deletions
  1. +26
    -1
      binaries/daemon/src/main.rs
  2. +1
    -0
      examples/rust-dataflow/node/src/main.rs
  3. +5
    -0
      examples/rust-dataflow/sink/src/main.rs
  4. +4
    -0
      libraries/core/src/daemon_messages.rs

+ 26
- 1
binaries/daemon/src/main.rs View File

@@ -283,9 +283,34 @@ impl Daemon {
let _ = reply_sender.send(ControlReply::Result(Ok(())));
}
DaemonNodeEvent::Stopped => {
// TODO send stop message to downstream nodes
tracing::info!("Stopped: {dataflow_id}/{node_id}");

let _ = reply_sender.send(ControlReply::Result(Ok(())));

// notify downstream nodes
let dataflow = self
.running
.get_mut(&dataflow_id)
.wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?;
let downstream_nodes: BTreeSet<_> = dataflow
.mappings
.iter()
.filter(|((source_id, _), _)| source_id == &node_id)
.flat_map(|(_, v)| v)
.collect();
for (receiver_id, input_id) in downstream_nodes {
let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
continue;
};

let _ = channel
.send_async(daemon_messages::NodeEvent::InputClosed {
id: input_id.clone(),
})
.await;
}

// TODO: notify remote nodes
}
}
Ok(())


+ 1
- 0
examples/rust-dataflow/node/src/main.rs View File

@@ -31,6 +31,7 @@ fn main() -> eyre::Result<()> {
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
other => eprintln!("Received unexpected input: {other:?}"),
}
}



+ 5
- 0
examples/rust-dataflow/sink/src/main.rs View File

@@ -26,6 +26,11 @@ fn main() -> eyre::Result<()> {
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
NodeEvent::InputClosed { id } => {
println!("Input `{id}` was closed -> exiting");
break;
}
other => eprintln!("Received unexpected input: {other:?}"),
}
}



+ 4
- 0
libraries/core/src/daemon_messages.rs View File

@@ -47,6 +47,7 @@ pub enum ControlReply {
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub enum NodeEvent {
Stop,
Input {
@@ -54,6 +55,9 @@ pub enum NodeEvent {
metadata: Metadata<'static>,
data: Option<InputData>,
},
InputClosed {
id: DataId,
},
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]


Loading…
Cancel
Save