| @@ -45,7 +45,7 @@ pub enum Event { | |||||
| /// The [`StopCause`] field contains the reason for the event stream closure. | /// The [`StopCause`] field contains the reason for the event stream closure. | ||||
| /// | /// | ||||
| /// Typically, nodes should exit once the event stream closes. One notable | /// Typically, nodes should exit once the event stream closes. One notable | ||||
| /// exception are nodes with no inputs, which will receive a | |||||
| /// exception are nodes with no inputs, which will receive aa | |||||
| /// `Event::Stop(StopCause::AllInputsClosed)` right at startup. Source nodes | /// `Event::Stop(StopCause::AllInputsClosed)` right at startup. Source nodes | ||||
| /// might want to keep producing outputs still. (There is currently an open | /// might want to keep producing outputs still. (There is currently an open | ||||
| /// discussion of changing this behavior and not sending `AllInputsClosed` | /// discussion of changing this behavior and not sending `AllInputsClosed` | ||||
| @@ -55,7 +55,15 @@ pub enum Event { | |||||
| /// issued through `dora stop` or a `ctrl-c`. Nodes **must exit** once receiving | /// issued through `dora stop` or a `ctrl-c`. Nodes **must exit** once receiving | ||||
| /// such a stop event, otherwise they will be killed by Dora. | /// such a stop event, otherwise they will be killed by Dora. | ||||
| Stop(StopCause), | Stop(StopCause), | ||||
| /// Instructs the node to reload itself or one of its operators. | |||||
| /// | |||||
| /// This event is currently only used for reloading Python operators that are | |||||
| /// started by a `dora runtime` process. So this event should not be sent to normal | |||||
| /// nodes yet. | |||||
| Reload { | Reload { | ||||
| /// The ID of the operator that should be reloaded. | |||||
| /// | |||||
| /// There is currently no case where `operator_id` is `None`. | |||||
| operator_id: Option<OperatorId>, | operator_id: Option<OperatorId>, | ||||
| }, | }, | ||||
| /// Notifies the node about an unexpected error that happened inside Dora. | /// Notifies the node about an unexpected error that happened inside Dora. | ||||
| @@ -1,18 +1,32 @@ | |||||
| //! Merge external stream into an [`EventStream`][super::EventStream]. | |||||
| //! | |||||
| //! Sometimes nodes need to listen to external events, in addition to Dora events. | |||||
| //! This module provides support for that by providing the [`MergeExternal`] trait. | |||||
| use futures::{Stream, StreamExt}; | use futures::{Stream, StreamExt}; | ||||
| use futures_concurrency::stream::Merge; | use futures_concurrency::stream::Merge; | ||||
| /// A Dora event or an event from an external source. | |||||
| #[derive(Debug)] | #[derive(Debug)] | ||||
| pub enum MergedEvent<E> { | pub enum MergedEvent<E> { | ||||
| /// A Dora event | |||||
| Dora(super::Event), | Dora(super::Event), | ||||
| /// An external event | |||||
| /// | |||||
| /// Yielded by the stream that was merged into the Dora [`EventStream`][super::EventStream]. | |||||
| External(E), | External(E), | ||||
| } | } | ||||
| /// A general enum to represent a value of two possible types. | |||||
| pub enum Either<A, B> { | pub enum Either<A, B> { | ||||
| /// Value is of the first type, type `A`. | |||||
| First(A), | First(A), | ||||
| /// Value is of the second type, type `B`. | |||||
| Second(B), | Second(B), | ||||
| } | } | ||||
| impl<A> Either<A, A> { | impl<A> Either<A, A> { | ||||
| /// Unwraps an `Either` instance where both types are identical. | |||||
| pub fn flatten(self) -> A { | pub fn flatten(self) -> A { | ||||
| match self { | match self { | ||||
| Either::First(a) => a, | Either::First(a) => a, | ||||
| @@ -21,19 +35,33 @@ impl<A> Either<A, A> { | |||||
| } | } | ||||
| } | } | ||||
| /// Allows merging an external event stream into an existing event stream. | |||||
| // TODO: use impl trait return type once stable | // TODO: use impl trait return type once stable | ||||
| pub trait MergeExternal<'a, E> { | pub trait MergeExternal<'a, E> { | ||||
| /// The item type yielded from the merged stream. | |||||
| type Item; | type Item; | ||||
| /// Merge the given stream into an existing event stream. | |||||
| /// | |||||
| /// Returns a new event stream that yields items from both streams. | |||||
| /// The ordering between the two streams is not guaranteed. | |||||
| fn merge_external( | fn merge_external( | ||||
| self, | self, | ||||
| external_events: impl Stream<Item = E> + Unpin + 'a, | external_events: impl Stream<Item = E> + Unpin + 'a, | ||||
| ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>; | ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>; | ||||
| } | } | ||||
| /// Allows merging a sendable external event stream into an existing (sendable) event stream. | |||||
| /// | |||||
| /// By implementing [`Send`], the streams can be sent to different threads. | |||||
| pub trait MergeExternalSend<'a, E> { | pub trait MergeExternalSend<'a, E> { | ||||
| /// The item type yielded from the merged stream. | |||||
| type Item; | type Item; | ||||
| /// Merge the given stream into an existing event stream. | |||||
| /// | |||||
| /// Returns a new event stream that yields items from both streams. | |||||
| /// The ordering between the two streams is not guaranteed. | |||||
| fn merge_external_send( | fn merge_external_send( | ||||
| self, | self, | ||||
| external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a, | external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a, | ||||
| @@ -30,12 +30,33 @@ use dora_core::{ | |||||
| }; | }; | ||||
| use eyre::{eyre, Context}; | use eyre::{eyre, Context}; | ||||
| pub use scheduler::Scheduler as EventScheduler; | |||||
| mod data_conversion; | mod data_conversion; | ||||
| mod event; | mod event; | ||||
| pub mod merged; | pub mod merged; | ||||
| mod scheduler; | mod scheduler; | ||||
| mod thread; | mod thread; | ||||
| /// Asynchronous iterator over the incoming [`Event`]s destined for this node. | |||||
| /// | |||||
| /// This struct [implements](#impl-Stream-for-EventStream) the [`Stream`] trait, | |||||
| /// so you can use methods of the [`StreamExt`] trait | |||||
| /// on this struct. A common pattern is `while let Some(event) = event_stream.next().await`. | |||||
| /// | |||||
| /// Nodes should iterate over this event stream and react to events that they are interested in. | |||||
| /// Typically, the most important event type is [`Event::Input`]. | |||||
| /// You don't need to handle all events, it's fine to ignore events that are not relevant to your node. | |||||
| /// | |||||
| /// The event stream will close itself after a [`Event::Stop`] was received. | |||||
| /// A manual `break` on [`Event::Stop`] is typically not needed. | |||||
| /// _(You probably do need to use a manual `break` on stop events when using the | |||||
| /// [`StreamExt::merge`][`futures_concurrency::stream::StreamExt::merge`] implementation on | |||||
| /// [`EventStream`] to combine the stream with an external one.)_ | |||||
| /// | |||||
| /// Once the event stream finished, nodes should exit. | |||||
| /// Note that Dora kills nodes that don't exit quickly after a [`Event::Stop`] of type | |||||
| /// [`StopCause::Manual`] was received. | |||||
| pub struct EventStream { | pub struct EventStream { | ||||
| node_id: NodeId, | node_id: NodeId, | ||||
| receiver: flume::r#async::RecvStream<'static, EventItem>, | receiver: flume::r#async::RecvStream<'static, EventItem>, | ||||
| @@ -158,16 +179,61 @@ impl EventStream { | |||||
| }) | }) | ||||
| } | } | ||||
| /// wait for the next event on the events stream. | |||||
| /// Synchronously waits for the next event. | |||||
| /// | |||||
| /// Blocks the thread until the next event arrives. | |||||
| /// Returns [`None`] once the event stream is closed. | |||||
| /// | |||||
| /// For an asynchronous variant of this method see [`recv_async`][Self::recv_async]. | |||||
| /// | |||||
| /// ## Event Reordering | |||||
| /// | |||||
| /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the | |||||
| /// events might be returned in a different order than they occurred. For details, check the | |||||
| /// documentation of the [`EventScheduler`] struct. | |||||
| /// | |||||
| /// If you want to receive the events in their original chronological order, use the | |||||
| /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the | |||||
| /// [`Stream`] trait). | |||||
| pub fn recv(&mut self) -> Option<Event> { | pub fn recv(&mut self) -> Option<Event> { | ||||
| futures::executor::block_on(self.recv_async()) | futures::executor::block_on(self.recv_async()) | ||||
| } | } | ||||
| /// wait for the next event on the events stream until timeout | |||||
| /// Receives the next incoming [`Event`] synchronously with a timeout. | |||||
| /// | |||||
| /// Blocks the thread until the next event arrives or the timeout is reached. | |||||
| /// Returns a [`Event::Error`] if no event was received within the given duration. | |||||
| /// | |||||
| /// Returns [`None`] once the event stream is closed. | |||||
| /// | |||||
| /// For an asynchronous variant of this method see [`recv_async_timeout`][Self::recv_async_timeout]. | |||||
| /// | |||||
| /// ## Event Reordering | |||||
| /// | |||||
| /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the | |||||
| /// events might be returned in a different order than they occurred. For details, check the | |||||
| /// documentation of the [`EventScheduler`] struct. | |||||
| /// | |||||
| /// If you want to receive the events in their original chronological order, use the | |||||
| /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the | |||||
| /// [`Stream`] trait). | |||||
| pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> { | pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> { | ||||
| futures::executor::block_on(self.recv_async_timeout(dur)) | futures::executor::block_on(self.recv_async_timeout(dur)) | ||||
| } | } | ||||
| /// Receives the next incoming [`Event`] asynchronously, using an [`EventScheduler`] for fairness. | |||||
| /// | |||||
| /// Returns [`None`] once the event stream is closed. | |||||
| /// | |||||
| /// ## Event Reordering | |||||
| /// | |||||
| /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the | |||||
| /// events might be returned in a different order than they occurred. For details, check the | |||||
| /// documentation of the [`EventScheduler`] struct. | |||||
| /// | |||||
| /// If you want to receive the events in their original chronological order, use the | |||||
| /// [`StreamExt::next`] method with a custom timeout future instead | |||||
| /// ([`EventStream`] implements the [`Stream`] trait). | |||||
| pub async fn recv_async(&mut self) -> Option<Event> { | pub async fn recv_async(&mut self) -> Option<Event> { | ||||
| loop { | loop { | ||||
| if self.scheduler.is_empty() { | if self.scheduler.is_empty() { | ||||
| @@ -188,6 +254,21 @@ impl EventStream { | |||||
| event.map(Self::convert_event_item) | event.map(Self::convert_event_item) | ||||
| } | } | ||||
| /// Receives the next incoming [`Event`] asynchronously with a timeout. | |||||
| /// | |||||
| /// Returns a [`Event::Error`] if no event was received within the given duration. | |||||
| /// | |||||
| /// Returns [`None`] once the event stream is closed. | |||||
| /// | |||||
| /// ## Event Reordering | |||||
| /// | |||||
| /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the | |||||
| /// events might be returned in a different order than they occurred. For details, check the | |||||
| /// documentation of the [`EventScheduler`] struct. | |||||
| /// | |||||
| /// If you want to receive the events in their original chronological order, use the | |||||
| /// [`StreamExt::next`] method with a custom timeout future instead | |||||
| /// ([`EventStream`] implements the [`Stream`] trait). | |||||
| pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> { | pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> { | ||||
| match select(Delay::new(dur), pin!(self.recv_async())).await { | match select(Delay::new(dur), pin!(self.recv_async())).await { | ||||
| Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError( | Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError( | ||||
| @@ -3,27 +3,89 @@ use std::collections::{HashMap, VecDeque}; | |||||
| use dora_message::{daemon_to_node::NodeEvent, id::DataId}; | use dora_message::{daemon_to_node::NodeEvent, id::DataId}; | ||||
| use super::thread::EventItem; | use super::thread::EventItem; | ||||
| pub const NON_INPUT_EVENT: &str = "dora/non_input_event"; | |||||
| pub(crate) const NON_INPUT_EVENT: &str = "dora/non_input_event"; | |||||
| // This scheduler will make sure that there is fairness between | |||||
| // inputs. | |||||
| // | |||||
| // It is going to always make sure that the input that has not been used for the longest period is the first one to be used next. | |||||
| // | |||||
| // Ex: | |||||
| // In the case that one input has a very high frequency and another one with a very slow frequency.\ | |||||
| // | |||||
| // The Node will always alternate between the two inputs when each input is available | |||||
| // Avoiding one input to be overwhelmingly present. | |||||
| // | |||||
| /// This scheduler will make sure that there is fairness between inputs. | |||||
| /// | |||||
| /// The scheduler reorders events in the following way: | |||||
| /// | |||||
| /// - **Non-input events are prioritized** | |||||
| /// | |||||
| /// If the node received any events that are not input events, they are returned first. The | |||||
| /// intention of this reordering is that the nodes can react quickly to dataflow-related events | |||||
| /// even when their input queues are very full. | |||||
| /// | |||||
| /// This reordering has some side effects that might be unexpected: | |||||
| /// - An [`InputClosed`][super::Event::InputClosed] event might be yielded before the last | |||||
| /// input events of that ID. | |||||
| /// | |||||
| /// Usually, an `InputClosed` event indicates that there won't be any subsequent inputs | |||||
| /// of a certain ID. This invariant does not hold anymore for a scheduled event stream. | |||||
| /// - The [`Stop`][super::Event::Stop] event might not be the last event of the stream anymore. | |||||
| /// | |||||
| /// Usually, the `Stop` event is the last event that is sent to a node before the event stream | |||||
| /// is closed. Because of the reordering, the stream might return more events after a `Stop` | |||||
| /// event. | |||||
| /// - **Input events are grouped by ID** and yielded in a **least-recently used order (by ID)**. | |||||
| /// | |||||
| /// The scheduler keeps a separate queue for each input ID, where the incoming input events are | |||||
| /// placed in their chronological order. When yielding the next event, the scheduler iterates over | |||||
| /// these queues in least-recently used order. This means that the queue corresponding to the | |||||
| /// last yielded event will be checked last. The scheduler will return the oldest event from the | |||||
| /// first non-empty queue. | |||||
| /// | |||||
| /// The side effect of this change is that inputs events of different IDs are no longer in their | |||||
| /// chronological order. This might lead to unexpected results for input events that are caused by | |||||
| /// each other. | |||||
| /// | |||||
| /// ## Example 1 | |||||
| /// Consider the case that one input has a very high frequency and another one with a very slow | |||||
| /// frequency. The event stream will always alternate between the two inputs when each input is | |||||
| /// available. | |||||
| /// Without the scheduling, the high-frequency input would be returned much more often. | |||||
| /// | |||||
| /// ## Example 2 | |||||
| /// Again, let's consider the case that one input has a very high frequency and the other has a | |||||
| /// very slow frequency. This time, we define a small maximum queue sizes for the low-frequency | |||||
| /// input, but a large queue size for the high-frequency one. | |||||
| /// Using the scheduler, the event stream will always alternate between high and low-frequency | |||||
| /// inputs as long as inputs of both types are available. | |||||
| /// | |||||
| /// Without scheduling, the low-frequency input might never be yielded before | |||||
| /// it's dropped because there is almost always an older high-frequency input available that is | |||||
| /// yielded first. Once the low-frequency input would be the next one chronologically, it might | |||||
| /// have been dropped already because the node received newer low-frequency inputs in the | |||||
| /// meantime (the queue length is small). At this point, the next-oldest input is a high-frequency | |||||
| /// input again. | |||||
| /// | |||||
| /// ## Example 3 | |||||
| /// Consider a high-frequency camera input and a low-frequency bounding box input, which is based | |||||
| /// on the latest camera image. The dataflow YAML file specifies a large queue size for the camera | |||||
| /// input and a small queue size for the bounding box input. | |||||
| /// | |||||
| /// With scheduling, the number of | |||||
| /// buffered camera inputs might grow over time. As a result the camera inputs yielded from the | |||||
| /// stream (in oldest-first order) are not synchronized with the bounding box inputs anymore. So | |||||
| /// the node receives an up-to-date bounding box, but a considerably outdated image. | |||||
| /// | |||||
| /// Without scheduling, the events are returned in chronological order. This time, the bounding | |||||
| /// box might be slightly outdated if the camera sent new images before the bounding box was | |||||
| /// ready. However, the time difference between the two input types is independent of the | |||||
| /// queue size this time. | |||||
| /// | |||||
| /// (If a perfect matching bounding box is required, we recommend to forward the input image as | |||||
| /// part of the bounding box output. This way, the receiving node only needs to subscribe to one | |||||
| /// input so no mismatches can happen.) | |||||
| #[derive(Debug)] | #[derive(Debug)] | ||||
| pub struct Scheduler { | pub struct Scheduler { | ||||
| last_used: VecDeque<DataId>, // Tracks the last-used event ID | |||||
| event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>, // Tracks events per ID | |||||
| /// Tracks the last-used event ID | |||||
| last_used: VecDeque<DataId>, | |||||
| /// Tracks events per ID | |||||
| event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>, | |||||
| } | } | ||||
| impl Scheduler { | impl Scheduler { | ||||
| pub fn new(event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>) -> Self { | |||||
| pub(crate) fn new(event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>) -> Self { | |||||
| let topic = VecDeque::from_iter( | let topic = VecDeque::from_iter( | ||||
| event_queues | event_queues | ||||
| .keys() | .keys() | ||||
| @@ -36,7 +98,7 @@ impl Scheduler { | |||||
| } | } | ||||
| } | } | ||||
| pub fn add_event(&mut self, event: EventItem) { | |||||
| pub(crate) fn add_event(&mut self, event: EventItem) { | |||||
| let event_id = match &event { | let event_id = match &event { | ||||
| EventItem::NodeEvent { | EventItem::NodeEvent { | ||||
| event: | event: | ||||
| @@ -63,7 +125,7 @@ impl Scheduler { | |||||
| } | } | ||||
| } | } | ||||
| pub fn next(&mut self) -> Option<EventItem> { | |||||
| pub(crate) fn next(&mut self) -> Option<EventItem> { | |||||
| // Retrieve message from the non input event first that have priority over input message. | // Retrieve message from the non input event first that have priority over input message. | ||||
| if let Some((_size, queue)) = self | if let Some((_size, queue)) = self | ||||
| .event_queues | .event_queues | ||||
| @@ -89,7 +151,7 @@ impl Scheduler { | |||||
| None | None | ||||
| } | } | ||||
| pub fn is_empty(&self) -> bool { | |||||
| pub(crate) fn is_empty(&self) -> bool { | |||||
| self.event_queues | self.event_queues | ||||
| .iter() | .iter() | ||||
| .all(|(_id, (_size, queue))| queue.is_empty()) | .all(|(_id, (_size, queue))| queue.is_empty()) | ||||
| @@ -84,7 +84,7 @@ pub use dora_message::{ | |||||
| metadata::{Metadata, MetadataParameters, Parameter}, | metadata::{Metadata, MetadataParameters, Parameter}, | ||||
| DataflowId, | DataflowId, | ||||
| }; | }; | ||||
| pub use event_stream::{merged, Event, EventStream, StopCause}; | |||||
| pub use event_stream::{merged, Event, EventScheduler, EventStream, StopCause}; | |||||
| pub use flume::Receiver; | pub use flume::Receiver; | ||||
| pub use futures; | pub use futures; | ||||
| pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; | pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; | ||||
| @@ -1,7 +1,11 @@ | |||||
| //! Utility functions for converting Arrow arrays to/from raw data. | |||||
| //! | |||||
| use arrow::array::{ArrayData, BufferSpec}; | use arrow::array::{ArrayData, BufferSpec}; | ||||
| use dora_message::metadata::{ArrowTypeInfo, BufferOffset}; | use dora_message::metadata::{ArrowTypeInfo, BufferOffset}; | ||||
| use eyre::Context; | use eyre::Context; | ||||
| /// Calculates the data size in bytes required for storing a continuous copy of the given Arrow | |||||
| /// array. | |||||
| pub fn required_data_size(array: &ArrayData) -> usize { | pub fn required_data_size(array: &ArrayData) -> usize { | ||||
| let mut next_offset = 0; | let mut next_offset = 0; | ||||
| required_data_size_inner(array, &mut next_offset); | required_data_size_inner(array, &mut next_offset); | ||||
| @@ -21,6 +25,12 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) { | |||||
| } | } | ||||
| } | } | ||||
| /// Copy the given Arrow array into the provided buffer. | |||||
| /// | |||||
| /// If the Arrow array consists of multiple buffers, they are placed continuously in the target | |||||
| /// buffer (there might be some padding for alignment) | |||||
| /// | |||||
| /// Panics if the buffer is not large enough. | |||||
| pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo { | pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo { | ||||
| let mut next_offset = 0; | let mut next_offset = 0; | ||||
| copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) | copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) | ||||
| @@ -71,6 +81,9 @@ fn copy_array_into_sample_inner( | |||||
| } | } | ||||
| } | } | ||||
| /// Tries to convert the given raw Arrow buffer into an Arrow array. | |||||
| /// | |||||
| /// The `type_info` is required for decoding the `raw_buffer` correctly. | |||||
| pub fn buffer_into_arrow_array( | pub fn buffer_into_arrow_array( | ||||
| raw_buffer: &arrow::buffer::Buffer, | raw_buffer: &arrow::buffer::Buffer, | ||||
| type_info: &ArrowTypeInfo, | type_info: &ArrowTypeInfo, | ||||
| @@ -63,6 +63,10 @@ enum TokioRuntime { | |||||
| Handle(Handle), | Handle(Handle), | ||||
| } | } | ||||
| /// Allows sending outputs and retrieving node information. | |||||
| /// | |||||
| /// The main purpose of this struct is to send outputs via Dora. There are also functions available | |||||
| /// for retrieving the node configuration. | |||||
| pub struct DoraNode { | pub struct DoraNode { | ||||
| id: NodeId, | id: NodeId, | ||||
| dataflow_id: DataflowId, | dataflow_id: DataflowId, | ||||
| @@ -80,7 +84,11 @@ pub struct DoraNode { | |||||
| } | } | ||||
| impl DoraNode { | impl DoraNode { | ||||
| /// Initiate a node from environment variables set by `dora-coordinator` | |||||
| /// Initiate a node from environment variables set by the Dora daemon. | |||||
| /// | |||||
| /// This is the recommended initialization function for Dora nodes, which are spawned by | |||||
| /// Dora daemon instances. | |||||
| /// | |||||
| /// | /// | ||||
| /// ```no_run | /// ```no_run | ||||
| /// use dora_node_api::DoraNode; | /// use dora_node_api::DoraNode; | ||||
| @@ -107,6 +115,8 @@ impl DoraNode { | |||||
| /// Initiate a node from a dataflow id and a node id. | /// Initiate a node from a dataflow id and a node id. | ||||
| /// | /// | ||||
| /// This initialization function should be used for [_dynamic nodes_](index.html#dynamic-nodes). | |||||
| /// | |||||
| /// ```no_run | /// ```no_run | ||||
| /// use dora_node_api::DoraNode; | /// use dora_node_api::DoraNode; | ||||
| /// use dora_node_api::dora_core::config::NodeId; | /// use dora_node_api::dora_core::config::NodeId; | ||||
| @@ -139,6 +149,11 @@ impl DoraNode { | |||||
| } | } | ||||
| } | } | ||||
| /// Dynamic initialization function for nodes that are sometimes used as dynamic nodes. | |||||
| /// | |||||
| /// This function first tries initializing the traditional way through | |||||
| /// [`init_from_env`][Self::init_from_env]. If this fails, it falls back to | |||||
| /// [`init_from_node_id`][Self::init_from_node_id]. | |||||
| pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { | pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { | ||||
| if std::env::var("DORA_NODE_CONFIG").is_ok() { | if std::env::var("DORA_NODE_CONFIG").is_ok() { | ||||
| info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"); | info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"); | ||||
| @@ -148,6 +163,8 @@ impl DoraNode { | |||||
| } | } | ||||
| } | } | ||||
| /// Internal initialization routine that should not be used outside of Dora. | |||||
| #[doc(hidden)] | |||||
| #[tracing::instrument] | #[tracing::instrument] | ||||
| pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> { | pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> { | ||||
| let NodeConfig { | let NodeConfig { | ||||
| @@ -232,7 +249,8 @@ impl DoraNode { | |||||
| } | } | ||||
| } | } | ||||
| /// Send data from the node to the other nodes. | |||||
| /// Send raw data from the node to the other nodes. | |||||
| /// | |||||
| /// We take a closure as an input to enable zero copy on send. | /// We take a closure as an input to enable zero copy on send. | ||||
| /// | /// | ||||
| /// ```no_run | /// ```no_run | ||||
| @@ -255,6 +273,8 @@ impl DoraNode { | |||||
| /// }).expect("Could not send output"); | /// }).expect("Could not send output"); | ||||
| /// ``` | /// ``` | ||||
| /// | /// | ||||
| /// Ignores the output if the given `output_id` is not specified as node output in the dataflow | |||||
| /// configuration file. | |||||
| pub fn send_output_raw<F>( | pub fn send_output_raw<F>( | ||||
| &mut self, | &mut self, | ||||
| output_id: DataId, | output_id: DataId, | ||||
| @@ -276,6 +296,14 @@ impl DoraNode { | |||||
| self.send_output_sample(output_id, type_info, parameters, Some(sample)) | self.send_output_sample(output_id, type_info, parameters, Some(sample)) | ||||
| } | } | ||||
| /// Sends the give Arrow array as an output message. | |||||
| /// | |||||
| /// Uses shared memory for efficient data transfer if suitable. | |||||
| /// | |||||
| /// This method might copy the message once to move it to shared memory. | |||||
| /// | |||||
| /// Ignores the output if the given `output_id` is not specified as node output in the dataflow | |||||
| /// configuration file. | |||||
| pub fn send_output( | pub fn send_output( | ||||
| &mut self, | &mut self, | ||||
| output_id: DataId, | output_id: DataId, | ||||
| @@ -299,6 +327,12 @@ impl DoraNode { | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| /// Send the given raw byte data as output. | |||||
| /// | |||||
| /// Might copy the data once to move it into shared memory. | |||||
| /// | |||||
| /// Ignores the output if the given `output_id` is not specified as node output in the dataflow | |||||
| /// configuration file. | |||||
| pub fn send_output_bytes( | pub fn send_output_bytes( | ||||
| &mut self, | &mut self, | ||||
| output_id: DataId, | output_id: DataId, | ||||
| @@ -314,6 +348,12 @@ impl DoraNode { | |||||
| }) | }) | ||||
| } | } | ||||
| /// Send the give raw byte data with the provided type information. | |||||
| /// | |||||
| /// It is recommended to use a function like [`send_output`][Self::send_output] instead. | |||||
| /// | |||||
| /// Ignores the output if the given `output_id` is not specified as node output in the dataflow | |||||
| /// configuration file. | |||||
| pub fn send_typed_output<F>( | pub fn send_typed_output<F>( | ||||
| &mut self, | &mut self, | ||||
| output_id: DataId, | output_id: DataId, | ||||
| @@ -335,6 +375,12 @@ impl DoraNode { | |||||
| self.send_output_sample(output_id, type_info, parameters, Some(sample)) | self.send_output_sample(output_id, type_info, parameters, Some(sample)) | ||||
| } | } | ||||
| /// Sends the given [`DataSample`] as output, combined with the given type information. | |||||
| /// | |||||
| /// It is recommended to use a function like [`send_output`][Self::send_output] instead. | |||||
| /// | |||||
| /// Ignores the output if the given `output_id` is not specified as node output in the dataflow | |||||
| /// configuration file. | |||||
| pub fn send_output_sample( | pub fn send_output_sample( | ||||
| &mut self, | &mut self, | ||||
| output_id: DataId, | output_id: DataId, | ||||
| @@ -363,32 +409,46 @@ impl DoraNode { | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| pub fn close_outputs(&mut self, outputs: Vec<DataId>) -> eyre::Result<()> { | |||||
| for output_id in &outputs { | |||||
| /// Report the given outputs IDs as closed. | |||||
| /// | |||||
| /// The node is not allowed to send more outputs with the closed IDs. | |||||
| /// | |||||
| /// Closing outputs early can be helpful to receivers. | |||||
| pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> eyre::Result<()> { | |||||
| for output_id in &outputs_ids { | |||||
| if !self.node_config.outputs.remove(output_id) { | if !self.node_config.outputs.remove(output_id) { | ||||
| eyre::bail!("unknown output {output_id}"); | eyre::bail!("unknown output {output_id}"); | ||||
| } | } | ||||
| } | } | ||||
| self.control_channel | self.control_channel | ||||
| .report_closed_outputs(outputs) | |||||
| .report_closed_outputs(outputs_ids) | |||||
| .wrap_err("failed to report closed outputs to daemon")?; | .wrap_err("failed to report closed outputs to daemon")?; | ||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| /// Returns the ID of the node as specified in the dataflow configuration file. | |||||
| pub fn id(&self) -> &NodeId { | pub fn id(&self) -> &NodeId { | ||||
| &self.id | &self.id | ||||
| } | } | ||||
| /// Returns the unique identifier for the running dataflow instance. | |||||
| /// | |||||
| /// Dora assigns each dataflow instance a random identifier when started. | |||||
| pub fn dataflow_id(&self) -> &DataflowId { | pub fn dataflow_id(&self) -> &DataflowId { | ||||
| &self.dataflow_id | &self.dataflow_id | ||||
| } | } | ||||
| /// Returns the input and output configuration of this node. | |||||
| pub fn node_config(&self) -> &NodeRunConfig { | pub fn node_config(&self) -> &NodeRunConfig { | ||||
| &self.node_config | &self.node_config | ||||
| } | } | ||||
| /// Allocates a [`DataSample`] of the specified size. | |||||
| /// | |||||
| /// The data sample will use shared memory when suitable to enable efficient data transfer | |||||
| /// when sending an output message. | |||||
| pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> { | pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> { | ||||
| let data = if data_len >= ZERO_COPY_THRESHOLD { | let data = if data_len >= ZERO_COPY_THRESHOLD { | ||||
| // create shared memory region | // create shared memory region | ||||
| @@ -527,6 +587,11 @@ impl Drop for DoraNode { | |||||
| } | } | ||||
| } | } | ||||
| /// A data region suitable for sending as an output message. | |||||
| /// | |||||
| /// The region is stored in shared memory when suitable to enable efficient data transfer. | |||||
| /// | |||||
| /// `DataSample` implements the [`Deref`] and [`DerefMut`] traits to read and write the mapped data. | |||||
| pub struct DataSample { | pub struct DataSample { | ||||
| inner: DataSampleInner, | inner: DataSampleInner, | ||||
| len: usize, | len: usize, | ||||
| @@ -1,3 +1,7 @@ | |||||
| //! Provides functions for converting between Apache Arrow arrays and Rust data types. | |||||
| #![warn(missing_docs)] | |||||
| use arrow::array::{ | use arrow::array::{ | ||||
| Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, | Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, | ||||
| UInt32Array, UInt8Array, | UInt32Array, UInt8Array, | ||||
| @@ -10,12 +14,16 @@ use std::ops::{Deref, DerefMut}; | |||||
| mod from_impls; | mod from_impls; | ||||
| mod into_impls; | mod into_impls; | ||||
| /// Data that can be converted to an Arrow array. | |||||
| pub trait IntoArrow { | pub trait IntoArrow { | ||||
| /// The Array type that the data can be converted to. | |||||
| type A: Array; | type A: Array; | ||||
| /// Convert the data into an Arrow array. | |||||
| fn into_arrow(self) -> Self::A; | fn into_arrow(self) -> Self::A; | ||||
| } | } | ||||
| /// Wrapper type for an Arrow [`ArrayRef`](arrow::array::ArrayRef). | |||||
| #[derive(Debug)] | #[derive(Debug)] | ||||
| pub struct ArrowData(pub arrow::array::ArrayRef); | pub struct ArrowData(pub arrow::array::ArrayRef); | ||||
| @@ -35,6 +43,7 @@ impl DerefMut for ArrowData { | |||||
| macro_rules! register_array_handlers { | macro_rules! register_array_handlers { | ||||
| ($(($variant:path, $array_type:ty, $type_name:expr)),* $(,)?) => { | ($(($variant:path, $array_type:ty, $type_name:expr)),* $(,)?) => { | ||||
| /// Tries to convert the given Arrow array into a `Vec` of integers or floats. | |||||
| pub fn into_vec<T>(data: &ArrowData) -> Result<Vec<T>> | pub fn into_vec<T>(data: &ArrowData) -> Result<Vec<T>> | ||||
| where | where | ||||
| T: Copy + NumCast + 'static, | T: Copy + NumCast + 'static, | ||||
| @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; | |||||
| pub use crate::id::{DataId, NodeId, OperatorId}; | pub use crate::id::{DataId, NodeId, OperatorId}; | ||||
| /// Contains the input and output configuration of the node. | |||||
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] | #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] | ||||
| pub struct NodeRunConfig { | pub struct NodeRunConfig { | ||||
| /// Inputs for the nodes as a map from input ID to `node_id/output_id`. | /// Inputs for the nodes as a map from input ID to `node_id/output_id`. | ||||
| @@ -26,6 +26,9 @@ pub use arrow_data; | |||||
| pub use arrow_schema; | pub use arrow_schema; | ||||
| use uuid::{Timestamp, Uuid}; | use uuid::{Timestamp, Uuid}; | ||||
| /// Unique identifier for a dataflow instance. | |||||
| /// | |||||
| /// Dora assigns each dataflow instance a unique ID on start. | |||||
| pub type DataflowId = uuid::Uuid; | pub type DataflowId = uuid::Uuid; | ||||
| #[derive( | #[derive( | ||||
| @@ -3,6 +3,9 @@ use std::collections::BTreeMap; | |||||
| use arrow_schema::DataType; | use arrow_schema::DataType; | ||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||
| /// Additional data that is sent as part of output messages. | |||||
| /// | |||||
| /// Includes a timestamp, type information, and additional user-provided paramters. | |||||
| #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | ||||
| pub struct Metadata { | pub struct Metadata { | ||||
| metadata_version: u16, | metadata_version: u16, | ||||
| @@ -42,6 +45,7 @@ impl Metadata { | |||||
| } | } | ||||
| } | } | ||||
| /// Additional metadata that can be sent as part of output messages. | |||||
| pub type MetadataParameters = BTreeMap<String, Parameter>; | pub type MetadataParameters = BTreeMap<String, Parameter>; | ||||
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||||
| @@ -55,6 +59,7 @@ pub struct ArrowTypeInfo { | |||||
| pub child_data: Vec<ArrowTypeInfo>, | pub child_data: Vec<ArrowTypeInfo>, | ||||
| } | } | ||||
| /// A metadata parameter that can be sent as part of output messages. | |||||
| #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] | #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] | ||||
| pub enum Parameter { | pub enum Parameter { | ||||
| Bool(bool), | Bool(bool), | ||||