Compare commits

...

4 Commits

Author SHA1 Message Date
  Philipp Oppermann d2dacc2832
Adjust template for non-exhaustive input event 1 year ago
  Philipp Oppermann fa73ea1596
Make `Event::Input` `non_exhaustive` 1 year ago
  Philipp Oppermann 7ea74d049d
Report dropped inputs in input event 1 year ago
  Philipp Oppermann 0159da4aee
Add a new event type to notify nodes about dropped inputs 1 year ago
17 changed files with 64 additions and 18 deletions
Split View
  1. +1
    -0
      apis/c++/node/src/lib.rs
  2. +16
    -0
      apis/rust/node/src/event_stream/event.rs
  3. +7
    -1
      apis/rust/node/src/event_stream/mod.rs
  4. +1
    -0
      binaries/cli/src/template/rust/node/main-template.rs
  5. +3
    -0
      binaries/daemon/src/lib.rs
  6. +13
    -8
      binaries/daemon/src/node_communication/mod.rs
  7. +4
    -6
      binaries/runtime/src/lib.rs
  8. +1
    -0
      binaries/runtime/src/operator/shared_lib.rs
  9. +3
    -1
      examples/benchmark/sink/src/main.rs
  10. +1
    -0
      examples/multiple-daemons/node/src/main.rs
  11. +1
    -0
      examples/multiple-daemons/sink/src/main.rs
  12. +1
    -0
      examples/rust-dataflow/node/src/main.rs
  13. +1
    -0
      examples/rust-dataflow/sink/src/main.rs
  14. +3
    -1
      examples/rust-dataflow/status-node/src/main.rs
  15. +4
    -0
      libraries/core/src/daemon_messages.rs
  16. +3
    -1
      tool_nodes/dora-record/src/main.rs
  17. +1
    -0
      tool_nodes/dora-rerun/src/main.rs

+ 1
- 0
apis/c++/node/src/lib.rs View File

