From ea446f910b7363186da8e5a2284ac205c0fcbcd1 Mon Sep 17 00:00:00 2001 From: rozgo Date: Mon, 28 Jul 2025 17:26:18 -0600 Subject: [PATCH] temp fix for avoiding shared mem crash --- apis/rust/node/src/node/mod.rs | 57 +++++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 46ea61ac..32a92f34 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -75,7 +75,7 @@ pub struct DoraNode { control_channel: ControlChannel, clock: Arc, - sent_out_shared_memory: HashMap, + sent_out_shared_memory: HashMap, drop_stream: DropStream, cache: VecDeque, @@ -403,8 +403,9 @@ impl DoraNode { .wrap_err_with(|| format!("failed to send output {output_id}"))?; if let Some((shared_memory, drop_token)) = shmem { + let timestamp = self.clock.new_timestamp(); self.sent_out_shared_memory - .insert(drop_token, shared_memory); + .insert(drop_token, (shared_memory, timestamp)); } Ok(()) @@ -469,6 +470,11 @@ impl DoraNode { } fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result { + // TODO: TEMPORARY - This is a workaround for memory pressure issues + // Need deeper investigation into why drop tokens aren't being processed in time + // First, try to process any pending drop tokens to free up memory + let _ = self.handle_finished_drop_tokens(); + let cache_index = self .cache .iter() @@ -496,10 +502,14 @@ impl DoraNode { } fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> { + // TODO: TEMPORARY - More aggressive cleanup when we have many pending segments + // This is a band-aid solution - need to investigate root cause of memory retention + const MAX_PENDING_SEGMENTS: usize = 100; // Increased limit for high-frequency data + loop { match self.drop_stream.try_recv() { Ok(token) => match self.sent_out_shared_memory.remove(&token) { - Some(region) => self.add_to_cache(region), + Some((region, _timestamp)) => self.add_to_cache(region), None => tracing::warn!("received unknown finished drop token `{token:?}`"), }, Err(flume::TryRecvError::Empty) => break, @@ -508,6 +518,43 @@ impl DoraNode { } } } + + // TODO: TEMPORARY - If we have too many pending segments, force cleanup of the oldest ones + // WARNING: This forceful cleanup might drop segments still in use by receivers + // Need to implement proper reference counting or lifecycle management + if self.sent_out_shared_memory.len() > MAX_PENDING_SEGMENTS { + tracing::warn!( + "Too many pending shared memory segments ({}), forcing cleanup of oldest segments", + self.sent_out_shared_memory.len() + ); + + // TODO: TEMPORARY FIX - Remove oldest entries beyond the limit + // This properly removes the oldest segments based on timestamp, but the root cause + // of why drop tokens aren't being received in time still needs investigation + let to_remove = self.sent_out_shared_memory.len() - MAX_PENDING_SEGMENTS; + + // Collect all entries with their timestamps and sort by age + let mut entries: Vec<_> = self + .sent_out_shared_memory + .iter() + .map(|(token, (_, timestamp))| (*token, *timestamp)) + .collect(); + entries.sort_by_key(|(_, timestamp)| *timestamp); + + // Remove the oldest entries + let keys_to_remove: Vec<_> = entries + .into_iter() + .take(to_remove) + .map(|(token, _)| token) + .collect(); + + for key in keys_to_remove { + if let Some((region, _)) = self.sent_out_shared_memory.remove(&key) { + self.add_to_cache(region); + } + } + } + Ok(()) } @@ -561,7 +608,9 @@ impl Drop for DoraNode { match self.drop_stream.recv_timeout(Duration::from_secs(2)) { Ok(token) => { - self.sent_out_shared_memory.remove(&token); + if let Some((region, _)) = self.sent_out_shared_memory.remove(&token) { + self.add_to_cache(region); + } } Err(flume::RecvTimeoutError::Disconnected) => { tracing::warn!(