@@ -2,8 +2,8 @@ use crate::{DaemonNodeEvent, Event};
use dora_core::{
config::{DataId, LocalCommunicationConfig, NodeId},
daemon_messages::{
DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeDropEvent, NodeEvent ,
Timestamped,
DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, InputDropReason ,
NodeDropEvent, NodeEvent, Timestamped,
},
message::uhlc,
};
@@ -298,7 +298,8 @@ impl Listener {
let mut drop_tokens = Vec::new();
// iterate over queued events, newest first
for event in self.queue.iter_mut().rev() {
let mut last_dropped = None;
for (i, event) in self.queue.iter_mut().enumerate().rev() {
let Some(Timestamped {
inner: NodeEvent::Input { id, data, .. },
..
@@ -313,6 +314,7 @@ impl Listener {
drop_tokens.push(drop_token);
}
*event.as_mut() = None;
last_dropped = Some(i);
}
Some(size_remaining) => {
*size_remaining = size_remaining.saturating_sub(1);
@@ -324,8 +326,18 @@ impl Listener {
}
self.report_drop_tokens(drop_tokens).await?;
if dropped > 0 {
if let Some(last_dropped) = last_dropped {
tracing::debug!("dropped {dropped} inputs because event queue was too full");
// replace last dropped event with `DroppedInputs` event
let entry = &mut self.queue[last_dropped];
assert!(entry.is_none());
*entry = Box::new(Some(Timestamped {
inner: NodeEvent::DroppedInputs {
reason: InputDropReason::QueueSize,
number: dropped,
},
timestamp: self.clock.new_timestamp(),
}));
}
Ok(())
}