Compare commits

...

2 Commits

Author SHA1 Message Date
  Philipp Oppermann 738507fa04
Batch dropping of messages by introducing new queue overshoot parameter 2 years ago
  Philipp Oppermann 0b53324cea
Increase default queue size to 50 2 years ago
1 changed files with 4 additions and 2 deletions
Unified View
  1. +4
    -2
      binaries/daemon/src/listener/mod.rs

+ 4
- 2
binaries/daemon/src/listener/mod.rs View File

@@ -98,6 +98,7 @@ struct Listener<C> {
subscribed_events: Option<flume::Receiver<NodeEvent>>, subscribed_events: Option<flume::Receiver<NodeEvent>>,
queue: Vec<NodeEvent>, queue: Vec<NodeEvent>,
max_queue_len: usize, max_queue_len: usize,
queue_overshoot: usize,
connection: C, connection: C,
} }


@@ -146,7 +147,8 @@ where
daemon_tx, daemon_tx,
shmem_handler_tx, shmem_handler_tx,
subscribed_events: None, subscribed_events: None,
max_queue_len: 10, // TODO: make this configurable
max_queue_len: 50, // TODO: make this configurable
queue_overshoot: 30, // TODO: make this configurable
queue: Vec::new(), queue: Vec::new(),
}; };
match listener.run_inner().await.wrap_err("listener failed") { match listener.run_inner().await.wrap_err("listener failed") {
@@ -218,7 +220,7 @@ where
.filter(|e| matches!(e, NodeEvent::Input { .. })) .filter(|e| matches!(e, NodeEvent::Input { .. }))
.count(); .count();
let drop_n = input_event_count.saturating_sub(self.max_queue_len); let drop_n = input_event_count.saturating_sub(self.max_queue_len);
if drop_n > 0 {
if drop_n > self.queue_overshoot {
self.drop_oldest_inputs(drop_n).await?; self.drop_oldest_inputs(drop_n).await?;
} }
} }


Loading…
Cancel
Save