Browse Source

refactor: use builder style

fix: logging thread block
tags/v0.3.12-rc0
sjfhsjfh 8 months ago
parent
commit
51190788ba
No known key found for this signature in database GPG Key ID: CA820EAE1C115AED
5 changed files with 131 additions and 74 deletions
  1. +20
    -19
      apis/rust/node/src/node/mod.rs
  2. +27
    -20
      binaries/cli/src/lib.rs
  3. +9
    -5
      binaries/runtime/src/lib.rs
  4. +2
    -2
      libraries/extensions/telemetry/metrics/src/lib.rs
  5. +73
    -28
      libraries/extensions/telemetry/tracing/src/lib.rs

+ 20
- 19
apis/rust/node/src/node/mod.rs View File

@@ -32,9 +32,10 @@ use std::{
use tracing::{info, warn};

#[cfg(feature = "metrics")]
use dora_metrics::init_meter_provider;
use dora_metrics::run_metrics_monitor;
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use dora_tracing::TracingBuilder;

use tokio::runtime::{Handle, Runtime};

pub mod arrow_utils;
@@ -81,8 +82,12 @@ impl DoraNode {
serde_yaml::from_str(&raw).context("failed to deserialize node config")?
};
#[cfg(feature = "tracing")]
set_up_tracing(node_config.node_id.as_ref())
.context("failed to set up tracing subscriber")?;
{
TracingBuilder::new(node_config.node_id.as_ref())
.build()
.wrap_err("failed to set up tracing subscriber")?;
}

Self::init(node_config)
}

@@ -156,24 +161,20 @@ impl DoraNode {
let id = format!("{}/{}", dataflow_id, node_id);

#[cfg(feature = "metrics")]
match &rt {
TokioRuntime::Runtime(rt) => rt.spawn(async {
if let Err(e) = init_meter_provider(id)
.await
.context("failed to init metrics provider")
{
warn!("could not create metric provider with err: {:#?}", e);
}
}),
TokioRuntime::Handle(handle) => handle.spawn(async {
if let Err(e) = init_meter_provider(id)
{
let monitor_task = async move {
if let Err(e) = run_metrics_monitor(id.clone())
.await
.context("failed to init metrics provider")
.wrap_err("metrics monitor exited unexpectedly")
{
warn!("could not create metric provider with err: {:#?}", e);
warn!("metrics monitor failed: {:#?}", e);
}
}),
};
};
match &rt {
TokioRuntime::Runtime(rt) => rt.spawn(monitor_task),
TokioRuntime::Handle(handle) => handle.spawn(monitor_task),
};
}

let event_stream = EventStream::init(
dataflow_id,


+ 27
- 20
binaries/cli/src/lib.rs View File

@@ -16,8 +16,7 @@ use dora_message::{
coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus},
};
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use dora_tracing::{set_up_tracing_opts, FileLogging};
use dora_tracing::TracingBuilder;
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
@@ -311,34 +310,42 @@ fn run(args: Args) -> eyre::Result<()> {
.as_ref()
.map(|id| format!("{name}-{id}"))
.unwrap_or(name.to_string());
let stdout = (!quiet).then_some("info,zenoh=warn");
let file = Some(FileLogging {
file_name: filename,
filter: LevelFilter::INFO,
});
set_up_tracing_opts(name, stdout, file)
.context("failed to set up tracing subscriber")?;

let mut builder = TracingBuilder::new(name);
if !quiet {
builder = builder.with_stdout("info,zenoh=warn");
}
builder = builder.with_file(filename, LevelFilter::INFO)?;
builder
.build()
.wrap_err("failed to set up tracing subscriber")?;
}
Command::Runtime => {
// Do not set the runtime in the cli.
}
Command::Coordinator { quiet, .. } => {
let name = "dora-coordinator";
let stdout = (!quiet).then_some("info");
let file = Some(FileLogging {
file_name: name.to_owned(),
filter: LevelFilter::INFO,
});
set_up_tracing_opts(name, stdout, file)
.context("failed to set up tracing subscriber")?;
let mut builder = TracingBuilder::new(name);
if !quiet {
builder = builder.with_stdout("info");
}
builder = builder.with_file(name, LevelFilter::INFO)?;
builder
.build()
.wrap_err("failed to set up tracing subscriber")?;
}
Command::Run { .. } => {
let log_level = std::env::var("RUST_LOG").ok().or(Some("info".to_string()));
set_up_tracing_opts("run", log_level.as_deref(), None)
.context("failed to set up tracing subscriber")?;
let log_level = std::env::var("RUST_LOG").ok().unwrap_or("info".to_string());
TracingBuilder::new("run")
.with_stdout(log_level)
.build()
.wrap_err("failed to set up tracing subscriber")?;
}
_ => {
set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?;
TracingBuilder::new("dora-cli")
.with_stdout("warn")
.build()
.wrap_err("failed to set up tracing subscriber")?;
}
};



