Browse Source

Fix: Don't stop operator before all of its events have been processed

We don't want to stop the operator if there are still events in the operator event queue, e.g. pending outputs.
tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
5c674d786f
2 changed files with 37 additions and 29 deletions
  1. +36
    -29
      runtime/src/main.rs
  2. +1
    -0
      runtime/src/operator/mod.rs

+ 36
- 29
runtime/src/main.rs View File

@@ -14,7 +14,10 @@ use futures::{
};
use futures_concurrency::Merge;
use operator::{Operator, OperatorEvent};
use std::{collections::BTreeMap, mem};
use std::{
collections::{BTreeMap, HashMap},
mem,
};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamMap};

@@ -42,6 +45,7 @@ async fn main() -> eyre::Result<()> {

let mut operator_map = BTreeMap::new();
let mut operator_events = StreamMap::new();
let mut operator_events_tx = HashMap::new();
for operator_config in &operators {
let (events_tx, events) = mpsc::channel(1);
let operator = Operator::init(operator_config.clone(), events_tx.clone())
@@ -49,6 +53,7 @@ async fn main() -> eyre::Result<()> {
.wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?;
operator_map.insert(&operator_config.id, operator);
operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events));
operator_events_tx.insert(operator_config.id.clone(), events_tx);
}

let communication: Box<dyn CommunicationLayer> =
@@ -86,35 +91,14 @@ async fn main() -> eyre::Result<()> {
})?;
}
SubscribeEvent::InputsStopped { target_operator } => {
// --------------------------------------------------------
// TODO FIXME: For some reason, these zenoh publish calls
// (and also subsequent ones) are not visible to other
// nodes. This includes the stop command, so the input
// streams of dependent nodes are not closed properly.
// --------------------------------------------------------

communication
.publish("/HHH", &[])
.await
.wrap_err("failed to send on /HHH")?;
if operator_map.remove(&target_operator).is_some() {
println!("operator {node_id}/{target_operator} finished");
// send stopped message
publish(
&node_id,
target_operator.clone(),
STOP_TOPIC.to_owned().into(),
&[],
communication.as_ref(),
)
.await.with_context(|| {
format!("failed to send stop message for operator `{node_id}/{target_operator}`")
})?;
}
let events_tx = operator_events_tx.get(&target_operator).ok_or_else(|| {
eyre!("failed to get events_tx for operator {target_operator}")
})?;

if operator_map.is_empty() {
break;
}
let events_tx = events_tx.clone();
tokio::spawn(async move {
let _ = events_tx.send(OperatorEvent::EndOfInput).await;
});
}
},
Event::Operator { id, event } => {
@@ -134,6 +118,29 @@ async fn main() -> eyre::Result<()> {
bail!(err.wrap_err(format!("operator {id} failed")))
}
OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload),
OperatorEvent::EndOfInput => {
if operator_map.remove(&id).is_some() {
println!("operator {node_id}/{id} finished");
// send stopped message
publish(
&node_id,
id.clone(),
STOP_TOPIC.to_owned().into(),
&[],
communication.as_ref(),
)
.await
.with_context(|| {
format!("failed to send stop message for operator `{node_id}/{id}`")
})?;

operator_events_tx.remove(&id);
}

if operator_map.is_empty() {
break;
}
}
}
}
}


+ 1
- 0
runtime/src/operator/mod.rs View File

@@ -60,6 +60,7 @@ pub enum OperatorEvent {
Output { id: DataId, value: Vec<u8> },
Error(eyre::Error),
Panic(Box<dyn Any + Send>),
EndOfInput,
}

pub struct OperatorInput {


Loading…
Cancel
Save