From a84f91c572bc47b7e670736711a3df86178f3cbc Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 28 Feb 2023 15:56:15 +0100 Subject: [PATCH] Fix: Don't keep on polling incoming event channel after it's closed This causes the `send_out_buf` future to completely starve. --- binaries/runtime/src/operator/channel.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/binaries/runtime/src/operator/channel.rs b/binaries/runtime/src/operator/channel.rs index d8f1ca27..135260cc 100644 --- a/binaries/runtime/src/operator/channel.rs +++ b/binaries/runtime/src/operator/channel.rs @@ -38,8 +38,13 @@ impl InputBuffer { outgoing: flume::Sender, ) { let mut send_out_buf = future::Fuse::terminated(); + let mut incoming_closed = false; loop { - let next_incoming = incoming.recv_async(); + let next_incoming = if incoming_closed { + future::Fuse::terminated() + } else { + incoming.recv_async().fuse() + }; match future::select(next_incoming, send_out_buf).await { future::Either::Left((event, mut send_out)) => { match event { @@ -55,6 +60,7 @@ impl InputBuffer { } } Err(flume::RecvError::Disconnected) => { + incoming_closed = true; // the incoming channel was closed -> exit if we sent out all events already if send_out.is_terminated() && self.queue.is_empty() { break;