@@ -11,7 +11,7 @@ use eyre::{eyre, Context};
use futures::{future, task, Future};
use shared_memory_server::{ShmemConf, ShmemServer};
use std::{
collections::{BTreeMap, VecDeque},
collections::{BTreeMap, HashMap, VecDeque},
mem,
net::Ipv4Addr,
sync::Arc,
@@ -151,6 +151,7 @@ struct Listener {
queue: VecDeque<Box<Option<Timestamped<NodeEvent>>>>,
queue_sizes: BTreeMap<DataId, usize>,
clock: Arc<uhlc::HLC>,
dropped_inputs: HashMap<DataId, usize>,
}
impl Listener {
@@ -211,6 +212,7 @@ impl Listener {
queue_sizes,
queue: VecDeque::new(),
clock: hlc.clone(),
dropped_inputs: HashMap::new(),
};
match listener
.run_inner(connection)
@@ -281,7 +283,10 @@ impl Listener {
async fn handle_events(&mut self) -> eyre::Result<()> {
if let Some(events) = &mut self.subscribed_events {
while let Ok(event) = events.try_recv() {
while let Ok(mut event) = events.try_recv() {
if let NodeEvent::Input { id, dropped, .. } = &mut event.inner {
*dropped += self.dropped_inputs.remove(id).unwrap_or_default();
}
self.queue.push_back(Box::new(Some(event)));
}
@@ -294,13 +299,15 @@ impl Listener {
#[tracing::instrument(skip(self), fields(%self.node_id), level = "trace")]
async fn drop_oldest_inputs(&mut self) -> Result<(), eyre::ErrReport> {
let mut queue_size_remaining = self.queue_sizes.clone();
let mut dropped = 0;
let mut drop_tokens = Vec::new();
// iterate over queued events, newest first
for event in self.queue.iter_mut().rev() {
let Some(Timestamped {
inner: NodeEvent::Input { id, data, .. },
inner:
NodeEvent::Input {
id, data, dropped, ..
},
..
}) = event.as_mut()
else {
@@ -308,7 +315,8 @@ impl Listener {
};
match queue_size_remaining.get_mut(id) {
Some(0) => {
dropped += 1;
*self.dropped_inputs.entry(id.clone()).or_default() += *dropped + 1;
if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) {
drop_tokens.push(drop_token);
}
@@ -324,9 +332,6 @@ impl Listener {
}
self.report_drop_tokens(drop_tokens).await?;
if dropped > 0 {
tracing::debug!("dropped {dropped} inputs because event queue was too full");
}
Ok(())
}