@@ -142,6 +142,7 @@ fn event_as_input(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraInput> {
id,
metadata: _,
data,
..
}) = event.0
else {
bail!("not an input event");


+ 16
- 0
apis/rust/node/src/event_stream/event.rs View File

@@ -16,10 +16,15 @@ pub enum Event {
Reload {
operator_id: Option<OperatorId>,
},
#[non_exhaustive]
Input {
id: DataId,
metadata: Metadata,
data: ArrowData,
/// Number of dropped inputs of this ID.
///
/// Specifies the number of inputs of this ID that were dropped _before_ this input.
dropped: usize,
},
InputClosed {
id: DataId,
@@ -27,6 +32,17 @@ pub enum Event {
Error(String),
}

impl Event {
pub fn new_input(id: DataId, metadata: Metadata, data: ArrowData) -> Event {
Event::Input {
id,
metadata,
data,
dropped: 0,
}
}
}

pub enum RawData {
Empty,
Vec(AVec<u8, ConstAlign<128>>),


+ 7
- 1
apis/rust/node/src/event_stream/mod.rs View File

@@ -136,7 +136,12 @@ impl EventStream {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
NodeEvent::Input {
id,
metadata,
data,
dropped,
} => {
let data = match data {
None => Ok(None),
Some(daemon_messages::DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
@@ -164,6 +169,7 @@ impl EventStream {
id,
metadata,
data: data.into(),
dropped,
},
Err(err) => Event::Error(format!("{err:?}")),
}


+ 1
- 0
binaries/cli/src/template/rust/node/main-template.rs View File

@@ -10,6 +10,7 @@ fn main() -> Result<(), Box<dyn Error>> {
id,
metadata,
data: _,
..
} => match id.as_str() {
other => eprintln!("Received input `{other}`"),
},


+ 3
- 0
binaries/daemon/src/lib.rs View File

@@ -984,6 +984,7 @@ impl Daemon {
id: input_id.clone(),
metadata: metadata.clone(),
data: None,
dropped: 0,
},
&self.clock,
);
@@ -1031,6 +1032,7 @@ impl Daemon {
id: input_id.clone(),
metadata: metadata.clone(),
data: Some(message.clone()),
dropped: 0,
},
&self.clock,
);
@@ -1162,6 +1164,7 @@ async fn send_output_to_local_receivers(
id: input_id.clone(),
metadata: metadata.clone(),
data: data.clone(),
dropped: 0,
};
match channel.send(Timestamped {
inner: item,


+ 13
- 8
binaries/daemon/src/node_communication/mod.rs View File

@@ -11,7 +11,7 @@ use eyre::{eyre, Context};
use futures::{future, task, Future};
use shared_memory_server::{ShmemConf, ShmemServer};
use std::{
collections::{BTreeMap, VecDeque},
collections::{BTreeMap, HashMap, VecDeque},
mem,
net::Ipv4Addr,
sync::Arc,
@@ -151,6 +151,7 @@ struct Listener {
queue: VecDeque<Box<Option<Timestamped<NodeEvent>>>>,
queue_sizes: BTreeMap<DataId, usize>,
clock: Arc<uhlc::HLC>,
dropped_inputs: HashMap<DataId, usize>,
}

impl Listener {
@@ -211,6 +212,7 @@ impl Listener {
queue_sizes,
queue: VecDeque::new(),
clock: hlc.clone(),
dropped_inputs: HashMap::new(),
};
match listener
.run_inner(connection)
@@ -281,7 +283,10 @@ impl Listener {

async fn handle_events(&mut self) -> eyre::Result<()> {
if let Some(events) = &mut self.subscribed_events {
while let Ok(event) = events.try_recv() {
while let Ok(mut event) = events.try_recv() {
if let NodeEvent::Input { id, dropped, .. } = &mut event.inner {
*dropped += self.dropped_inputs.remove(id).unwrap_or_default();
}
self.queue.push_back(Box::new(Some(event)));
}

@@ -294,13 +299,15 @@ impl Listener {
#[tracing::instrument(skip(self), fields(%self.node_id), level = "trace")]
async fn drop_oldest_inputs(&mut self) -> Result<(), eyre::ErrReport> {
let mut queue_size_remaining = self.queue_sizes.clone();
let mut dropped = 0;
let mut drop_tokens = Vec::new();

// iterate over queued events, newest first
for event in self.queue.iter_mut().rev() {
let Some(Timestamped {
inner: NodeEvent::Input { id, data, .. },
inner:
NodeEvent::Input {
id, data, dropped, ..
},
..
}) = event.as_mut()
else {
@@ -308,7 +315,8 @@ impl Listener {
};
match queue_size_remaining.get_mut(id) {
Some(0) => {
dropped += 1;
*self.dropped_inputs.entry(id.clone()).or_default() += *dropped + 1;

if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) {
drop_tokens.push(drop_token);
}
@@ -324,9 +332,6 @@ impl Listener {
}
self.report_drop_tokens(drop_tokens).await?;

if dropped > 0 {
tracing::debug!("dropped {dropped} inputs because event queue was too full");
}
Ok(())
}



+ 4
- 6
binaries/runtime/src/lib.rs View File

@@ -248,7 +248,9 @@ async fn run(
RuntimeEvent::Event(Event::Reload { operator_id: None }) => {
tracing::warn!("Reloading runtime nodes is not supported");
}
RuntimeEvent::Event(Event::Input { id, metadata, data }) => {
RuntimeEvent::Event(Event::Input {
id, metadata, data, ..
}) => {
let Some((operator_id, input_id)) = id.as_str().split_once('/') else {
tracing::warn!("received non-operator input {id}");
continue;
@@ -261,11 +263,7 @@ async fn run(
};

if let Err(err) = operator_channel
.send_async(Event::Input {
id: input_id.clone(),
metadata,
data,
})
.send_async(Event::new_input(input_id.clone(), metadata, data))
.await
.wrap_err_with(|| {
format!("failed to send input `{input_id}` to operator `{operator_id}`")


+ 1
- 0
binaries/runtime/src/operator/shared_lib.rs View File

@@ -191,6 +191,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
id: input_id,
metadata,
data,
..
} => {
let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?;



+ 3
- 1
examples/benchmark/sink/src/main.rs View File

@@ -20,7 +20,9 @@ fn main() -> eyre::Result<()> {

while let Some(event) = events.recv() {
match event {
Event::Input { id, metadata, data } => {
Event::Input {
id, metadata, data, ..
} => {
// check if new size bracket
let data_len = data.len();
if data_len != current_size {


+ 1
- 0
examples/multiple-daemons/node/src/main.rs View File

@@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> {
id,
metadata,
data: _,
..
} => match id.as_str() {
"tick" => {
let random: u64 = rand::random();


+ 1
- 0
examples/multiple-daemons/sink/src/main.rs View File

@@ -10,6 +10,7 @@ fn main() -> eyre::Result<()> {
id,
metadata: _,
data,
..
} => match id.as_str() {
"message" => {
let received_string: &str =


+ 1
- 0
examples/rust-dataflow/node/src/main.rs View File

@@ -18,6 +18,7 @@ fn main() -> eyre::Result<()> {
id,
metadata,
data: _,
..
} => match id.as_str() {
"tick" => {
let random: u64 = rand::random();


+ 1
- 0
examples/rust-dataflow/sink/src/main.rs View File

@@ -10,6 +10,7 @@ fn main() -> eyre::Result<()> {
id,
metadata: _,
data,
..
} => match id.as_str() {
"message" => {
let received_string: &str =


+ 3
- 1
examples/rust-dataflow/status-node/src/main.rs View File

@@ -10,7 +10,9 @@ fn main() -> eyre::Result<()> {
let mut ticks = 0;
while let Some(event) = events.recv() {
match event {
Event::Input { id, metadata, data } => match id.as_ref() {
Event::Input {
id, metadata, data, ..
} => match id.as_ref() {
"tick" => {
ticks += 1;
}


+ 4
- 0
libraries/core/src/daemon_messages.rs View File

@@ -155,6 +155,10 @@ pub enum NodeEvent {
id: DataId,
metadata: Metadata,
data: Option<DataMessage>,
/// Number of dropped inputs of this ID.
///
/// Specifies the number of inputs of this ID that were dropped _before_ this input.
dropped: usize,
},
InputClosed {
id: DataId,


+ 3
- 1
tool_nodes/dora-record/src/main.rs View File

@@ -25,7 +25,9 @@ async fn main() -> eyre::Result<()> {

while let Some(event) = events.recv() {
match event {
Event::Input { id, data, metadata } => {
Event::Input {
id, data, metadata, ..
} => {
match writers.get(&id) {
None => {
let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false);


+ 1
- 0
tool_nodes/dora-rerun/src/main.rs View File

@@ -43,6 +43,7 @@ fn main() -> Result<()> {
id,
data,
metadata: _,
..
} = event
{
if id.as_str().contains("image") {


Loading…
Cancel
Save