Instead of adding a new ˋDroppedInputsˋ event.dropped-inputs-event
| @@ -142,6 +142,7 @@ fn event_as_input(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraInput> { | |||
| id, | |||
| metadata: _, | |||
| data, | |||
| dropped, | |||
| }) = event.0 | |||
| else { | |||
| bail!("not an input event"); | |||
| @@ -2,7 +2,6 @@ use std::{ptr::NonNull, sync::Arc}; | |||
| use aligned_vec::{AVec, ConstAlign}; | |||
| use dora_arrow_convert::{ArrowData, IntoArrow}; | |||
| pub use dora_core::daemon_messages::InputDropReason; | |||
| use dora_core::{ | |||
| config::{DataId, OperatorId}, | |||
| message::{ArrowTypeInfo, BufferOffset, Metadata}, | |||
| @@ -21,15 +20,15 @@ pub enum Event { | |||
| id: DataId, | |||
| metadata: Metadata, | |||
| data: ArrowData, | |||
| /// Number of dropped inputs of this ID. | |||
| /// | |||
| /// Specifies the number of inputs of this ID that were dropped _before_ this input. | |||
| dropped: usize, | |||
| }, | |||
| InputClosed { | |||
| id: DataId, | |||
| }, | |||
| Error(String), | |||
| DroppedInputs { | |||
| reason: InputDropReason, | |||
| number: usize, | |||
| }, | |||
| } | |||
| pub enum RawData { | |||
| @@ -136,7 +136,12 @@ impl EventStream { | |||
| NodeEvent::Stop => Event::Stop, | |||
| NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, | |||
| NodeEvent::InputClosed { id } => Event::InputClosed { id }, | |||
| NodeEvent::Input { id, metadata, data } => { | |||
| NodeEvent::Input { | |||
| id, | |||
| metadata, | |||
| data, | |||
| dropped, | |||
| } => { | |||
| let data = match data { | |||
| None => Ok(None), | |||
| Some(daemon_messages::DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), | |||
| @@ -164,6 +169,7 @@ impl EventStream { | |||
| id, | |||
| metadata, | |||
| data: data.into(), | |||
| dropped, | |||
| }, | |||
| Err(err) => Event::Error(format!("{err:?}")), | |||
| } | |||
| @@ -175,9 +181,6 @@ impl EventStream { | |||
| tracing::error!("{err:?}"); | |||
| Event::Error(err.wrap_err("internal error").to_string()) | |||
| } | |||
| NodeEvent::DroppedInputs { reason, number } => { | |||
| Event::DroppedInputs { reason, number } | |||
| } | |||
| }, | |||
| EventItem::FatalError(err) => { | |||
| @@ -984,6 +984,7 @@ impl Daemon { | |||
| id: input_id.clone(), | |||
| metadata: metadata.clone(), | |||
| data: None, | |||
| dropped: 0, | |||
| }, | |||
| &self.clock, | |||
| ); | |||
| @@ -1031,6 +1032,7 @@ impl Daemon { | |||
| id: input_id.clone(), | |||
| metadata: metadata.clone(), | |||
| data: Some(message.clone()), | |||
| dropped: 0, | |||
| }, | |||
| &self.clock, | |||
| ); | |||
| @@ -1162,6 +1164,7 @@ async fn send_output_to_local_receivers( | |||
| id: input_id.clone(), | |||
| metadata: metadata.clone(), | |||
| data: data.clone(), | |||
| dropped: 0, | |||
| }; | |||
| match channel.send(Timestamped { | |||
| inner: item, | |||
| @@ -2,8 +2,8 @@ use crate::{DaemonNodeEvent, Event}; | |||
| use dora_core::{ | |||
| config::{DataId, LocalCommunicationConfig, NodeId}, | |||
| daemon_messages::{ | |||
| DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, InputDropReason, | |||
| NodeDropEvent, NodeEvent, Timestamped, | |||
| DaemonCommunication, DaemonReply, DaemonRequest, DataflowId, NodeDropEvent, NodeEvent, | |||
| Timestamped, | |||
| }, | |||
| message::uhlc, | |||
| }; | |||
| @@ -11,7 +11,7 @@ use eyre::{eyre, Context}; | |||
| use futures::{future, task, Future}; | |||
| use shared_memory_server::{ShmemConf, ShmemServer}; | |||
| use std::{ | |||
| collections::{BTreeMap, VecDeque}, | |||
| collections::{BTreeMap, HashMap, VecDeque}, | |||
| mem, | |||
| net::Ipv4Addr, | |||
| sync::Arc, | |||
| @@ -151,6 +151,7 @@ struct Listener { | |||
| queue: VecDeque<Box<Option<Timestamped<NodeEvent>>>>, | |||
| queue_sizes: BTreeMap<DataId, usize>, | |||
| clock: Arc<uhlc::HLC>, | |||
| dropped_inputs: HashMap<DataId, usize>, | |||
| } | |||
| impl Listener { | |||
| @@ -211,6 +212,7 @@ impl Listener { | |||
| queue_sizes, | |||
| queue: VecDeque::new(), | |||
| clock: hlc.clone(), | |||
| dropped_inputs: HashMap::new(), | |||
| }; | |||
| match listener | |||
| .run_inner(connection) | |||
| @@ -281,7 +283,10 @@ impl Listener { | |||
| async fn handle_events(&mut self) -> eyre::Result<()> { | |||
| if let Some(events) = &mut self.subscribed_events { | |||
| while let Ok(event) = events.try_recv() { | |||
| while let Ok(mut event) = events.try_recv() { | |||
| if let NodeEvent::Input { id, dropped, .. } = &mut event.inner { | |||
| *dropped += self.dropped_inputs.remove(id).unwrap_or_default(); | |||
| } | |||
| self.queue.push_back(Box::new(Some(event))); | |||
| } | |||
| @@ -294,14 +299,15 @@ impl Listener { | |||
| #[tracing::instrument(skip(self), fields(%self.node_id), level = "trace")] | |||
| async fn drop_oldest_inputs(&mut self) -> Result<(), eyre::ErrReport> { | |||
| let mut queue_size_remaining = self.queue_sizes.clone(); | |||
| let mut dropped = 0; | |||
| let mut drop_tokens = Vec::new(); | |||
| // iterate over queued events, newest first | |||
| let mut last_dropped = None; | |||
| for (i, event) in self.queue.iter_mut().enumerate().rev() { | |||
| for event in self.queue.iter_mut().rev() { | |||
| let Some(Timestamped { | |||
| inner: NodeEvent::Input { id, data, .. }, | |||
| inner: | |||
| NodeEvent::Input { | |||
| id, data, dropped, .. | |||
| }, | |||
| .. | |||
| }) = event.as_mut() | |||
| else { | |||
| @@ -309,12 +315,15 @@ impl Listener { | |||
| }; | |||
| match queue_size_remaining.get_mut(id) { | |||
| Some(0) => { | |||
| dropped += 1; | |||
| self.dropped_inputs | |||
| .entry(id.clone()) | |||
| .or_default() | |||
| .saturating_add(*dropped + 1); | |||
| if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) { | |||
| drop_tokens.push(drop_token); | |||
| } | |||
| *event.as_mut() = None; | |||
| last_dropped = Some(i); | |||
| } | |||
| Some(size_remaining) => { | |||
| *size_remaining = size_remaining.saturating_sub(1); | |||
| @@ -326,19 +335,6 @@ impl Listener { | |||
| } | |||
| self.report_drop_tokens(drop_tokens).await?; | |||
| if let Some(last_dropped) = last_dropped { | |||
| tracing::debug!("dropped {dropped} inputs because event queue was too full"); | |||
| // replace last dropped event with `DroppedInputs` event | |||
| let entry = &mut self.queue[last_dropped]; | |||
| assert!(entry.is_none()); | |||
| *entry = Box::new(Some(Timestamped { | |||
| inner: NodeEvent::DroppedInputs { | |||
| reason: InputDropReason::QueueSize, | |||
| number: dropped, | |||
| }, | |||
| timestamp: self.clock.new_timestamp(), | |||
| })); | |||
| } | |||
| Ok(()) | |||
| } | |||
| @@ -248,7 +248,12 @@ async fn run( | |||
| RuntimeEvent::Event(Event::Reload { operator_id: None }) => { | |||
| tracing::warn!("Reloading runtime nodes is not supported"); | |||
| } | |||
| RuntimeEvent::Event(Event::Input { id, metadata, data }) => { | |||
| RuntimeEvent::Event(Event::Input { | |||
| id, | |||
| metadata, | |||
| data, | |||
| dropped, | |||
| }) => { | |||
| let Some((operator_id, input_id)) = id.as_str().split_once('/') else { | |||
| tracing::warn!("received non-operator input {id}"); | |||
| continue; | |||
| @@ -265,6 +270,7 @@ async fn run( | |||
| id: input_id.clone(), | |||
| metadata, | |||
| data, | |||
| dropped, | |||
| }) | |||
| .await | |||
| .wrap_err_with(|| { | |||
| @@ -191,6 +191,7 @@ impl<'lib> SharedLibraryOperator<'lib> { | |||
| id: input_id, | |||
| metadata, | |||
| data, | |||
| dropped, | |||
| } => { | |||
| let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?; | |||
| @@ -20,7 +20,12 @@ fn main() -> eyre::Result<()> { | |||
| while let Some(event) = events.recv() { | |||
| match event { | |||
| Event::Input { id, metadata, data } => { | |||
| Event::Input { | |||
| id, | |||
| metadata, | |||
| data, | |||
| dropped, | |||
| } => { | |||
| // check if new size bracket | |||
| let data_len = data.len(); | |||
| if data_len != current_size { | |||
| @@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> { | |||
| id, | |||
| metadata, | |||
| data: _, | |||
| dropped, | |||
| } => match id.as_str() { | |||
| "tick" => { | |||
| let random: u64 = rand::random(); | |||
| @@ -10,6 +10,7 @@ fn main() -> eyre::Result<()> { | |||
| id, | |||
| metadata: _, | |||
| data, | |||
| dropped, | |||
| } => match id.as_str() { | |||
| "message" => { | |||
| let received_string: &str = | |||
| @@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> { | |||
| id, | |||
| metadata, | |||
| data: _, | |||
| dropped, | |||
| } => match id.as_str() { | |||
| "tick" => { | |||
| let random: u64 = rand::random(); | |||
| @@ -155,21 +155,15 @@ pub enum NodeEvent { | |||
| id: DataId, | |||
| metadata: Metadata, | |||
| data: Option<DataMessage>, | |||
| /// Number of dropped inputs of this ID. | |||
| /// | |||
| /// Specifies the number of inputs of this ID that were dropped _before_ this input. | |||
| dropped: usize, | |||
| }, | |||
| InputClosed { | |||
| id: DataId, | |||
| }, | |||
| AllInputsClosed, | |||
| DroppedInputs { | |||
| reason: InputDropReason, | |||
| number: usize, | |||
| }, | |||
| } | |||
| #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] | |||
| #[non_exhaustive] | |||
| pub enum InputDropReason { | |||
| QueueSize, | |||
| } | |||
| #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] | |||
| @@ -25,7 +25,12 @@ async fn main() -> eyre::Result<()> { | |||
| while let Some(event) = events.recv() { | |||
| match event { | |||
| Event::Input { id, data, metadata } => { | |||
| Event::Input { | |||
| id, | |||
| data, | |||
| metadata, | |||
| dropped, | |||
| } => { | |||
| match writers.get(&id) { | |||
| None => { | |||
| let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false); | |||