| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
d2dacc2832
|
Adjust template for non-exhaustive input event | 1 year ago |
|
|
fa73ea1596
|
Make `Event::Input` `non_exhaustive`
Allows us to add more fields in the future. This is a breaking change. |
1 year ago |
|
|
7ea74d049d
|
Report dropped inputs in input event
Instead of adding a new ˋDroppedInputsˋ event. |
1 year ago |
|
|
0159da4aee
|
Add a new event type to notify nodes about dropped inputs
This gives nodes a way to check whether some inputs have been dropped by dora. The event also includes a drop reason, which is currently only the queue size. |
1 year ago |
| @@ -142,6 +142,7 @@ fn event_as_input(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraInput> { | |||||
| id, | id, | ||||
| metadata: _, | metadata: _, | ||||
| data, | data, | ||||
| .. | |||||
| }) = event.0 | }) = event.0 | ||||
| else { | else { | ||||
| bail!("not an input event"); | bail!("not an input event"); | ||||
| @@ -16,10 +16,15 @@ pub enum Event { | |||||
| Reload { | Reload { | ||||
| operator_id: Option<OperatorId>, | operator_id: Option<OperatorId>, | ||||
| }, | }, | ||||
| #[non_exhaustive] | |||||
| Input { | Input { | ||||
| id: DataId, | id: DataId, | ||||
| metadata: Metadata, | metadata: Metadata, | ||||
| data: ArrowData, | data: ArrowData, | ||||
| /// Number of dropped inputs of this ID. | |||||
| /// | |||||
| /// Specifies the number of inputs of this ID that were dropped _before_ this input. | |||||
| dropped: usize, | |||||
| }, | }, | ||||
| InputClosed { | InputClosed { | ||||
| id: DataId, | id: DataId, | ||||
| @@ -27,6 +32,17 @@ pub enum Event { | |||||
| Error(String), | Error(String), | ||||
| } | } | ||||
| impl Event { | |||||
| pub fn new_input(id: DataId, metadata: Metadata, data: ArrowData) -> Event { | |||||
| Event::Input { | |||||
| id, | |||||
| metadata, | |||||
| data, | |||||
| dropped: 0, | |||||
| } | |||||
| } | |||||
| } | |||||
| pub enum RawData { | pub enum RawData { | ||||
| Empty, | Empty, | ||||
| Vec(AVec<u8, ConstAlign<128>>), | Vec(AVec<u8, ConstAlign<128>>), | ||||
| @@ -136,7 +136,12 @@ impl EventStream { | |||||
| NodeEvent::Stop => Event::Stop, | NodeEvent::Stop => Event::Stop, | ||||
| NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, | NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, | ||||
| NodeEvent::InputClosed { id } => Event::InputClosed { id }, | NodeEvent::InputClosed { id } => Event::InputClosed { id }, | ||||
| NodeEvent::Input { id, metadata, data } => { | |||||
| NodeEvent::Input { | |||||
| id, | |||||
| metadata, | |||||
| data, | |||||
| dropped, | |||||
| } => { | |||||
| let data = match data { | let data = match data { | ||||
| None => Ok(None), | None => Ok(None), | ||||
| Some(daemon_messages::DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), | Some(daemon_messages::DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), | ||||
| @@ -164,6 +169,7 @@ impl EventStream { | |||||
| id, | id, | ||||
| metadata, | metadata, | ||||
| data: data.into(), | data: data.into(), | ||||
| dropped, | |||||
| }, | }, | ||||
| Err(err) => Event::Error(format!("{err:?}")), | Err(err) => Event::Error(format!("{err:?}")), | ||||
| } | } | ||||
| @@ -10,6 +10,7 @@ fn main() -> Result<(), Box<dyn Error>> { | |||||
| id, | id, | ||||
| metadata, | metadata, | ||||
| data: _, | data: _, | ||||
| .. | |||||
| } => match id.as_str() { | } => match id.as_str() { | ||||
| other => eprintln!("Received input `{other}`"), | other => eprintln!("Received input `{other}`"), | ||||
| }, | }, | ||||
| @@ -984,6 +984,7 @@ impl Daemon { | |||||
| id: input_id.clone(), | id: input_id.clone(), | ||||
| metadata: metadata.clone(), | metadata: metadata.clone(), | ||||
| data: None, | data: None, | ||||
| dropped: 0, | |||||
| }, | }, | ||||
| &self.clock, | &self.clock, | ||||
| ); | ); | ||||
| @@ -1031,6 +1032,7 @@ impl Daemon { | |||||
| id: input_id.clone(), | id: input_id.clone(), | ||||
| metadata: metadata.clone(), | metadata: metadata.clone(), | ||||
| data: Some(message.clone()), | data: Some(message.clone()), | ||||
| dropped: 0, | |||||
| }, | }, | ||||
| &self.clock, | &self.clock, | ||||
| ); | ); | ||||
| @@ -1162,6 +1164,7 @@ async fn send_output_to_local_receivers( | |||||
| id: input_id.clone(), | id: input_id.clone(), | ||||
| metadata: metadata.clone(), | metadata: metadata.clone(), | ||||
| data: data.clone(), | data: data.clone(), | ||||
| dropped: 0, | |||||
| }; | }; | ||||
| match channel.send(Timestamped { | match channel.send(Timestamped { | ||||
| inner: item, | inner: item, | ||||
| @@ -11,7 +11,7 @@ use eyre::{eyre, Context}; | |||||
| use futures::{future, task, Future}; | use futures::{future, task, Future}; | ||||
| use shared_memory_server::{ShmemConf, ShmemServer}; | use shared_memory_server::{ShmemConf, ShmemServer}; | ||||
| use std::{ | use std::{ | ||||
| collections::{BTreeMap, VecDeque}, | |||||
| collections::{BTreeMap, HashMap, VecDeque}, | |||||
| mem, | mem, | ||||
| net::Ipv4Addr, | net::Ipv4Addr, | ||||
| sync::Arc, | sync::Arc, | ||||
| @@ -151,6 +151,7 @@ struct Listener { | |||||
| queue: VecDeque<Box<Option<Timestamped<NodeEvent>>>>, | queue: VecDeque<Box<Option<Timestamped<NodeEvent>>>>, | ||||
| queue_sizes: BTreeMap<DataId, usize>, | queue_sizes: BTreeMap<DataId, usize>, | ||||
| clock: Arc<uhlc::HLC>, | clock: Arc<uhlc::HLC>, | ||||
| dropped_inputs: HashMap<DataId, usize>, | |||||
| } | } | ||||
| impl Listener { | impl Listener { | ||||
| @@ -211,6 +212,7 @@ impl Listener { | |||||
| queue_sizes, | queue_sizes, | ||||
| queue: VecDeque::new(), | queue: VecDeque::new(), | ||||
| clock: hlc.clone(), | clock: hlc.clone(), | ||||
| dropped_inputs: HashMap::new(), | |||||
| }; | }; | ||||
| match listener | match listener | ||||
| .run_inner(connection) | .run_inner(connection) | ||||
| @@ -281,7 +283,10 @@ impl Listener { | |||||
| async fn handle_events(&mut self) -> eyre::Result<()> { | async fn handle_events(&mut self) -> eyre::Result<()> { | ||||
| if let Some(events) = &mut self.subscribed_events { | if let Some(events) = &mut self.subscribed_events { | ||||
| while let Ok(event) = events.try_recv() { | |||||
| while let Ok(mut event) = events.try_recv() { | |||||
| if let NodeEvent::Input { id, dropped, .. } = &mut event.inner { | |||||
| *dropped += self.dropped_inputs.remove(id).unwrap_or_default(); | |||||
| } | |||||
| self.queue.push_back(Box::new(Some(event))); | self.queue.push_back(Box::new(Some(event))); | ||||
| } | } | ||||
| @@ -294,13 +299,15 @@ impl Listener { | |||||
| #[tracing::instrument(skip(self), fields(%self.node_id), level = "trace")] | #[tracing::instrument(skip(self), fields(%self.node_id), level = "trace")] | ||||
| async fn drop_oldest_inputs(&mut self) -> Result<(), eyre::ErrReport> { | async fn drop_oldest_inputs(&mut self) -> Result<(), eyre::ErrReport> { | ||||
| let mut queue_size_remaining = self.queue_sizes.clone(); | let mut queue_size_remaining = self.queue_sizes.clone(); | ||||
| let mut dropped = 0; | |||||
| let mut drop_tokens = Vec::new(); | let mut drop_tokens = Vec::new(); | ||||
| // iterate over queued events, newest first | // iterate over queued events, newest first | ||||
| for event in self.queue.iter_mut().rev() { | for event in self.queue.iter_mut().rev() { | ||||
| let Some(Timestamped { | let Some(Timestamped { | ||||
| inner: NodeEvent::Input { id, data, .. }, | |||||
| inner: | |||||
| NodeEvent::Input { | |||||
| id, data, dropped, .. | |||||
| }, | |||||
| .. | .. | ||||
| }) = event.as_mut() | }) = event.as_mut() | ||||
| else { | else { | ||||
| @@ -308,7 +315,8 @@ impl Listener { | |||||
| }; | }; | ||||
| match queue_size_remaining.get_mut(id) { | match queue_size_remaining.get_mut(id) { | ||||
| Some(0) => { | Some(0) => { | ||||
| dropped += 1; | |||||
| *self.dropped_inputs.entry(id.clone()).or_default() += *dropped + 1; | |||||
| if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) { | if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) { | ||||
| drop_tokens.push(drop_token); | drop_tokens.push(drop_token); | ||||
| } | } | ||||
| @@ -324,9 +332,6 @@ impl Listener { | |||||
| } | } | ||||
| self.report_drop_tokens(drop_tokens).await?; | self.report_drop_tokens(drop_tokens).await?; | ||||
| if dropped > 0 { | |||||
| tracing::debug!("dropped {dropped} inputs because event queue was too full"); | |||||
| } | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| @@ -248,7 +248,9 @@ async fn run( | |||||
| RuntimeEvent::Event(Event::Reload { operator_id: None }) => { | RuntimeEvent::Event(Event::Reload { operator_id: None }) => { | ||||
| tracing::warn!("Reloading runtime nodes is not supported"); | tracing::warn!("Reloading runtime nodes is not supported"); | ||||
| } | } | ||||
| RuntimeEvent::Event(Event::Input { id, metadata, data }) => { | |||||
| RuntimeEvent::Event(Event::Input { | |||||
| id, metadata, data, .. | |||||
| }) => { | |||||
| let Some((operator_id, input_id)) = id.as_str().split_once('/') else { | let Some((operator_id, input_id)) = id.as_str().split_once('/') else { | ||||
| tracing::warn!("received non-operator input {id}"); | tracing::warn!("received non-operator input {id}"); | ||||
| continue; | continue; | ||||
| @@ -261,11 +263,7 @@ async fn run( | |||||
| }; | }; | ||||
| if let Err(err) = operator_channel | if let Err(err) = operator_channel | ||||
| .send_async(Event::Input { | |||||
| id: input_id.clone(), | |||||
| metadata, | |||||
| data, | |||||
| }) | |||||
| .send_async(Event::new_input(input_id.clone(), metadata, data)) | |||||
| .await | .await | ||||
| .wrap_err_with(|| { | .wrap_err_with(|| { | ||||
| format!("failed to send input `{input_id}` to operator `{operator_id}`") | format!("failed to send input `{input_id}` to operator `{operator_id}`") | ||||
| @@ -191,6 +191,7 @@ impl<'lib> SharedLibraryOperator<'lib> { | |||||
| id: input_id, | id: input_id, | ||||
| metadata, | metadata, | ||||
| data, | data, | ||||
| .. | |||||
| } => { | } => { | ||||
| let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?; | let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?; | ||||
| @@ -20,7 +20,9 @@ fn main() -> eyre::Result<()> { | |||||
| while let Some(event) = events.recv() { | while let Some(event) = events.recv() { | ||||
| match event { | match event { | ||||
| Event::Input { id, metadata, data } => { | |||||
| Event::Input { | |||||
| id, metadata, data, .. | |||||
| } => { | |||||
| // check if new size bracket | // check if new size bracket | ||||
| let data_len = data.len(); | let data_len = data.len(); | ||||
| if data_len != current_size { | if data_len != current_size { | ||||
| @@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> { | |||||
| id, | id, | ||||
| metadata, | metadata, | ||||
| data: _, | data: _, | ||||
| .. | |||||
| } => match id.as_str() { | } => match id.as_str() { | ||||
| "tick" => { | "tick" => { | ||||
| let random: u64 = rand::random(); | let random: u64 = rand::random(); | ||||
| @@ -10,6 +10,7 @@ fn main() -> eyre::Result<()> { | |||||
| id, | id, | ||||
| metadata: _, | metadata: _, | ||||
| data, | data, | ||||
| .. | |||||
| } => match id.as_str() { | } => match id.as_str() { | ||||
| "message" => { | "message" => { | ||||
| let received_string: &str = | let received_string: &str = | ||||
| @@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> { | |||||
| id, | id, | ||||
| metadata, | metadata, | ||||
| data: _, | data: _, | ||||
| .. | |||||
| } => match id.as_str() { | } => match id.as_str() { | ||||
| "tick" => { | "tick" => { | ||||
| let random: u64 = rand::random(); | let random: u64 = rand::random(); | ||||
| @@ -10,6 +10,7 @@ fn main() -> eyre::Result<()> { | |||||
| id, | id, | ||||
| metadata: _, | metadata: _, | ||||
| data, | data, | ||||
| .. | |||||
| } => match id.as_str() { | } => match id.as_str() { | ||||
| "message" => { | "message" => { | ||||
| let received_string: &str = | let received_string: &str = | ||||
| @@ -10,7 +10,9 @@ fn main() -> eyre::Result<()> { | |||||
| let mut ticks = 0; | let mut ticks = 0; | ||||
| while let Some(event) = events.recv() { | while let Some(event) = events.recv() { | ||||
| match event { | match event { | ||||
| Event::Input { id, metadata, data } => match id.as_ref() { | |||||
| Event::Input { | |||||
| id, metadata, data, .. | |||||
| } => match id.as_ref() { | |||||
| "tick" => { | "tick" => { | ||||
| ticks += 1; | ticks += 1; | ||||
| } | } | ||||
| @@ -155,6 +155,10 @@ pub enum NodeEvent { | |||||
| id: DataId, | id: DataId, | ||||
| metadata: Metadata, | metadata: Metadata, | ||||
| data: Option<DataMessage>, | data: Option<DataMessage>, | ||||
| /// Number of dropped inputs of this ID. | |||||
| /// | |||||
| /// Specifies the number of inputs of this ID that were dropped _before_ this input. | |||||
| dropped: usize, | |||||
| }, | }, | ||||
| InputClosed { | InputClosed { | ||||
| id: DataId, | id: DataId, | ||||
| @@ -25,7 +25,9 @@ async fn main() -> eyre::Result<()> { | |||||
| while let Some(event) = events.recv() { | while let Some(event) = events.recv() { | ||||
| match event { | match event { | ||||
| Event::Input { id, data, metadata } => { | |||||
| Event::Input { | |||||
| id, data, metadata, .. | |||||
| } => { | |||||
| match writers.get(&id) { | match writers.get(&id) { | ||||
| None => { | None => { | ||||
| let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false); | let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false); | ||||
| @@ -43,6 +43,7 @@ fn main() -> Result<()> { | |||||
| id, | id, | ||||
| data, | data, | ||||
| metadata: _, | metadata: _, | ||||
| .. | |||||
| } = event | } = event | ||||
| { | { | ||||
| if id.as_str().contains("image") { | if id.as_str().contains("image") { | ||||