Browse Source

Push error into the `init_done` channel for debugging context

After adding #236 , error of initialization is shadowed by bailing on
the sender channel.

Bailing used to happen here: ccec196234/binaries/runtime/src/lib.rs (L136)

This makes it hard to debug.

This PR will push the error into the sender channel, and report any error
of initialization making it easier to debug.

Now the channel is used without error, but any error of initialization
will bail the runtime here: ccec196234/binaries/runtime/src/lib.rs (L137)
tags/v0.2.2-rc
haixuanTao 2 years ago
parent
commit
bfd32f1fd7
4 changed files with 28 additions and 13 deletions
  1. +3
    -2
      binaries/runtime/src/lib.rs
  2. +2
    -2
      binaries/runtime/src/operator/mod.rs
  3. +11
    -4
      binaries/runtime/src/operator/python.rs
  4. +12
    -5
      binaries/runtime/src/operator/shared_lib.rs

+ 3
- 2
binaries/runtime/src/lib.rs View File

@@ -117,7 +117,7 @@ async fn run(
config: NodeConfig,
operator_events: impl Stream<Item = Event> + Unpin,
mut operator_channels: HashMap<OperatorId, flume::Sender<operator::IncomingEvent>>,
init_done: oneshot::Receiver<()>,
init_done: oneshot::Receiver<Result<()>>,
) -> eyre::Result<()> {
#[cfg(feature = "metrics")]
let _started = {
@@ -133,7 +133,8 @@ async fn run(

init_done
.await
.wrap_err("the `init_done` channel was closed unexpectedly")?;
.wrap_err("the `init_done` channel was closed unexpectedly")?
.wrap_err("failed to init an operator")?;
tracing::info!("All operators are ready, starting runtime");

let (mut node, daemon_events) = DoraNode::init(config)?;


+ 2
- 2
binaries/runtime/src/operator/mod.rs View File

@@ -4,7 +4,7 @@ use dora_core::{
message::{Metadata, MetadataParameters},
};
use dora_operator_api_python::metadata_to_pydict;
use eyre::Context;
use eyre::{Context, Result};
#[cfg(feature = "telemetry")]
use opentelemetry::sdk::trace::Tracer;
use pyo3::{
@@ -26,7 +26,7 @@ pub fn run_operator(
operator_definition: OperatorDefinition,
incoming_events: flume::Receiver<IncomingEvent>,
events_tx: Sender<OperatorEvent>,
init_done: oneshot::Sender<()>,
init_done: oneshot::Sender<Result<()>>,
) -> eyre::Result<()> {
#[cfg(feature = "telemetry")]
let tracer = dora_tracing::telemetry::init_tracing(


+ 11
- 4
binaries/runtime/src/operator/python.rs View File

@@ -33,7 +33,7 @@ pub fn run(
events_tx: Sender<OperatorEvent>,
incoming_events: flume::Receiver<IncomingEvent>,
tracer: Tracer,
init_done: oneshot::Sender<()>,
init_done: oneshot::Sender<Result<()>>,
) -> eyre::Result<()> {
let path = if source_is_url(source) {
let target_path = Path::new("build")
@@ -98,9 +98,16 @@ pub fn run(

let python_runner = move || {
let operator =
Python::with_gil(init_operator).wrap_err("failed to init python operator")?;

let _ = init_done.send(());
match Python::with_gil(init_operator).wrap_err("failed to init python operator") {
Ok(op) => {
let _ = init_done.send(Ok(()));
op
}
Err(err) => {
let _ = init_done.send(Err(err));
bail!("Could not init python operator")
}
};

let reason = loop {
let Ok(mut event) = incoming_events.recv() else { break StopReason::InputsClosed };


+ 12
- 5
binaries/runtime/src/operator/shared_lib.rs View File

@@ -10,7 +10,7 @@ use dora_operator_api_types::{
safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnEvent,
DoraResult, DoraStatus, Metadata, OnEventResult, Output, SendOutput,
};
use eyre::{bail, eyre, Context};
use eyre::{bail, eyre, Context, Result};
use libloading::Symbol;
use std::{
borrow::Cow,
@@ -28,7 +28,7 @@ pub fn run(
events_tx: Sender<OperatorEvent>,
incoming_events: flume::Receiver<IncomingEvent>,
tracer: Tracer,
init_done: oneshot::Sender<()>,
init_done: oneshot::Sender<Result<()>>,
) -> eyre::Result<()> {
let path = if source_is_url(source) {
let target_path = adjust_shared_library_path(
@@ -86,14 +86,21 @@ struct SharedLibraryOperator<'lib> {
}

impl<'lib> SharedLibraryOperator<'lib> {
fn run(self, tracer: Tracer, init_done: oneshot::Sender<()>) -> eyre::Result<StopReason> {
fn run(
self,
tracer: Tracer,
init_done: oneshot::Sender<Result<()>>,
) -> eyre::Result<StopReason> {
let operator_context = {
let DoraInitResult {
result,
operator_context,
} = unsafe { (self.bindings.init_operator.init_operator)() };
let raw = match result.error {
Some(error) => bail!("init_operator failed: {}", String::from(error)),
Some(error) => {
let _ = init_done.send(Err(eyre!(error.to_string())));
bail!("init_operator failed: {}", String::from(error))
}
None => operator_context,
};
OperatorContext {
@@ -102,7 +109,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
}
};

let _ = init_done.send(());
let _ = init_done.send(Ok(()));

let send_output_closure = Arc::new(move |output: Output| {
let Output {


Loading…
Cancel
Save