diff --git a/apis/rust/node/src/communication/mod.rs b/apis/rust/node/src/communication/mod.rs index 80058df3..55de5235 100644 --- a/apis/rust/node/src/communication/mod.rs +++ b/apis/rust/node/src/communication/mod.rs @@ -1,11 +1,11 @@ use crate::{ - config::{CommunicationConfig, DataId, InputMapping}, + config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, BoxError, }; -use eyre::{eyre, Context}; +use eyre::Context; pub use flume::Receiver; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashSet}, mem, thread, }; @@ -78,25 +78,22 @@ pub trait CommunicationLayer: Send + Sync { let input_id = input.to_owned(); let sender = inputs_tx.clone(); thread::spawn(move || loop { - match sub.recv().transpose() { + let event = match sub.recv().transpose() { None => break, - Some(value) => { - let input = value.map(|data| Input { - id: input_id.clone(), - data, - }); - match sender.send(input) { - Ok(()) => {} - Err(flume::SendError(_)) => break, - } - } + Some(Ok(data)) => InputEvent::Input(Input { + id: input_id.clone(), + data, + }), + Some(Err(err)) => InputEvent::Error(err), + }; + match sender.send(event) { + Ok(()) => {} + Err(flume::SendError(_)) => break, } }); } - mem::drop(inputs_tx); - let (stop_tx, stop_rx) = flume::bounded(10); - let mut sources: HashMap<_, _> = inputs + let mut sources: HashSet<_> = inputs .values() .map(|v| (v.source().to_owned(), v.operator().to_owned())) .collect(); @@ -109,58 +106,41 @@ pub trait CommunicationLayer: Send + Sync { .subscribe(&topic) .wrap_err_with(|| format!("failed to subscribe on {topic}"))?; - let source_id = source.to_owned(); - let sender = stop_tx.clone(); + let source = source.to_owned(); + let operator = operator.clone(); + let sender = inputs_tx.clone(); thread::spawn(move || loop { - match sub.recv().transpose() { + let event = match sub.recv().transpose() { None => break, - Some(value) => { - let input = value.map(|_| source_id.clone()); - match sender.send(input) { - Ok(()) => {} - Err(flume::SendError(_)) => break, - } - } + Some(Ok(_)) => InputEvent::SourceClosed { + source: source.clone(), + operator: operator.clone(), + }, + Some(Err(err)) => InputEvent::Error(err), + }; + match sender.send(event) { + Ok(()) => {} + Err(flume::SendError(_)) => break, } }); } - mem::drop(stop_tx); + mem::drop(inputs_tx); let (combined_tx, combined) = flume::bounded(1); thread::spawn(move || loop { - let selector = flume::Selector::new() - .recv(&inputs_rx, |v| match v { - Ok(Ok(value)) => InputEvent::Input(value), - Ok(Err(err)) => InputEvent::Error(err), - Err(flume::RecvError::Disconnected) => InputEvent::Error(BoxError( - eyre!("input stream was disconnected unexpectedly").into(), - )), - }) - .recv(&stop_rx, |v| match v { - Ok(Ok(stopped_source)) => { - sources.remove(&stopped_source); - InputEvent::InputClosed { - number_of_remaining_sources: sources.len(), - } - } - Ok(Err(err)) => InputEvent::Error(err), - Err(flume::RecvError::Disconnected) => InputEvent::Error(BoxError( - eyre!("stop stream was disconnected unexpectedly").into(), - )), - }); - match selector.wait() { - InputEvent::Input(input) => match combined_tx.send(input) { + match inputs_rx.recv() { + Ok(InputEvent::Input(input)) => match combined_tx.send(input) { Ok(()) => {} Err(flume::SendError(_)) => break, }, - InputEvent::InputClosed { - number_of_remaining_sources, - } => { - if number_of_remaining_sources == 0 { + Ok(InputEvent::SourceClosed { source, operator }) => { + sources.remove(&(source, operator)); + if sources.is_empty() { break; } } - InputEvent::Error(err) => panic!("{err}"), + Ok(InputEvent::Error(err)) => panic!("{err}"), + Err(_) => break, } }); @@ -180,7 +160,10 @@ pub trait Subscriber: Send + Sync { enum InputEvent { Input(Input), - InputClosed { number_of_remaining_sources: usize }, + SourceClosed { + source: NodeId, + operator: Option, + }, Error(BoxError), }