Browse Source

Make the non input event a priority when collecting next event

tags/0.3.8-rc
haixuanTao 1 year ago
parent
commit
90752a0bb7
2 changed files with 23 additions and 5 deletions
  1. +8
    -2
      apis/rust/node/src/event_stream/mod.rs
  2. +15
    -3
      apis/rust/node/src/event_stream/scheduler.rs

+ 8
- 2
apis/rust/node/src/event_stream/mod.rs View File

@@ -1,4 +1,8 @@
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Duration,
};

use dora_message::{
daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
@@ -21,6 +25,7 @@ use self::{
use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::{Input, NodeId},
topics::NON_INPUT_EVENT,
uhlc,
};
use eyre::{eyre, Context};
@@ -83,10 +88,11 @@ impl EventStream {
})?
}
};
let queue_size_limit = input_config
let mut queue_size_limit: HashMap<DataId, usize> = input_config
.iter()
.map(|(input, config)| (input.clone(), config.queue_size.unwrap_or(1)))
.collect();
queue_size_limit.insert(DataId::from(NON_INPUT_EVENT.to_string()), 100_000);
let scheduler = Scheduler::new(queue_size_limit);

Self::init_on_channel(


+ 15
- 3
apis/rust/node/src/event_stream/scheduler.rs View File

@@ -1,5 +1,6 @@
use std::collections::{HashMap, VecDeque};

use dora_core::topics::NON_INPUT_EVENT;
use dora_message::{daemon_to_node::NodeEvent, id::DataId};

use super::thread::EventItem;
@@ -42,7 +43,7 @@ impl Scheduler {
},
ack_channel: _,
} => id.clone(),
_ => DataId::from("non_input_event".to_string()),
_ => DataId::from(NON_INPUT_EVENT.to_string()),
};

// Enforce queue size limit
@@ -54,13 +55,24 @@ impl Scheduler {
} else {
self.event_queues
.insert(event_id.clone(), VecDeque::from([event]));
self.last_used.push_front(event_id.clone());
if event_id != DataId::from(NON_INPUT_EVENT.to_string()) {
self.last_used.push_front(event_id.clone());
}
}
}

pub fn next(&mut self) -> Option<EventItem> {
// Process the ID with the oldest timestamp using BTreMap Ordering
// Retreive message from the non input event first that have priority over input messaage.
if let Some(queue) = self
.event_queues
.get_mut(&DataId::from(NON_INPUT_EVENT.to_string()))
{
if let Some(event) = queue.pop_front() {
return Some(event);
}
}

// Process the ID with the oldest timestamp using BTreMap Ordering
for (index, id) in self.last_used.clone().iter().enumerate() {
if let Some(queue) = self.event_queues.get_mut(id) {
if let Some(event) = queue.pop_front() {


Loading…
Cancel
Save