| @@ -75,7 +75,7 @@ pub struct DoraNode { | |||||
| control_channel: ControlChannel, | control_channel: ControlChannel, | ||||
| clock: Arc<uhlc::HLC>, | clock: Arc<uhlc::HLC>, | ||||
| sent_out_shared_memory: HashMap<DropToken, ShmemHandle>, | |||||
| sent_out_shared_memory: HashMap<DropToken, (ShmemHandle, uhlc::Timestamp)>, | |||||
| drop_stream: DropStream, | drop_stream: DropStream, | ||||
| cache: VecDeque<ShmemHandle>, | cache: VecDeque<ShmemHandle>, | ||||
| @@ -403,8 +403,9 @@ impl DoraNode { | |||||
| .wrap_err_with(|| format!("failed to send output {output_id}"))?; | .wrap_err_with(|| format!("failed to send output {output_id}"))?; | ||||
| if let Some((shared_memory, drop_token)) = shmem { | if let Some((shared_memory, drop_token)) = shmem { | ||||
| let timestamp = self.clock.new_timestamp(); | |||||
| self.sent_out_shared_memory | self.sent_out_shared_memory | ||||
| .insert(drop_token, shared_memory); | |||||
| .insert(drop_token, (shared_memory, timestamp)); | |||||
| } | } | ||||
| Ok(()) | Ok(()) | ||||
| @@ -469,6 +470,11 @@ impl DoraNode { | |||||
| } | } | ||||
| fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> { | fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> { | ||||
| // TODO: TEMPORARY - This is a workaround for memory pressure issues | |||||
| // Need deeper investigation into why drop tokens aren't being processed in time | |||||
| // First, try to process any pending drop tokens to free up memory | |||||
| let _ = self.handle_finished_drop_tokens(); | |||||
| let cache_index = self | let cache_index = self | ||||
| .cache | .cache | ||||
| .iter() | .iter() | ||||
| @@ -496,10 +502,14 @@ impl DoraNode { | |||||
| } | } | ||||
| fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> { | fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> { | ||||
| // TODO: TEMPORARY - More aggressive cleanup when we have many pending segments | |||||
| // This is a band-aid solution - need to investigate root cause of memory retention | |||||
| const MAX_PENDING_SEGMENTS: usize = 100; // Increased limit for high-frequency data | |||||
| loop { | loop { | ||||
| match self.drop_stream.try_recv() { | match self.drop_stream.try_recv() { | ||||
| Ok(token) => match self.sent_out_shared_memory.remove(&token) { | Ok(token) => match self.sent_out_shared_memory.remove(&token) { | ||||
| Some(region) => self.add_to_cache(region), | |||||
| Some((region, _timestamp)) => self.add_to_cache(region), | |||||
| None => tracing::warn!("received unknown finished drop token `{token:?}`"), | None => tracing::warn!("received unknown finished drop token `{token:?}`"), | ||||
| }, | }, | ||||
| Err(flume::TryRecvError::Empty) => break, | Err(flume::TryRecvError::Empty) => break, | ||||
| @@ -508,6 +518,43 @@ impl DoraNode { | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| // TODO: TEMPORARY - If we have too many pending segments, force cleanup of the oldest ones | |||||
| // WARNING: This forceful cleanup might drop segments still in use by receivers | |||||
| // Need to implement proper reference counting or lifecycle management | |||||
| if self.sent_out_shared_memory.len() > MAX_PENDING_SEGMENTS { | |||||
| tracing::warn!( | |||||
| "Too many pending shared memory segments ({}), forcing cleanup of oldest segments", | |||||
| self.sent_out_shared_memory.len() | |||||
| ); | |||||
| // TODO: TEMPORARY FIX - Remove oldest entries beyond the limit | |||||
| // This properly removes the oldest segments based on timestamp, but the root cause | |||||
| // of why drop tokens aren't being received in time still needs investigation | |||||
| let to_remove = self.sent_out_shared_memory.len() - MAX_PENDING_SEGMENTS; | |||||
| // Collect all entries with their timestamps and sort by age | |||||
| let mut entries: Vec<_> = self | |||||
| .sent_out_shared_memory | |||||
| .iter() | |||||
| .map(|(token, (_, timestamp))| (*token, *timestamp)) | |||||
| .collect(); | |||||
| entries.sort_by_key(|(_, timestamp)| *timestamp); | |||||
| // Remove the oldest entries | |||||
| let keys_to_remove: Vec<_> = entries | |||||
| .into_iter() | |||||
| .take(to_remove) | |||||
| .map(|(token, _)| token) | |||||
| .collect(); | |||||
| for key in keys_to_remove { | |||||
| if let Some((region, _)) = self.sent_out_shared_memory.remove(&key) { | |||||
| self.add_to_cache(region); | |||||
| } | |||||
| } | |||||
| } | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| @@ -561,7 +608,9 @@ impl Drop for DoraNode { | |||||
| match self.drop_stream.recv_timeout(Duration::from_secs(2)) { | match self.drop_stream.recv_timeout(Duration::from_secs(2)) { | ||||
| Ok(token) => { | Ok(token) => { | ||||
| self.sent_out_shared_memory.remove(&token); | |||||
| if let Some((region, _)) = self.sent_out_shared_memory.remove(&token) { | |||||
| self.add_to_cache(region); | |||||
| } | |||||
| } | } | ||||
| Err(flume::RecvTimeoutError::Disconnected) => { | Err(flume::RecvTimeoutError::Disconnected) => { | ||||
| tracing::warn!( | tracing::warn!( | ||||