From 3ae07203cc1f1522b69b58e09490a0ea5acb0ebe Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 19 Oct 2023 10:06:13 +0200 Subject: [PATCH] replacing `zenoh-logger` with `dora-record` --- .gitignore | 3 + Cargo.lock | 17 ++- Cargo.toml | 2 +- .../python-operator-dataflow/dataflow.yml | 7 + examples/rust-dataflow/dataflow.yml | 8 ++ .../{zenoh-logger => dora-record}/Cargo.toml | 7 +- libraries/extensions/dora-record/src/main.rs | 120 ++++++++++++++++++ .../telemetry/tracing/src/telemetry.rs | 11 +- libraries/extensions/zenoh-logger/src/main.rs | 15 --- 9 files changed, 162 insertions(+), 28 deletions(-) rename libraries/extensions/{zenoh-logger => dora-record}/Cargo.toml (62%) create mode 100644 libraries/extensions/dora-record/src/main.rs delete mode 100644 libraries/extensions/zenoh-logger/src/main.rs diff --git a/.gitignore b/.gitignore index c9121e8d..557fa2e1 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,9 @@ # These are backup files generated by rustfmt **/*.rs.bk +# Remove arrow file from dora-record +**/*.arrow + # Removing images. *.jpg *.png diff --git a/Cargo.lock b/Cargo.lock index 885ac2d1..0901276f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1619,6 +1619,16 @@ dependencies = [ "safer-ffi", ] +[[package]] +name = "dora-record" +version = "0.2.6" +dependencies = [ + "chrono", + "dora-node-api", + "dora-tracing", + "eyre", +] + [[package]] name = "dora-ros2-bridge" version = "0.1.0" @@ -6569,13 +6579,6 @@ dependencies = [ "zenoh-sync", ] -[[package]] -name = "zenoh-logger" -version = "0.2.6" -dependencies = [ - "zenoh", -] - [[package]] name = "zenoh-macros" version = "0.7.0-rc" diff --git a/Cargo.toml b/Cargo.toml index cd8a6fa5..283c412f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ members = [ "libraries/shared-memory-server", "libraries/extensions/download", "libraries/extensions/telemetry/*", - "libraries/extensions/zenoh-logger", + "libraries/extensions/dora-record", "libraries/extensions/ros2-bridge", "libraries/extensions/ros2-bridge/msg-gen", "libraries/extensions/ros2-bridge/msg-gen-macro", diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index 92bf5f2b..fb08e5eb 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -21,3 +21,10 @@ nodes: inputs: image: webcam/image bbox: object_detection/bbox + - id: dora-record + custom: + build: cargo build -p dora-record + source: ../../target/release/dora-record + inputs: + image: webcam/image + bbox: object_detection/bbox diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml index 84dd4404..f19ed516 100644 --- a/examples/rust-dataflow/dataflow.yml +++ b/examples/rust-dataflow/dataflow.yml @@ -23,3 +23,11 @@ nodes: source: ../../target/debug/rust-dataflow-example-sink inputs: message: runtime-node/rust-operator/status + - id: dora-record + custom: + build: cargo build -p dora-record + source: ../../target/debug/dora-record + inputs: + message: runtime-node/rust-operator/status + tick: dora/timer/millis/100 + random: rust-node/random \ No newline at end of file diff --git a/libraries/extensions/zenoh-logger/Cargo.toml b/libraries/extensions/dora-record/Cargo.toml similarity index 62% rename from libraries/extensions/zenoh-logger/Cargo.toml rename to libraries/extensions/dora-record/Cargo.toml index 15666b60..1d8dbe73 100644 --- a/libraries/extensions/zenoh-logger/Cargo.toml +++ b/libraries/extensions/dora-record/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "zenoh-logger" +name = "dora-record" version.workspace = true edition = "2021" documentation.workspace = true @@ -9,4 +9,7 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -zenoh = "0.7.0-rc" +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" +chrono = "0.4.31" +dora-tracing = { workspace = true } diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs new file mode 100644 index 00000000..14d6ccba --- /dev/null +++ b/libraries/extensions/dora-record/src/main.rs @@ -0,0 +1,120 @@ +use chrono::{DateTime, Utc}; +use dora_node_api::{ + self, + arrow::{ + array::{make_array, Array, Int64Array, ListArray, StringArray}, + buffer::{OffsetBuffer, ScalarBuffer}, + datatypes::{DataType, Field, Schema}, + ipc::writer::FileWriter, + record_batch::RecordBatch, + }, + DoraNode, Event, Metadata, +}; +use dora_tracing::telemetry::deserialize_to_hashmap; +use eyre::{Context, ContextCompat}; +use std::{collections::HashMap, fs::File, sync::Arc}; + +// Remove once arrow-rs 48.0 is published with access to writer schema. +// See: https://github.com/apache/arrow-rs/pull/4940 +struct WriterContext { + writer: FileWriter, + schema: Arc, +} + +fn main() -> eyre::Result<()> { + let (_node, mut events) = DoraNode::init_from_env()?; + + let mut writers = HashMap::new(); + while let Some(event) = events.recv() { + match event { + Event::Input { id, data, metadata } => { + match writers.get_mut(&id) { + None => { + let field_timestamp = Field::new("timestamp", DataType::Int64, true); + let field_otel_context = Field::new("otel_context", DataType::Utf8, true); + let field_values = + Arc::new(Field::new("item", data.data_type().clone(), true)); + let field_data = Field::new(id.clone(), DataType::List(field_values), true); + + let schema = Arc::new(Schema::new(vec![ + field_otel_context, + field_timestamp, + field_data, + ])); + let file = std::fs::File::create(format!("{id}.arrow")).unwrap(); + + let writer = FileWriter::try_new(file, &schema).unwrap(); + let mut writer_context = WriterContext { writer, schema }; + write_event(&mut writer_context, data.into(), &metadata) + .context("could not write first record data")?; + writers.insert(id.clone(), writer_context); + } + Some(writer_context) => { + write_event(writer_context, data.into(), &metadata) + .context("could not write first record data")?; + } + }; + } + Event::InputClosed { id } => match writers.remove(&id) { + None => {} + Some(mut writer) => writer + .writer + .finish() + .context("Could not finish arrow file")?, + }, + _ => {} + } + } + + let result: eyre::Result> = writers + .iter_mut() + .map(|(_, wc)| -> eyre::Result<()> { + wc.writer + .finish() + .context("Could not finish writing arrow file")?; + Ok(()) + }) + .collect(); + result.context("One of the input recorder file writer failed to finish")?; + + Ok(()) +} + +fn write_event( + writer_context: &mut WriterContext, + data: Arc, + metadata: &Metadata, +) -> eyre::Result<()> { + let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, data.len() as i32])); + let field = Arc::new(Field::new("item", data.data_type().clone(), true)); + let list = ListArray::new(field.clone(), offsets, data.clone(), None); + + let timestamp = metadata.timestamp(); + let timestamp = timestamp.get_time().to_system_time(); + + let dt: DateTime = timestamp.into(); + let timestamp_array = Int64Array::from(vec![dt.timestamp_millis()]); + let timestamp_array = make_array(timestamp_array.into()); + + let string_otel_context = metadata.parameters.open_telemetry_context.to_string(); + let otel_context = deserialize_to_hashmap(&string_otel_context); + let traceparent = otel_context.get("traceparent").clone(); + let traceparent = match traceparent { + None => "", + Some(trace) => trace.split("-").nth(2).context("Trace is malformatted")?, + }; + let otel_context_array = StringArray::from(vec![traceparent]); + let otel_context_array = make_array(otel_context_array.into()); + + let record = RecordBatch::try_new( + writer_context.schema.clone(), + vec![otel_context_array, timestamp_array, make_array(list.into())], + ) + .context("Could not create record batch with the given data")?; + writer_context + .writer + .write(&record) + .context("Could not write recordbatch to file")?; + + Ok(()) +} diff --git a/libraries/extensions/telemetry/tracing/src/telemetry.rs b/libraries/extensions/telemetry/tracing/src/telemetry.rs index c24eb957..526fe970 100644 --- a/libraries/extensions/telemetry/tracing/src/telemetry.rs +++ b/libraries/extensions/telemetry/tracing/src/telemetry.rs @@ -54,12 +54,17 @@ pub fn serialize_context(context: &Context) -> String { } pub fn deserialize_context(string_context: &str) -> Context { - let mut map = MetadataMap(HashMap::new()); + let map = MetadataMap(deserialize_to_hashmap(string_context)); + global::get_text_map_propagator(|prop| prop.extract(&map)) +} + +pub fn deserialize_to_hashmap(string_context: &str) -> HashMap<&str, &str> { + let mut map = HashMap::new(); for s in string_context.split(';') { let mut values = s.split(':'); let key = values.next().unwrap(); let value = values.next().unwrap_or(""); - map.0.insert(key, value); + map.insert(key, value); } - global::get_text_map_propagator(|prop| prop.extract(&map)) + map } diff --git a/libraries/extensions/zenoh-logger/src/main.rs b/libraries/extensions/zenoh-logger/src/main.rs deleted file mode 100644 index 782cdfed..00000000 --- a/libraries/extensions/zenoh-logger/src/main.rs +++ /dev/null @@ -1,15 +0,0 @@ -use zenoh::prelude::{sync::SyncResolve, Config}; - -fn main() { - let zenoh = zenoh::open(Config::default()).res_sync().unwrap(); - let sub = zenoh - .declare_subscriber("/**") - .reliable() - .res_sync() - .unwrap(); - - loop { - let msg = sub.recv().unwrap(); - println!("{}", msg.key_expr); - } -}