From 287b71d8936c93ddb00a819846059016aa4f40a3 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 9 Jul 2025 14:50:02 +0200 Subject: [PATCH] Document remaining items of `dora-node-api` crate --- apis/rust/node/src/event_stream/event.rs | 10 +- apis/rust/node/src/event_stream/merged.rs | 28 ++++++ apis/rust/node/src/event_stream/mod.rs | 85 ++++++++++++++++- apis/rust/node/src/event_stream/scheduler.rs | 98 ++++++++++++++++---- apis/rust/node/src/lib.rs | 2 +- apis/rust/node/src/node/arrow_utils.rs | 13 +++ apis/rust/node/src/node/mod.rs | 75 ++++++++++++++- libraries/arrow-convert/src/lib.rs | 9 ++ libraries/message/src/config.rs | 1 + libraries/message/src/lib.rs | 3 + libraries/message/src/metadata.rs | 5 + 11 files changed, 302 insertions(+), 27 deletions(-) diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index aec7b10f..72dfe118 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -45,7 +45,7 @@ pub enum Event { /// The [`StopCause`] field contains the reason for the event stream closure. /// /// Typically, nodes should exit once the event stream closes. One notable - /// exception are nodes with no inputs, which will receive a + /// exception are nodes with no inputs, which will receive aa /// `Event::Stop(StopCause::AllInputsClosed)` right at startup. Source nodes /// might want to keep producing outputs still. (There is currently an open /// discussion of changing this behavior and not sending `AllInputsClosed` @@ -55,7 +55,15 @@ pub enum Event { /// issued through `dora stop` or a `ctrl-c`. Nodes **must exit** once receiving /// such a stop event, otherwise they will be killed by Dora. Stop(StopCause), + /// Instructs the node to reload itself or one of its operators. + /// + /// This event is currently only used for reloading Python operators that are + /// started by a `dora runtime` process. So this event should not be sent to normal + /// nodes yet. Reload { + /// The ID of the operator that should be reloaded. + /// + /// There is currently no case where `operator_id` is `None`. operator_id: Option, }, /// Notifies the node about an unexpected error that happened inside Dora. diff --git a/apis/rust/node/src/event_stream/merged.rs b/apis/rust/node/src/event_stream/merged.rs index 43c456be..adeeafad 100644 --- a/apis/rust/node/src/event_stream/merged.rs +++ b/apis/rust/node/src/event_stream/merged.rs @@ -1,18 +1,32 @@ +//! Merge external stream into an [`EventStream`][super::EventStream]. +//! +//! Sometimes nodes need to listen to external events, in addition to Dora events. +//! This module provides support for that by providing the [`MergeExternal`] trait. + use futures::{Stream, StreamExt}; use futures_concurrency::stream::Merge; +/// A Dora event or an event from an external source. #[derive(Debug)] pub enum MergedEvent { + /// A Dora event Dora(super::Event), + /// An external event + /// + /// Yielded by the stream that was merged into the Dora [`EventStream`][super::EventStream]. External(E), } +/// A general enum to represent a value of two possible types. pub enum Either { + /// Value is of the first type, type `A`. First(A), + /// Value is of the second type, type `B`. Second(B), } impl Either { + /// Unwraps an `Either` instance where both types are identical. pub fn flatten(self) -> A { match self { Either::First(a) => a, @@ -21,19 +35,33 @@ impl Either { } } +/// Allows merging an external event stream into an existing event stream. // TODO: use impl trait return type once stable pub trait MergeExternal<'a, E> { + /// The item type yielded from the merged stream. type Item; + /// Merge the given stream into an existing event stream. + /// + /// Returns a new event stream that yields items from both streams. + /// The ordering between the two streams is not guaranteed. fn merge_external( self, external_events: impl Stream + Unpin + 'a, ) -> Box + Unpin + 'a>; } +/// Allows merging a sendable external event stream into an existing (sendable) event stream. +/// +/// By implementing [`Send`], the streams can be sent to different threads. pub trait MergeExternalSend<'a, E> { + /// The item type yielded from the merged stream. type Item; + /// Merge the given stream into an existing event stream. + /// + /// Returns a new event stream that yields items from both streams. + /// The ordering between the two streams is not guaranteed. fn merge_external_send( self, external_events: impl Stream + Unpin + Send + Sync + 'a, diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 120aa090..c7a3aefc 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -30,12 +30,33 @@ use dora_core::{ }; use eyre::{eyre, Context}; +pub use scheduler::Scheduler as EventScheduler; + mod data_conversion; mod event; pub mod merged; mod scheduler; mod thread; +/// Asynchronous iterator over the incoming [`Event`]s destined for this node. +/// +/// This struct [implements](#impl-Stream-for-EventStream) the [`Stream`] trait, +/// so you can use methods of the [`StreamExt`] trait +/// on this struct. A common pattern is `while let Some(event) = event_stream.next().await`. +/// +/// Nodes should iterate over this event stream and react to events that they are interested in. +/// Typically, the most important event type is [`Event::Input`]. +/// You don't need to handle all events, it's fine to ignore events that are not relevant to your node. +/// +/// The event stream will close itself after a [`Event::Stop`] was received. +/// A manual `break` on [`Event::Stop`] is typically not needed. +/// _(You probably do need to use a manual `break` on stop events when using the +/// [`StreamExt::merge`][`futures_concurrency::stream::StreamExt::merge`] implementation on +/// [`EventStream`] to combine the stream with an external one.)_ +/// +/// Once the event stream finished, nodes should exit. +/// Note that Dora kills nodes that don't exit quickly after a [`Event::Stop`] of type +/// [`StopCause::Manual`] was received. pub struct EventStream { node_id: NodeId, receiver: flume::r#async::RecvStream<'static, EventItem>, @@ -158,16 +179,61 @@ impl EventStream { }) } - /// wait for the next event on the events stream. + /// Synchronously waits for the next event. + /// + /// Blocks the thread until the next event arrives. + /// Returns [`None`] once the event stream is closed. + /// + /// For an asynchronous variant of this method see [`recv_async`][Self::recv_async]. + /// + /// ## Event Reordering + /// + /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the + /// events might be returned in a different order than they occurred. For details, check the + /// documentation of the [`EventScheduler`] struct. + /// + /// If you want to receive the events in their original chronological order, use the + /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the + /// [`Stream`] trait). pub fn recv(&mut self) -> Option { futures::executor::block_on(self.recv_async()) } - /// wait for the next event on the events stream until timeout + /// Receives the next incoming [`Event`] synchronously with a timeout. + /// + /// Blocks the thread until the next event arrives or the timeout is reached. + /// Returns a [`Event::Error`] if no event was received within the given duration. + /// + /// Returns [`None`] once the event stream is closed. + /// + /// For an asynchronous variant of this method see [`recv_async_timeout`][Self::recv_async_timeout]. + /// + /// ## Event Reordering + /// + /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the + /// events might be returned in a different order than they occurred. For details, check the + /// documentation of the [`EventScheduler`] struct. + /// + /// If you want to receive the events in their original chronological order, use the + /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the + /// [`Stream`] trait). pub fn recv_timeout(&mut self, dur: Duration) -> Option { futures::executor::block_on(self.recv_async_timeout(dur)) } + /// Receives the next incoming [`Event`] asynchronously, using an [`EventScheduler`] for fairness. + /// + /// Returns [`None`] once the event stream is closed. + /// + /// ## Event Reordering + /// + /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the + /// events might be returned in a different order than they occurred. For details, check the + /// documentation of the [`EventScheduler`] struct. + /// + /// If you want to receive the events in their original chronological order, use the + /// [`StreamExt::next`] method with a custom timeout future instead + /// ([`EventStream`] implements the [`Stream`] trait). pub async fn recv_async(&mut self) -> Option { loop { if self.scheduler.is_empty() { @@ -188,6 +254,21 @@ impl EventStream { event.map(Self::convert_event_item) } + /// Receives the next incoming [`Event`] asynchronously with a timeout. + /// + /// Returns a [`Event::Error`] if no event was received within the given duration. + /// + /// Returns [`None`] once the event stream is closed. + /// + /// ## Event Reordering + /// + /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the + /// events might be returned in a different order than they occurred. For details, check the + /// documentation of the [`EventScheduler`] struct. + /// + /// If you want to receive the events in their original chronological order, use the + /// [`StreamExt::next`] method with a custom timeout future instead + /// ([`EventStream`] implements the [`Stream`] trait). pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option { match select(Delay::new(dur), pin!(self.recv_async())).await { Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError( diff --git a/apis/rust/node/src/event_stream/scheduler.rs b/apis/rust/node/src/event_stream/scheduler.rs index defdc209..6ef8c14f 100644 --- a/apis/rust/node/src/event_stream/scheduler.rs +++ b/apis/rust/node/src/event_stream/scheduler.rs @@ -3,27 +3,89 @@ use std::collections::{HashMap, VecDeque}; use dora_message::{daemon_to_node::NodeEvent, id::DataId}; use super::thread::EventItem; -pub const NON_INPUT_EVENT: &str = "dora/non_input_event"; +pub(crate) const NON_INPUT_EVENT: &str = "dora/non_input_event"; -// This scheduler will make sure that there is fairness between -// inputs. -// -// It is going to always make sure that the input that has not been used for the longest period is the first one to be used next. -// -// Ex: -// In the case that one input has a very high frequency and another one with a very slow frequency.\ -// -// The Node will always alternate between the two inputs when each input is available -// Avoiding one input to be overwhelmingly present. -// +/// This scheduler will make sure that there is fairness between inputs. +/// +/// The scheduler reorders events in the following way: +/// +/// - **Non-input events are prioritized** +/// +/// If the node received any events that are not input events, they are returned first. The +/// intention of this reordering is that the nodes can react quickly to dataflow-related events +/// even when their input queues are very full. +/// +/// This reordering has some side effects that might be unexpected: +/// - An [`InputClosed`][super::Event::InputClosed] event might be yielded before the last +/// input events of that ID. +/// +/// Usually, an `InputClosed` event indicates that there won't be any subsequent inputs +/// of a certain ID. This invariant does not hold anymore for a scheduled event stream. +/// - The [`Stop`][super::Event::Stop] event might not be the last event of the stream anymore. +/// +/// Usually, the `Stop` event is the last event that is sent to a node before the event stream +/// is closed. Because of the reordering, the stream might return more events after a `Stop` +/// event. +/// - **Input events are grouped by ID** and yielded in a **least-recently used order (by ID)**. +/// +/// The scheduler keeps a separate queue for each input ID, where the incoming input events are +/// placed in their chronological order. When yielding the next event, the scheduler iterates over +/// these queues in least-recently used order. This means that the queue corresponding to the +/// last yielded event will be checked last. The scheduler will return the oldest event from the +/// first non-empty queue. +/// +/// The side effect of this change is that inputs events of different IDs are no longer in their +/// chronological order. This might lead to unexpected results for input events that are caused by +/// each other. +/// +/// ## Example 1 +/// Consider the case that one input has a very high frequency and another one with a very slow +/// frequency. The event stream will always alternate between the two inputs when each input is +/// available. +/// Without the scheduling, the high-frequency input would be returned much more often. +/// +/// ## Example 2 +/// Again, let's consider the case that one input has a very high frequency and the other has a +/// very slow frequency. This time, we define a small maximum queue sizes for the low-frequency +/// input, but a large queue size for the high-frequency one. +/// Using the scheduler, the event stream will always alternate between high and low-frequency +/// inputs as long as inputs of both types are available. +/// +/// Without scheduling, the low-frequency input might never be yielded before +/// it's dropped because there is almost always an older high-frequency input available that is +/// yielded first. Once the low-frequency input would be the next one chronologically, it might +/// have been dropped already because the node received newer low-frequency inputs in the +/// meantime (the queue length is small). At this point, the next-oldest input is a high-frequency +/// input again. +/// +/// ## Example 3 +/// Consider a high-frequency camera input and a low-frequency bounding box input, which is based +/// on the latest camera image. The dataflow YAML file specifies a large queue size for the camera +/// input and a small queue size for the bounding box input. +/// +/// With scheduling, the number of +/// buffered camera inputs might grow over time. As a result the camera inputs yielded from the +/// stream (in oldest-first order) are not synchronized with the bounding box inputs anymore. So +/// the node receives an up-to-date bounding box, but a considerably outdated image. +/// +/// Without scheduling, the events are returned in chronological order. This time, the bounding +/// box might be slightly outdated if the camera sent new images before the bounding box was +/// ready. However, the time difference between the two input types is independent of the +/// queue size this time. +/// +/// (If a perfect matching bounding box is required, we recommend to forward the input image as +/// part of the bounding box output. This way, the receiving node only needs to subscribe to one +/// input so no mismatches can happen.) #[derive(Debug)] pub struct Scheduler { - last_used: VecDeque, // Tracks the last-used event ID - event_queues: HashMap)>, // Tracks events per ID + /// Tracks the last-used event ID + last_used: VecDeque, + /// Tracks events per ID + event_queues: HashMap)>, } impl Scheduler { - pub fn new(event_queues: HashMap)>) -> Self { + pub(crate) fn new(event_queues: HashMap)>) -> Self { let topic = VecDeque::from_iter( event_queues .keys() @@ -36,7 +98,7 @@ impl Scheduler { } } - pub fn add_event(&mut self, event: EventItem) { + pub(crate) fn add_event(&mut self, event: EventItem) { let event_id = match &event { EventItem::NodeEvent { event: @@ -63,7 +125,7 @@ impl Scheduler { } } - pub fn next(&mut self) -> Option { + pub(crate) fn next(&mut self) -> Option { // Retrieve message from the non input event first that have priority over input message. if let Some((_size, queue)) = self .event_queues @@ -89,7 +151,7 @@ impl Scheduler { None } - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { self.event_queues .iter() .all(|(_id, (_size, queue))| queue.is_empty()) diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index b79f14ee..bdc9c35d 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -84,7 +84,7 @@ pub use dora_message::{ metadata::{Metadata, MetadataParameters, Parameter}, DataflowId, }; -pub use event_stream::{merged, Event, EventStream, StopCause}; +pub use event_stream::{merged, Event, EventScheduler, EventStream, StopCause}; pub use flume::Receiver; pub use futures; pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; diff --git a/apis/rust/node/src/node/arrow_utils.rs b/apis/rust/node/src/node/arrow_utils.rs index 39c714c2..9a75e9f7 100644 --- a/apis/rust/node/src/node/arrow_utils.rs +++ b/apis/rust/node/src/node/arrow_utils.rs @@ -1,7 +1,11 @@ +//! Utility functions for converting Arrow arrays to/from raw data. +//! use arrow::array::{ArrayData, BufferSpec}; use dora_message::metadata::{ArrowTypeInfo, BufferOffset}; use eyre::Context; +/// Calculates the data size in bytes required for storing a continuous copy of the given Arrow +/// array. pub fn required_data_size(array: &ArrayData) -> usize { let mut next_offset = 0; required_data_size_inner(array, &mut next_offset); @@ -21,6 +25,12 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) { } } +/// Copy the given Arrow array into the provided buffer. +/// +/// If the Arrow array consists of multiple buffers, they are placed continuously in the target +/// buffer (there might be some padding for alignment) +/// +/// Panics if the buffer is not large enough. pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo { let mut next_offset = 0; copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) @@ -71,6 +81,9 @@ fn copy_array_into_sample_inner( } } +/// Tries to convert the given raw Arrow buffer into an Arrow array. +/// +/// The `type_info` is required for decoding the `raw_buffer` correctly. pub fn buffer_into_arrow_array( raw_buffer: &arrow::buffer::Buffer, type_info: &ArrowTypeInfo, diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index a1fd01de..f3757991 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -63,6 +63,10 @@ enum TokioRuntime { Handle(Handle), } +/// Allows sending outputs and retrieving node information. +/// +/// The main purpose of this struct is to send outputs via Dora. There are also functions available +/// for retrieving the node configuration. pub struct DoraNode { id: NodeId, dataflow_id: DataflowId, @@ -80,7 +84,11 @@ pub struct DoraNode { } impl DoraNode { - /// Initiate a node from environment variables set by `dora-coordinator` + /// Initiate a node from environment variables set by the Dora daemon. + /// + /// This is the recommended initialization function for Dora nodes, which are spawned by + /// Dora daemon instances. + /// /// /// ```no_run /// use dora_node_api::DoraNode; @@ -107,6 +115,8 @@ impl DoraNode { /// Initiate a node from a dataflow id and a node id. /// + /// This initialization function should be used for [_dynamic nodes_](index.html#dynamic-nodes). + /// /// ```no_run /// use dora_node_api::DoraNode; /// use dora_node_api::dora_core::config::NodeId; @@ -139,6 +149,11 @@ impl DoraNode { } } + /// Dynamic initialization function for nodes that are sometimes used as dynamic nodes. + /// + /// This function first tries initializing the traditional way through + /// [`init_from_env`][Self::init_from_env]. If this fails, it falls back to + /// [`init_from_node_id`][Self::init_from_node_id]. pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { if std::env::var("DORA_NODE_CONFIG").is_ok() { info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"); @@ -148,6 +163,8 @@ impl DoraNode { } } + /// Internal initialization routine that should not be used outside of Dora. + #[doc(hidden)] #[tracing::instrument] pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> { let NodeConfig { @@ -232,7 +249,8 @@ impl DoraNode { } } - /// Send data from the node to the other nodes. + /// Send raw data from the node to the other nodes. + /// /// We take a closure as an input to enable zero copy on send. /// /// ```no_run @@ -255,6 +273,8 @@ impl DoraNode { /// }).expect("Could not send output"); /// ``` /// + /// Ignores the output if the given `output_id` is not specified as node output in the dataflow + /// configuration file. pub fn send_output_raw( &mut self, output_id: DataId, @@ -276,6 +296,14 @@ impl DoraNode { self.send_output_sample(output_id, type_info, parameters, Some(sample)) } + /// Sends the give Arrow array as an output message. + /// + /// Uses shared memory for efficient data transfer if suitable. + /// + /// This method might copy the message once to move it to shared memory. + /// + /// Ignores the output if the given `output_id` is not specified as node output in the dataflow + /// configuration file. pub fn send_output( &mut self, output_id: DataId, @@ -299,6 +327,12 @@ impl DoraNode { Ok(()) } + /// Send the given raw byte data as output. + /// + /// Might copy the data once to move it into shared memory. + /// + /// Ignores the output if the given `output_id` is not specified as node output in the dataflow + /// configuration file. pub fn send_output_bytes( &mut self, output_id: DataId, @@ -314,6 +348,12 @@ impl DoraNode { }) } + /// Send the give raw byte data with the provided type information. + /// + /// It is recommended to use a function like [`send_output`][Self::send_output] instead. + /// + /// Ignores the output if the given `output_id` is not specified as node output in the dataflow + /// configuration file. pub fn send_typed_output( &mut self, output_id: DataId, @@ -335,6 +375,12 @@ impl DoraNode { self.send_output_sample(output_id, type_info, parameters, Some(sample)) } + /// Sends the given [`DataSample`] as output, combined with the given type information. + /// + /// It is recommended to use a function like [`send_output`][Self::send_output] instead. + /// + /// Ignores the output if the given `output_id` is not specified as node output in the dataflow + /// configuration file. pub fn send_output_sample( &mut self, output_id: DataId, @@ -363,32 +409,46 @@ impl DoraNode { Ok(()) } - pub fn close_outputs(&mut self, outputs: Vec) -> eyre::Result<()> { - for output_id in &outputs { + /// Report the given outputs IDs as closed. + /// + /// The node is not allowed to send more outputs with the closed IDs. + /// + /// Closing outputs early can be helpful to receivers. + pub fn close_outputs(&mut self, outputs_ids: Vec) -> eyre::Result<()> { + for output_id in &outputs_ids { if !self.node_config.outputs.remove(output_id) { eyre::bail!("unknown output {output_id}"); } } self.control_channel - .report_closed_outputs(outputs) + .report_closed_outputs(outputs_ids) .wrap_err("failed to report closed outputs to daemon")?; Ok(()) } + /// Returns the ID of the node as specified in the dataflow configuration file. pub fn id(&self) -> &NodeId { &self.id } + /// Returns the unique identifier for the running dataflow instance. + /// + /// Dora assigns each dataflow instance a random identifier when started. pub fn dataflow_id(&self) -> &DataflowId { &self.dataflow_id } + /// Returns the input and output configuration of this node. pub fn node_config(&self) -> &NodeRunConfig { &self.node_config } + /// Allocates a [`DataSample`] of the specified size. + /// + /// The data sample will use shared memory when suitable to enable efficient data transfer + /// when sending an output message. pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result { let data = if data_len >= ZERO_COPY_THRESHOLD { // create shared memory region @@ -527,6 +587,11 @@ impl Drop for DoraNode { } } +/// A data region suitable for sending as an output message. +/// +/// The region is stored in shared memory when suitable to enable efficient data transfer. +/// +/// `DataSample` implements the [`Deref`] and [`DerefMut`] traits to read and write the mapped data. pub struct DataSample { inner: DataSampleInner, len: usize, diff --git a/libraries/arrow-convert/src/lib.rs b/libraries/arrow-convert/src/lib.rs index f9b8f21e..32185a3f 100644 --- a/libraries/arrow-convert/src/lib.rs +++ b/libraries/arrow-convert/src/lib.rs @@ -1,3 +1,7 @@ +//! Provides functions for converting between Apache Arrow arrays and Rust data types. + +#![warn(missing_docs)] + use arrow::array::{ Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, UInt32Array, UInt8Array, @@ -10,12 +14,16 @@ use std::ops::{Deref, DerefMut}; mod from_impls; mod into_impls; +/// Data that can be converted to an Arrow array. pub trait IntoArrow { + /// The Array type that the data can be converted to. type A: Array; + /// Convert the data into an Arrow array. fn into_arrow(self) -> Self::A; } +/// Wrapper type for an Arrow [`ArrayRef`](arrow::array::ArrayRef). #[derive(Debug)] pub struct ArrowData(pub arrow::array::ArrayRef); @@ -35,6 +43,7 @@ impl DerefMut for ArrowData { macro_rules! register_array_handlers { ($(($variant:path, $array_type:ty, $type_name:expr)),* $(,)?) => { + /// Tries to convert the given Arrow array into a `Vec` of integers or floats. pub fn into_vec(data: &ArrowData) -> Result> where T: Copy + NumCast + 'static, diff --git a/libraries/message/src/config.rs b/libraries/message/src/config.rs index 203a302f..21d34a26 100644 --- a/libraries/message/src/config.rs +++ b/libraries/message/src/config.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; pub use crate::id::{DataId, NodeId, OperatorId}; +/// Contains the input and output configuration of the node. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] pub struct NodeRunConfig { /// Inputs for the nodes as a map from input ID to `node_id/output_id`. diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index e5e2e33f..dfaba4b9 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -26,6 +26,9 @@ pub use arrow_data; pub use arrow_schema; use uuid::{Timestamp, Uuid}; +/// Unique identifier for a dataflow instance. +/// +/// Dora assigns each dataflow instance a unique ID on start. pub type DataflowId = uuid::Uuid; #[derive( diff --git a/libraries/message/src/metadata.rs b/libraries/message/src/metadata.rs index 9b9fa2b2..d2cda9d6 100644 --- a/libraries/message/src/metadata.rs +++ b/libraries/message/src/metadata.rs @@ -3,6 +3,9 @@ use std::collections::BTreeMap; use arrow_schema::DataType; use serde::{Deserialize, Serialize}; +/// Additional data that is sent as part of output messages. +/// +/// Includes a timestamp, type information, and additional user-provided paramters. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Metadata { metadata_version: u16, @@ -42,6 +45,7 @@ impl Metadata { } } +/// Additional metadata that can be sent as part of output messages. pub type MetadataParameters = BTreeMap; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -55,6 +59,7 @@ pub struct ArrowTypeInfo { pub child_data: Vec, } +/// A metadata parameter that can be sent as part of output messages. #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] pub enum Parameter { Bool(bool),