From 7232a31b57c034751538992cba01b88b217ae58e Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 6 Mar 2023 17:45:29 +0100 Subject: [PATCH] Send `InputClosed` events to operators too --- apis/c/operator/operator_types.h | 3 +++ apis/rust/operator/src/lib.rs | 2 ++ apis/rust/operator/src/raw.rs | 2 ++ apis/rust/operator/types/src/lib.rs | 1 + binaries/runtime/src/lib.rs | 18 ++++++++++++++++++ binaries/runtime/src/operator/mod.rs | 9 +++++++++ binaries/runtime/src/operator/shared_lib.rs | 7 +++++++ 7 files changed, 42 insertions(+) diff --git a/apis/c/operator/operator_types.h b/apis/c/operator/operator_types.h index ca95fc2e..4c07b808 100644 --- a/apis/c/operator/operator_types.h +++ b/apis/c/operator/operator_types.h @@ -111,6 +111,9 @@ typedef struct RawEvent { /** */ Input_t * input; + /** */ + Vec_uint8_t input_closed; + /** */ bool stop; } RawEvent_t; diff --git a/apis/rust/operator/src/lib.rs b/apis/rust/operator/src/lib.rs index 4744fd73..9db91b97 100644 --- a/apis/rust/operator/src/lib.rs +++ b/apis/rust/operator/src/lib.rs @@ -8,9 +8,11 @@ use types::{Metadata, Output, SendOutput}; pub mod raw; +#[derive(Debug)] #[non_exhaustive] pub enum Event<'a> { Input { id: &'a str, data: &'a [u8] }, + InputClosed { id: &'a str }, Stop, } diff --git a/apis/rust/operator/src/raw.rs b/apis/rust/operator/src/raw.rs index dee87bb6..c3db60e4 100644 --- a/apis/rust/operator/src/raw.rs +++ b/apis/rust/operator/src/raw.rs @@ -41,6 +41,8 @@ pub unsafe fn dora_on_event( id: &input.id, data, } + } else if let Some(input_id) = &event.input_closed { + Event::InputClosed { id: input_id } } else if event.stop { Event::Stop } else { diff --git a/apis/rust/operator/types/src/lib.rs b/apis/rust/operator/types/src/lib.rs index dc8117a2..e1b39cba 100644 --- a/apis/rust/operator/types/src/lib.rs +++ b/apis/rust/operator/types/src/lib.rs @@ -58,6 +58,7 @@ pub struct OnEventFn( #[derive(Debug)] pub struct RawEvent { pub input: Option>, + pub input_closed: Option, pub stop: bool, } diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index b6c733e0..1892dd1d 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -237,6 +237,24 @@ async fn run( let operator_id = OperatorId::from(operator_id.to_owned()); let input_id = DataId::from(input_id.to_owned()); + let Some(operator_channel) = operator_channels.get(&operator_id) else { + tracing::warn!("received input {id} for unknown operator"); + continue; + }; + if let Err(err) = operator_channel + .send_async(operator::IncomingEvent::InputClosed { + input_id: input_id.clone(), + }) + .await + .wrap_err_with(|| { + format!( + "failed to send InputClosed({input_id}) to operator `{operator_id}`" + ) + }) + { + tracing::warn!("{err}"); + } + if let Some(open_inputs) = open_operator_inputs.get_mut(&operator_id) { open_inputs.remove(&input_id); if open_inputs.is_empty() { diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index df2548e2..2803e482 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -97,6 +97,9 @@ pub enum IncomingEvent { metadata: Metadata<'static>, data: Option>, }, + InputClosed { + input_id: DataId, + }, } impl IntoPy for IncomingEvent { @@ -124,6 +127,12 @@ impl IntoPy for IncomingEvent { .unwrap(); "INPUT" } + Self::InputClosed { input_id } => { + dict.set_item("id", input_id.to_string()) + .wrap_err("failed to add input ID") + .unwrap(); + "INPUT_CLOSED" + } }; dict.set_item("type", ty) diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 9e6e5666..aaeb8da6 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -170,6 +170,7 @@ impl<'lib> SharedLibraryOperator<'lib> { let operator_event = match event { IncomingEvent::Stop => dora_operator_api_types::RawEvent { input: None, + input_closed: None, stop: true, }, IncomingEvent::Input { @@ -190,9 +191,15 @@ impl<'lib> SharedLibraryOperator<'lib> { }; dora_operator_api_types::RawEvent { input: Some(Box::new(operator_input).into()), + input_closed: None, stop: false, } } + IncomingEvent::InputClosed { input_id } => dora_operator_api_types::RawEvent { + input_closed: Some(input_id.to_string().into()), + input: None, + stop: false, + }, }; let send_output = SendOutput {