From 7bcb132075d703d697c621895c509c739ab36166 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 21 Jul 2024 08:01:44 +0200 Subject: [PATCH] Retrieve `open_telemetry_context` from metadata parameters. --- binaries/daemon/src/lib.rs | 21 +++++++++------- binaries/runtime/src/operator/python.rs | 23 ++++++++++++------ binaries/runtime/src/operator/shared_lib.rs | 27 ++++++++++++--------- node-hub/dora-record/src/main.rs | 9 +++++-- 4 files changed, 50 insertions(+), 30 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index de799ad6..d8e19e3d 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -8,7 +8,7 @@ use dora_core::daemon_messages::{ }; use dora_core::descriptor::runtime_node_inputs; use dora_core::message::uhlc::{self, HLC}; -use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; +use dora_core::message::{ArrowTypeInfo, Metadata}; use dora_core::topics::LOCALHOST; use dora_core::topics::{ DataflowDaemonResult, DataflowResult, NodeError, NodeErrorCause, NodeExitStatus, @@ -23,6 +23,7 @@ use dora_core::{ descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, }; +use dora_node_api::Parameter; use eyre::{bail, eyre, Context, ContextCompat, Result}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; @@ -1543,17 +1544,19 @@ impl RunningDataflow { let span = tracing::span!(tracing::Level::TRACE, "tick"); let _ = span.enter(); + let mut parameters = BTreeMap::new(); + parameters.insert( + "open_telemetry_context".to_string(), + #[cfg(feature = "telemetry")] + Parameter::String(serialize_context(&span.context())), + #[cfg(not(feature = "telemetry"))] + Parameter::String("".into()), + ); + let metadata = dora_core::message::Metadata::from_parameters( hlc.new_timestamp(), ArrowTypeInfo::empty(), - MetadataParameters { - watermark: 0, - deadline: 0, - #[cfg(feature = "telemetry")] - open_telemetry_context: serialize_context(&span.context()), - #[cfg(not(feature = "telemetry"))] - open_telemetry_context: "".into(), - }, + parameters, ); let event = Timestamped { diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index fd436f1a..21e00ec6 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -6,7 +6,7 @@ use dora_core::{ descriptor::{source_is_url, Descriptor, PythonSource}, }; use dora_download::download_file; -use dora_node_api::{merged::MergedEvent, Event}; +use dora_node_api::{merged::MergedEvent, Event, Parameter}; use dora_operator_api_python::PyEvent; use dora_operator_api_types::DoraStatus; use eyre::{bail, eyre, Context, Result}; @@ -201,11 +201,15 @@ pub fn run( use tracing_opentelemetry::OpenTelemetrySpanExt; span.record("input_id", input_id.as_str()); - let cx = deserialize_context(&metadata.parameters.open_telemetry_context); + let otel = metadata.open_telemetry_context(); + let cx = deserialize_context(&otel); span.set_parent(cx); let cx = span.context(); let string_cx = serialize_context(&cx); - metadata.parameters.open_telemetry_context = string_cx; + metadata.parameters.insert( + "open_telemetry_context".to_string(), + Parameter::String(string_cx), + ); } let py_event = PyEvent { @@ -317,17 +321,22 @@ mod callback_impl { metadata: Option>, py: Python, ) -> Result<()> { - let parameters = pydict_to_metadata(metadata) - .wrap_err("failed to parse metadata")? - .into_owned(); + let parameters = pydict_to_metadata(metadata).wrap_err("failed to parse metadata")?; let span = span!( tracing::Level::TRACE, "send_output", output_id = field::Empty ); span.record("output_id", output); + let otel = if let Some(dora_node_api::Parameter::String(otel)) = + parameters.get("open_telemetry_context") + { + otel.to_string() + } else { + "".to_string() + }; - let cx = deserialize_context(¶meters.open_telemetry_context); + let cx = deserialize_context(&otel); span.set_parent(cx); let _ = span.enter(); diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 984a760b..70fccff4 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -8,7 +8,7 @@ use dora_core::{ use dora_download::download_file; use dora_node_api::{ arrow_utils::{copy_array_into_sample, required_data_size}, - Event, MetadataParameters, + Event, Parameter, }; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnEvent, @@ -17,6 +17,7 @@ use dora_operator_api_types::{ use eyre::{bail, eyre, Context, Result}; use libloading::Symbol; use std::{ + collections::BTreeMap, ffi::c_void, panic::{catch_unwind, AssertUnwindSafe}, path::Path, @@ -119,10 +120,11 @@ impl<'lib> SharedLibraryOperator<'lib> { open_telemetry_context, }, } = output; - let parameters = MetadataParameters { - open_telemetry_context: open_telemetry_context.into(), - ..Default::default() - }; + let mut parameters = BTreeMap::new(); + parameters.insert( + "open_telemetry_context".to_string(), + Parameter::String(open_telemetry_context.to_string()), + ); let arrow_array = match unsafe { arrow::ffi::from_ffi(data_array, &schema) } { Ok(a) => a, @@ -173,11 +175,15 @@ impl<'lib> SharedLibraryOperator<'lib> { use tracing_opentelemetry::OpenTelemetrySpanExt; span.record("input_id", input_id.as_str()); - let cx = deserialize_context(&metadata.parameters.open_telemetry_context); + let otel = metadata.open_telemetry_context(); + let cx = deserialize_context(&otel); span.set_parent(cx); let cx = span.context(); let string_cx = serialize_context(&cx); - metadata.parameters.open_telemetry_context = string_cx; + metadata.parameters.insert( + "open_telemetry_context".to_string(), + Parameter::String(string_cx), + ); } let mut operator_event = match event { @@ -193,16 +199,13 @@ impl<'lib> SharedLibraryOperator<'lib> { data, } => { let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?; - + let otel = metadata.open_telemetry_context(); let operator_input = dora_operator_api_types::Input { id: String::from(input_id).into(), data_array: Some(data_array), schema, metadata: Metadata { - open_telemetry_context: metadata - .parameters - .open_telemetry_context - .into(), + open_telemetry_context: otel.into(), }, }; dora_operator_api_types::RawEvent { diff --git a/node-hub/dora-record/src/main.rs b/node-hub/dora-record/src/main.rs index 3d9268fe..0737583f 100644 --- a/node-hub/dora-record/src/main.rs +++ b/node-hub/dora-record/src/main.rs @@ -100,7 +100,12 @@ async fn main() -> eyre::Result<()> { None => {} Some(tx) => drop(tx), }, - _ => {} + Event::Error(err) => { + println!("Error: {}", err); + } + event => { + println!("Event: {event:#?}") + } } } @@ -137,7 +142,7 @@ async fn write_event( let timestamp_utc = TimestampMillisecondArray::from(vec![dt.timestamp_millis()]); let timestamp_utc = make_array(timestamp_utc.into()); - let string_otel_context = metadata.parameters.open_telemetry_context.to_string(); + let string_otel_context = metadata.open_telemetry_context(); let otel_context = deserialize_to_hashmap(&string_otel_context); let traceparent = otel_context.get("traceparent"); let trace_id = match traceparent {