From b3fd01848cd7a90f27f02bf7ee2ac1775cd98b3c Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 4 Oct 2024 13:06:38 +0200 Subject: [PATCH] Report when shared memory region is mapped to allow faster cleanup The shared memory region can be safely removed by the sender once it's mapped in the receiver. The OS will just delete the file handle associated with the shared memory region, but keep the data alive until it has been unmapped from all address spaces. By notifying the sender that a message has been mapped to the address space we enable faster cleanup on exit. The sender can safely close all of its shared memory regions once all of its sent messages are at least mapped. So it does not need to wait until all messages have been _dropped_ anymore, which can take considerably longer, especially if the Python GC is involved. This commit modifies the message format, so we need to bump the version of the `dora-message` crate to `0.5.0`. --- apis/rust/node/src/event_stream/mod.rs | 53 +----------- apis/rust/node/src/event_stream/scheduler.rs | 10 +-- apis/rust/node/src/event_stream/thread.rs | 85 +++++++++++++------ apis/rust/node/src/node/drop_stream.rs | 35 +++++--- apis/rust/node/src/node/mod.rs | 41 +++++++-- binaries/daemon/src/lib.rs | 82 ++++++++++++++---- binaries/daemon/src/node_communication/mod.rs | 8 +- libraries/message/src/common.rs | 16 ++++ libraries/message/src/daemon_to_node.rs | 1 + libraries/message/src/node_to_daemon.rs | 5 +- 10 files changed, 211 insertions(+), 125 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 7276b6bf..122ba44e 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -6,7 +6,7 @@ use std::{ }; use dora_message::{ - daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent}, + daemon_to_node::{DaemonCommunication, DaemonReply}, id::DataId, node_to_daemon::{DaemonRequest, Timestamped}, DataflowId, @@ -19,10 +19,7 @@ use futures::{ use futures_timer::Delay; use scheduler::{Scheduler, NON_INPUT_EVENT}; -use self::{ - event::SharedMemoryData, - thread::{EventItem, EventStreamThreadHandle}, -}; +use self::thread::{EventItem, EventStreamThreadHandle}; use crate::daemon_connection::DaemonChannel; use dora_core::{ config::{Input, NodeId}, @@ -199,51 +196,7 @@ impl EventStream { fn convert_event_item(item: EventItem) -> Event { match item { - EventItem::NodeEvent { event, ack_channel } => match event { - NodeEvent::Stop => Event::Stop, - NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, - NodeEvent::InputClosed { id } => Event::InputClosed { id }, - NodeEvent::Input { id, metadata, data } => { - let data = match data { - None => Ok(None), - Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), - Some(DataMessage::SharedMemory { - shared_memory_id, - len, - drop_token: _, // handled in `event_stream_loop` - }) => unsafe { - MappedInputData::map(&shared_memory_id, len).map(|data| { - Some(RawData::SharedMemory(SharedMemoryData { - data, - _drop: ack_channel, - })) - }) - }, - }; - let data = data.and_then(|data| { - let raw_data = data.unwrap_or(RawData::Empty); - raw_data - .into_arrow_array(&metadata.type_info) - .map(arrow::array::make_array) - }); - match data { - Ok(data) => Event::Input { - id, - metadata, - data: data.into(), - }, - Err(err) => Event::Error(format!("{err:?}")), - } - } - NodeEvent::AllInputsClosed => { - let err = eyre!( - "received `AllInputsClosed` event, which should be handled by background task" - ); - tracing::error!("{err:?}"); - Event::Error(err.wrap_err("internal error").to_string()) - } - }, - + EventItem::NodeEvent { event } => event, EventItem::FatalError(err) => { Event::Error(format!("fatal event stream error: {err:?}")) } diff --git a/apis/rust/node/src/event_stream/scheduler.rs b/apis/rust/node/src/event_stream/scheduler.rs index c6e15abe..9185050b 100644 --- a/apis/rust/node/src/event_stream/scheduler.rs +++ b/apis/rust/node/src/event_stream/scheduler.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque}; use dora_message::{daemon_to_node::NodeEvent, id::DataId}; -use super::thread::EventItem; +use super::{thread::EventItem, Event}; pub const NON_INPUT_EVENT: &str = "dora/non_input_event"; // This scheduler will make sure that there is fairness between @@ -40,13 +40,7 @@ impl Scheduler { pub fn add_event(&mut self, event: EventItem) { let event_id = match &event { EventItem::NodeEvent { - event: - NodeEvent::Input { - id, - metadata: _, - data: _, - }, - ack_channel: _, + event: Event::Input { id, .. }, } => id, _ => &DataId::from(NON_INPUT_EVENT.to_string()), }; diff --git a/apis/rust/node/src/event_stream/thread.rs b/apis/rust/node/src/event_stream/thread.rs index 5e982f74..47aa41e2 100644 --- a/apis/rust/node/src/event_stream/thread.rs +++ b/apis/rust/node/src/event_stream/thread.rs @@ -3,6 +3,7 @@ use dora_core::{ uhlc::{self, Timestamp}, }; use dora_message::{ + common::{DataMessage, DropTokenState, DropTokenStatus}, daemon_to_node::{DaemonReply, NodeEvent}, node_to_daemon::{DaemonRequest, DropToken, Timestamped}, }; @@ -15,6 +16,8 @@ use std::{ use crate::daemon_connection::DaemonChannel; +use super::{event::SharedMemoryData, Event, MappedInputData, RawData}; + pub fn init( node_id: NodeId, tx: flume::Sender, @@ -28,10 +31,7 @@ pub fn init( #[derive(Debug)] pub enum EventItem { - NodeEvent { - event: NodeEvent, - ack_channel: flume::Sender<()>, - }, + NodeEvent { event: super::Event }, FatalError(eyre::Report), TimeoutError(eyre::Report), } @@ -130,25 +130,60 @@ fn event_stream_loop( if let Err(err) = clock.update_with_timestamp(×tamp) { tracing::warn!("failed to update HLC: {err}"); } - let drop_token = match &inner { - NodeEvent::Input { - data: Some(data), .. - } => data.drop_token(), + + let event = match inner { + NodeEvent::Stop => Event::Stop, + NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, + NodeEvent::InputClosed { id } => Event::InputClosed { id }, + NodeEvent::Input { id, metadata, data } => { + let data = match data { + None => Ok(None), + Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), + Some(DataMessage::SharedMemory { + shared_memory_id, + len, + drop_token, + }) => unsafe { + let (drop_tx, drop_rx) = flume::bounded(0); + let data = MappedInputData::map(&shared_memory_id, len).map(|data| { + Some(RawData::SharedMemory(SharedMemoryData { + data, + _drop: drop_tx, + })) + }); + drop_tokens.push(DropTokenStatus { + token: drop_token, + state: DropTokenState::Mapped, + }); + pending_drop_tokens.push((drop_token, drop_rx, Instant::now(), 1)); + data + }, + }; + let data = data.and_then(|data| { + let raw_data = data.unwrap_or(RawData::Empty); + raw_data + .into_arrow_array(&metadata.type_info) + .map(arrow::array::make_array) + }); + match data { + Ok(data) => Event::Input { + id, + metadata, + data: data.into(), + }, + Err(err) => Event::Error(format!("{err:?}")), + } + } NodeEvent::AllInputsClosed => { // close the event stream tx = None; // skip this internal event continue; } - _ => None, }; if let Some(tx) = tx.as_ref() { - let (drop_tx, drop_rx) = flume::bounded(0); - match tx.send(EventItem::NodeEvent { - event: inner, - ack_channel: drop_tx, - }) { + match tx.send(EventItem::NodeEvent { event }) { Ok(()) => {} Err(send_error) => { let event = send_error.into_inner(); @@ -159,12 +194,8 @@ fn event_stream_loop( break 'outer Ok(()); } } - - if let Some(token) = drop_token { - pending_drop_tokens.push((token, drop_rx, Instant::now(), 1)); - } } else { - tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`"); + tracing::warn!("dropping event because event `tx` was already closed: `{event:?}`"); } } }; @@ -196,7 +227,7 @@ fn event_stream_loop( fn handle_pending_drop_tokens( pending_drop_tokens: &mut Vec<(DropToken, flume::Receiver<()>, Instant, u64)>, - drop_tokens: &mut Vec, + drop_tokens: &mut Vec, ) -> eyre::Result<()> { let mut still_pending = Vec::new(); for (token, rx, since, warn) in pending_drop_tokens.drain(..) { @@ -204,7 +235,10 @@ fn handle_pending_drop_tokens( Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Err(flume::TryRecvError::Disconnected) => { // the event was dropped -> add the drop token to the list - drop_tokens.push(token); + drop_tokens.push(DropTokenStatus { + token, + state: DropTokenState::Dropped, + }); } Err(flume::TryRecvError::Empty) => { let duration = Duration::from_secs(30 * warn); @@ -221,7 +255,7 @@ fn handle_pending_drop_tokens( fn report_remaining_drop_tokens( mut channel: DaemonChannel, - mut drop_tokens: Vec, + mut drop_tokens: Vec, mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)>, timestamp: Timestamp, ) -> eyre::Result<()> { @@ -234,7 +268,10 @@ fn report_remaining_drop_tokens( Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Err(flume::RecvTimeoutError::Disconnected) => { // the event was dropped -> add the drop token to the list - drop_tokens.push(token); + drop_tokens.push(DropTokenStatus { + token, + state: DropTokenState::Dropped, + }); } Err(flume::RecvTimeoutError::Timeout) => { let duration = Duration::from_secs(1); @@ -259,7 +296,7 @@ fn report_remaining_drop_tokens( } fn report_drop_tokens( - drop_tokens: &mut Vec, + drop_tokens: &mut Vec, channel: &mut DaemonChannel, timestamp: Timestamp, ) -> Result<(), eyre::ErrReport> { diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index d62b0588..d30bf40b 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -3,15 +3,16 @@ use std::{sync::Arc, time::Duration}; use crate::daemon_connection::DaemonChannel; use dora_core::{config::NodeId, uhlc}; use dora_message::{ + common::{DropTokenState, DropTokenStatus}, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent}, - node_to_daemon::{DaemonRequest, DropToken, Timestamped}, + node_to_daemon::{DaemonRequest, Timestamped}, DataflowId, }; use eyre::{eyre, Context}; use flume::RecvTimeoutError; pub struct DropStream { - receiver: flume::Receiver, + receiver: flume::Receiver, _thread_handle: DropStreamThreadHandle, } @@ -82,7 +83,7 @@ impl DropStream { } impl std::ops::Deref for DropStream { - type Target = flume::Receiver; + type Target = flume::Receiver; fn deref(&self) -> &Self::Target { &self.receiver @@ -92,7 +93,7 @@ impl std::ops::Deref for DropStream { #[tracing::instrument(skip(tx, channel, clock))] fn drop_stream_loop( node_id: NodeId, - tx: flume::Sender, + tx: flume::Sender, mut channel: DaemonChannel, clock: Arc, ) { @@ -125,16 +126,22 @@ fn drop_stream_loop( if let Err(err) = clock.update_with_timestamp(×tamp) { tracing::warn!("failed to update HLC: {err}"); } - match inner { - NodeDropEvent::OutputDropped { drop_token } => { - if tx.send(drop_token).is_err() { - tracing::warn!( - "drop channel was closed already, could not forward \ - drop token`{drop_token:?}`" - ); - break 'outer; - } - } + let event = match inner { + NodeDropEvent::OutputMapped { drop_token } => DropTokenStatus { + token: drop_token, + state: DropTokenState::Mapped, + }, + NodeDropEvent::OutputDropped { drop_token } => DropTokenStatus { + token: drop_token, + state: DropTokenState::Dropped, + }, + }; + if tx.send(event).is_err() { + tracing::warn!( + "drop channel was closed already, could not forward \ + drop token event `{event:?}`" + ); + break 'outer; } } } diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 6ba08af1..7972229a 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -16,6 +16,7 @@ use dora_core::{ }; use dora_message::{ + common::DropTokenStatus, daemon_to_node::{DaemonReply, NodeConfig}, metadata::{ArrowTypeInfo, Metadata, MetadataParameters}, node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped}, @@ -48,6 +49,7 @@ pub struct DoraNode { clock: Arc, sent_out_shared_memory: HashMap, + shared_memory_in_use: HashMap, drop_stream: DropStream, cache: VecDeque, @@ -155,6 +157,7 @@ impl DoraNode { control_channel, clock, sent_out_shared_memory: HashMap::new(), + shared_memory_in_use: HashMap::new(), drop_stream, cache: VecDeque::new(), dataflow_descriptor, @@ -380,10 +383,7 @@ impl DoraNode { fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> { loop { match self.drop_stream.try_recv() { - Ok(token) => match self.sent_out_shared_memory.remove(&token) { - Some(region) => self.add_to_cache(region), - None => tracing::warn!("received unknown finished drop token `{token:?}`"), - }, + Ok(event) => self.handle_drop_token_event(event), Err(flume::TryRecvError::Empty) => break, Err(flume::TryRecvError::Disconnected) => { bail!("event stream was closed before sending all expected drop tokens") @@ -393,6 +393,35 @@ impl DoraNode { Ok(()) } + fn handle_drop_token_event(&mut self, event: DropTokenStatus) { + let DropTokenStatus { token, state } = event; + match state { + dora_message::common::DropTokenState::Mapped => { + let region = self.sent_out_shared_memory.remove(&token); + match region { + Some(region) => { + self.shared_memory_in_use.insert(token, region); + } + None => { + tracing::warn!("received unknown mapped drop token `{token:?}`") + } + }; + } + dora_message::common::DropTokenState::Dropped => { + let region = self + .sent_out_shared_memory + .remove(&token) + .or_else(|| self.shared_memory_in_use.remove(&token)); + match region { + Some(region) => self.add_to_cache(region), + None => { + tracing::warn!("received unknown finished drop token `{token:?}`") + } + } + } + }; + } + fn add_to_cache(&mut self, memory: ShmemHandle) { const MAX_CACHE_SIZE: usize = 20; @@ -435,9 +464,7 @@ impl Drop for DoraNode { } match self.drop_stream.recv_timeout(Duration::from_secs(2)) { - Ok(token) => { - self.sent_out_shared_memory.remove(&token); - } + Ok(event) => self.handle_drop_token_event(event), Err(flume::RecvTimeoutError::Disconnected) => { tracing::warn!( "finished_drop_tokens channel closed while still waiting for drop tokens; \ diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index de531fd0..718adf23 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -12,7 +12,8 @@ use dora_core::{ }; use dora_message::{ common::{ - DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus, + DaemonId, DataMessage, DropToken, DropTokenStatus, LogLevel, NodeError, NodeErrorCause, + NodeExitStatus, }, coordinator_to_cli::DataflowResult, coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes}, @@ -1014,7 +1015,7 @@ impl Daemon { .send_out(dataflow_id, node_id, output_id, metadata, data) .await .context("failed to send out")?, - DaemonNodeEvent::ReportDrop { tokens } => { + DaemonNodeEvent::ReportTokenState { token_events } => { let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!( "failed to get handle drop tokens: \ @@ -1024,17 +1025,27 @@ impl Daemon { match dataflow { Ok(dataflow) => { - for token in tokens { + for event in token_events { + let DropTokenStatus { token, state } = event; match dataflow.pending_drop_tokens.get_mut(&token) { - Some(info) => { - if info.pending_nodes.remove(&node_id) { - dataflow.check_drop_token(token, &self.clock).await?; - } else { - tracing::warn!( - "node `{node_id}` is not pending for drop token `{token:?}`" - ); + Some(info) => match state { + dora_message::common::DropTokenState::Mapped => { + let changed = info.pending_nodes.remove(&node_id); + info.mapped_in_nodes.insert(node_id.clone()); + if changed { + dataflow + .check_drop_token_mapped(token, &self.clock) + .await?; + } } - } + dora_message::common::DropTokenState::Dropped => { + let mut changed = info.pending_nodes.remove(&node_id); + changed |= info.mapped_in_nodes.remove(&node_id); + if changed { + dataflow.check_drop_token(token, &self.clock).await?; + } + } + }, None => tracing::warn!("unknown drop token `{token:?}`"), } } @@ -1620,6 +1631,7 @@ async fn send_output_to_local_receivers( .or_insert_with(|| DropTokenInformation { owner: node_id.clone(), pending_nodes: Default::default(), + mapped_in_nodes: Default::default(), }) .pending_nodes .insert(receiver_id.clone()); @@ -1658,6 +1670,7 @@ async fn send_output_to_local_receivers( .or_insert_with(|| DropTokenInformation { owner: node_id.clone(), pending_nodes: Default::default(), + mapped_in_nodes: Default::default(), }); // check if all local subscribers are finished with the token dataflow.check_drop_token(token, clock).await?; @@ -1927,7 +1940,7 @@ impl RunningDataflow { async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> { match self.pending_drop_tokens.entry(token) { std::collections::hash_map::Entry::Occupied(entry) => { - if entry.get().pending_nodes.is_empty() { + if entry.get().pending_nodes.is_empty() && entry.get().mapped_in_nodes.is_empty() { let (drop_token, info) = entry.remove_entry(); let result = match self.drop_channels.get_mut(&info.owner) { Some(channel) => send_with_timestamp( @@ -1962,6 +1975,38 @@ impl RunningDataflow { let OutputId(node_id, output_id) = output_id; format!("dora/{network_id}/{dataflow_id}/output/{node_id}/{output_id}") } + + async fn check_drop_token_mapped(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> { + match self.pending_drop_tokens.entry(token) { + std::collections::hash_map::Entry::Occupied(entry) => { + if entry.get().pending_nodes.is_empty() && !entry.get().mapped_in_nodes.is_empty() { + let info = entry.get(); + let result = match self.drop_channels.get_mut(&info.owner) { + Some(channel) => send_with_timestamp( + channel, + NodeDropEvent::OutputMapped { drop_token: token }, + clock, + ) + .wrap_err("send failed"), + None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)), + }; + if let Err(err) = result.wrap_err_with(|| { + format!( + "failed to report drop token mapped `{token:?}` to owner `{}`", + &info.owner + ) + }) { + tracing::warn!("{err:?}"); + } + } + } + std::collections::hash_map::Entry::Vacant(_) => { + tracing::warn!("check_drop_token_mapped called with already closed token") + } + } + + Ok(()) + } } fn empty_type_info() -> ArrowTypeInfo { @@ -1983,9 +2028,14 @@ type InputId = (NodeId, DataId); struct DropTokenInformation { /// The node that created the associated drop token. owner: NodeId, - /// Contains the set of pending nodes that still have access to the input - /// associated with a drop token. + /// Contains the set of nodes that have not mapped the input associated + /// with a drop token yet. The shared memory region needs to be kept + /// alive until this list is empty. pending_nodes: BTreeSet, + /// Contains the set of nodes that still have the input data associated + /// with a drop token mapped in their address space. The shared memory + /// region must not be overwritten until this list is empty. + mapped_in_nodes: BTreeSet, } #[derive(Debug)] @@ -2033,8 +2083,8 @@ pub enum DaemonNodeEvent { metadata: metadata::Metadata, data: Option, }, - ReportDrop { - tokens: Vec, + ReportTokenState { + token_events: Vec, }, EventStreamDropped { reply_sender: oneshot::Sender, diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 31b11aa4..efc7ad44 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -5,7 +5,7 @@ use dora_core::{ uhlc, }; use dora_message::{ - common::{DropToken, Timestamped}, + common::{DropTokenState, DropTokenStatus, Timestamped}, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent, NodeEvent}, node_to_daemon::DaemonRequest, DataflowId, @@ -461,13 +461,13 @@ impl Listener { Ok(()) } - async fn report_drop_tokens(&mut self, drop_tokens: Vec) -> eyre::Result<()> { + async fn report_drop_tokens(&mut self, drop_tokens: Vec) -> eyre::Result<()> { if !drop_tokens.is_empty() { let event = Event::Node { dataflow_id: self.dataflow_id, node_id: self.node_id.clone(), - event: DaemonNodeEvent::ReportDrop { - tokens: drop_tokens, + event: DaemonNodeEvent::ReportTokenState { + token_events: drop_tokens, }, }; let event = Timestamped { diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 93e2f8d9..ecb217c8 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -189,6 +189,22 @@ impl fmt::Debug for DataMessage { } } +#[derive( + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +pub struct DropTokenStatus { + pub token: DropToken, + pub state: DropTokenState, +} + +#[derive( + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +pub enum DropTokenState { + Mapped, + Dropped, +} + #[derive( Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, )] diff --git a/libraries/message/src/daemon_to_node.rs b/libraries/message/src/daemon_to_node.rs index acc1630e..c270919c 100644 --- a/libraries/message/src/daemon_to_node.rs +++ b/libraries/message/src/daemon_to_node.rs @@ -74,5 +74,6 @@ pub enum NodeEvent { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum NodeDropEvent { + OutputMapped { drop_token: DropToken }, OutputDropped { drop_token: DropToken }, } diff --git a/libraries/message/src/node_to_daemon.rs b/libraries/message/src/node_to_daemon.rs index bb5a0850..ce2a8a1f 100644 --- a/libraries/message/src/node_to_daemon.rs +++ b/libraries/message/src/node_to_daemon.rs @@ -2,6 +2,7 @@ pub use crate::common::{ DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped, }; use crate::{ + common::DropTokenStatus, current_crate_version, id::{DataId, NodeId}, metadata::Metadata, @@ -22,10 +23,10 @@ pub enum DaemonRequest { /// required drop tokens. OutputsDone, NextEvent { - drop_tokens: Vec, + drop_tokens: Vec, }, ReportDropTokens { - drop_tokens: Vec, + drop_tokens: Vec, }, SubscribeDrop, NextFinishedDropTokens,