Browse Source

Send metadata in messages encoded with capnproto

Changes the message format from raw data bytes to a higher-level `Message` struct serialized with capnproto. In addition to the raw data, which is sent as a byte array as before, the `Message` struct features `metadata` field. This metadata field can be used to pass open telemetry contexts, deadlines, etc.
tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
0df06fce86
Failed to extract signature
18 changed files with 182 additions and 34 deletions
  1. +6
    -2
      Cargo.lock
  2. +2
    -2
      apis/c/node/src/lib.rs
  3. +6
    -2
      apis/python/node/src/lib.rs
  4. +2
    -0
      apis/rust/node/Cargo.toml
  5. +29
    -6
      apis/rust/node/src/communication.rs
  6. +6
    -2
      apis/rust/node/src/lib.rs
  7. +1
    -0
      binaries/coordinator/Cargo.toml
  8. +3
    -1
      binaries/coordinator/src/lib.rs
  9. +1
    -0
      binaries/runtime/Cargo.toml
  10. +1
    -1
      binaries/runtime/src/operator/python.rs
  11. +23
    -5
      binaries/runtime/src/operator/shared_lib.rs
  12. +2
    -2
      examples/c++-dataflow/node-rust-api/src/main.rs
  13. +2
    -1
      examples/iceoryx/node/src/main.rs
  14. +1
    -1
      examples/iceoryx/sink/src/main.rs
  15. +2
    -1
      examples/rust-dataflow/node/src/main.rs
  16. +1
    -1
      examples/rust-dataflow/sink/src/main.rs
  17. +2
    -3
      libraries/extensions/message/schema/message.capnp
  18. +92
    -4
      libraries/extensions/message/src/lib.rs

+ 6
- 2
Cargo.lock View File

@@ -370,9 +370,9 @@ checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c"

[[package]]
name = "capnp"
version = "0.14.6"
version = "0.14.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21d5d7da973146f1720672faa44f1523cc8f923636190ca1a931c7bc8834de68"
checksum = "b1b5dbbbfbae370eb9e9e86fb86b572acf4d5e0072e44636758cae3fe6ba6015"

