Browse Source

Use `tracing` for logging state of nodes, coordinator, and runtimes

tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
7e9c3050f9
Failed to extract signature
11 changed files with 122 additions and 39 deletions
  1. +65
    -10
      Cargo.lock
  2. +1
    -0
      apis/c/node/Cargo.toml
  3. +2
    -2
      apis/c/node/src/lib.rs
  4. +3
    -1
      apis/rust/node/Cargo.toml
  5. +14
    -0
      apis/rust/node/src/lib.rs
  6. +2
    -0
      binaries/coordinator/Cargo.toml
  7. +4
    -2
      binaries/coordinator/src/lib.rs
  8. +13
    -0
      binaries/coordinator/src/main.rs
  9. +6
    -2
      binaries/runtime/Cargo.toml
  10. +10
    -21
      binaries/runtime/src/main.rs
  11. +2
    -1
      binaries/runtime/src/operator/mod.rs

+ 65
- 10
Cargo.lock View File

@@ -805,6 +805,8 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util 0.7.1",
"tracing",
"tracing-subscriber",
"uuid 0.8.2",
]

@@ -861,6 +863,7 @@ dependencies = [
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
"uuid 1.1.2",
]

@@ -871,6 +874,7 @@ dependencies = [
"dora-node-api",
"eyre",
"flume",
"tracing",
]

