diff --git a/binaries/runtime/src/operator/channel.rs b/binaries/runtime/src/operator/channel.rs index d8f1ca27..135260cc 100644 --- a/binaries/runtime/src/operator/channel.rs +++ b/binaries/runtime/src/operator/channel.rs @@ -38,8 +38,13 @@ impl InputBuffer { outgoing: flume::Sender, ) { let mut send_out_buf = future::Fuse::terminated(); + let mut incoming_closed = false; loop { - let next_incoming = incoming.recv_async(); + let next_incoming = if incoming_closed { + future::Fuse::terminated() + } else { + incoming.recv_async().fuse() + }; match future::select(next_incoming, send_out_buf).await { future::Either::Left((event, mut send_out)) => { match event { @@ -55,6 +60,7 @@ impl InputBuffer { } } Err(flume::RecvError::Disconnected) => { + incoming_closed = true; // the incoming channel was closed -> exit if we sent out all events already if send_out.is_terminated() && self.queue.is_empty() { break;