[[package]]
name = "capnpc"
@@ -793,6 +793,7 @@ dependencies = [
"bincode",
"clap 3.1.12",
"dora-core",
"dora-message",
"dora-node-api",
"eyre",
"futures",
@@ -849,7 +850,9 @@ dependencies = [
name = "dora-node-api"
version = "0.1.0"
dependencies = [
"capnp",
"communication-layer-pub-sub",
"dora-message",
"eyre",
"flume",
"once_cell",
@@ -920,6 +923,7 @@ version = "0.1.0"
dependencies = [
"clap 3.1.12",
"dora-core",
"dora-message",
"dora-node-api",
"dora-operator-api-types",
"eyre",


+ 2
- 2
apis/c/node/src/lib.rs View File

@@ -126,7 +126,7 @@ pub unsafe extern "C" fn read_dora_input_data(
out_len: *mut usize,
) {
let input: &Input = unsafe { &*input.cast() };
let data = input.data.as_slice();
let data = &input.message.data;
let ptr = data.as_ptr();
let len = data.len();
unsafe {
@@ -192,5 +192,5 @@ unsafe fn try_send_output(
let id = std::str::from_utf8(unsafe { slice::from_raw_parts(id_ptr, id_len) })?;
let output_id = id.to_owned().into();
let data = unsafe { slice::from_raw_parts(data_ptr, data_len) };
context.node.send_output(&output_id, data)
context.node.send_output(&output_id, &data.into())
}

+ 6
- 2
apis/python/node/src/lib.rs View File

@@ -16,7 +16,11 @@ pub struct PyInput(Input);

impl IntoPy<PyObject> for PyInput {
fn into_py(self, py: Python) -> PyObject {
(self.0.id.to_string(), PyBytes::new(py, &self.0.data)).into_py(py)
(
self.0.id.to_string(),
PyBytes::new(py, &self.0.message.data),
)
.into_py(py)
}
}

@@ -51,7 +55,7 @@ impl Node {

pub fn send_output(&mut self, output_id: String, data: &PyBytes) -> Result<()> {
self.node
.send_output(&output_id.into(), data.as_bytes())
.send_output(&output_id.into(), &data.as_bytes().into())
.wrap_err("Could not send output")
}



+ 2
- 0
apis/rust/node/Cargo.toml View File

@@ -19,6 +19,8 @@ tracing = "0.1.33"
flume = "0.10.14"
communication-layer-pub-sub = { path = "../../../libraries/communication-layer", default-features = false }
uuid = { version = "1.1.2", features = ["v4"] }
capnp = "0.14.9"
dora-message = { path = "../../../libraries/extensions/message" }

[dev-dependencies]
tokio = { version = "1.17.0", features = ["rt"] }

+ 29
- 6
apis/rust/node/src/communication.rs View File

@@ -1,4 +1,5 @@
pub use communication_layer_pub_sub::{CommunicationLayer, Publisher, Subscriber};
use dora_message::{deserialize_message, Message, Metadata};

use crate::{
config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId},
@@ -83,10 +84,18 @@ pub fn subscribe_all(
thread::spawn(move || loop {
let event = match sub.recv().transpose() {
None => break,
Some(Ok(data)) => InputEvent::Input(Input {
id: input_id.clone(),
data,
}),
Some(Ok(data)) => {
match deserialize_message(&data)
.with_context(|| format!("failed to deserialize `{input_id}` message"))
{
Ok(message) => InputEvent::Input(Input {
id: input_id.clone(),
// TODO: make this zero-copy by operating on borrowed data
message: message.into_owned(),
}),
Err(err) => InputEvent::ParseMessageError(err.into()),
}
}
Some(Err(err)) => InputEvent::Error(err),
};
match sender.send(event) {
@@ -133,7 +142,7 @@ pub fn subscribe_all(
let (combined_tx, combined) = flume::bounded(1);
thread::spawn(move || loop {
match inputs_rx.recv() {
Ok(InputEvent::Input(input)) => match combined_tx.send(input) {
Ok(InputEvent::Input(message)) => match combined_tx.send(message) {
Ok(()) => {}
Err(flume::SendError(_)) => break,
},
@@ -143,6 +152,9 @@ pub fn subscribe_all(
break;
}
}
Ok(InputEvent::ParseMessageError(err)) => {
tracing::warn!("{err}");
}
Ok(InputEvent::Error(err)) => panic!("{err}"),
Err(_) => break,
}
@@ -158,10 +170,21 @@ enum InputEvent {
operator: Option<OperatorId>,
},
Error(BoxError),
ParseMessageError(BoxError),
}

#[derive(Debug)]
pub struct Input {
pub id: DataId,
pub data: Vec<u8>,
pub message: Message<'static>,
}

impl Input {
pub fn data(&self) -> &[u8] {
&self.message.data
}

pub fn metadata(&self) -> &Metadata {
&self.message.metadata
}
}

+ 6
- 2
apis/rust/node/src/lib.rs View File

@@ -1,4 +1,6 @@
pub use communication::Input;
use dora_message::serialize_message;
pub use dora_message::{Message, Metadata};
pub use flume::Receiver;

use communication::STOP_TOPIC;
@@ -52,10 +54,12 @@ impl DoraNode {
communication::subscribe_all(self.communication.as_mut(), &self.node_config.inputs)
}

pub fn send_output(&mut self, output_id: &DataId, data: &[u8]) -> eyre::Result<()> {
pub fn send_output(&mut self, output_id: &DataId, message: &Message) -> eyre::Result<()> {
if !self.node_config.outputs.contains(output_id) {
eyre::bail!("unknown output");
}
let data = serialize_message(message)
.with_context(|| format!("failed to serialize `{}` message", output_id))?;

let self_id = &self.id;

@@ -64,7 +68,7 @@ impl DoraNode {
.publisher(&topic)
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| format!("failed create publisher for output {output_id}"))?
.publish(data)
.publish(&data)
.map_err(|err| eyre::eyre!(err))
.wrap_err_with(|| format!("failed to send data for output {output_id}"))?;
Ok(())


+ 1
- 0
binaries/coordinator/Cargo.toml View File

@@ -22,3 +22,4 @@ time = "0.3.9"
futures-concurrency = "2.0.3"
rand = "0.8.5"
dora-core = { version = "0.1.0", path = "../../libraries/core" }
dora-message = { path = "../../libraries/extensions/message" }

+ 3
- 1
binaries/coordinator/src/lib.rs View File

@@ -1,4 +1,5 @@
use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor};
use dora_message::serialize_message;
use dora_node_api::{
communication,
config::{format_duration, NodeId},
@@ -115,12 +116,13 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
let duration = format_duration(interval);
format!("dora/timer/{duration}")
};
let data = serialize_message(&Vec::new().into()).unwrap();
let mut stream = IntervalStream::new(tokio::time::interval(interval));
while (stream.next().await).is_some() {
communication
.publisher(&topic)
.unwrap()
.publish(&[])
.publish(&data)
.expect("failed to publish timer tick message");
}
});


+ 1
- 0
binaries/runtime/Cargo.toml View File

@@ -24,3 +24,4 @@ log = "0.4.17"
fern = "0.6.1"
pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre"] }
flume = "0.10.14"
dora-message = { path = "../../libraries/extensions/message" }

+ 1
- 1
binaries/runtime/src/operator/python.rs View File

@@ -89,7 +89,7 @@ pub fn spawn(
"on_input",
(
input.id.to_string(),
PyBytes::new(py, &input.data),
PyBytes::new(py, &input.message.data),
send_output.clone(),
),
)


+ 23
- 5
binaries/runtime/src/operator/shared_lib.rs View File

@@ -90,14 +90,32 @@ impl<'lib> SharedLibraryOperator<'lib> {
};

let send_output_closure = Arc::new(move |output: Output| {
let result = match publishers.get(output.id.deref()) {
Some(publisher) => publisher.publish(&output.data),
let Output {
id,
data,
metadata: Metadata {
open_telemetry_context,
},
} = output;
let message = dora_node_api::Message {
data: Vec::from(data).into(),
metadata: dora_node_api::Metadata {
open_telemetry_context: String::from(open_telemetry_context).into(),
..Default::default()
},
};

let data = dora_message::serialize_message(&message)
.context(format!("failed to serialize `{}` message", id.deref()))
.map_err(|err| err.into());
let result = data.and_then(|data| match publishers.get(id.deref()) {
Some(publisher) => publisher.publish(&data),
None => Err(eyre!(
"unexpected output {} (not defined in dataflow config)",
output.id.deref()
id.deref()
)
.into()),
};
});

let error = match result {
Ok(()) => None,
@@ -110,7 +128,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
while let Ok(input) = self.inputs.recv() {
let operator_input = dora_operator_api_types::Input {
id: String::from(input.id).into(),
data: input.data.into(),
data: input.message.data.into_owned().into(),
metadata: Metadata {
open_telemetry_context: String::new().into(),
},


+ 2
- 2
examples/c++-dataflow/node-rust-api/src/main.rs View File

@@ -34,7 +34,7 @@ fn next_input(inputs: &mut Inputs) -> ffi::DoraInput {
Ok(input) => ffi::DoraInput {
end_of_input: false,
id: input.id.into(),
data: input.data,
data: input.message.data.into(),
},
Err(_) => ffi::DoraInput {
end_of_input: true,
@@ -47,7 +47,7 @@ fn next_input(inputs: &mut Inputs) -> ffi::DoraInput {
pub struct OutputSender<'a>(&'a mut DoraNode);

fn send_output(sender: &mut OutputSender, id: String, data: &[u8]) -> ffi::DoraResult {
let error = match sender.0.send_output(&id.into(), data) {
let error = match sender.0.send_output(&id.into(), &data.into()) {
Ok(()) => String::new(),
Err(err) => format!("{err:?}"),
};


+ 2
- 1
examples/iceoryx/node/src/main.rs View File

@@ -16,7 +16,8 @@ fn main() -> eyre::Result<()> {
match input.id.as_str() {
"tick" => {
let random: u64 = rand::random();
operator.send_output(&output, &random.to_le_bytes())?;
let data: &[u8] = &random.to_le_bytes();
operator.send_output(&output, &data.into())?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}


+ 1
- 1
examples/iceoryx/sink/src/main.rs View File

@@ -9,7 +9,7 @@ fn main() -> eyre::Result<()> {
while let Ok(input) = inputs.recv() {
match input.id.as_str() {
"message" => {
let received_string = String::from_utf8(input.data)
let received_string = String::from_utf8(input.message.data.into())
.wrap_err("received message was not utf8-encoded")?;
println!("received message: {}", received_string);
if !received_string.starts_with("operator received random value ") {


+ 2
- 1
examples/rust-dataflow/node/src/main.rs View File

@@ -16,7 +16,8 @@ fn main() -> eyre::Result<()> {
match input.id.as_str() {
"tick" => {
let random: u64 = rand::random();
operator.send_output(&output, &random.to_le_bytes())?;
let data: &[u8] = &random.to_le_bytes();
operator.send_output(&output, &data.into())?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}


+ 1
- 1
examples/rust-dataflow/sink/src/main.rs View File

@@ -9,7 +9,7 @@ fn main() -> eyre::Result<()> {
while let Ok(input) = inputs.recv() {
match input.id.as_str() {
"message" => {
let received_string = String::from_utf8(input.data)
let received_string = String::from_utf8(input.message.data.into())
.wrap_err("received message was not utf8-encoded")?;
println!("received message: {}", received_string);
if !received_string.starts_with("operator received random value ") {


+ 2
- 3
libraries/extensions/message/schema/message.capnp View File

@@ -9,7 +9,6 @@ struct Metadata {


struct Message {
id @0 :UInt32;
metadata @1 :Metadata;
data @2 :Data;
metadata @0 :Metadata;
data @1 :Data;
}

+ 92
- 4
libraries/extensions/message/src/lib.rs View File

@@ -1,23 +1,111 @@
//! Enable serialisation and deserialisation of capnproto messages
//!

use std::borrow::Cow;
pub mod message_capnp {
include!(concat!(env!("OUT_DIR"), "/message_capnp.rs"));
}

/// Helper function to serialize message
pub fn serialize_message(data: &[u8], otel_context: &str) -> Vec<u8> {
pub fn serialize_message(message: &Message) -> Result<Vec<u8>, capnp::Error> {
let Message {
data,
metadata:
Metadata {
metadata_version,
watermark,
deadline,
open_telemetry_context: otel_context,
},
} = message;

// Build the metadata
let mut meta_builder = capnp::message::Builder::new_default();
let mut metadata = meta_builder.init_root::<message_capnp::metadata::Builder>();
metadata.set_metadata_version(*metadata_version);
metadata.set_watermark(*watermark);
metadata.set_deadline(*deadline);
metadata.set_otel_context(otel_context);

// Build the data of the message
let mut builder = capnp::message::Builder::new_default();
let mut message = builder.init_root::<message_capnp::message::Builder>();
message.set_data(data);
message.set_metadata(metadata.into_reader()).unwrap();
message.set_metadata(metadata.into_reader())?;

let mut buffer = Vec::new();
capnp::serialize::write_message(&mut buffer, &builder).unwrap();
buffer
capnp::serialize::write_message(&mut buffer, &builder)?;
Ok(buffer)
}

pub fn deserialize_message(mut raw: &[u8]) -> Result<Message, capnp::Error> {
let deserialized =
capnp::serialize::read_message_from_flat_slice(&mut raw, Default::default())?
.into_typed::<message_capnp::message::Owned>();

let reader = deserialized.get()?;
let metadata_reader = reader.get_metadata()?;

let message = Message {
data: reader.get_data()?.into(),
metadata: Metadata {
metadata_version: metadata_reader.get_metadata_version(),
watermark: metadata_reader.get_watermark(),
deadline: metadata_reader.get_deadline(),
open_telemetry_context: metadata_reader.get_otel_context()?.into(),
},
};

// TODO: avoid copying and make this zero copy
Ok(message.into_owned())
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Message<'a> {
pub data: Cow<'a, [u8]>,
pub metadata: Metadata<'a>,
}

impl<'a> From<&'a [u8]> for Message<'a> {
fn from(data: &'a [u8]) -> Self {
Self {
data: data.into(),
metadata: Default::default(),
}
}
}

impl From<Vec<u8>> for Message<'static> {
fn from(data: Vec<u8>) -> Self {
Self {
data: data.into(),
metadata: Default::default(),
}
}
}

impl Message<'_> {
pub fn into_owned(self) -> Message<'static> {
Message {
data: self.data.into_owned().into(),
metadata: self.metadata.into_owned(),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct Metadata<'a> {
pub metadata_version: u16,
pub watermark: u64,
pub deadline: u64,
pub open_telemetry_context: Cow<'a, str>,
}

impl Metadata<'_> {
fn into_owned(self) -> Metadata<'static> {
Metadata {
open_telemetry_context: self.open_telemetry_context.into_owned().into(),
..self
}
}
}

Loading…
Cancel
Save