Browse Source

Retrieve `open_telemetry_context` from metadata parameters.

tags/v0.3.6-rc0
haixuanTao 1 year ago
parent
commit
7bcb132075
4 changed files with 50 additions and 30 deletions
  1. +12
    -9
      binaries/daemon/src/lib.rs
  2. +16
    -7
      binaries/runtime/src/operator/python.rs
  3. +15
    -12
      binaries/runtime/src/operator/shared_lib.rs
  4. +7
    -2
      node-hub/dora-record/src/main.rs

+ 12
- 9
binaries/daemon/src/lib.rs View File

@@ -8,7 +8,7 @@ use dora_core::daemon_messages::{
};
use dora_core::descriptor::runtime_node_inputs;
use dora_core::message::uhlc::{self, HLC};
use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters};
use dora_core::message::{ArrowTypeInfo, Metadata};
use dora_core::topics::LOCALHOST;
use dora_core::topics::{
DataflowDaemonResult, DataflowResult, NodeError, NodeErrorCause, NodeExitStatus,
@@ -23,6 +23,7 @@ use dora_core::{
descriptor::{CoreNodeKind, Descriptor, ResolvedNode},
};

use dora_node_api::Parameter;
use eyre::{bail, eyre, Context, ContextCompat, Result};
use futures::{future, stream, FutureExt, TryFutureExt};
use futures_concurrency::stream::Merge;
@@ -1543,17 +1544,19 @@ impl RunningDataflow {
let span = tracing::span!(tracing::Level::TRACE, "tick");
let _ = span.enter();

let mut parameters = BTreeMap::new();
parameters.insert(
"open_telemetry_context".to_string(),
#[cfg(feature = "telemetry")]
Parameter::String(serialize_context(&span.context())),
#[cfg(not(feature = "telemetry"))]
Parameter::String("".into()),
);

let metadata = dora_core::message::Metadata::from_parameters(
hlc.new_timestamp(),
ArrowTypeInfo::empty(),
MetadataParameters {
watermark: 0,
deadline: 0,
#[cfg(feature = "telemetry")]
open_telemetry_context: serialize_context(&span.context()),
#[cfg(not(feature = "telemetry"))]
open_telemetry_context: "".into(),
},
parameters,
);

let event = Timestamped {


+ 16
- 7
binaries/runtime/src/operator/python.rs View File

@@ -6,7 +6,7 @@ use dora_core::{
descriptor::{source_is_url, Descriptor, PythonSource},
};
use dora_download::download_file;
use dora_node_api::{merged::MergedEvent, Event};
use dora_node_api::{merged::MergedEvent, Event, Parameter};
use dora_operator_api_python::PyEvent;
use dora_operator_api_types::DoraStatus;
use eyre::{bail, eyre, Context, Result};
@@ -201,11 +201,15 @@ pub fn run(
use tracing_opentelemetry::OpenTelemetrySpanExt;
span.record("input_id", input_id.as_str());

let cx = deserialize_context(&metadata.parameters.open_telemetry_context);
let otel = metadata.open_telemetry_context();
let cx = deserialize_context(&otel);
span.set_parent(cx);
let cx = span.context();
let string_cx = serialize_context(&cx);
metadata.parameters.open_telemetry_context = string_cx;
metadata.parameters.insert(
"open_telemetry_context".to_string(),
Parameter::String(string_cx),
);
}

let py_event = PyEvent {
@@ -317,17 +321,22 @@ mod callback_impl {
metadata: Option<Bound<'_, PyDict>>,
py: Python,
) -> Result<()> {
let parameters = pydict_to_metadata(metadata)
.wrap_err("failed to parse metadata")?
.into_owned();
let parameters = pydict_to_metadata(metadata).wrap_err("failed to parse metadata")?;
let span = span!(
tracing::Level::TRACE,
"send_output",
output_id = field::Empty
);
span.record("output_id", output);
let otel = if let Some(dora_node_api::Parameter::String(otel)) =
parameters.get("open_telemetry_context")
{
otel.to_string()
} else {
"".to_string()
};

let cx = deserialize_context(&parameters.open_telemetry_context);
let cx = deserialize_context(&otel);
span.set_parent(cx);
let _ = span.enter();



+ 15
- 12
binaries/runtime/src/operator/shared_lib.rs View File

@@ -8,7 +8,7 @@ use dora_core::{
use dora_download::download_file;
use dora_node_api::{
arrow_utils::{copy_array_into_sample, required_data_size},
Event, MetadataParameters,
Event, Parameter,
};
use dora_operator_api_types::{
safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnEvent,
@@ -17,6 +17,7 @@ use dora_operator_api_types::{
use eyre::{bail, eyre, Context, Result};
use libloading::Symbol;
use std::{
collections::BTreeMap,
ffi::c_void,
panic::{catch_unwind, AssertUnwindSafe},
path::Path,
@@ -119,10 +120,11 @@ impl<'lib> SharedLibraryOperator<'lib> {
open_telemetry_context,
},
} = output;
let parameters = MetadataParameters {
open_telemetry_context: open_telemetry_context.into(),
..Default::default()
};
let mut parameters = BTreeMap::new();
parameters.insert(
"open_telemetry_context".to_string(),
Parameter::String(open_telemetry_context.to_string()),
);

let arrow_array = match unsafe { arrow::ffi::from_ffi(data_array, &schema) } {
Ok(a) => a,
@@ -173,11 +175,15 @@ impl<'lib> SharedLibraryOperator<'lib> {
use tracing_opentelemetry::OpenTelemetrySpanExt;
span.record("input_id", input_id.as_str());

let cx = deserialize_context(&metadata.parameters.open_telemetry_context);
let otel = metadata.open_telemetry_context();
let cx = deserialize_context(&otel);
span.set_parent(cx);
let cx = span.context();
let string_cx = serialize_context(&cx);
metadata.parameters.open_telemetry_context = string_cx;
metadata.parameters.insert(
"open_telemetry_context".to_string(),
Parameter::String(string_cx),
);
}

let mut operator_event = match event {
@@ -193,16 +199,13 @@ impl<'lib> SharedLibraryOperator<'lib> {
data,
} => {
let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?;
let otel = metadata.open_telemetry_context();
let operator_input = dora_operator_api_types::Input {
id: String::from(input_id).into(),
data_array: Some(data_array),
schema,
metadata: Metadata {
open_telemetry_context: metadata
.parameters
.open_telemetry_context
.into(),
open_telemetry_context: otel.into(),
},
};
dora_operator_api_types::RawEvent {


+ 7
- 2
node-hub/dora-record/src/main.rs View File

@@ -100,7 +100,12 @@ async fn main() -> eyre::Result<()> {
None => {}
Some(tx) => drop(tx),
},
_ => {}
Event::Error(err) => {
println!("Error: {}", err);
}
event => {
println!("Event: {event:#?}")
}
}
}

@@ -137,7 +142,7 @@ async fn write_event(
let timestamp_utc = TimestampMillisecondArray::from(vec![dt.timestamp_millis()]);
let timestamp_utc = make_array(timestamp_utc.into());

let string_otel_context = metadata.parameters.open_telemetry_context.to_string();
let string_otel_context = metadata.open_telemetry_context();
let otel_context = deserialize_to_hashmap(&string_otel_context);
let traceparent = otel_context.get("traceparent");
let trace_id = match traceparent {


Loading…
Cancel
Save