Browse Source

Fix: Don't keep on polling incoming event channel after it's closed

This causes the `send_out_buf` future to completely starve.
tags/v0.2.0-candidate
Philipp Oppermann 2 years ago
parent
commit
a84f91c572
Failed to extract signature
1 changed files with 7 additions and 1 deletions
  1. +7
    -1
      binaries/runtime/src/operator/channel.rs

+ 7
- 1
binaries/runtime/src/operator/channel.rs View File

@@ -38,8 +38,13 @@ impl InputBuffer {
outgoing: flume::Sender<IncomingEvent>,
) {
let mut send_out_buf = future::Fuse::terminated();
let mut incoming_closed = false;
loop {
let next_incoming = incoming.recv_async();
let next_incoming = if incoming_closed {
future::Fuse::terminated()
} else {
incoming.recv_async().fuse()
};
match future::select(next_incoming, send_out_buf).await {
future::Either::Left((event, mut send_out)) => {
match event {
@@ -55,6 +60,7 @@ impl InputBuffer {
}
}
Err(flume::RecvError::Disconnected) => {
incoming_closed = true;
// the incoming channel was closed -> exit if we sent out all events already
if send_out.is_terminated() && self.queue.is_empty() {
break;


Loading…
Cancel
Save