From 1c1891446a4c2bad9cbbbee3fda403d389d6287d Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 18 Oct 2022 12:25:21 +0200 Subject: [PATCH] Add `uhlc` timestamp to message metadata --- Cargo.lock | 21 ++++- apis/c/node/src/lib.rs | 2 +- apis/python/node/src/lib.rs | 2 +- apis/python/operator/src/lib.rs | 19 +++-- apis/rust/node/src/lib.rs | 7 +- binaries/coordinator/src/run/mod.rs | 5 +- binaries/runtime/src/operator/python.rs | 10 ++- binaries/runtime/src/operator/shared_lib.rs | 16 ++-- .../c++-dataflow/node-rust-api/src/main.rs | 2 +- examples/iceoryx/node/src/main.rs | 11 ++- examples/rust-dataflow/node/src/main.rs | 11 ++- libraries/message/Cargo.toml | 1 + libraries/message/schema/message.capnp | 1 + libraries/message/src/lib.rs | 77 ++++++++++++++----- 14 files changed, 134 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d2628b5e..746c4abe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -945,6 +945,7 @@ version = "0.1.0" dependencies = [ "capnp", "capnpc", + "uhlc 0.5.1", ] [[package]] @@ -3841,6 +3842,20 @@ dependencies = [ "uuid 0.8.2", ] +[[package]] +name = "uhlc" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7908438f98a5824af02b34c2b31fb369c5764ef835d26df0badbb9897fb28245" +dependencies = [ + "hex", + "humantime", + "lazy_static", + "log", + "serde", + "uuid 1.1.2", +] + [[package]] name = "unicode-bidi" version = "0.3.8" @@ -4374,7 +4389,7 @@ dependencies = [ "serde_json", "socket2", "stop-token", - "uhlc", + "uhlc 0.4.1", "uuid 0.8.2", "vec_map", "zenoh-buffers", @@ -4633,7 +4648,7 @@ version = "0.6.0-dev.0" source = "git+https://github.com/eclipse-zenoh/zenoh.git#79a136e4fd90b11ff5d775ced981af53c4f1071b" dependencies = [ "log", - "uhlc", + "uhlc 0.4.1", "zenoh-buffers", "zenoh-core", "zenoh-protocol-core", @@ -4647,7 +4662,7 @@ dependencies = [ "hex", "lazy_static", "serde", - "uhlc", + "uhlc 0.4.1", "uuid 0.8.2", "zenoh-core", ] diff --git a/apis/c/node/src/lib.rs b/apis/c/node/src/lib.rs index 63cb209b..0760adb9 100644 --- a/apis/c/node/src/lib.rs +++ b/apis/c/node/src/lib.rs @@ -194,7 +194,7 @@ unsafe fn try_send_output( let data = unsafe { slice::from_raw_parts(data_ptr, data_len) }; context .node - .send_output(&output_id, &Default::default(), data.len(), |out| { + .send_output(&output_id, Default::default(), data.len(), |out| { out.copy_from_slice(data); }) } diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 7fec3e78..357960e5 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -67,7 +67,7 @@ impl Node { let data = &data.as_bytes(); let metadata = pydict_to_metadata(metadata)?; self.node - .send_output(&output_id.into(), &metadata, data.len(), |out| { + .send_output(&output_id.into(), metadata, data.len(), |out| { out.copy_from_slice(data); }) .wrap_err("Could not send output") diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 0dce2cbe..79fcd164 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,18 +1,14 @@ use std::borrow::Cow; -use dora_node_api::Metadata; +use dora_node_api::{Metadata, MetadataParameters}; use eyre::{Context, Result}; use pyo3::{prelude::*, types::PyDict}; -pub fn pydict_to_metadata(dict: Option<&PyDict>) -> Result { - let mut default_metadata = Metadata::default(); +pub fn pydict_to_metadata(dict: Option<&PyDict>) -> Result { + let mut default_metadata = MetadataParameters::default(); if let Some(metadata) = dict { for (key, value) in metadata.iter() { match key.extract::<&str>().context("Parsing metadata keys")? { - "metadata_version" => { - default_metadata.metadata_version = - value.extract().context("parsing metadata version failed")?; - } "watermark" => { default_metadata.watermark = value.extract().context("parsing watermark failed")?; @@ -36,8 +32,11 @@ pub fn pydict_to_metadata(dict: Option<&PyDict>) -> Result { pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> &'a PyDict { let dict = PyDict::new(py); - dict.set_item("open_telemetry_context", &metadata.open_telemetry_context) - .wrap_err("could not make metadata a python dictionary item") - .unwrap(); + dict.set_item( + "open_telemetry_context", + &metadata.parameters.open_telemetry_context, + ) + .wrap_err("could not make metadata a python dictionary item") + .unwrap(); dict } diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index b75e7306..ef9fe072 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,5 +1,5 @@ pub use communication::Input; -pub use dora_message::Metadata; +pub use dora_message::{uhlc, Metadata, MetadataParameters}; pub use flume::Receiver; use communication::STOP_TOPIC; @@ -14,6 +14,7 @@ pub struct DoraNode { id: NodeId, node_config: NodeRunConfig, communication: Box, + hlc: uhlc::HLC, } impl DoraNode { @@ -49,6 +50,7 @@ impl DoraNode { id, node_config, communication, + hlc: uhlc::HLC::default(), }) } @@ -59,7 +61,7 @@ impl DoraNode { pub fn send_output( &mut self, output_id: &DataId, - metadata: &Metadata, + parameters: MetadataParameters, data_len: usize, data: F, ) -> eyre::Result<()> @@ -69,6 +71,7 @@ impl DoraNode { if !self.node_config.outputs.contains(output_id) { eyre::bail!("unknown output"); } + let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); let serialized_metadata = metadata .serialize() .with_context(|| format!("failed to serialize `{}` message", output_id))?; diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 7b20f303..5afea3ca 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -94,10 +94,11 @@ pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Resul let duration = format_duration(interval); format!("dora/timer/{duration}") }; - let metadata = dora_message::Metadata::default(); - let data = metadata.serialize().unwrap(); + let hlc = dora_message::uhlc::HLC::default(); let mut stream = IntervalStream::new(tokio::time::interval(interval)); while (stream.next().await).is_some() { + let metadata = dora_message::Metadata::new(hlc.new_timestamp()); + let data = metadata.serialize().unwrap(); communication .publisher(&topic) .unwrap() diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index fb90ec86..fe68b502 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,6 +1,7 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] use super::{OperatorEvent, Tracer}; +use dora_message::uhlc; use dora_node_api::{communication::Publisher, config::DataId}; use dora_operator_api_python::metadata_to_pydict; use eyre::{bail, eyre, Context}; @@ -49,6 +50,7 @@ pub fn spawn( let send_output = SendOutputCallback { publishers: Arc::new(publishers), + hlc: Arc::new(uhlc::HLC::default()), }; let init_operator = move |py: Python| { @@ -109,7 +111,7 @@ pub fn spawn( let () = tracer; "".to_string() }; - input.metadata.open_telemetry_context = Cow::Owned(string_cx); + input.metadata.parameters.open_telemetry_context = Cow::Owned(string_cx); let status_enum = Python::with_gil(|py| { let input_dict = PyDict::new(py); @@ -173,12 +175,14 @@ pub fn spawn( #[derive(Clone)] struct SendOutputCallback { publishers: Arc>>, + hlc: Arc, } #[allow(unsafe_op_in_unsafe_fn)] mod callback_impl { use super::SendOutputCallback; + use dora_message::Metadata; use dora_operator_api_python::pydict_to_metadata; use eyre::{eyre, Context}; use pyo3::{ @@ -197,7 +201,9 @@ mod callback_impl { ) -> PyResult<()> { match self.publishers.get(output) { Some(publisher) => { - let message = pydict_to_metadata(metadata)? + let parameters = pydict_to_metadata(metadata)?; + let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters); + let message = metadata .serialize() .context(format!("failed to serialize `{}` metadata", output)); message.and_then(|mut message| { diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index dd524734..4d2cdee6 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,5 +1,6 @@ use super::{OperatorEvent, Tracer}; use dora_core::adjust_shared_library_path; +use dora_message::uhlc; use dora_node_api::{communication::Publisher, config::DataId}; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput, @@ -32,12 +33,17 @@ pub fn spawn( libloading::Library::new(&path) .wrap_err_with(|| format!("failed to load shared library at `{}`", path.display()))? }; + let hlc = uhlc::HLC::default(); thread::spawn(move || { let closure = AssertUnwindSafe(|| { let bindings = Bindings::init(&library).context("failed to init operator")?; - let operator = SharedLibraryOperator { inputs, bindings }; + let operator = SharedLibraryOperator { + inputs, + bindings, + hlc, + }; operator.run(publishers, tracer) }); @@ -61,6 +67,7 @@ struct SharedLibraryOperator<'lib> { inputs: Receiver, bindings: Bindings<'lib>, + hlc: uhlc::HLC, } impl<'lib> SharedLibraryOperator<'lib> { @@ -92,10 +99,9 @@ impl<'lib> SharedLibraryOperator<'lib> { open_telemetry_context, }, } = output; - let metadata = dora_node_api::Metadata { - open_telemetry_context: String::from(open_telemetry_context).into(), - ..Default::default() - }; + let mut metadata = dora_node_api::Metadata::new(self.hlc.new_timestamp()); + metadata.parameters.open_telemetry_context = + String::from(open_telemetry_context).into(); let message = metadata .serialize() diff --git a/examples/c++-dataflow/node-rust-api/src/main.rs b/examples/c++-dataflow/node-rust-api/src/main.rs index 6d1cd512..30d37d24 100644 --- a/examples/c++-dataflow/node-rust-api/src/main.rs +++ b/examples/c++-dataflow/node-rust-api/src/main.rs @@ -53,7 +53,7 @@ pub struct OutputSender<'a>(&'a mut DoraNode); fn send_output(sender: &mut OutputSender, id: String, data: &[u8]) -> ffi::DoraResult { let result = sender .0 - .send_output(&id.into(), &Default::default(), data.len(), |out| { + .send_output(&id.into(), Default::default(), data.len(), |out| { out.copy_from_slice(data) }); let error = match result { diff --git a/examples/iceoryx/node/src/main.rs b/examples/iceoryx/node/src/main.rs index 06437786..c8f8b422 100644 --- a/examples/iceoryx/node/src/main.rs +++ b/examples/iceoryx/node/src/main.rs @@ -17,9 +17,14 @@ fn main() -> eyre::Result<()> { "tick" => { let random: u64 = rand::random(); let data: &[u8] = &random.to_le_bytes(); - operator.send_output(&output, input.metadata(), data.len(), |out| { - out.copy_from_slice(data); - })?; + operator.send_output( + &output, + input.metadata().parameters.clone(), + data.len(), + |out| { + out.copy_from_slice(data); + }, + )?; } other => eprintln!("Ignoring unexpected input `{other}`"), } diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index 06437786..c8f8b422 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -17,9 +17,14 @@ fn main() -> eyre::Result<()> { "tick" => { let random: u64 = rand::random(); let data: &[u8] = &random.to_le_bytes(); - operator.send_output(&output, input.metadata(), data.len(), |out| { - out.copy_from_slice(data); - })?; + operator.send_output( + &output, + input.metadata().parameters.clone(), + data.len(), + |out| { + out.copy_from_slice(data); + }, + )?; } other => eprintln!("Ignoring unexpected input `{other}`"), } diff --git a/libraries/message/Cargo.toml b/libraries/message/Cargo.toml index 160a87b9..275b05a4 100644 --- a/libraries/message/Cargo.toml +++ b/libraries/message/Cargo.toml @@ -8,6 +8,7 @@ license = "Apache-2.0" [dependencies] capnp = { version = "0.14.6", features = ["unaligned"] } +uhlc = "0.5.1" [build-dependencies] capnpc = "0.14" diff --git a/libraries/message/schema/message.capnp b/libraries/message/schema/message.capnp index 4cddbeaa..7825bd56 100644 --- a/libraries/message/schema/message.capnp +++ b/libraries/message/schema/message.capnp @@ -5,4 +5,5 @@ struct Metadata { watermark @1 :UInt64; deadline @2 :UInt64; otelContext @3 :Text; # OpenTelemetry Context allowing shared context between nodes. + timestamp @4 :Text; } diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index db215b71..1709fbf9 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -5,14 +5,54 @@ use std::borrow::Cow; pub mod message_capnp { include!(concat!(env!("OUT_DIR"), "/message_capnp.rs")); } +pub use uhlc; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Metadata<'a> { + metadata_version: u16, + timestamp: uhlc::Timestamp, + pub parameters: MetadataParameters<'a>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct MetadataParameters<'a> { + pub watermark: u64, + pub deadline: u64, + pub open_telemetry_context: Cow<'a, str>, +} + +impl MetadataParameters<'_> { + fn into_owned(self) -> MetadataParameters<'static> { + MetadataParameters { + open_telemetry_context: self.open_telemetry_context.into_owned().into(), + ..self + } + } +} + +impl<'a> Metadata<'a> { + pub fn new(timestamp: uhlc::Timestamp) -> Self { + Self::from_parameters(timestamp, Default::default()) + } + + pub fn from_parameters(timestamp: uhlc::Timestamp, parameters: MetadataParameters<'a>) -> Self { + Self { + metadata_version: 0, + timestamp, + parameters, + } + } -impl Metadata<'_> { pub fn serialize(&self) -> Result, capnp::Error> { let Metadata { metadata_version, - watermark, - deadline, - open_telemetry_context: otel_context, + timestamp, + parameters: + MetadataParameters { + watermark, + deadline, + open_telemetry_context: otel_context, + }, } = self; let mut meta_builder = capnp::message::Builder::new_default(); @@ -21,6 +61,7 @@ impl Metadata<'_> { metadata.set_watermark(*watermark); metadata.set_deadline(*deadline); metadata.set_otel_context(otel_context); + metadata.set_timestamp(×tamp.to_string()); let mut buffer = Vec::new(); capnp::serialize::write_message(&mut buffer, &meta_builder)?; @@ -35,28 +76,28 @@ impl Metadata<'_> { let metadata = Metadata { metadata_version: metadata_reader.get_metadata_version(), - watermark: metadata_reader.get_watermark(), - deadline: metadata_reader.get_deadline(), - open_telemetry_context: metadata_reader.get_otel_context()?.into(), + timestamp: metadata_reader + .get_timestamp()? + .parse() + .map_err(|_| capnp::Error::failed("failed to parse timestamp".into()))?, + parameters: MetadataParameters { + watermark: metadata_reader.get_watermark(), + deadline: metadata_reader.get_deadline(), + open_telemetry_context: metadata_reader.get_otel_context()?.into(), + }, }; Ok(metadata.into_owned()) } -} -#[derive(Debug, Clone, PartialEq, Eq, Default)] -pub struct Metadata<'a> { - pub metadata_version: u16, - pub watermark: u64, - pub deadline: u64, - pub open_telemetry_context: Cow<'a, str>, -} - -impl Metadata<'_> { fn into_owned(self) -> Metadata<'static> { Metadata { - open_telemetry_context: self.open_telemetry_context.into_owned().into(), + parameters: self.parameters.into_owned(), ..self } } + + pub fn timestamp(&self) -> uhlc::Timestamp { + self.timestamp + } }