From 0df06fce86e46df7d210702bbc7b45877026a60a Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 9 Sep 2022 10:42:54 +0200 Subject: [PATCH] Send metadata in messages encoded with capnproto Changes the message format from raw data bytes to a higher-level `Message` struct serialized with capnproto. In addition to the raw data, which is sent as a byte array as before, the `Message` struct features `metadata` field. This metadata field can be used to pass open telemetry contexts, deadlines, etc. --- Cargo.lock | 8 +- apis/c/node/src/lib.rs | 4 +- apis/python/node/src/lib.rs | 8 +- apis/rust/node/Cargo.toml | 2 + apis/rust/node/src/communication.rs | 35 +++++-- apis/rust/node/src/lib.rs | 8 +- binaries/coordinator/Cargo.toml | 1 + binaries/coordinator/src/lib.rs | 4 +- binaries/runtime/Cargo.toml | 1 + binaries/runtime/src/operator/python.rs | 2 +- binaries/runtime/src/operator/shared_lib.rs | 28 +++++- .../c++-dataflow/node-rust-api/src/main.rs | 4 +- examples/iceoryx/node/src/main.rs | 3 +- examples/iceoryx/sink/src/main.rs | 2 +- examples/rust-dataflow/node/src/main.rs | 3 +- examples/rust-dataflow/sink/src/main.rs | 2 +- .../extensions/message/schema/message.capnp | 5 +- libraries/extensions/message/src/lib.rs | 96 ++++++++++++++++++- 18 files changed, 182 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7656d737..30be48d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -370,9 +370,9 @@ checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" [[package]] name = "capnp" -version = "0.14.6" +version = "0.14.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21d5d7da973146f1720672faa44f1523cc8f923636190ca1a931c7bc8834de68" +checksum = "b1b5dbbbfbae370eb9e9e86fb86b572acf4d5e0072e44636758cae3fe6ba6015" [[package]] name = "capnpc" @@ -793,6 +793,7 @@ dependencies = [ "bincode", "clap 3.1.12", "dora-core", + "dora-message", "dora-node-api", "eyre", "futures", @@ -849,7 +850,9 @@ dependencies = [ name = "dora-node-api" version = "0.1.0" dependencies = [ + "capnp", "communication-layer-pub-sub", + "dora-message", "eyre", "flume", "once_cell", @@ -920,6 +923,7 @@ version = "0.1.0" dependencies = [ "clap 3.1.12", "dora-core", + "dora-message", "dora-node-api", "dora-operator-api-types", "eyre", diff --git a/apis/c/node/src/lib.rs b/apis/c/node/src/lib.rs index a833cbd0..0e10fb59 100644 --- a/apis/c/node/src/lib.rs +++ b/apis/c/node/src/lib.rs @@ -126,7 +126,7 @@ pub unsafe extern "C" fn read_dora_input_data( out_len: *mut usize, ) { let input: &Input = unsafe { &*input.cast() }; - let data = input.data.as_slice(); + let data = &input.message.data; let ptr = data.as_ptr(); let len = data.len(); unsafe { @@ -192,5 +192,5 @@ unsafe fn try_send_output( let id = std::str::from_utf8(unsafe { slice::from_raw_parts(id_ptr, id_len) })?; let output_id = id.to_owned().into(); let data = unsafe { slice::from_raw_parts(data_ptr, data_len) }; - context.node.send_output(&output_id, data) + context.node.send_output(&output_id, &data.into()) } diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 64118a78..837fec4b 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -16,7 +16,11 @@ pub struct PyInput(Input); impl IntoPy for PyInput { fn into_py(self, py: Python) -> PyObject { - (self.0.id.to_string(), PyBytes::new(py, &self.0.data)).into_py(py) + ( + self.0.id.to_string(), + PyBytes::new(py, &self.0.message.data), + ) + .into_py(py) } } @@ -51,7 +55,7 @@ impl Node { pub fn send_output(&mut self, output_id: String, data: &PyBytes) -> Result<()> { self.node - .send_output(&output_id.into(), data.as_bytes()) + .send_output(&output_id.into(), &data.as_bytes().into()) .wrap_err("Could not send output") } diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index c750fa29..80dd4492 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -19,6 +19,8 @@ tracing = "0.1.33" flume = "0.10.14" communication-layer-pub-sub = { path = "../../../libraries/communication-layer", default-features = false } uuid = { version = "1.1.2", features = ["v4"] } +capnp = "0.14.9" +dora-message = { path = "../../../libraries/extensions/message" } [dev-dependencies] tokio = { version = "1.17.0", features = ["rt"] } diff --git a/apis/rust/node/src/communication.rs b/apis/rust/node/src/communication.rs index b63ee9b3..8e07f3f5 100644 --- a/apis/rust/node/src/communication.rs +++ b/apis/rust/node/src/communication.rs @@ -1,4 +1,5 @@ pub use communication_layer_pub_sub::{CommunicationLayer, Publisher, Subscriber}; +use dora_message::{deserialize_message, Message, Metadata}; use crate::{ config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, @@ -83,10 +84,18 @@ pub fn subscribe_all( thread::spawn(move || loop { let event = match sub.recv().transpose() { None => break, - Some(Ok(data)) => InputEvent::Input(Input { - id: input_id.clone(), - data, - }), + Some(Ok(data)) => { + match deserialize_message(&data) + .with_context(|| format!("failed to deserialize `{input_id}` message")) + { + Ok(message) => InputEvent::Input(Input { + id: input_id.clone(), + // TODO: make this zero-copy by operating on borrowed data + message: message.into_owned(), + }), + Err(err) => InputEvent::ParseMessageError(err.into()), + } + } Some(Err(err)) => InputEvent::Error(err), }; match sender.send(event) { @@ -133,7 +142,7 @@ pub fn subscribe_all( let (combined_tx, combined) = flume::bounded(1); thread::spawn(move || loop { match inputs_rx.recv() { - Ok(InputEvent::Input(input)) => match combined_tx.send(input) { + Ok(InputEvent::Input(message)) => match combined_tx.send(message) { Ok(()) => {} Err(flume::SendError(_)) => break, }, @@ -143,6 +152,9 @@ pub fn subscribe_all( break; } } + Ok(InputEvent::ParseMessageError(err)) => { + tracing::warn!("{err}"); + } Ok(InputEvent::Error(err)) => panic!("{err}"), Err(_) => break, } @@ -158,10 +170,21 @@ enum InputEvent { operator: Option, }, Error(BoxError), + ParseMessageError(BoxError), } #[derive(Debug)] pub struct Input { pub id: DataId, - pub data: Vec, + pub message: Message<'static>, +} + +impl Input { + pub fn data(&self) -> &[u8] { + &self.message.data + } + + pub fn metadata(&self) -> &Metadata { + &self.message.metadata + } } diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 3b933940..97fb7fba 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,4 +1,6 @@ pub use communication::Input; +use dora_message::serialize_message; +pub use dora_message::{Message, Metadata}; pub use flume::Receiver; use communication::STOP_TOPIC; @@ -52,10 +54,12 @@ impl DoraNode { communication::subscribe_all(self.communication.as_mut(), &self.node_config.inputs) } - pub fn send_output(&mut self, output_id: &DataId, data: &[u8]) -> eyre::Result<()> { + pub fn send_output(&mut self, output_id: &DataId, message: &Message) -> eyre::Result<()> { if !self.node_config.outputs.contains(output_id) { eyre::bail!("unknown output"); } + let data = serialize_message(message) + .with_context(|| format!("failed to serialize `{}` message", output_id))?; let self_id = &self.id; @@ -64,7 +68,7 @@ impl DoraNode { .publisher(&topic) .map_err(|err| eyre::eyre!(err)) .wrap_err_with(|| format!("failed create publisher for output {output_id}"))? - .publish(data) + .publish(&data) .map_err(|err| eyre::eyre!(err)) .wrap_err_with(|| format!("failed to send data for output {output_id}"))?; Ok(()) diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 1dcca35e..0eb02093 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -22,3 +22,4 @@ time = "0.3.9" futures-concurrency = "2.0.3" rand = "0.8.5" dora-core = { version = "0.1.0", path = "../../libraries/core" } +dora-message = { path = "../../libraries/extensions/message" } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index a91f522e..795bc9d2 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -1,4 +1,5 @@ use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor}; +use dora_message::serialize_message; use dora_node_api::{ communication, config::{format_duration, NodeId}, @@ -115,12 +116,13 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() let duration = format_duration(interval); format!("dora/timer/{duration}") }; + let data = serialize_message(&Vec::new().into()).unwrap(); let mut stream = IntervalStream::new(tokio::time::interval(interval)); while (stream.next().await).is_some() { communication .publisher(&topic) .unwrap() - .publish(&[]) + .publish(&data) .expect("failed to publish timer tick message"); } }); diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index bd5fd6b8..f56fb4e3 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -24,3 +24,4 @@ log = "0.4.17" fern = "0.6.1" pyo3 = { version = "0.16.5", features = ["auto-initialize", "eyre"] } flume = "0.10.14" +dora-message = { path = "../../libraries/extensions/message" } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 0e94ae5f..94ce66d3 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -89,7 +89,7 @@ pub fn spawn( "on_input", ( input.id.to_string(), - PyBytes::new(py, &input.data), + PyBytes::new(py, &input.message.data), send_output.clone(), ), ) diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index e1c1674a..af4d4e5d 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -90,14 +90,32 @@ impl<'lib> SharedLibraryOperator<'lib> { }; let send_output_closure = Arc::new(move |output: Output| { - let result = match publishers.get(output.id.deref()) { - Some(publisher) => publisher.publish(&output.data), + let Output { + id, + data, + metadata: Metadata { + open_telemetry_context, + }, + } = output; + let message = dora_node_api::Message { + data: Vec::from(data).into(), + metadata: dora_node_api::Metadata { + open_telemetry_context: String::from(open_telemetry_context).into(), + ..Default::default() + }, + }; + + let data = dora_message::serialize_message(&message) + .context(format!("failed to serialize `{}` message", id.deref())) + .map_err(|err| err.into()); + let result = data.and_then(|data| match publishers.get(id.deref()) { + Some(publisher) => publisher.publish(&data), None => Err(eyre!( "unexpected output {} (not defined in dataflow config)", - output.id.deref() + id.deref() ) .into()), - }; + }); let error = match result { Ok(()) => None, @@ -110,7 +128,7 @@ impl<'lib> SharedLibraryOperator<'lib> { while let Ok(input) = self.inputs.recv() { let operator_input = dora_operator_api_types::Input { id: String::from(input.id).into(), - data: input.data.into(), + data: input.message.data.into_owned().into(), metadata: Metadata { open_telemetry_context: String::new().into(), }, diff --git a/examples/c++-dataflow/node-rust-api/src/main.rs b/examples/c++-dataflow/node-rust-api/src/main.rs index 95c8a42c..3161be76 100644 --- a/examples/c++-dataflow/node-rust-api/src/main.rs +++ b/examples/c++-dataflow/node-rust-api/src/main.rs @@ -34,7 +34,7 @@ fn next_input(inputs: &mut Inputs) -> ffi::DoraInput { Ok(input) => ffi::DoraInput { end_of_input: false, id: input.id.into(), - data: input.data, + data: input.message.data.into(), }, Err(_) => ffi::DoraInput { end_of_input: true, @@ -47,7 +47,7 @@ fn next_input(inputs: &mut Inputs) -> ffi::DoraInput { pub struct OutputSender<'a>(&'a mut DoraNode); fn send_output(sender: &mut OutputSender, id: String, data: &[u8]) -> ffi::DoraResult { - let error = match sender.0.send_output(&id.into(), data) { + let error = match sender.0.send_output(&id.into(), &data.into()) { Ok(()) => String::new(), Err(err) => format!("{err:?}"), }; diff --git a/examples/iceoryx/node/src/main.rs b/examples/iceoryx/node/src/main.rs index a69cb172..25f14161 100644 --- a/examples/iceoryx/node/src/main.rs +++ b/examples/iceoryx/node/src/main.rs @@ -16,7 +16,8 @@ fn main() -> eyre::Result<()> { match input.id.as_str() { "tick" => { let random: u64 = rand::random(); - operator.send_output(&output, &random.to_le_bytes())?; + let data: &[u8] = &random.to_le_bytes(); + operator.send_output(&output, &data.into())?; } other => eprintln!("Ignoring unexpected input `{other}`"), } diff --git a/examples/iceoryx/sink/src/main.rs b/examples/iceoryx/sink/src/main.rs index 32a1277d..a8e7394d 100644 --- a/examples/iceoryx/sink/src/main.rs +++ b/examples/iceoryx/sink/src/main.rs @@ -9,7 +9,7 @@ fn main() -> eyre::Result<()> { while let Ok(input) = inputs.recv() { match input.id.as_str() { "message" => { - let received_string = String::from_utf8(input.data) + let received_string = String::from_utf8(input.message.data.into()) .wrap_err("received message was not utf8-encoded")?; println!("received message: {}", received_string); if !received_string.starts_with("operator received random value ") { diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index a69cb172..25f14161 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -16,7 +16,8 @@ fn main() -> eyre::Result<()> { match input.id.as_str() { "tick" => { let random: u64 = rand::random(); - operator.send_output(&output, &random.to_le_bytes())?; + let data: &[u8] = &random.to_le_bytes(); + operator.send_output(&output, &data.into())?; } other => eprintln!("Ignoring unexpected input `{other}`"), } diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index 32a1277d..a8e7394d 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -9,7 +9,7 @@ fn main() -> eyre::Result<()> { while let Ok(input) = inputs.recv() { match input.id.as_str() { "message" => { - let received_string = String::from_utf8(input.data) + let received_string = String::from_utf8(input.message.data.into()) .wrap_err("received message was not utf8-encoded")?; println!("received message: {}", received_string); if !received_string.starts_with("operator received random value ") { diff --git a/libraries/extensions/message/schema/message.capnp b/libraries/extensions/message/schema/message.capnp index a1954262..b3ef45e8 100644 --- a/libraries/extensions/message/schema/message.capnp +++ b/libraries/extensions/message/schema/message.capnp @@ -9,7 +9,6 @@ struct Metadata { struct Message { - id @0 :UInt32; - metadata @1 :Metadata; - data @2 :Data; + metadata @0 :Metadata; + data @1 :Data; } diff --git a/libraries/extensions/message/src/lib.rs b/libraries/extensions/message/src/lib.rs index 04dfa791..bf1b311a 100644 --- a/libraries/extensions/message/src/lib.rs +++ b/libraries/extensions/message/src/lib.rs @@ -1,23 +1,111 @@ //! Enable serialisation and deserialisation of capnproto messages //! + +use std::borrow::Cow; pub mod message_capnp { include!(concat!(env!("OUT_DIR"), "/message_capnp.rs")); } /// Helper function to serialize message -pub fn serialize_message(data: &[u8], otel_context: &str) -> Vec { +pub fn serialize_message(message: &Message) -> Result, capnp::Error> { + let Message { + data, + metadata: + Metadata { + metadata_version, + watermark, + deadline, + open_telemetry_context: otel_context, + }, + } = message; + // Build the metadata let mut meta_builder = capnp::message::Builder::new_default(); let mut metadata = meta_builder.init_root::(); + metadata.set_metadata_version(*metadata_version); + metadata.set_watermark(*watermark); + metadata.set_deadline(*deadline); metadata.set_otel_context(otel_context); // Build the data of the message let mut builder = capnp::message::Builder::new_default(); let mut message = builder.init_root::(); message.set_data(data); - message.set_metadata(metadata.into_reader()).unwrap(); + message.set_metadata(metadata.into_reader())?; let mut buffer = Vec::new(); - capnp::serialize::write_message(&mut buffer, &builder).unwrap(); - buffer + capnp::serialize::write_message(&mut buffer, &builder)?; + Ok(buffer) +} + +pub fn deserialize_message(mut raw: &[u8]) -> Result { + let deserialized = + capnp::serialize::read_message_from_flat_slice(&mut raw, Default::default())? + .into_typed::(); + + let reader = deserialized.get()?; + let metadata_reader = reader.get_metadata()?; + + let message = Message { + data: reader.get_data()?.into(), + metadata: Metadata { + metadata_version: metadata_reader.get_metadata_version(), + watermark: metadata_reader.get_watermark(), + deadline: metadata_reader.get_deadline(), + open_telemetry_context: metadata_reader.get_otel_context()?.into(), + }, + }; + + // TODO: avoid copying and make this zero copy + Ok(message.into_owned()) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Message<'a> { + pub data: Cow<'a, [u8]>, + pub metadata: Metadata<'a>, +} + +impl<'a> From<&'a [u8]> for Message<'a> { + fn from(data: &'a [u8]) -> Self { + Self { + data: data.into(), + metadata: Default::default(), + } + } +} + +impl From> for Message<'static> { + fn from(data: Vec) -> Self { + Self { + data: data.into(), + metadata: Default::default(), + } + } +} + +impl Message<'_> { + pub fn into_owned(self) -> Message<'static> { + Message { + data: self.data.into_owned().into(), + metadata: self.metadata.into_owned(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct Metadata<'a> { + pub metadata_version: u16, + pub watermark: u64, + pub deadline: u64, + pub open_telemetry_context: Cow<'a, str>, +} + +impl Metadata<'_> { + fn into_owned(self) -> Metadata<'static> { + Metadata { + open_telemetry_context: self.open_telemetry_context.into_owned().into(), + ..self + } + } }