|
|
|
@@ -35,7 +35,7 @@ use tracing::{info, warn}; |
|
|
|
use dora_metrics::init_meter_provider; |
|
|
|
#[cfg(feature = "tracing")] |
|
|
|
use dora_tracing::set_up_tracing; |
|
|
|
use tokio::runtime::Handle; |
|
|
|
use tokio::runtime::{Handle, Runtime}; |
|
|
|
|
|
|
|
pub mod arrow_utils; |
|
|
|
mod control_channel; |
|
|
|
@@ -43,6 +43,11 @@ mod drop_stream; |
|
|
|
|
|
|
|
pub const ZERO_COPY_THRESHOLD: usize = 4096; |
|
|
|
|
|
|
|
enum TokioRuntime { |
|
|
|
Runtime(Runtime), |
|
|
|
Handle(Handle), |
|
|
|
} |
|
|
|
|
|
|
|
pub struct DoraNode { |
|
|
|
id: NodeId, |
|
|
|
dataflow_id: DataflowId, |
|
|
|
@@ -56,7 +61,7 @@ pub struct DoraNode { |
|
|
|
|
|
|
|
dataflow_descriptor: Descriptor, |
|
|
|
warned_unknown_output: BTreeSet<DataId>, |
|
|
|
_rt: Handle, |
|
|
|
_rt: TokioRuntime, |
|
|
|
} |
|
|
|
|
|
|
|
impl DoraNode { |
|
|
|
@@ -138,26 +143,37 @@ impl DoraNode { |
|
|
|
let input_config = run_config.inputs.clone(); |
|
|
|
|
|
|
|
let rt = match Handle::try_current() { |
|
|
|
Ok(rt) => rt, |
|
|
|
Err(_) => tokio::runtime::Builder::new_multi_thread() |
|
|
|
.worker_threads(2) |
|
|
|
.enable_all() |
|
|
|
.build() |
|
|
|
.context("tokio runtime failed")? |
|
|
|
.handle() |
|
|
|
.clone(), |
|
|
|
Ok(handle) => TokioRuntime::Handle(handle), |
|
|
|
Err(_) => TokioRuntime::Runtime( |
|
|
|
tokio::runtime::Builder::new_multi_thread() |
|
|
|
.worker_threads(2) |
|
|
|
.enable_all() |
|
|
|
.build() |
|
|
|
.context("tokio runtime failed")?, |
|
|
|
), |
|
|
|
}; |
|
|
|
|
|
|
|
let id = node_id.to_string(); |
|
|
|
let id = format!("{}/{}", dataflow_id, node_id); |
|
|
|
|
|
|
|
#[cfg(feature = "metrics")] |
|
|
|
rt.spawn(async { |
|
|
|
if let Err(e) = init_meter_provider(id) |
|
|
|
.await |
|
|
|
.context("failed to init metrics provider") |
|
|
|
{ |
|
|
|
warn!("could not create metric provider with err: {:#?}", e); |
|
|
|
} |
|
|
|
}); |
|
|
|
match &rt { |
|
|
|
TokioRuntime::Runtime(rt) => rt.spawn(async { |
|
|
|
if let Err(e) = init_meter_provider(id) |
|
|
|
.await |
|
|
|
.context("failed to init metrics provider") |
|
|
|
{ |
|
|
|
warn!("could not create metric provider with err: {:#?}", e); |
|
|
|
} |
|
|
|
}), |
|
|
|
TokioRuntime::Handle(handle) => handle.spawn(async { |
|
|
|
if let Err(e) = init_meter_provider(id) |
|
|
|
.await |
|
|
|
.context("failed to init metrics provider") |
|
|
|
{ |
|
|
|
warn!("could not create metric provider with err: {:#?}", e); |
|
|
|
} |
|
|
|
}), |
|
|
|
}; |
|
|
|
|
|
|
|
let event_stream = EventStream::init( |
|
|
|
dataflow_id, |
|
|
|
|