Browse Source

Rework stop message handling to avoid 'unexpected disconnection' errors

With the previous solution, the stop channel thread sometimes panicked with 'stop stream was disconnected unexpectedly'.
tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
60c12f4d69
Failed to extract signature
1 changed files with 39 additions and 56 deletions
  1. +39
    -56
      apis/rust/node/src/communication/mod.rs

+ 39
- 56
apis/rust/node/src/communication/mod.rs View File

@@ -1,11 +1,11 @@
use crate::{
config::{CommunicationConfig, DataId, InputMapping},
config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId},
BoxError,
};
use eyre::{eyre, Context};
use eyre::Context;
pub use flume::Receiver;
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashSet},
mem, thread,
};

@@ -78,25 +78,22 @@ pub trait CommunicationLayer: Send + Sync {
let input_id = input.to_owned();
let sender = inputs_tx.clone();
thread::spawn(move || loop {
match sub.recv().transpose() {
let event = match sub.recv().transpose() {
None => break,
Some(value) => {
let input = value.map(|data| Input {
id: input_id.clone(),
data,
});
match sender.send(input) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
}
}
Some(Ok(data)) => InputEvent::Input(Input {
id: input_id.clone(),
data,
}),
Some(Err(err)) => InputEvent::Error(err),
};
match sender.send(event) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
}
});
}
mem::drop(inputs_tx);

let (stop_tx, stop_rx) = flume::bounded(10);
let mut sources: HashMap<_, _> = inputs
let mut sources: HashSet<_> = inputs
.values()
.map(|v| (v.source().to_owned(), v.operator().to_owned()))
.collect();
@@ -109,58 +106,41 @@ pub trait CommunicationLayer: Send + Sync {
.subscribe(&topic)
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?;

let source_id = source.to_owned();
let sender = stop_tx.clone();
let source = source.to_owned();
let operator = operator.clone();
let sender = inputs_tx.clone();
thread::spawn(move || loop {
match sub.recv().transpose() {
let event = match sub.recv().transpose() {
None => break,
Some(value) => {
let input = value.map(|_| source_id.clone());
match sender.send(input) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
}
}
Some(Ok(_)) => InputEvent::SourceClosed {
source: source.clone(),
operator: operator.clone(),
},
Some(Err(err)) => InputEvent::Error(err),
};
match sender.send(event) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
}
});
}
mem::drop(stop_tx);
mem::drop(inputs_tx);

let (combined_tx, combined) = flume::bounded(1);
thread::spawn(move || loop {
let selector = flume::Selector::new()
.recv(&inputs_rx, |v| match v {
Ok(Ok(value)) => InputEvent::Input(value),
Ok(Err(err)) => InputEvent::Error(err),
Err(flume::RecvError::Disconnected) => InputEvent::Error(BoxError(
eyre!("input stream was disconnected unexpectedly").into(),
)),
})
.recv(&stop_rx, |v| match v {
Ok(Ok(stopped_source)) => {
sources.remove(&stopped_source);
InputEvent::InputClosed {
number_of_remaining_sources: sources.len(),
}
}
Ok(Err(err)) => InputEvent::Error(err),
Err(flume::RecvError::Disconnected) => InputEvent::Error(BoxError(
eyre!("stop stream was disconnected unexpectedly").into(),
)),
});
match selector.wait() {
InputEvent::Input(input) => match combined_tx.send(input) {
match inputs_rx.recv() {
Ok(InputEvent::Input(input)) => match combined_tx.send(input) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
},
InputEvent::InputClosed {
number_of_remaining_sources,
} => {
if number_of_remaining_sources == 0 {
Ok(InputEvent::SourceClosed { source, operator }) => {
sources.remove(&(source, operator));
if sources.is_empty() {
break;
}
}
InputEvent::Error(err) => panic!("{err}"),
Ok(InputEvent::Error(err)) => panic!("{err}"),
Err(_) => break,
}
});

@@ -180,7 +160,10 @@ pub trait Subscriber: Send + Sync {

enum InputEvent {
Input(Input),
InputClosed { number_of_remaining_sources: usize },
SourceClosed {
source: NodeId,
operator: Option<OperatorId>,
},
Error(BoxError),
}



Loading…
Cancel
Save