From bfd32f1fd7767ec28c6ed0090fbef1b45b4cba64 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 27 Mar 2023 19:35:08 +0800 Subject: [PATCH] Push error into the `init_done` channel for debugging context After adding #236 , error of initialization is shadowed by bailing on the sender channel. Bailing used to happen here: https://github.com/dora-rs/dora/blob/ccec196234c46fc5df7a0f7c7cd15d29a2bda670/binaries/runtime/src/lib.rs#L136 This makes it hard to debug. This PR will push the error into the sender channel, and report any error of initialization making it easier to debug. Now the channel is used without error, but any error of initialization will bail the runtime here: https://github.com/dora-rs/dora/blob/ccec196234c46fc5df7a0f7c7cd15d29a2bda670/binaries/runtime/src/lib.rs#L137 --- binaries/runtime/src/lib.rs | 5 +++-- binaries/runtime/src/operator/mod.rs | 4 ++-- binaries/runtime/src/operator/python.rs | 15 +++++++++++---- binaries/runtime/src/operator/shared_lib.rs | 17 ++++++++++++----- 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 70df334f..bfe574c7 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -117,7 +117,7 @@ async fn run( config: NodeConfig, operator_events: impl Stream + Unpin, mut operator_channels: HashMap>, - init_done: oneshot::Receiver<()>, + init_done: oneshot::Receiver>, ) -> eyre::Result<()> { #[cfg(feature = "metrics")] let _started = { @@ -133,7 +133,8 @@ async fn run( init_done .await - .wrap_err("the `init_done` channel was closed unexpectedly")?; + .wrap_err("the `init_done` channel was closed unexpectedly")? + .wrap_err("failed to init an operator")?; tracing::info!("All operators are ready, starting runtime"); let (mut node, daemon_events) = DoraNode::init(config)?; diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 90291d9c..c5f50286 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -4,7 +4,7 @@ use dora_core::{ message::{Metadata, MetadataParameters}, }; use dora_operator_api_python::metadata_to_pydict; -use eyre::Context; +use eyre::{Context, Result}; #[cfg(feature = "telemetry")] use opentelemetry::sdk::trace::Tracer; use pyo3::{ @@ -26,7 +26,7 @@ pub fn run_operator( operator_definition: OperatorDefinition, incoming_events: flume::Receiver, events_tx: Sender, - init_done: oneshot::Sender<()>, + init_done: oneshot::Sender>, ) -> eyre::Result<()> { #[cfg(feature = "telemetry")] let tracer = dora_tracing::telemetry::init_tracing( diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index ee21d213..7bad1a9c 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -33,7 +33,7 @@ pub fn run( events_tx: Sender, incoming_events: flume::Receiver, tracer: Tracer, - init_done: oneshot::Sender<()>, + init_done: oneshot::Sender>, ) -> eyre::Result<()> { let path = if source_is_url(source) { let target_path = Path::new("build") @@ -98,9 +98,16 @@ pub fn run( let python_runner = move || { let operator = - Python::with_gil(init_operator).wrap_err("failed to init python operator")?; - - let _ = init_done.send(()); + match Python::with_gil(init_operator).wrap_err("failed to init python operator") { + Ok(op) => { + let _ = init_done.send(Ok(())); + op + } + Err(err) => { + let _ = init_done.send(Err(err)); + bail!("Could not init python operator") + } + }; let reason = loop { let Ok(mut event) = incoming_events.recv() else { break StopReason::InputsClosed }; diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 2e1c463e..56f18b3d 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -10,7 +10,7 @@ use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnEvent, DoraResult, DoraStatus, Metadata, OnEventResult, Output, SendOutput, }; -use eyre::{bail, eyre, Context}; +use eyre::{bail, eyre, Context, Result}; use libloading::Symbol; use std::{ borrow::Cow, @@ -28,7 +28,7 @@ pub fn run( events_tx: Sender, incoming_events: flume::Receiver, tracer: Tracer, - init_done: oneshot::Sender<()>, + init_done: oneshot::Sender>, ) -> eyre::Result<()> { let path = if source_is_url(source) { let target_path = adjust_shared_library_path( @@ -86,14 +86,21 @@ struct SharedLibraryOperator<'lib> { } impl<'lib> SharedLibraryOperator<'lib> { - fn run(self, tracer: Tracer, init_done: oneshot::Sender<()>) -> eyre::Result { + fn run( + self, + tracer: Tracer, + init_done: oneshot::Sender>, + ) -> eyre::Result { let operator_context = { let DoraInitResult { result, operator_context, } = unsafe { (self.bindings.init_operator.init_operator)() }; let raw = match result.error { - Some(error) => bail!("init_operator failed: {}", String::from(error)), + Some(error) => { + let _ = init_done.send(Err(eyre!(error.to_string()))); + bail!("init_operator failed: {}", String::from(error)) + } None => operator_context, }; OperatorContext { @@ -102,7 +109,7 @@ impl<'lib> SharedLibraryOperator<'lib> { } }; - let _ = init_done.send(()); + let _ = init_done.send(Ok(())); let send_output_closure = Arc::new(move |output: Output| { let Output {