@@ -24,12 +24,12 @@ use dora_message::{
use eyre::{bail, WrapErr};
use shared_memory_extended::{Shmem, ShmemConf};
use std::{
collections::{HashMap, VecDeque},
collections::{BTreeSet, HashMap, VecDeque},
ops::{Deref, DerefMut},
sync::Arc,
time::Duration,
};
use tracing::info;
use tracing::{ info, warn} ;
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
@@ -52,6 +52,7 @@ pub struct DoraNode {
cache: VecDeque<ShmemHandle>,
dataflow_descriptor: Descriptor,
warned_unknown_output: BTreeSet<DataId>,
}
impl DoraNode {
@@ -157,10 +158,23 @@ impl DoraNode {
drop_stream,
cache: VecDeque::new(),
dataflow_descriptor,
warned_unknown_output: BTreeSet::new(),
};
Ok((node, event_stream))
}
fn validate_output(&mut self, output_id: &DataId) -> bool {
if !self.node_config.outputs.contains(output_id) {
if !self.warned_unknown_output.contains(output_id) {
warn!("Ignoring output `{output_id}` not in node's output list.");
self.warned_unknown_output.insert(output_id.clone());
}
false
} else {
true
}
}
/// Send data from the node to the other nodes.
/// We take a closure as an input to enable zero copy on send.
///
@@ -194,6 +208,9 @@ impl DoraNode {
where
F: FnOnce(&mut [u8]),
{
if !self.validate_output(&output_id) {
return Ok(());
};
let mut sample = self.allocate_data_sample(data_len)?;
data(&mut sample);
@@ -208,6 +225,10 @@ impl DoraNode {
parameters: MetadataParameters,
data: impl Array,
) -> eyre::Result<()> {
if !self.validate_output(&output_id) {
return Ok(());
};
let arrow_array = data.to_data();
let total_len = required_data_size(&arrow_array);
@@ -228,6 +249,9 @@ impl DoraNode {
data_len: usize,
data: &[u8],
) -> eyre::Result<()> {
if !self.validate_output(&output_id) {
return Ok(());
};
self.send_output_raw(output_id, parameters, data_len, |sample| {
sample.copy_from_slice(data)
})
@@ -244,6 +268,10 @@ impl DoraNode {
where
F: FnOnce(&mut [u8]),
{
if !self.validate_output(&output_id) {
return Ok(());
};
let mut sample = self.allocate_data_sample(data_len)?;
data(&mut sample);
@@ -259,9 +287,6 @@ impl DoraNode {
) -> eyre::Result<()> {
self.handle_finished_drop_tokens()?;
if !self.node_config.outputs.contains(&output_id) {
eyre::bail!("unknown dora node output `{output_id}` called by `send_output`. Double-check if this output is defined within your dataflow YAML file.",);
}
let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);
let (data, shmem) = match sample {