diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index 75b3c595..426b344c 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -2,6 +2,7 @@ use std::{ptr::NonNull, sync::Arc}; use aligned_vec::{AVec, ConstAlign}; use dora_arrow_convert::{ArrowData, IntoArrow}; +pub use dora_core::daemon_messages::InputDropReason; use dora_core::{ config::{DataId, OperatorId}, message::{ArrowTypeInfo, BufferOffset, Metadata}, @@ -25,6 +26,10 @@ pub enum Event { id: DataId, }, Error(String), + DroppedInputs { + reason: InputDropReason, + number: usize, + }, } pub enum RawData { diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 9575a8d7..d3985fca 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -175,6 +175,9 @@ impl EventStream { tracing::error!("{err:?}"); Event::Error(err.wrap_err("internal error").to_string()) } + NodeEvent::DroppedInputs { reason, number } => { + Event::DroppedInputs { reason, number } + } }, EventItem::FatalError(err) => { diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 8fd200de..76f5cc04 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -2,8 +2,8 @@ use crate::{DaemonNodeEvent, Event}; use dora_core::{ config::{DataId, LocalCommunicationConfig, NodeId}, daemon_messages::{ - DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeDropEvent, NodeEvent, - Timestamped, + DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, InputDropReason, + NodeDropEvent, NodeEvent, Timestamped, }, message::uhlc, }; @@ -298,7 +298,8 @@ impl Listener { let mut drop_tokens = Vec::new(); // iterate over queued events, newest first - for event in self.queue.iter_mut().rev() { + let mut last_dropped = None; + for (i, event) in self.queue.iter_mut().enumerate().rev() { let Some(Timestamped { inner: NodeEvent::Input { id, data, .. }, .. @@ -313,6 +314,7 @@ impl Listener { drop_tokens.push(drop_token); } *event.as_mut() = None; + last_dropped = Some(i); } Some(size_remaining) => { *size_remaining = size_remaining.saturating_sub(1); @@ -324,8 +326,18 @@ impl Listener { } self.report_drop_tokens(drop_tokens).await?; - if dropped > 0 { + if let Some(last_dropped) = last_dropped { tracing::debug!("dropped {dropped} inputs because event queue was too full"); + // replace last dropped event with `DroppedInputs` event + let entry = &mut self.queue[last_dropped]; + assert!(entry.is_none()); + *entry = Box::new(Some(Timestamped { + inner: NodeEvent::DroppedInputs { + reason: InputDropReason::QueueSize, + number: dropped, + }, + timestamp: self.clock.new_timestamp(), + })); } Ok(()) } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 91c634cc..647750cd 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -160,6 +160,16 @@ pub enum NodeEvent { id: DataId, }, AllInputsClosed, + DroppedInputs { + reason: InputDropReason, + number: usize, + }, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[non_exhaustive] +pub enum InputDropReason { + QueueSize, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]