Instead of doing an additional copy to send them from the operator thread to the runtime thread.
This commit uses the new `allocate_data_sample` and `send_output_sample` methods introduced in d7cd370.
We didn't check for already closed inputs, which could result in duplicate `InputClosed` messages when the node closes some inputs before drop (or with the new close-on-drop behavior).
We don't want to keep the input open until all drop tokens were released for two reasons:
- It adds an unnecessary delay. It is already clear that the output is finished, so by reporting it directly receivers can react earlier.
- Receivers might be blocked while waiting for new events, which prevents them from sending finished drop tokens. By closing the outputs, they will be unblocked through a new `InputClosed` event, which allows them to send their finished drop tokens. This way, we receive the remaining drop tokens faster in the sender.
The Python garbage collection will drop them non-deterministically in the background. Also, the dropping cannot happen while the GIL is held by our wait code.
There might still be some pending drop tokens after the receiving end of the event stream was closed. So we don't want to break from the receiver thread directly. Instead we keep it running until the control channel signals that it expects no more drop tokens by closing the `finished_drop_tokens` channel. This happens either when all required drop tokens were received, or because of a timeout.
Some applications might not need the data in arrow format. By creating a custom object, we can provide the data both as a classical `PyBytes` and as a zero-copy arrow array.
Don't bind the lifetime of the event to the `next` call anymore. This makes it possible to use the original event in Python, which does not support borrowed data or lifetimes. The drawback is that we no longer have the guarantee that an event is freed before the next call to `recv`.