Browse Source

Add `uhlc` timestamp to message metadata

tags/v0.0.0-test-pr-120
Philipp Oppermann 3 years ago
parent
commit
1c1891446a
Failed to extract signature
14 changed files with 134 additions and 51 deletions
  1. +18
    -3
      Cargo.lock
  2. +1
    -1
      apis/c/node/src/lib.rs
  3. +1
    -1
      apis/python/node/src/lib.rs
  4. +9
    -10
      apis/python/operator/src/lib.rs
  5. +5
    -2
      apis/rust/node/src/lib.rs
  6. +3
    -2
      binaries/coordinator/src/run/mod.rs
  7. +8
    -2
      binaries/runtime/src/operator/python.rs
  8. +11
    -5
      binaries/runtime/src/operator/shared_lib.rs
  9. +1
    -1
      examples/c++-dataflow/node-rust-api/src/main.rs
  10. +8
    -3
      examples/iceoryx/node/src/main.rs
  11. +8
    -3
      examples/rust-dataflow/node/src/main.rs
  12. +1
    -0
      libraries/message/Cargo.toml
  13. +1
    -0
      libraries/message/schema/message.capnp
  14. +59
    -18
      libraries/message/src/lib.rs

+ 18
- 3
Cargo.lock View File

@@ -945,6 +945,7 @@ version = "0.1.0"
dependencies = [
"capnp",
"capnpc",
"uhlc 0.5.1",
]

[[package]]
@@ -3841,6 +3842,20 @@ dependencies = [
"uuid 0.8.2",
]

[[package]]
name = "uhlc"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7908438f98a5824af02b34c2b31fb369c5764ef835d26df0badbb9897fb28245"
dependencies = [
"hex",
"humantime",
"lazy_static",
"log",
"serde",
"uuid 1.1.2",
]