+ 9
- 5
binaries/runtime/src/lib.rs View File

@@ -5,15 +5,14 @@ use dora_core::{
descriptor::OperatorConfig,
};
use dora_message::daemon_to_node::{NodeConfig, RuntimeConfig};
use dora_metrics::init_meter_provider;
use dora_metrics::run_metrics_monitor;
use dora_node_api::{DoraNode, Event};
use dora_tracing::TracingBuilder;
use eyre::{bail, Context, Result};
use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge;
use operator::{run_operator, OperatorEvent, StopReason};

#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
mem,
@@ -37,7 +36,12 @@ pub fn main() -> eyre::Result<()> {
} = config;
let node_id = config.node_id.clone();
#[cfg(feature = "tracing")]
set_up_tracing(node_id.as_ref()).context("failed to set up tracing subscriber")?;
{
TracingBuilder::new(node_id.as_ref())
.with_stdout("warn")
.build()
.wrap_err("failed to set up tracing subscriber")?;
}

let dataflow_descriptor = config.dataflow_descriptor.clone();

@@ -123,7 +127,7 @@ async fn run(
init_done: oneshot::Receiver<Result<()>>,
) -> eyre::Result<()> {
#[cfg(feature = "metrics")]
let _meter_provider = init_meter_provider(config.node_id.to_string());
let _meter_provider = run_metrics_monitor(config.node_id.to_string());
init_done
.await
.wrap_err("the `init_done` channel was closed unexpectedly")?


+ 2
- 2
libraries/extensions/telemetry/metrics/src/lib.rs View File

@@ -31,7 +31,7 @@ pub fn init_metrics() -> SdkMeterProvider {
.build()
}

pub async fn init_meter_provider(meter_id: String) -> Result<SdkMeterProvider> {
pub async fn run_metrics_monitor(meter_id: String) -> Result<()> {
let meter_provider = init_metrics();
global::set_meter_provider(meter_provider.clone());
let scope = InstrumentationScope::builder(meter_id)
@@ -40,5 +40,5 @@ pub async fn init_meter_provider(meter_id: String) -> Result<SdkMeterProvider> {
let meter = global::meter_with_scope(scope);

init_process_observer(meter).await.unwrap();
Ok(meter_provider)
Ok(())
}

+ 73
- 28
libraries/extensions/telemetry/tracing/src/lib.rs View File

@@ -11,38 +11,61 @@ use tracing_subscriber::{
filter::FilterExt, prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Layer,
};

use eyre::ContextCompat;
use tracing_subscriber::Registry;
pub mod telemetry;

/// Setup tracing with a default configuration.
///
/// This will set up a global subscriber that logs to stdout with a filter level of "warn".
///
/// Should **ONLY** be used in `DoraNode` implementations.
pub fn set_up_tracing(name: &str) -> eyre::Result<()> {
set_up_tracing_opts(name, Some("warn"), None)
TracingBuilder::new(name)
.with_stdout("warn")
.build()
.wrap_err(format!(
"failed to set tracing global subscriber for {name}"
))?;
Ok(())
}

pub struct FileLogging {
pub file_name: String,
pub filter: LevelFilter,
#[must_use = "call `build` to finalize the tracing setup"]
pub struct TracingBuilder {
name: String,
layers: Vec<Box<dyn Layer<Registry> + Send + Sync>>,
}

pub fn set_up_tracing_opts(
name: &str,
stdout_filter: Option<impl AsRef<str>>,
file: Option<FileLogging>,
) -> eyre::Result<()> {
let mut layers = Vec::new();
impl TracingBuilder {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
layers: Vec::new(),
}
}

if let Some(filter) = stdout_filter {
/// Add a layer that write logs to the [std::io::stdout] with the given filter.
///
/// **DO NOT** use this in `DoraNode` implementations,
/// it uses [std::io::stdout] which is synchronous
/// and might block the logging thread.
pub fn with_stdout(mut self, filter: impl AsRef<str>) -> Self {
let parsed = EnvFilter::builder().parse_lossy(filter);
// Filter log using `RUST_LOG`. More useful for CLI.
let env_filter = EnvFilter::from_default_env().or(parsed);
let layer = tracing_subscriber::fmt::layer()
.compact()
.with_writer(std::io::stdout)
.with_filter(env_filter);
layers.push(layer.boxed());
self.layers.push(layer.boxed());
self
}

if let Some(file) = file {
let FileLogging { file_name, filter } = file;
/// Add a layer that write logs to a file with the given name and filter.
pub fn with_file(
mut self,
file_name: impl Into<String>,
filter: LevelFilter,
) -> eyre::Result<Self> {
let file_name = file_name.into();
let out_dir = Path::new("out");
std::fs::create_dir_all(out_dir).context("failed to create `out` directory")?;
let path = out_dir.join(file_name).with_extension("txt");
@@ -51,26 +74,48 @@ pub fn set_up_tracing_opts(
.append(true)
.open(path)
.context("failed to create log file")?;
// Filter log using `RUST_LOG`. More useful for CLI.
let layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(file)
.with_filter(filter);
layers.push(layer.boxed());
self.layers.push(layer.boxed());
Ok(self)
}

if let Some(endpoint) = std::env::var_os("DORA_JAEGER_TRACING") {
let endpoint = endpoint
.to_str()
.wrap_err("Could not parse env variable: DORA_JAEGER_TRACING")?;
let tracer = crate::telemetry::init_jaeger_tracing(name, endpoint)
pub fn with_jaeger_tracing(mut self) -> eyre::Result<Self> {
let endpoint = std::env::var("DORA_JAEGER_TRACING")
.wrap_err("DORA_JAEGER_TRACING environment variable not set")?;
let tracer = crate::telemetry::init_jaeger_tracing(&self.name, &endpoint)
.wrap_err("Could not instantiate tracing")?;
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
layers.push(telemetry.boxed());
self.layers.push(telemetry.boxed());
Ok(self)
}

pub fn add_layer<L>(mut self, layer: L) -> Self
where
L: Layer<Registry> + Send + Sync + 'static,
{
self.layers.push(layer.boxed());
self
}

let registry = Registry::default().with(layers);
tracing::subscriber::set_global_default(registry).context(format!(
"failed to set tracing global subscriber for {name}"
))
pub fn with_layers<I, L>(mut self, layers: I) -> Self
where
I: IntoIterator<Item = L>,
L: Layer<Registry> + Send + Sync + 'static,
{
for layer in layers {
self.layers.push(layer.boxed());
}
self
}

pub fn build(self) -> eyre::Result<()> {
let registry = Registry::default().with(self.layers);
tracing::subscriber::set_global_default(registry).context(format!(
"failed to set tracing global subscriber for {}",
self.name
))
}
}

Loading…
Cancel
Save