Browse Source

Send `InputClosed` events to operators too

tags/v0.2.0-candidate
Philipp Oppermann 2 years ago
parent
commit
7232a31b57
Failed to extract signature
7 changed files with 42 additions and 0 deletions
  1. +3
    -0
      apis/c/operator/operator_types.h
  2. +2
    -0
      apis/rust/operator/src/lib.rs
  3. +2
    -0
      apis/rust/operator/src/raw.rs
  4. +1
    -0
      apis/rust/operator/types/src/lib.rs
  5. +18
    -0
      binaries/runtime/src/lib.rs
  6. +9
    -0
      binaries/runtime/src/operator/mod.rs
  7. +7
    -0
      binaries/runtime/src/operator/shared_lib.rs

+ 3
- 0
apis/c/operator/operator_types.h View File

@@ -111,6 +111,9 @@ typedef struct RawEvent {
/** <No documentation available> */
Input_t * input;

/** <No documentation available> */
Vec_uint8_t input_closed;

/** <No documentation available> */
bool stop;
} RawEvent_t;


+ 2
- 0
apis/rust/operator/src/lib.rs View File

@@ -8,9 +8,11 @@ use types::{Metadata, Output, SendOutput};

pub mod raw;

#[derive(Debug)]
#[non_exhaustive]
pub enum Event<'a> {
Input { id: &'a str, data: &'a [u8] },
InputClosed { id: &'a str },
Stop,
}



+ 2
- 0
apis/rust/operator/src/raw.rs View File

@@ -41,6 +41,8 @@ pub unsafe fn dora_on_event<O: DoraOperator>(
id: &input.id,
data,
}
} else if let Some(input_id) = &event.input_closed {
Event::InputClosed { id: input_id }
} else if event.stop {
Event::Stop
} else {


+ 1
- 0
apis/rust/operator/types/src/lib.rs View File

@@ -58,6 +58,7 @@ pub struct OnEventFn(
#[derive(Debug)]
pub struct RawEvent {
pub input: Option<safer_ffi::boxed::Box<Input>>,
pub input_closed: Option<safer_ffi::String>,
pub stop: bool,
}



+ 18
- 0
binaries/runtime/src/lib.rs View File

@@ -237,6 +237,24 @@ async fn run(
let operator_id = OperatorId::from(operator_id.to_owned());
let input_id = DataId::from(input_id.to_owned());

let Some(operator_channel) = operator_channels.get(&operator_id) else {
tracing::warn!("received input {id} for unknown operator");
continue;
};
if let Err(err) = operator_channel
.send_async(operator::IncomingEvent::InputClosed {
input_id: input_id.clone(),
})
.await
.wrap_err_with(|| {
format!(
"failed to send InputClosed({input_id}) to operator `{operator_id}`"
)
})
{
tracing::warn!("{err}");
}

if let Some(open_inputs) = open_operator_inputs.get_mut(&operator_id) {
open_inputs.remove(&input_id);
if open_inputs.is_empty() {


+ 9
- 0
binaries/runtime/src/operator/mod.rs View File

@@ -97,6 +97,9 @@ pub enum IncomingEvent {
metadata: Metadata<'static>,
data: Option<Vec<u8>>,
},
InputClosed {
input_id: DataId,
},
}

impl IntoPy<PyObject> for IncomingEvent {
@@ -124,6 +127,12 @@ impl IntoPy<PyObject> for IncomingEvent {
.unwrap();
"INPUT"
}
Self::InputClosed { input_id } => {
dict.set_item("id", input_id.to_string())
.wrap_err("failed to add input ID")
.unwrap();
"INPUT_CLOSED"
}
};

dict.set_item("type", ty)


+ 7
- 0
binaries/runtime/src/operator/shared_lib.rs View File

@@ -170,6 +170,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
let operator_event = match event {
IncomingEvent::Stop => dora_operator_api_types::RawEvent {
input: None,
input_closed: None,
stop: true,
},
IncomingEvent::Input {
@@ -190,9 +191,15 @@ impl<'lib> SharedLibraryOperator<'lib> {
};
dora_operator_api_types::RawEvent {
input: Some(Box::new(operator_input).into()),
input_closed: None,
stop: false,
}
}
IncomingEvent::InputClosed { input_id } => dora_operator_api_types::RawEvent {
input_closed: Some(input_id.to_string().into()),
input: None,
stop: false,
},
};

let send_output = SendOutput {


Loading…
Cancel
Save