[[package]]
name = "unicode-bidi"
version = "0.3.8"
@@ -4374,7 +4389,7 @@ dependencies = [
"serde_json",
"socket2",
"stop-token",
"uhlc",
"uhlc 0.4.1",
"uuid 0.8.2",
"vec_map",
"zenoh-buffers",
@@ -4633,7 +4648,7 @@ version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#79a136e4fd90b11ff5d775ced981af53c4f1071b"
dependencies = [
"log",
"uhlc",
"uhlc 0.4.1",
"zenoh-buffers",
"zenoh-core",
"zenoh-protocol-core",
@@ -4647,7 +4662,7 @@ dependencies = [
"hex",
"lazy_static",
"serde",
"uhlc",
"uhlc 0.4.1",
"uuid 0.8.2",
"zenoh-core",
]


+ 1
- 1
apis/c/node/src/lib.rs View File

@@ -194,7 +194,7 @@ unsafe fn try_send_output(
let data = unsafe { slice::from_raw_parts(data_ptr, data_len) };
context
.node
.send_output(&output_id, &Default::default(), data.len(), |out| {
.send_output(&output_id, Default::default(), data.len(), |out| {
out.copy_from_slice(data);
})
}

+ 1
- 1
apis/python/node/src/lib.rs View File

@@ -67,7 +67,7 @@ impl Node {
let data = &data.as_bytes();
let metadata = pydict_to_metadata(metadata)?;
self.node
.send_output(&output_id.into(), &metadata, data.len(), |out| {
.send_output(&output_id.into(), metadata, data.len(), |out| {
out.copy_from_slice(data);
})
.wrap_err("Could not send output")


+ 9
- 10
apis/python/operator/src/lib.rs View File

@@ -1,18 +1,14 @@
use std::borrow::Cow;

use dora_node_api::Metadata;
use dora_node_api::{Metadata, MetadataParameters};
use eyre::{Context, Result};
use pyo3::{prelude::*, types::PyDict};

pub fn pydict_to_metadata(dict: Option<&PyDict>) -> Result<Metadata> {
let mut default_metadata = Metadata::default();
pub fn pydict_to_metadata(dict: Option<&PyDict>) -> Result<MetadataParameters> {
let mut default_metadata = MetadataParameters::default();
if let Some(metadata) = dict {
for (key, value) in metadata.iter() {
match key.extract::<&str>().context("Parsing metadata keys")? {
"metadata_version" => {
default_metadata.metadata_version =
value.extract().context("parsing metadata version failed")?;
}
"watermark" => {
default_metadata.watermark =
value.extract().context("parsing watermark failed")?;
@@ -36,8 +32,11 @@ pub fn pydict_to_metadata(dict: Option<&PyDict>) -> Result<Metadata> {

pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> &'a PyDict {
let dict = PyDict::new(py);
dict.set_item("open_telemetry_context", &metadata.open_telemetry_context)
.wrap_err("could not make metadata a python dictionary item")
.unwrap();
dict.set_item(
"open_telemetry_context",
&metadata.parameters.open_telemetry_context,
)
.wrap_err("could not make metadata a python dictionary item")
.unwrap();
dict
}

+ 5
- 2
apis/rust/node/src/lib.rs View File

@@ -1,5 +1,5 @@
pub use communication::Input;
pub use dora_message::Metadata;
pub use dora_message::{uhlc, Metadata, MetadataParameters};
pub use flume::Receiver;

use communication::STOP_TOPIC;
@@ -14,6 +14,7 @@ pub struct DoraNode {
id: NodeId,
node_config: NodeRunConfig,
communication: Box<dyn CommunicationLayer>,
hlc: uhlc::HLC,
}

impl DoraNode {
@@ -49,6 +50,7 @@ impl DoraNode {
id,
node_config,
communication,
hlc: uhlc::HLC::default(),
})
}

@@ -59,7 +61,7 @@ impl DoraNode {
pub fn send_output<F>(
&mut self,
output_id: &DataId,
metadata: &Metadata,
parameters: MetadataParameters,
data_len: usize,
data: F,
) -> eyre::Result<()>
@@ -69,6 +71,7 @@ impl DoraNode {
if !self.node_config.outputs.contains(output_id) {
eyre::bail!("unknown output");
}
let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters);
let serialized_metadata = metadata
.serialize()
.with_context(|| format!("failed to serialize `{}` message", output_id))?;


+ 3
- 2
binaries/coordinator/src/run/mod.rs View File

@@ -94,10 +94,11 @@ pub async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Resul
let duration = format_duration(interval);
format!("dora/timer/{duration}")
};
let metadata = dora_message::Metadata::default();
let data = metadata.serialize().unwrap();
let hlc = dora_message::uhlc::HLC::default();
let mut stream = IntervalStream::new(tokio::time::interval(interval));
while (stream.next().await).is_some() {
let metadata = dora_message::Metadata::new(hlc.new_timestamp());
let data = metadata.serialize().unwrap();
communication
.publisher(&topic)
.unwrap()


+ 8
- 2
binaries/runtime/src/operator/python.rs View File

@@ -1,6 +1,7 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use super::{OperatorEvent, Tracer};
use dora_message::uhlc;
use dora_node_api::{communication::Publisher, config::DataId};
use dora_operator_api_python::metadata_to_pydict;
use eyre::{bail, eyre, Context};
@@ -49,6 +50,7 @@ pub fn spawn(

let send_output = SendOutputCallback {
publishers: Arc::new(publishers),
hlc: Arc::new(uhlc::HLC::default()),
};

let init_operator = move |py: Python| {
@@ -109,7 +111,7 @@ pub fn spawn(
let () = tracer;
"".to_string()
};
input.metadata.open_telemetry_context = Cow::Owned(string_cx);
input.metadata.parameters.open_telemetry_context = Cow::Owned(string_cx);

let status_enum = Python::with_gil(|py| {
let input_dict = PyDict::new(py);
@@ -173,12 +175,14 @@ pub fn spawn(
#[derive(Clone)]
struct SendOutputCallback {
publishers: Arc<HashMap<DataId, Box<dyn Publisher>>>,
hlc: Arc<uhlc::HLC>,
}

#[allow(unsafe_op_in_unsafe_fn)]
mod callback_impl {

use super::SendOutputCallback;
use dora_message::Metadata;
use dora_operator_api_python::pydict_to_metadata;
use eyre::{eyre, Context};
use pyo3::{
@@ -197,7 +201,9 @@ mod callback_impl {
) -> PyResult<()> {
match self.publishers.get(output) {
Some(publisher) => {
let message = pydict_to_metadata(metadata)?
let parameters = pydict_to_metadata(metadata)?;
let metadata = Metadata::from_parameters(self.hlc.new_timestamp(), parameters);
let message = metadata
.serialize()
.context(format!("failed to serialize `{}` metadata", output));
message.and_then(|mut message| {


+ 11
- 5
binaries/runtime/src/operator/shared_lib.rs View File

@@ -1,5 +1,6 @@
use super::{OperatorEvent, Tracer};
use dora_core::adjust_shared_library_path;
use dora_message::uhlc;
use dora_node_api::{communication::Publisher, config::DataId};
use dora_operator_api_types::{
safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput,
@@ -32,12 +33,17 @@ pub fn spawn(
libloading::Library::new(&path)
.wrap_err_with(|| format!("failed to load shared library at `{}`", path.display()))?
};
let hlc = uhlc::HLC::default();

thread::spawn(move || {
let closure = AssertUnwindSafe(|| {
let bindings = Bindings::init(&library).context("failed to init operator")?;

let operator = SharedLibraryOperator { inputs, bindings };
let operator = SharedLibraryOperator {
inputs,
bindings,
hlc,
};

operator.run(publishers, tracer)
});
@@ -61,6 +67,7 @@ struct SharedLibraryOperator<'lib> {
inputs: Receiver<dora_node_api::Input>,

bindings: Bindings<'lib>,
hlc: uhlc::HLC,
}

impl<'lib> SharedLibraryOperator<'lib> {
@@ -92,10 +99,9 @@ impl<'lib> SharedLibraryOperator<'lib> {
open_telemetry_context,
},
} = output;
let metadata = dora_node_api::Metadata {
open_telemetry_context: String::from(open_telemetry_context).into(),
..Default::default()
};
let mut metadata = dora_node_api::Metadata::new(self.hlc.new_timestamp());
metadata.parameters.open_telemetry_context =
String::from(open_telemetry_context).into();

let message = metadata
.serialize()


+ 1
- 1
examples/c++-dataflow/node-rust-api/src/main.rs View File

@@ -53,7 +53,7 @@ pub struct OutputSender<'a>(&'a mut DoraNode);
fn send_output(sender: &mut OutputSender, id: String, data: &[u8]) -> ffi::DoraResult {
let result = sender
.0
.send_output(&id.into(), &Default::default(), data.len(), |out| {
.send_output(&id.into(), Default::default(), data.len(), |out| {
out.copy_from_slice(data)
});
let error = match result {


+ 8
- 3
examples/iceoryx/node/src/main.rs View File

@@ -17,9 +17,14 @@ fn main() -> eyre::Result<()> {
"tick" => {
let random: u64 = rand::random();
let data: &[u8] = &random.to_le_bytes();
operator.send_output(&output, input.metadata(), data.len(), |out| {
out.copy_from_slice(data);
})?;
operator.send_output(
&output,
input.metadata().parameters.clone(),
data.len(),
|out| {
out.copy_from_slice(data);
},
)?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}


+ 8
- 3
examples/rust-dataflow/node/src/main.rs View File

@@ -17,9 +17,14 @@ fn main() -> eyre::Result<()> {
"tick" => {
let random: u64 = rand::random();
let data: &[u8] = &random.to_le_bytes();
operator.send_output(&output, input.metadata(), data.len(), |out| {
out.copy_from_slice(data);
})?;
operator.send_output(
&output,
input.metadata().parameters.clone(),
data.len(),
|out| {
out.copy_from_slice(data);
},
)?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}


+ 1
- 0
libraries/message/Cargo.toml View File

@@ -8,6 +8,7 @@ license = "Apache-2.0"

[dependencies]
capnp = { version = "0.14.6", features = ["unaligned"] }
uhlc = "0.5.1"

[build-dependencies]
capnpc = "0.14"

+ 1
- 0
libraries/message/schema/message.capnp View File

@@ -5,4 +5,5 @@ struct Metadata {
watermark @1 :UInt64;
deadline @2 :UInt64;
otelContext @3 :Text; # OpenTelemetry Context allowing shared context between nodes.
timestamp @4 :Text;
}

+ 59
- 18
libraries/message/src/lib.rs View File

@@ -5,14 +5,54 @@ use std::borrow::Cow;
pub mod message_capnp {
include!(concat!(env!("OUT_DIR"), "/message_capnp.rs"));
}
pub use uhlc;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Metadata<'a> {
metadata_version: u16,
timestamp: uhlc::Timestamp,
pub parameters: MetadataParameters<'a>,
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct MetadataParameters<'a> {
pub watermark: u64,
pub deadline: u64,
pub open_telemetry_context: Cow<'a, str>,
}

impl MetadataParameters<'_> {
fn into_owned(self) -> MetadataParameters<'static> {
MetadataParameters {
open_telemetry_context: self.open_telemetry_context.into_owned().into(),
..self
}
}
}

impl<'a> Metadata<'a> {
pub fn new(timestamp: uhlc::Timestamp) -> Self {
Self::from_parameters(timestamp, Default::default())
}

pub fn from_parameters(timestamp: uhlc::Timestamp, parameters: MetadataParameters<'a>) -> Self {
Self {
metadata_version: 0,
timestamp,
parameters,
}
}

impl Metadata<'_> {
pub fn serialize(&self) -> Result<Vec<u8>, capnp::Error> {
let Metadata {
metadata_version,
watermark,
deadline,
open_telemetry_context: otel_context,
timestamp,
parameters:
MetadataParameters {
watermark,
deadline,
open_telemetry_context: otel_context,
},
} = self;

let mut meta_builder = capnp::message::Builder::new_default();
@@ -21,6 +61,7 @@ impl Metadata<'_> {
metadata.set_watermark(*watermark);
metadata.set_deadline(*deadline);
metadata.set_otel_context(otel_context);
metadata.set_timestamp(&timestamp.to_string());

let mut buffer = Vec::new();
capnp::serialize::write_message(&mut buffer, &meta_builder)?;
@@ -35,28 +76,28 @@ impl Metadata<'_> {

let metadata = Metadata {
metadata_version: metadata_reader.get_metadata_version(),
watermark: metadata_reader.get_watermark(),
deadline: metadata_reader.get_deadline(),
open_telemetry_context: metadata_reader.get_otel_context()?.into(),
timestamp: metadata_reader
.get_timestamp()?
.parse()
.map_err(|_| capnp::Error::failed("failed to parse timestamp".into()))?,
parameters: MetadataParameters {
watermark: metadata_reader.get_watermark(),
deadline: metadata_reader.get_deadline(),
open_telemetry_context: metadata_reader.get_otel_context()?.into(),
},
};

Ok(metadata.into_owned())
}
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct Metadata<'a> {
pub metadata_version: u16,
pub watermark: u64,
pub deadline: u64,
pub open_telemetry_context: Cow<'a, str>,
}

impl Metadata<'_> {
fn into_owned(self) -> Metadata<'static> {
Metadata {
open_telemetry_context: self.open_telemetry_context.into_owned().into(),
parameters: self.parameters.into_owned(),
..self
}
}

pub fn timestamp(&self) -> uhlc::Timestamp {
self.timestamp
}
}

Loading…
Cancel
Save