diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index e3476de8..b6a9ea17 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -142,6 +142,7 @@ fn event_as_input(event: Box) -> eyre::Result { id, metadata: _, data, + dropped, }) = event.0 else { bail!("not an input event"); diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index 426b344c..cec736fc 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -2,7 +2,6 @@ use std::{ptr::NonNull, sync::Arc}; use aligned_vec::{AVec, ConstAlign}; use dora_arrow_convert::{ArrowData, IntoArrow}; -pub use dora_core::daemon_messages::InputDropReason; use dora_core::{ config::{DataId, OperatorId}, message::{ArrowTypeInfo, BufferOffset, Metadata}, @@ -21,15 +20,15 @@ pub enum Event { id: DataId, metadata: Metadata, data: ArrowData, + /// Number of dropped inputs of this ID. + /// + /// Specifies the number of inputs of this ID that were dropped _before_ this input. + dropped: usize, }, InputClosed { id: DataId, }, Error(String), - DroppedInputs { - reason: InputDropReason, - number: usize, - }, } pub enum RawData { diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index d3985fca..af114feb 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -136,7 +136,12 @@ impl EventStream { NodeEvent::Stop => Event::Stop, NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, NodeEvent::InputClosed { id } => Event::InputClosed { id }, - NodeEvent::Input { id, metadata, data } => { + NodeEvent::Input { + id, + metadata, + data, + dropped, + } => { let data = match data { None => Ok(None), Some(daemon_messages::DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), @@ -164,6 +169,7 @@ impl EventStream { id, metadata, data: data.into(), + dropped, }, Err(err) => Event::Error(format!("{err:?}")), } @@ -175,9 +181,6 @@ impl EventStream { tracing::error!("{err:?}"); Event::Error(err.wrap_err("internal error").to_string()) } - NodeEvent::DroppedInputs { reason, number } => { - Event::DroppedInputs { reason, number } - } }, EventItem::FatalError(err) => { diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 8b85ee8e..d7f71ac0 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -984,6 +984,7 @@ impl Daemon { id: input_id.clone(), metadata: metadata.clone(), data: None, + dropped: 0, }, &self.clock, ); @@ -1031,6 +1032,7 @@ impl Daemon { id: input_id.clone(), metadata: metadata.clone(), data: Some(message.clone()), + dropped: 0, }, &self.clock, ); @@ -1162,6 +1164,7 @@ async fn send_output_to_local_receivers( id: input_id.clone(), metadata: metadata.clone(), data: data.clone(), + dropped: 0, }; match channel.send(Timestamped { inner: item, diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 76f5cc04..dcbad385 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -2,8 +2,8 @@ use crate::{DaemonNodeEvent, Event}; use dora_core::{ config::{DataId, LocalCommunicationConfig, NodeId}, daemon_messages::{ - DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, InputDropReason, - NodeDropEvent, NodeEvent, Timestamped, + DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeDropEvent, NodeEvent, + Timestamped, }, message::uhlc, }; @@ -11,7 +11,7 @@ use eyre::{eyre, Context}; use futures::{future, task, Future}; use shared_memory_server::{ShmemConf, ShmemServer}; use std::{ - collections::{BTreeMap, VecDeque}, + collections::{BTreeMap, HashMap, VecDeque}, mem, net::Ipv4Addr, sync::Arc, @@ -151,6 +151,7 @@ struct Listener { queue: VecDeque>>>, queue_sizes: BTreeMap, clock: Arc, + dropped_inputs: HashMap, } impl Listener { @@ -211,6 +212,7 @@ impl Listener { queue_sizes, queue: VecDeque::new(), clock: hlc.clone(), + dropped_inputs: HashMap::new(), }; match listener .run_inner(connection) @@ -281,7 +283,10 @@ impl Listener { async fn handle_events(&mut self) -> eyre::Result<()> { if let Some(events) = &mut self.subscribed_events { - while let Ok(event) = events.try_recv() { + while let Ok(mut event) = events.try_recv() { + if let NodeEvent::Input { id, dropped, .. } = &mut event.inner { + *dropped += self.dropped_inputs.remove(id).unwrap_or_default(); + } self.queue.push_back(Box::new(Some(event))); } @@ -294,14 +299,15 @@ impl Listener { #[tracing::instrument(skip(self), fields(%self.node_id), level = "trace")] async fn drop_oldest_inputs(&mut self) -> Result<(), eyre::ErrReport> { let mut queue_size_remaining = self.queue_sizes.clone(); - let mut dropped = 0; let mut drop_tokens = Vec::new(); // iterate over queued events, newest first - let mut last_dropped = None; - for (i, event) in self.queue.iter_mut().enumerate().rev() { + for event in self.queue.iter_mut().rev() { let Some(Timestamped { - inner: NodeEvent::Input { id, data, .. }, + inner: + NodeEvent::Input { + id, data, dropped, .. + }, .. }) = event.as_mut() else { @@ -309,12 +315,15 @@ impl Listener { }; match queue_size_remaining.get_mut(id) { Some(0) => { - dropped += 1; + self.dropped_inputs + .entry(id.clone()) + .or_default() + .saturating_add(*dropped + 1); + if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) { drop_tokens.push(drop_token); } *event.as_mut() = None; - last_dropped = Some(i); } Some(size_remaining) => { *size_remaining = size_remaining.saturating_sub(1); @@ -326,19 +335,6 @@ impl Listener { } self.report_drop_tokens(drop_tokens).await?; - if let Some(last_dropped) = last_dropped { - tracing::debug!("dropped {dropped} inputs because event queue was too full"); - // replace last dropped event with `DroppedInputs` event - let entry = &mut self.queue[last_dropped]; - assert!(entry.is_none()); - *entry = Box::new(Some(Timestamped { - inner: NodeEvent::DroppedInputs { - reason: InputDropReason::QueueSize, - number: dropped, - }, - timestamp: self.clock.new_timestamp(), - })); - } Ok(()) } diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 308d59f1..bf5934f9 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -248,7 +248,12 @@ async fn run( RuntimeEvent::Event(Event::Reload { operator_id: None }) => { tracing::warn!("Reloading runtime nodes is not supported"); } - RuntimeEvent::Event(Event::Input { id, metadata, data }) => { + RuntimeEvent::Event(Event::Input { + id, + metadata, + data, + dropped, + }) => { let Some((operator_id, input_id)) = id.as_str().split_once('/') else { tracing::warn!("received non-operator input {id}"); continue; @@ -265,6 +270,7 @@ async fn run( id: input_id.clone(), metadata, data, + dropped, }) .await .wrap_err_with(|| { diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 811c3cd0..3c4ca560 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -191,6 +191,7 @@ impl<'lib> SharedLibraryOperator<'lib> { id: input_id, metadata, data, + dropped, } => { let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?; diff --git a/examples/benchmark/sink/src/main.rs b/examples/benchmark/sink/src/main.rs index 154b47d2..34aa84b7 100644 --- a/examples/benchmark/sink/src/main.rs +++ b/examples/benchmark/sink/src/main.rs @@ -20,7 +20,12 @@ fn main() -> eyre::Result<()> { while let Some(event) = events.recv() { match event { - Event::Input { id, metadata, data } => { + Event::Input { + id, + metadata, + data, + dropped, + } => { // check if new size bracket let data_len = data.len(); if data_len != current_size { diff --git a/examples/multiple-daemons/node/src/main.rs b/examples/multiple-daemons/node/src/main.rs index 36f42d57..f4664708 100644 --- a/examples/multiple-daemons/node/src/main.rs +++ b/examples/multiple-daemons/node/src/main.rs @@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> { id, metadata, data: _, + dropped, } => match id.as_str() { "tick" => { let random: u64 = rand::random(); diff --git a/examples/multiple-daemons/sink/src/main.rs b/examples/multiple-daemons/sink/src/main.rs index e180af08..4f4351c5 100644 --- a/examples/multiple-daemons/sink/src/main.rs +++ b/examples/multiple-daemons/sink/src/main.rs @@ -10,6 +10,7 @@ fn main() -> eyre::Result<()> { id, metadata: _, data, + dropped, } => match id.as_str() { "message" => { let received_string: &str = diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index 36f42d57..f4664708 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> { id, metadata, data: _, + dropped, } => match id.as_str() { "tick" => { let random: u64 = rand::random(); diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 647750cd..de4272fb 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -155,21 +155,15 @@ pub enum NodeEvent { id: DataId, metadata: Metadata, data: Option, + /// Number of dropped inputs of this ID. + /// + /// Specifies the number of inputs of this ID that were dropped _before_ this input. + dropped: usize, }, InputClosed { id: DataId, }, AllInputsClosed, - DroppedInputs { - reason: InputDropReason, - number: usize, - }, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -#[non_exhaustive] -pub enum InputDropReason { - QueueSize, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/tool_nodes/dora-record/src/main.rs b/tool_nodes/dora-record/src/main.rs index fb10bf18..116c3339 100644 --- a/tool_nodes/dora-record/src/main.rs +++ b/tool_nodes/dora-record/src/main.rs @@ -25,7 +25,12 @@ async fn main() -> eyre::Result<()> { while let Some(event) = events.recv() { match event { - Event::Input { id, data, metadata } => { + Event::Input { + id, + data, + metadata, + dropped, + } => { match writers.get(&id) { None => { let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false);