[[package]]
@@ -932,11 +936,12 @@ dependencies = [
"futures",
"futures-concurrency",
"libloading",
"log",
"pyo3",
"serde_yaml",
"tokio",
"tokio-stream",
"tracing",
"tracing-subscriber",
"zenoh",
"zenoh-config",
]
@@ -2204,9 +2209,9 @@ dependencies = [

[[package]]
name = "pin-project-lite"
version = "0.2.8"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"

[[package]]
name = "pin-utils"
@@ -2996,6 +3001,15 @@ dependencies = [
"opaque-debug 0.3.0",
]

[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]

[[package]]
name = "shared_memory"
version = "0.12.0"
@@ -3215,6 +3229,15 @@ dependencies = [
"syn",
]

[[package]]
name = "thread_local"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180"
dependencies = [
"once_cell",
]

[[package]]
name = "threadpool"
version = "1.8.1"
@@ -3420,9 +3443,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"

[[package]]
name = "tracing"
version = "0.1.33"
version = "0.1.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80b9fa4360528139bc96100c160b7ae879f5567f49f1782b0b02035b0358ebf3"
checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307"
dependencies = [
"cfg-if",
"log",
@@ -3433,9 +3456,9 @@ dependencies = [

[[package]]
name = "tracing-attributes"
version = "0.1.20"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b"
checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2"
dependencies = [
"proc-macro2",
"quote",
@@ -3444,11 +3467,12 @@ dependencies = [

[[package]]
name = "tracing-core"
version = "0.1.23"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c"
checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7"
dependencies = [
"lazy_static",
"once_cell",
"valuable",
]

[[package]]
@@ -3461,6 +3485,31 @@ dependencies = [
"tracing",
]

[[package]]
name = "tracing-log"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]

[[package]]
name = "tracing-subscriber"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60db860322da191b40952ad9affe65ea23e7dd6a5c442c2c42865810c6ab8e6b"
dependencies = [
"ansi_term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
]

[[package]]
name = "try-lock"
version = "0.2.3"
@@ -3632,6 +3681,12 @@ dependencies = [
"unzip-n",
]

[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"

[[package]]
name = "value-bag"
version = "1.0.0-alpha.9"


+ 1
- 0
apis/c/node/Cargo.toml View File

@@ -12,6 +12,7 @@ crate-type = ["staticlib"]
[dependencies]
eyre = "0.6.8"
flume = "0.10.14"
tracing = "0.1.33"

[dependencies.dora-node-api]
default-features = false


+ 2
- 2
apis/c/node/src/lib.rs View File

@@ -30,7 +30,7 @@ pub extern "C" fn init_dora_context_from_env() -> *mut c_void {
Ok(n) => n,
Err(err) => {
let err: eyre::Error = err;
eprintln!("{err:?}");
tracing::error!("{err:?}");
return ptr::null_mut();
}
};
@@ -175,7 +175,7 @@ pub unsafe extern "C" fn dora_send_output(
match unsafe { try_send_output(context, id_ptr, id_len, data_ptr, data_len) } {
Ok(()) => 0,
Err(err) => {
eprintln!("{err:?}");
tracing::error!("{err:?}");
-1
}
}


+ 3
- 1
apis/rust/node/Cargo.toml View File

@@ -5,9 +5,10 @@ edition = "2021"
license = "Apache-2.0"

[features]
default = ["zenoh", "iceoryx"]
default = ["zenoh", "iceoryx", "tracing-subscriber"]
zenoh = ["communication-layer-pub-sub/zenoh"]
iceoryx = ["communication-layer-pub-sub/iceoryx"]
tracing-subscriber = ["dep:tracing-subscriber"]

[dependencies]
eyre = "0.6.7"
@@ -16,6 +17,7 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.8.23"
thiserror = "1.0.30"
tracing = "0.1.33"
tracing-subscriber = { version = "0.3.15", optional = true }
flume = "0.10.14"
communication-layer-pub-sub = { path = "../../../libraries/communication-layer", default-features = false }
uuid = { version = "1.1.2", features = ["v4"] }


+ 14
- 0
apis/rust/node/src/lib.rs View File

@@ -19,6 +19,9 @@ pub struct DoraNode {

impl DoraNode {
pub fn init_from_env() -> eyre::Result<Self> {
#[cfg(feature = "tracing-subscriber")]
set_up_tracing().context("failed to set up tracing subscriber")?;

let id = {
let raw =
std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?;
@@ -84,6 +87,7 @@ impl DoraNode {
}

impl Drop for DoraNode {
#[tracing::instrument(skip(self), fields(self.id = %self.id))]
fn drop(&mut self) {
let self_id = &self.id;
let topic = format!("{self_id}/{STOP_TOPIC}");
@@ -110,6 +114,16 @@ impl Drop for DoraNode {

pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

#[cfg(feature = "tracing-subscriber")]
fn set_up_tracing() -> eyre::Result<()> {
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;

let stdout_log = tracing_subscriber::fmt::layer().pretty();
let subscriber = tracing_subscriber::Registry::default().with(stdout_log);
tracing::subscriber::set_global_default(subscriber)
.context("failed to set tracing global subscriber")
}

#[cfg(test)]
mod tests {
use super::*;


+ 2
- 0
binaries/coordinator/Cargo.toml View File

@@ -23,3 +23,5 @@ futures-concurrency = "2.0.3"
rand = "0.8.5"
dora-core = { version = "0.1.0", path = "../../libraries/core" }
dora-message = { path = "../../libraries/message" }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"

+ 4
- 2
binaries/coordinator/src/lib.rs View File

@@ -137,6 +137,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
Ok(())
}

#[tracing::instrument]
fn spawn_custom_node(
node_id: NodeId,
node: &descriptor::CustomNode,
@@ -180,7 +181,7 @@ fn spawn_custom_node(
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
println!("node {node_id} finished");
tracing::info!("node {node_id} finished");
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!("node {node_id} failed with exit code: {code}"))
@@ -191,6 +192,7 @@ fn spawn_custom_node(
Ok(result)
}

#[tracing::instrument(skip(node))]
fn spawn_runtime_node(
runtime: &Path,
node_id: NodeId,
@@ -211,7 +213,7 @@ fn spawn_runtime_node(
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
println!("runtime node {node_id} finished");
tracing::info!("runtime node {node_id} finished");
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!(


+ 13
- 0
binaries/coordinator/src/main.rs View File

@@ -1,5 +1,18 @@
use eyre::Context;

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing().context("failed to set up tracing subscriber")?;

let command = clap::Parser::parse();
dora_coordinator::run(command).await
}

fn set_up_tracing() -> eyre::Result<()> {
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;

let stdout_log = tracing_subscriber::fmt::layer().pretty();
let subscriber = tracing_subscriber::Registry::default().with(stdout_log);
tracing::subscriber::set_global_default(subscriber)
.context("failed to set tracing global subscriber")
}

+ 6
- 2
binaries/runtime/Cargo.toml View File

@@ -8,7 +8,10 @@ license = "Apache-2.0"

[dependencies]
clap = { version = "3.1.12", features = ["derive"] }
dora-node-api = { path = "../../apis/rust/node" }
dora-node-api = { path = "../../apis/rust/node", default-features = false, features = [
"zenoh",
"iceoryx",
] }
dora-operator-api-types = { path = "../../apis/rust/operator/types" }
dora-core = { version = "0.1.0", path = "../../libraries/core" }
eyre = "0.6.8"
@@ -20,8 +23,9 @@ tokio = { version = "1.17.0", features = ["full"] }
tokio-stream = "0.1.8"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
log = "0.4.17"
fern = "0.6.1"
pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre"] }
flume = "0.10.14"
dora-message = { path = "../../libraries/message" }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"

+ 10
- 21
binaries/runtime/src/main.rs View File

@@ -19,7 +19,7 @@ use tokio_stream::{wrappers::ReceiverStream, StreamMap};
mod operator;

fn main() -> eyre::Result<()> {
set_up_logger()?;
set_up_tracing().context("failed to set up tracing subscriber")?;

let node_id = {
let raw =
@@ -94,7 +94,7 @@ async fn run(
OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload),
OperatorEvent::Finished => {
if let Some(stop_publisher) = operator_stop_publishers.remove(&id) {
println!("operator {node_id}/{id} finished");
tracing::info!("operator {node_id}/{id} finished");
stopped_operators.insert(id.clone());
// send stopped message
tokio::task::spawn_blocking(move || stop_publisher.publish(&[]))
@@ -110,7 +110,7 @@ async fn run(
break;
}
} else {
log::warn!("no stop publisher for {id}");
tracing::warn!("no stop publisher for {id}");
}
}
}
@@ -143,22 +143,11 @@ enum Event {
},
}

fn set_up_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
" [{}][{}] {}",
record.target(),
record.level(),
message
))
})
.level(log::LevelFilter::Debug)
.level_for("zenoh", log::LevelFilter::Warn)
.level_for("zenoh_transport", log::LevelFilter::Warn)
.level_for("zenoh_link_tcp", log::LevelFilter::Warn)
.chain(std::io::stdout())
.chain(fern::log_file("runtime.log")?)
.apply()?;
Ok(())
fn set_up_tracing() -> eyre::Result<()> {
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;

let stdout_log = tracing_subscriber::fmt::layer().pretty();
let subscriber = tracing_subscriber::Registry::default().with(stdout_log);
tracing::subscriber::set_global_default(subscriber)
.context("failed to set tracing global subscriber")
}

+ 2
- 1
binaries/runtime/src/operator/mod.rs View File

@@ -10,6 +10,7 @@ use tokio::sync::mpsc::Sender;
mod python;
mod shared_lib;

#[tracing::instrument(skip(communication))]
pub fn spawn_operator(
node_id: &NodeId,
operator_definition: OperatorDefinition,
@@ -59,7 +60,7 @@ pub fn spawn_operator(
})?;
}
OperatorSource::Wasm(_path) => {
eprintln!("WARNING: WASM operators are not supported yet");
tracing::error!("WASM operators are not supported yet");
}
}
Ok(())


Loading…
Cancel
Save