From 5c674d786fb37af35ea1fd85d4d435fe4cf43235 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 10 Jun 2022 11:00:15 +0200 Subject: [PATCH] Fix: Don't stop operator before all of its events have been processed We don't want to stop the operator if there are still events in the operator event queue, e.g. pending outputs. --- runtime/src/main.rs | 65 ++++++++++++++++++++----------------- runtime/src/operator/mod.rs | 1 + 2 files changed, 37 insertions(+), 29 deletions(-) diff --git a/runtime/src/main.rs b/runtime/src/main.rs index 96ba50d0..6f91715c 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -14,7 +14,10 @@ use futures::{ }; use futures_concurrency::Merge; use operator::{Operator, OperatorEvent}; -use std::{collections::BTreeMap, mem}; +use std::{ + collections::{BTreeMap, HashMap}, + mem, +}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamMap}; @@ -42,6 +45,7 @@ async fn main() -> eyre::Result<()> { let mut operator_map = BTreeMap::new(); let mut operator_events = StreamMap::new(); + let mut operator_events_tx = HashMap::new(); for operator_config in &operators { let (events_tx, events) = mpsc::channel(1); let operator = Operator::init(operator_config.clone(), events_tx.clone()) @@ -49,6 +53,7 @@ async fn main() -> eyre::Result<()> { .wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?; operator_map.insert(&operator_config.id, operator); operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events)); + operator_events_tx.insert(operator_config.id.clone(), events_tx); } let communication: Box = @@ -86,35 +91,14 @@ async fn main() -> eyre::Result<()> { })?; } SubscribeEvent::InputsStopped { target_operator } => { - // -------------------------------------------------------- - // TODO FIXME: For some reason, these zenoh publish calls - // (and also subsequent ones) are not visible to other - // nodes. This includes the stop command, so the input - // streams of dependent nodes are not closed properly. - // -------------------------------------------------------- - - communication - .publish("/HHH", &[]) - .await - .wrap_err("failed to send on /HHH")?; - if operator_map.remove(&target_operator).is_some() { - println!("operator {node_id}/{target_operator} finished"); - // send stopped message - publish( - &node_id, - target_operator.clone(), - STOP_TOPIC.to_owned().into(), - &[], - communication.as_ref(), - ) - .await.with_context(|| { - format!("failed to send stop message for operator `{node_id}/{target_operator}`") - })?; - } + let events_tx = operator_events_tx.get(&target_operator).ok_or_else(|| { + eyre!("failed to get events_tx for operator {target_operator}") + })?; - if operator_map.is_empty() { - break; - } + let events_tx = events_tx.clone(); + tokio::spawn(async move { + let _ = events_tx.send(OperatorEvent::EndOfInput).await; + }); } }, Event::Operator { id, event } => { @@ -134,6 +118,29 @@ async fn main() -> eyre::Result<()> { bail!(err.wrap_err(format!("operator {id} failed"))) } OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload), + OperatorEvent::EndOfInput => { + if operator_map.remove(&id).is_some() { + println!("operator {node_id}/{id} finished"); + // send stopped message + publish( + &node_id, + id.clone(), + STOP_TOPIC.to_owned().into(), + &[], + communication.as_ref(), + ) + .await + .with_context(|| { + format!("failed to send stop message for operator `{node_id}/{id}`") + })?; + + operator_events_tx.remove(&id); + } + + if operator_map.is_empty() { + break; + } + } } } } diff --git a/runtime/src/operator/mod.rs b/runtime/src/operator/mod.rs index 732042fd..dbfd1cee 100644 --- a/runtime/src/operator/mod.rs +++ b/runtime/src/operator/mod.rs @@ -60,6 +60,7 @@ pub enum OperatorEvent { Output { id: DataId, value: Vec }, Error(eyre::Error), Panic(Box), + EndOfInput, } pub struct OperatorInput {