From 90752a0bb75e46bc6d48b495e88bdf9be582ff75 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 4 Dec 2024 12:10:55 +0100 Subject: [PATCH] Make the non input event a priority when collecting next event --- apis/rust/node/src/event_stream/mod.rs | 10 ++++++++-- apis/rust/node/src/event_stream/scheduler.rs | 18 +++++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index cc3f6a38..4a9da8a6 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -1,4 +1,8 @@ -use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, + time::Duration, +}; use dora_message::{ daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent}, @@ -21,6 +25,7 @@ use self::{ use crate::daemon_connection::DaemonChannel; use dora_core::{ config::{Input, NodeId}, + topics::NON_INPUT_EVENT, uhlc, }; use eyre::{eyre, Context}; @@ -83,10 +88,11 @@ impl EventStream { })? } }; - let queue_size_limit = input_config + let mut queue_size_limit: HashMap = input_config .iter() .map(|(input, config)| (input.clone(), config.queue_size.unwrap_or(1))) .collect(); + queue_size_limit.insert(DataId::from(NON_INPUT_EVENT.to_string()), 100_000); let scheduler = Scheduler::new(queue_size_limit); Self::init_on_channel( diff --git a/apis/rust/node/src/event_stream/scheduler.rs b/apis/rust/node/src/event_stream/scheduler.rs index ed76d2b3..680d5856 100644 --- a/apis/rust/node/src/event_stream/scheduler.rs +++ b/apis/rust/node/src/event_stream/scheduler.rs @@ -1,5 +1,6 @@ use std::collections::{HashMap, VecDeque}; +use dora_core::topics::NON_INPUT_EVENT; use dora_message::{daemon_to_node::NodeEvent, id::DataId}; use super::thread::EventItem; @@ -42,7 +43,7 @@ impl Scheduler { }, ack_channel: _, } => id.clone(), - _ => DataId::from("non_input_event".to_string()), + _ => DataId::from(NON_INPUT_EVENT.to_string()), }; // Enforce queue size limit @@ -54,13 +55,24 @@ impl Scheduler { } else { self.event_queues .insert(event_id.clone(), VecDeque::from([event])); - self.last_used.push_front(event_id.clone()); + if event_id != DataId::from(NON_INPUT_EVENT.to_string()) { + self.last_used.push_front(event_id.clone()); + } } } pub fn next(&mut self) -> Option { - // Process the ID with the oldest timestamp using BTreMap Ordering + // Retreive message from the non input event first that have priority over input messaage. + if let Some(queue) = self + .event_queues + .get_mut(&DataId::from(NON_INPUT_EVENT.to_string())) + { + if let Some(event) = queue.pop_front() { + return Some(event); + } + } + // Process the ID with the oldest timestamp using BTreMap Ordering for (index, id) in self.last_used.clone().iter().enumerate() { if let Some(queue) = self.event_queues.get_mut(id) { if let Some(event) = queue.pop_front() {