diff --git a/apis/c++/operator/src/lib.rs b/apis/c++/operator/src/lib.rs index ebf20c49..511b4470 100644 --- a/apis/c++/operator/src/lib.rs +++ b/apis/c++/operator/src/lib.rs @@ -1,7 +1,9 @@ #![cfg(not(test))] #![warn(unsafe_op_in_unsafe_fn)] -use dora_operator_api::{self, register_operator, DoraOperator, DoraOutputSender, DoraStatus}; +use dora_operator_api::{ + self, register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, +}; use ffi::DoraSendOutputResult; #[cxx::bridge] @@ -64,23 +66,30 @@ impl Default for OperatorWrapper { } impl DoraOperator for OperatorWrapper { - fn on_input( + fn on_event( &mut self, - id: &str, - data: &[u8], + event: &Event, output_sender: &mut DoraOutputSender, ) -> Result { - let operator = self.operator.as_mut().unwrap(); - let mut output_sender = OutputSender(output_sender); - - let result = ffi::on_input(operator, id, data, &mut output_sender); - if result.error.is_empty() { - Ok(match result.stop { - false => DoraStatus::Continue, - true => DoraStatus::Stop, - }) - } else { - Err(result.error) + match event { + Event::Input { id, data } => { + let operator = self.operator.as_mut().unwrap(); + let mut output_sender = OutputSender(output_sender); + + let result = ffi::on_input(operator, id, data, &mut output_sender); + if result.error.is_empty() { + Ok(match result.stop { + false => DoraStatus::Continue, + true => DoraStatus::Stop, + }) + } else { + Err(result.error) + } + } + _ => { + // ignore other events for now + Ok(DoraStatus::Continue) + } } } } diff --git a/examples/benchmark/run.rs b/examples/benchmark/run.rs index 37f26473..798b420c 100644 --- a/examples/benchmark/run.rs +++ b/examples/benchmark/run.rs @@ -14,7 +14,7 @@ async fn main() -> eyre::Result<()> { let dataflow = Path::new("dataflow.yml"); build_dataflow(dataflow).await?; - dora_daemon::Daemon::run_dataflow(dataflow).await?; + dora_daemon::Daemon::run_dataflow(dataflow, None).await?; Ok(()) } diff --git a/examples/c++-dataflow/run.rs b/examples/c++-dataflow/run.rs index a6c3c285..3e75823d 100644 --- a/examples/c++-dataflow/run.rs +++ b/examples/c++-dataflow/run.rs @@ -88,7 +88,7 @@ async fn main() -> eyre::Result<()> { // build_package("dora-runtime").await?; let dataflow = Path::new("dataflow.yml").to_owned(); - dora_daemon::Daemon::run_dataflow(&dataflow).await?; + dora_daemon::Daemon::run_dataflow(&dataflow, None).await?; Ok(()) } diff --git a/examples/c-dataflow/run.rs b/examples/c-dataflow/run.rs index b041b773..39290ff0 100644 --- a/examples/c-dataflow/run.rs +++ b/examples/c-dataflow/run.rs @@ -18,7 +18,7 @@ async fn main() -> eyre::Result<()> { build_c_node(root, "sink.c", "c_sink").await?; let dataflow = Path::new("dataflow.yml").to_owned(); - dora_daemon::Daemon::run_dataflow(&dataflow).await?; + dora_daemon::Daemon::run_dataflow(&dataflow, None).await?; Ok(()) } diff --git a/examples/rust-dataflow-url/run.rs b/examples/rust-dataflow-url/run.rs index 7d0698a3..3c384b6d 100644 --- a/examples/rust-dataflow-url/run.rs +++ b/examples/rust-dataflow-url/run.rs @@ -10,7 +10,7 @@ async fn main() -> eyre::Result<()> { let dataflow = Path::new("dataflow.yml"); build_dataflow(dataflow).await?; - dora_daemon::Daemon::run_dataflow(dataflow).await?; + dora_daemon::Daemon::run_dataflow(dataflow, None).await?; Ok(()) }