Also includes a bit or regorganization to streamline the API.pull/1036/head
| @@ -289,7 +289,7 @@ pub fn metadata_to_pydict<'a>( | |||||
| #[cfg(test)] | #[cfg(test)] | ||||
| mod tests { | mod tests { | ||||
| use std::sync::Arc; | |||||
| use std::{ptr::NonNull, sync::Arc}; | |||||
| use aligned_vec::{AVec, ConstAlign}; | use aligned_vec::{AVec, ConstAlign}; | ||||
| use arrow::{ | use arrow::{ | ||||
| @@ -301,9 +301,8 @@ mod tests { | |||||
| }; | }; | ||||
| use arrow_schema::{DataType, Field}; | use arrow_schema::{DataType, Field}; | ||||
| use dora_node_api::{ | |||||
| arrow_utils::{copy_array_into_sample, required_data_size}, | |||||
| RawData, | |||||
| use dora_node_api::arrow_utils::{ | |||||
| buffer_into_arrow_array, copy_array_into_sample, required_data_size, | |||||
| }; | }; | ||||
| use eyre::{Context, Result}; | use eyre::{Context, Result}; | ||||
| @@ -313,9 +312,16 @@ mod tests { | |||||
| let info = copy_array_into_sample(&mut sample, arrow_array); | let info = copy_array_into_sample(&mut sample, arrow_array); | ||||
| let serialized_deserialized_arrow_array = RawData::Vec(sample) | |||||
| .into_arrow_array(&info) | |||||
| .context("Could not create arrow array")?; | |||||
| let serialized_deserialized_arrow_array = { | |||||
| let ptr = NonNull::new(sample.as_ptr() as *mut _).unwrap(); | |||||
| let len = sample.len(); | |||||
| let raw_buffer = unsafe { | |||||
| arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(sample)) | |||||
| }; | |||||
| buffer_into_arrow_array(&raw_buffer, &info)? | |||||
| }; | |||||
| assert_eq!(arrow_array, &serialized_deserialized_arrow_array); | assert_eq!(arrow_array, &serialized_deserialized_arrow_array); | ||||
| Ok(()) | Ok(()) | ||||
| @@ -0,0 +1,80 @@ | |||||
| use std::{ptr::NonNull, sync::Arc}; | |||||
| use aligned_vec::{AVec, ConstAlign}; | |||||
| use dora_arrow_convert::IntoArrow; | |||||
| use dora_message::metadata::ArrowTypeInfo; | |||||
| use eyre::Context; | |||||
| use shared_memory_server::{Shmem, ShmemConf}; | |||||
| use crate::arrow_utils::buffer_into_arrow_array; | |||||
| pub enum RawData { | |||||
| Empty, | |||||
| Vec(AVec<u8, ConstAlign<128>>), | |||||
| SharedMemory(SharedMemoryData), | |||||
| } | |||||
| impl RawData { | |||||
| pub fn into_arrow_array( | |||||
| self, | |||||
| type_info: &ArrowTypeInfo, | |||||
| ) -> eyre::Result<arrow::array::ArrayData> { | |||||
| let raw_buffer = match self { | |||||
| RawData::Empty => return Ok(().into_arrow().into()), | |||||
| RawData::Vec(data) => { | |||||
| let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap(); | |||||
| let len = data.len(); | |||||
| unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) } | |||||
| } | |||||
| RawData::SharedMemory(data) => { | |||||
| let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap(); | |||||
| let len = data.data.len(); | |||||
| unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) } | |||||
| } | |||||
| }; | |||||
| buffer_into_arrow_array(&raw_buffer, type_info) | |||||
| } | |||||
| } | |||||
| impl std::fmt::Debug for RawData { | |||||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |||||
| f.debug_struct("Data").finish_non_exhaustive() | |||||
| } | |||||
| } | |||||
| pub struct SharedMemoryData { | |||||
| pub data: MappedInputData, | |||||
| pub _drop: flume::Sender<()>, | |||||
| } | |||||
| pub struct MappedInputData { | |||||
| memory: Box<Shmem>, | |||||
| len: usize, | |||||
| } | |||||
| impl MappedInputData { | |||||
| pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result<Self> { | |||||
| let memory = Box::new( | |||||
| ShmemConf::new() | |||||
| .os_id(shared_memory_id) | |||||
| .writable(false) | |||||
| .open() | |||||
| .wrap_err("failed to map shared memory input")?, | |||||
| ); | |||||
| Ok(MappedInputData { memory, len }) | |||||
| } | |||||
| } | |||||
| impl std::ops::Deref for MappedInputData { | |||||
| type Target = [u8]; | |||||
| fn deref(&self) -> &Self::Target { | |||||
| unsafe { &self.memory.as_slice()[..self.len] } | |||||
| } | |||||
| } | |||||
| unsafe impl Send for MappedInputData {} | |||||
| unsafe impl Sync for MappedInputData {} | |||||
| @@ -1,134 +1,91 @@ | |||||
| use std::{ptr::NonNull, sync::Arc}; | |||||
| use aligned_vec::{AVec, ConstAlign}; | |||||
| use dora_arrow_convert::{ArrowData, IntoArrow}; | |||||
| use dora_arrow_convert::ArrowData; | |||||
| use dora_core::config::{DataId, OperatorId}; | use dora_core::config::{DataId, OperatorId}; | ||||
| use dora_message::metadata::{ArrowTypeInfo, BufferOffset, Metadata}; | |||||
| use eyre::{Context, Result}; | |||||
| use shared_memory_extended::{Shmem, ShmemConf}; | |||||
| use dora_message::metadata::Metadata; | |||||
| /// Represents an incoming Dora event. | |||||
| /// | |||||
| /// Events might be triggered by other nodes, by Dora itself, or by some external user input. | |||||
| /// | |||||
| /// It's safe to ignore event types that are not relevant to the node. | |||||
| /// | |||||
| /// This enum is marked as `non_exhaustive` because we might add additional | |||||
| /// variants in the future. Please ignore unknown event types instead of throwing an | |||||
| /// error to avoid breakage when updating Dora. | |||||
| #[derive(Debug)] | #[derive(Debug)] | ||||
| #[non_exhaustive] | #[non_exhaustive] | ||||
| #[allow(clippy::large_enum_variant)] | #[allow(clippy::large_enum_variant)] | ||||
| pub enum Event { | pub enum Event { | ||||
| Stop(StopCause), | |||||
| Reload { | |||||
| operator_id: Option<OperatorId>, | |||||
| }, | |||||
| /// An input was received from another node. | |||||
| /// | |||||
| /// This event corresponds to one of the `inputs` of the node as specified | |||||
| /// in the dataflow YAML file. | |||||
| Input { | Input { | ||||
| /// The input ID, as specified in the YAML file. | |||||
| /// | |||||
| /// Note that this is not the output ID of the sender, but the ID | |||||
| /// assigned to the input in the YAML file. | |||||
| id: DataId, | id: DataId, | ||||
| /// Meta information about this input, e.g. the timestamp. | |||||
| metadata: Metadata, | metadata: Metadata, | ||||
| /// The actual data in the Apache Arrow data format. | |||||
| data: ArrowData, | data: ArrowData, | ||||
| }, | }, | ||||
| /// An input was closed by the sender. | |||||
| /// | |||||
| /// The sending node mapped to an input exited, so this input will receive | |||||
| /// no more data. | |||||
| InputClosed { | InputClosed { | ||||
| /// The ID of the input that was closed, as specified in the YAML file. | |||||
| /// | |||||
| /// Note that this is not the output ID of the sender, but the ID | |||||
| /// assigned to the input in the YAML file. | |||||
| id: DataId, | id: DataId, | ||||
| }, | }, | ||||
| /// Notification that the event stream is about to close. | |||||
| /// | |||||
| /// The [`StopCause`] field contains the reason for the event stream closure. | |||||
| /// | |||||
| /// Typically, nodes should exit once the event stream closes. One notable | |||||
| /// exception are nodes with no inputs, which will receive aa | |||||
| /// `Event::Stop(StopCause::AllInputsClosed)` right at startup. Source nodes | |||||
| /// might want to keep producing outputs still. (There is currently an open | |||||
| /// discussion of changing this behavior and not sending `AllInputsClosed` | |||||
| /// to nodes without inputs.) | |||||
| /// | |||||
| /// Note: Stop events with `StopCause::Manual` indicate a manual stop operation | |||||
| /// issued through `dora stop` or a `ctrl-c`. Nodes **must exit** once receiving | |||||
| /// such a stop event, otherwise they will be killed by Dora. | |||||
| Stop(StopCause), | |||||
| /// Instructs the node to reload itself or one of its operators. | |||||
| /// | |||||
| /// This event is currently only used for reloading Python operators that are | |||||
| /// started by a `dora runtime` process. So this event should not be sent to normal | |||||
| /// nodes yet. | |||||
| Reload { | |||||
| /// The ID of the operator that should be reloaded. | |||||
| /// | |||||
| /// There is currently no case where `operator_id` is `None`. | |||||
| operator_id: Option<OperatorId>, | |||||
| }, | |||||
| /// Notifies the node about an unexpected error that happened inside Dora. | |||||
| /// | |||||
| /// It's a good idea to output or log this error for debugging. | |||||
| Error(String), | Error(String), | ||||
| } | } | ||||
| /// The reason for closing the event stream. | |||||
| /// | |||||
| /// This enum is marked as `non_exhaustive` because we might add additional | |||||
| /// variants in the future. | |||||
| #[derive(Debug, Clone)] | #[derive(Debug, Clone)] | ||||
| #[non_exhaustive] | #[non_exhaustive] | ||||
| pub enum StopCause { | pub enum StopCause { | ||||
| /// The dataflow is stopped early after a `dora stop` command (or on `ctrl-c`). | |||||
| /// | |||||
| /// Nodes should exit as soon as possible if they receive a stop event of | |||||
| /// this type. Dora will kill nodes that keep running for too long after | |||||
| /// receiving such a stop event. | |||||
| Manual, | Manual, | ||||
| /// The event stream is closed because all of the node's inputs were closed. | |||||
| AllInputsClosed, | AllInputsClosed, | ||||
| } | } | ||||
| pub enum RawData { | |||||
| Empty, | |||||
| Vec(AVec<u8, ConstAlign<128>>), | |||||
| SharedMemory(SharedMemoryData), | |||||
| } | |||||
| impl RawData { | |||||
| pub fn into_arrow_array(self, type_info: &ArrowTypeInfo) -> Result<arrow::array::ArrayData> { | |||||
| let raw_buffer = match self { | |||||
| RawData::Empty => return Ok(().into_arrow().into()), | |||||
| RawData::Vec(data) => { | |||||
| let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap(); | |||||
| let len = data.len(); | |||||
| unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) } | |||||
| } | |||||
| RawData::SharedMemory(data) => { | |||||
| let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap(); | |||||
| let len = data.data.len(); | |||||
| unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) } | |||||
| } | |||||
| }; | |||||
| buffer_into_arrow_array(&raw_buffer, type_info) | |||||
| } | |||||
| } | |||||
| pub struct SharedMemoryData { | |||||
| pub data: MappedInputData, | |||||
| pub _drop: flume::Sender<()>, | |||||
| } | |||||
| fn buffer_into_arrow_array( | |||||
| raw_buffer: &arrow::buffer::Buffer, | |||||
| type_info: &ArrowTypeInfo, | |||||
| ) -> eyre::Result<arrow::array::ArrayData> { | |||||
| if raw_buffer.is_empty() { | |||||
| return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type)); | |||||
| } | |||||
| let mut buffers = Vec::new(); | |||||
| for BufferOffset { offset, len } in &type_info.buffer_offsets { | |||||
| buffers.push(raw_buffer.slice_with_length(*offset, *len)); | |||||
| } | |||||
| let mut child_data = Vec::new(); | |||||
| for child_type_info in &type_info.child_data { | |||||
| child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?) | |||||
| } | |||||
| arrow::array::ArrayData::try_new( | |||||
| type_info.data_type.clone(), | |||||
| type_info.len, | |||||
| type_info | |||||
| .validity | |||||
| .clone() | |||||
| .map(arrow::buffer::Buffer::from_vec), | |||||
| type_info.offset, | |||||
| buffers, | |||||
| child_data, | |||||
| ) | |||||
| .context("Error creating Arrow array") | |||||
| } | |||||
| impl std::fmt::Debug for RawData { | |||||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |||||
| f.debug_struct("Data").finish_non_exhaustive() | |||||
| } | |||||
| } | |||||
| pub struct MappedInputData { | |||||
| memory: Box<Shmem>, | |||||
| len: usize, | |||||
| } | |||||
| impl MappedInputData { | |||||
| pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result<Self> { | |||||
| let memory = Box::new( | |||||
| ShmemConf::new() | |||||
| .os_id(shared_memory_id) | |||||
| .writable(false) | |||||
| .open() | |||||
| .wrap_err("failed to map shared memory input")?, | |||||
| ); | |||||
| Ok(MappedInputData { memory, len }) | |||||
| } | |||||
| } | |||||
| impl std::ops::Deref for MappedInputData { | |||||
| type Target = [u8]; | |||||
| fn deref(&self) -> &Self::Target { | |||||
| unsafe { &self.memory.as_slice()[..self.len] } | |||||
| } | |||||
| } | |||||
| unsafe impl Send for MappedInputData {} | |||||
| unsafe impl Sync for MappedInputData {} | |||||
| @@ -1,18 +1,32 @@ | |||||
| //! Merge external stream into an [`EventStream`][super::EventStream]. | |||||
| //! | |||||
| //! Sometimes nodes need to listen to external events, in addition to Dora events. | |||||
| //! This module provides support for that by providing the [`MergeExternal`] trait. | |||||
| use futures::{Stream, StreamExt}; | use futures::{Stream, StreamExt}; | ||||
| use futures_concurrency::stream::Merge; | use futures_concurrency::stream::Merge; | ||||
| /// A Dora event or an event from an external source. | |||||
| #[derive(Debug)] | #[derive(Debug)] | ||||
| pub enum MergedEvent<E> { | pub enum MergedEvent<E> { | ||||
| /// A Dora event | |||||
| Dora(super::Event), | Dora(super::Event), | ||||
| /// An external event | |||||
| /// | |||||
| /// Yielded by the stream that was merged into the Dora [`EventStream`][super::EventStream]. | |||||
| External(E), | External(E), | ||||
| } | } | ||||
| /// A general enum to represent a value of two possible types. | |||||
| pub enum Either<A, B> { | pub enum Either<A, B> { | ||||
| /// Value is of the first type, type `A`. | |||||
| First(A), | First(A), | ||||
| /// Value is of the second type, type `B`. | |||||
| Second(B), | Second(B), | ||||
| } | } | ||||
| impl<A> Either<A, A> { | impl<A> Either<A, A> { | ||||
| /// Unwraps an `Either` instance where both types are identical. | |||||
| pub fn flatten(self) -> A { | pub fn flatten(self) -> A { | ||||
| match self { | match self { | ||||
| Either::First(a) => a, | Either::First(a) => a, | ||||
| @@ -21,19 +35,33 @@ impl<A> Either<A, A> { | |||||
| } | } | ||||
| } | } | ||||
| /// Allows merging an external event stream into an existing event stream. | |||||
| // TODO: use impl trait return type once stable | // TODO: use impl trait return type once stable | ||||
| pub trait MergeExternal<'a, E> { | pub trait MergeExternal<'a, E> { | ||||
| /// The item type yielded from the merged stream. | |||||
| type Item; | type Item; | ||||
| /// Merge the given stream into an existing event stream. | |||||
| /// | |||||
| /// Returns a new event stream that yields items from both streams. | |||||
| /// The ordering between the two streams is not guaranteed. | |||||
| fn merge_external( | fn merge_external( | ||||
| self, | self, | ||||
| external_events: impl Stream<Item = E> + Unpin + 'a, | external_events: impl Stream<Item = E> + Unpin + 'a, | ||||
| ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>; | ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>; | ||||
| } | } | ||||
| /// Allows merging a sendable external event stream into an existing (sendable) event stream. | |||||
| /// | |||||
| /// By implementing [`Send`], the streams can be sent to different threads. | |||||
| pub trait MergeExternalSend<'a, E> { | pub trait MergeExternalSend<'a, E> { | ||||
| /// The item type yielded from the merged stream. | |||||
| type Item; | type Item; | ||||
| /// Merge the given stream into an existing event stream. | |||||
| /// | |||||
| /// Returns a new event stream that yields items from both streams. | |||||
| /// The ordering between the two streams is not guaranteed. | |||||
| fn merge_external_send( | fn merge_external_send( | ||||
| self, | self, | ||||
| external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a, | external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a, | ||||
| @@ -11,7 +11,7 @@ use dora_message::{ | |||||
| node_to_daemon::{DaemonRequest, Timestamped}, | node_to_daemon::{DaemonRequest, Timestamped}, | ||||
| DataflowId, | DataflowId, | ||||
| }; | }; | ||||
| pub use event::{Event, MappedInputData, RawData, StopCause}; | |||||
| pub use event::{Event, StopCause}; | |||||
| use futures::{ | use futures::{ | ||||
| future::{select, Either}, | future::{select, Either}, | ||||
| Stream, StreamExt, | Stream, StreamExt, | ||||
| @@ -19,22 +19,44 @@ use futures::{ | |||||
| use futures_timer::Delay; | use futures_timer::Delay; | ||||
| use scheduler::{Scheduler, NON_INPUT_EVENT}; | use scheduler::{Scheduler, NON_INPUT_EVENT}; | ||||
| use self::{ | |||||
| event::SharedMemoryData, | |||||
| thread::{EventItem, EventStreamThreadHandle}, | |||||
| use self::thread::{EventItem, EventStreamThreadHandle}; | |||||
| use crate::{ | |||||
| daemon_connection::DaemonChannel, | |||||
| event_stream::data_conversion::{MappedInputData, RawData, SharedMemoryData}, | |||||
| }; | }; | ||||
| use crate::daemon_connection::DaemonChannel; | |||||
| use dora_core::{ | use dora_core::{ | ||||
| config::{Input, NodeId}, | config::{Input, NodeId}, | ||||
| uhlc, | uhlc, | ||||
| }; | }; | ||||
| use eyre::{eyre, Context}; | use eyre::{eyre, Context}; | ||||
| pub use scheduler::Scheduler as EventScheduler; | |||||
| mod data_conversion; | |||||
| mod event; | mod event; | ||||
| pub mod merged; | pub mod merged; | ||||
| mod scheduler; | mod scheduler; | ||||
| mod thread; | mod thread; | ||||
| /// Asynchronous iterator over the incoming [`Event`]s destined for this node. | |||||
| /// | |||||
| /// This struct [implements](#impl-Stream-for-EventStream) the [`Stream`] trait, | |||||
| /// so you can use methods of the [`StreamExt`] trait | |||||
| /// on this struct. A common pattern is `while let Some(event) = event_stream.next().await`. | |||||
| /// | |||||
| /// Nodes should iterate over this event stream and react to events that they are interested in. | |||||
| /// Typically, the most important event type is [`Event::Input`]. | |||||
| /// You don't need to handle all events, it's fine to ignore events that are not relevant to your node. | |||||
| /// | |||||
| /// The event stream will close itself after a [`Event::Stop`] was received. | |||||
| /// A manual `break` on [`Event::Stop`] is typically not needed. | |||||
| /// _(You probably do need to use a manual `break` on stop events when using the | |||||
| /// [`StreamExt::merge`][`futures_concurrency::stream::StreamExt::merge`] implementation on | |||||
| /// [`EventStream`] to combine the stream with an external one.)_ | |||||
| /// | |||||
| /// Once the event stream finished, nodes should exit. | |||||
| /// Note that Dora kills nodes that don't exit quickly after a [`Event::Stop`] of type | |||||
| /// [`StopCause::Manual`] was received. | |||||
| pub struct EventStream { | pub struct EventStream { | ||||
| node_id: NodeId, | node_id: NodeId, | ||||
| receiver: flume::r#async::RecvStream<'static, EventItem>, | receiver: flume::r#async::RecvStream<'static, EventItem>, | ||||
| @@ -157,16 +179,61 @@ impl EventStream { | |||||
| }) | }) | ||||
| } | } | ||||
| /// wait for the next event on the events stream. | |||||
| /// Synchronously waits for the next event. | |||||
| /// | |||||
| /// Blocks the thread until the next event arrives. | |||||
| /// Returns [`None`] once the event stream is closed. | |||||
| /// | |||||
| /// For an asynchronous variant of this method see [`recv_async`][Self::recv_async]. | |||||
| /// | |||||
| /// ## Event Reordering | |||||
| /// | |||||
| /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the | |||||
| /// events might be returned in a different order than they occurred. For details, check the | |||||
| /// documentation of the [`EventScheduler`] struct. | |||||
| /// | |||||
| /// If you want to receive the events in their original chronological order, use the | |||||
| /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the | |||||
| /// [`Stream`] trait). | |||||
| pub fn recv(&mut self) -> Option<Event> { | pub fn recv(&mut self) -> Option<Event> { | ||||
| futures::executor::block_on(self.recv_async()) | futures::executor::block_on(self.recv_async()) | ||||
| } | } | ||||
| /// wait for the next event on the events stream until timeout | |||||
| /// Receives the next incoming [`Event`] synchronously with a timeout. | |||||
| /// | |||||
| /// Blocks the thread until the next event arrives or the timeout is reached. | |||||
| /// Returns a [`Event::Error`] if no event was received within the given duration. | |||||
| /// | |||||
| /// Returns [`None`] once the event stream is closed. | |||||
| /// | |||||
| /// For an asynchronous variant of this method see [`recv_async_timeout`][Self::recv_async_timeout]. | |||||
| /// | |||||
| /// ## Event Reordering | |||||
| /// | |||||
| /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the | |||||
| /// events might be returned in a different order than they occurred. For details, check the | |||||
| /// documentation of the [`EventScheduler`] struct. | |||||
| /// | |||||
| /// If you want to receive the events in their original chronological order, use the | |||||
| /// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the | |||||
| /// [`Stream`] trait). | |||||
| pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> { | pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> { | ||||
| futures::executor::block_on(self.recv_async_timeout(dur)) | futures::executor::block_on(self.recv_async_timeout(dur)) | ||||
| } | } | ||||
| /// Receives the next incoming [`Event`] asynchronously, using an [`EventScheduler`] for fairness. | |||||
| /// | |||||
| /// Returns [`None`] once the event stream is closed. | |||||
| /// | |||||
| /// ## Event Reordering | |||||
| /// | |||||
| /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the | |||||
| /// events might be returned in a different order than they occurred. For details, check the | |||||
| /// documentation of the [`EventScheduler`] struct. | |||||
| /// | |||||
| /// If you want to receive the events in their original chronological order, use the | |||||
| /// [`StreamExt::next`] method with a custom timeout future instead | |||||
| /// ([`EventStream`] implements the [`Stream`] trait). | |||||
| pub async fn recv_async(&mut self) -> Option<Event> { | pub async fn recv_async(&mut self) -> Option<Event> { | ||||
| loop { | loop { | ||||
| if self.scheduler.is_empty() { | if self.scheduler.is_empty() { | ||||
| @@ -187,6 +254,21 @@ impl EventStream { | |||||
| event.map(Self::convert_event_item) | event.map(Self::convert_event_item) | ||||
| } | } | ||||
| /// Receives the next incoming [`Event`] asynchronously with a timeout. | |||||
| /// | |||||
| /// Returns a [`Event::Error`] if no event was received within the given duration. | |||||
| /// | |||||
| /// Returns [`None`] once the event stream is closed. | |||||
| /// | |||||
| /// ## Event Reordering | |||||
| /// | |||||
| /// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the | |||||
| /// events might be returned in a different order than they occurred. For details, check the | |||||
| /// documentation of the [`EventScheduler`] struct. | |||||
| /// | |||||
| /// If you want to receive the events in their original chronological order, use the | |||||
| /// [`StreamExt::next`] method with a custom timeout future instead | |||||
| /// ([`EventStream`] implements the [`Stream`] trait). | |||||
| pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> { | pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> { | ||||
| match select(Delay::new(dur), pin!(self.recv_async())).await { | match select(Delay::new(dur), pin!(self.recv_async())).await { | ||||
| Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError( | Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError( | ||||
| @@ -3,27 +3,89 @@ use std::collections::{HashMap, VecDeque}; | |||||
| use dora_message::{daemon_to_node::NodeEvent, id::DataId}; | use dora_message::{daemon_to_node::NodeEvent, id::DataId}; | ||||
| use super::thread::EventItem; | use super::thread::EventItem; | ||||
| pub const NON_INPUT_EVENT: &str = "dora/non_input_event"; | |||||
| pub(crate) const NON_INPUT_EVENT: &str = "dora/non_input_event"; | |||||
| // This scheduler will make sure that there is fairness between | |||||
| // inputs. | |||||
| // | |||||
| // It is going to always make sure that the input that has not been used for the longest period is the first one to be used next. | |||||
| // | |||||
| // Ex: | |||||
| // In the case that one input has a very high frequency and another one with a very slow frequency.\ | |||||
| // | |||||
| // The Node will always alternate between the two inputs when each input is available | |||||
| // Avoiding one input to be overwhelmingly present. | |||||
| // | |||||
| /// This scheduler will make sure that there is fairness between inputs. | |||||
| /// | |||||
| /// The scheduler reorders events in the following way: | |||||
| /// | |||||
| /// - **Non-input events are prioritized** | |||||
| /// | |||||
| /// If the node received any events that are not input events, they are returned first. The | |||||
| /// intention of this reordering is that the nodes can react quickly to dataflow-related events | |||||
| /// even when their input queues are very full. | |||||
| /// | |||||
| /// This reordering has some side effects that might be unexpected: | |||||
| /// - An [`InputClosed`][super::Event::InputClosed] event might be yielded before the last | |||||
| /// input events of that ID. | |||||
| /// | |||||
| /// Usually, an `InputClosed` event indicates that there won't be any subsequent inputs | |||||
| /// of a certain ID. This invariant does not hold anymore for a scheduled event stream. | |||||
| /// - The [`Stop`][super::Event::Stop] event might not be the last event of the stream anymore. | |||||
| /// | |||||
| /// Usually, the `Stop` event is the last event that is sent to a node before the event stream | |||||
| /// is closed. Because of the reordering, the stream might return more events after a `Stop` | |||||
| /// event. | |||||
| /// - **Input events are grouped by ID** and yielded in a **least-recently used order (by ID)**. | |||||
| /// | |||||
| /// The scheduler keeps a separate queue for each input ID, where the incoming input events are | |||||
| /// placed in their chronological order. When yielding the next event, the scheduler iterates over | |||||
| /// these queues in least-recently used order. This means that the queue corresponding to the | |||||
| /// last yielded event will be checked last. The scheduler will return the oldest event from the | |||||
| /// first non-empty queue. | |||||
| /// | |||||
| /// The side effect of this change is that inputs events of different IDs are no longer in their | |||||
| /// chronological order. This might lead to unexpected results for input events that are caused by | |||||
| /// each other. | |||||
| /// | |||||
| /// ## Example 1 | |||||
| /// Consider the case that one input has a very high frequency and another one with a very slow | |||||
| /// frequency. The event stream will always alternate between the two inputs when each input is | |||||
| /// available. | |||||
| /// Without the scheduling, the high-frequency input would be returned much more often. | |||||
| /// | |||||
| /// ## Example 2 | |||||
| /// Again, let's consider the case that one input has a very high frequency and the other has a | |||||
| /// very slow frequency. This time, we define a small maximum queue sizes for the low-frequency | |||||
| /// input, but a large queue size for the high-frequency one. | |||||
| /// Using the scheduler, the event stream will always alternate between high and low-frequency | |||||
| /// inputs as long as inputs of both types are available. | |||||
| /// | |||||
| /// Without scheduling, the low-frequency input might never be yielded before | |||||
| /// it's dropped because there is almost always an older high-frequency input available that is | |||||
| /// yielded first. Once the low-frequency input would be the next one chronologically, it might | |||||
| /// have been dropped already because the node received newer low-frequency inputs in the | |||||
| /// meantime (the queue length is small). At this point, the next-oldest input is a high-frequency | |||||
| /// input again. | |||||
| /// | |||||
| /// ## Example 3 | |||||
| /// Consider a high-frequency camera input and a low-frequency bounding box input, which is based | |||||
| /// on the latest camera image. The dataflow YAML file specifies a large queue size for the camera | |||||
| /// input and a small queue size for the bounding box input. | |||||
| /// | |||||
| /// With scheduling, the number of | |||||
| /// buffered camera inputs might grow over time. As a result the camera inputs yielded from the | |||||
| /// stream (in oldest-first order) are not synchronized with the bounding box inputs anymore. So | |||||
| /// the node receives an up-to-date bounding box, but a considerably outdated image. | |||||
| /// | |||||
| /// Without scheduling, the events are returned in chronological order. This time, the bounding | |||||
| /// box might be slightly outdated if the camera sent new images before the bounding box was | |||||
| /// ready. However, the time difference between the two input types is independent of the | |||||
| /// queue size this time. | |||||
| /// | |||||
| /// (If a perfect matching bounding box is required, we recommend to forward the input image as | |||||
| /// part of the bounding box output. This way, the receiving node only needs to subscribe to one | |||||
| /// input so no mismatches can happen.) | |||||
| #[derive(Debug)] | #[derive(Debug)] | ||||
| pub struct Scheduler { | pub struct Scheduler { | ||||
| last_used: VecDeque<DataId>, // Tracks the last-used event ID | |||||
| event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>, // Tracks events per ID | |||||
| /// Tracks the last-used event ID | |||||
| last_used: VecDeque<DataId>, | |||||
| /// Tracks events per ID | |||||
| event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>, | |||||
| } | } | ||||
| impl Scheduler { | impl Scheduler { | ||||
| pub fn new(event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>) -> Self { | |||||
| pub(crate) fn new(event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>) -> Self { | |||||
| let topic = VecDeque::from_iter( | let topic = VecDeque::from_iter( | ||||
| event_queues | event_queues | ||||
| .keys() | .keys() | ||||
| @@ -36,7 +98,7 @@ impl Scheduler { | |||||
| } | } | ||||
| } | } | ||||
| pub fn add_event(&mut self, event: EventItem) { | |||||
| pub(crate) fn add_event(&mut self, event: EventItem) { | |||||
| let event_id = match &event { | let event_id = match &event { | ||||
| EventItem::NodeEvent { | EventItem::NodeEvent { | ||||
| event: | event: | ||||
| @@ -63,7 +125,7 @@ impl Scheduler { | |||||
| } | } | ||||
| } | } | ||||
| pub fn next(&mut self) -> Option<EventItem> { | |||||
| pub(crate) fn next(&mut self) -> Option<EventItem> { | |||||
| // Retrieve message from the non input event first that have priority over input message. | // Retrieve message from the non input event first that have priority over input message. | ||||
| if let Some((_size, queue)) = self | if let Some((_size, queue)) = self | ||||
| .event_queues | .event_queues | ||||
| @@ -89,7 +151,7 @@ impl Scheduler { | |||||
| None | None | ||||
| } | } | ||||
| pub fn is_empty(&self) -> bool { | |||||
| pub(crate) fn is_empty(&self) -> bool { | |||||
| self.event_queues | self.event_queues | ||||
| .iter() | .iter() | ||||
| .all(|(_id, (_size, queue))| queue.is_empty()) | .all(|(_id, (_size, queue))| queue.is_empty()) | ||||
| @@ -1,18 +1,85 @@ | |||||
| //! The custom node API allow you to integrate `dora` into your application. | |||||
| //! It allows you to retrieve input and send output in any fashion you want. | |||||
| //! This crate enables you to create nodes for the [Dora] dataflow framework. | |||||
| //! | //! | ||||
| //! Try it out with: | |||||
| //! [Dora]: https://dora-rs.ai/ | |||||
| //! | //! | ||||
| //! ```bash | |||||
| //! dora new node --kind node | |||||
| //! ``` | |||||
| //! ## The Dora Framework | |||||
| //! | |||||
| //! Dora is a dataflow frame work that models applications as a directed graph, with nodes | |||||
| //! representing operations and edges representing data transfer. | |||||
| //! The layout of the dataflow graph is defined through a YAML file in Dora. | |||||
| //! For details, see our [Dataflow Specification](https://dora-rs.ai/docs/api/dataflow-config/) | |||||
| //! chapter. | |||||
| //! | |||||
| //! Dora nodes are typically spawned by the Dora framework, instead of spawning them manually. | |||||
| //! If you want to spawn a node manually, define it as a [_dynamic_ node](#dynamic-nodes). | |||||
| //! | //! | ||||
| //! You can also generate a dora rust project with | |||||
| //! ## Normal Usage | |||||
| //! | //! | ||||
| //! ```bash | |||||
| //! dora new project_xyz --kind dataflow | |||||
| //! In order to connect your executable to Dora, you need to initialize a [`DoraNode`]. | |||||
| //! For standard nodes, the recommended initialization function is [`init_from_env`][`DoraNode::init_from_env`]. | |||||
| //! This function will return two values, a [`DoraNode`] instance and an [`EventStream`]: | |||||
| //! | |||||
| //! ```no_run | |||||
| //! use dora_node_api::DoraNode; | |||||
| //! | |||||
| //! let (mut node, mut events) = DoraNode::init_from_env()?; | |||||
| //! # Ok::<(), eyre::Report>(()) | |||||
| //! ``` | //! ``` | ||||
| //! | //! | ||||
| //! You can use the `node` instance to send outputs and retrieve information about the node and | |||||
| //! the dataflow. The `events` stream yields the inputs that the node defines in the dataflow | |||||
| //! YAML file and other incoming events. | |||||
| //! | |||||
| //! ### Sending Outputs | |||||
| //! | |||||
| //! The [`DoraNode`] instance enables you to send outputs in different formats. | |||||
| //! For best performance, use the [Arrow](https://arrow.apache.org/docs/index.html) data format | |||||
| //! and one of the output functions that utilizes shared memory. | |||||
| //! | |||||
| //! ### Receiving Events | |||||
| //! | |||||
| //! The [`EventStream`] is an [`AsyncIterator`][std::async_iter::AsyncIterator] that yields the incoming [`Event`]s. | |||||
| //! | |||||
| //! Nodes should iterate over this event stream and react to events that they are interested in. | |||||
| //! Typically, the most important event type is [`Event::Input`]. | |||||
| //! You don't need to handle all events, it's fine to ignore events that are not relevant to your node. | |||||
| //! | |||||
| //! The event stream will close itself after a [`Event::Stop`] was received. | |||||
| //! A manual `break` on [`Event::Stop`] is typically not needed. | |||||
| //! _(You probably do need to use a manual `break` on stop events when using the | |||||
| //! [`StreamExt::merge`][`futures_concurrency::stream::StreamExt::merge`] implementation on | |||||
| //! [`EventStream`] to combine the stream with an external one.)_ | |||||
| //! | |||||
| //! Once the event stream finished, nodes should exit. | |||||
| //! Note that Dora kills nodes that don't exit quickly after a [`Event::Stop`] of type | |||||
| //! [`StopCause::Manual`] was received. | |||||
| //! | |||||
| //! | |||||
| //! | |||||
| //! ## Dynamic Nodes | |||||
| //! | |||||
| //! <div class="warning"> | |||||
| //! | |||||
| //! Dynamic nodes have certain [limitations](#limitations). Use with care. | |||||
| //! | |||||
| //! </div> | |||||
| //! | |||||
| //! Nodes can be defined as `dynamic` by setting their `path` attribute to `dynamic` in the | |||||
| //! dataflow YAML file. Dynamic nodes are not spawned by the Dora framework and need to be started | |||||
| //! manually. | |||||
| //! | |||||
| //! Dynamic nodes cannot use the [`DoraNode::init_from_env`] function for initialization. | |||||
| //! Instead, they can be initialized through the [`DoraNode::init_from_node_id`] function. | |||||
| //! | |||||
| //! ### Limitations | |||||
| //! | |||||
| //! - Dynamic nodes **don't work with `dora run`**. | |||||
| //! - As dynamic nodes are identified by their node ID, this **ID must be unique** | |||||
| //! across all running dataflows. | |||||
| //! - For distributed dataflows, nodes need to be manually spawned on the correct machine. | |||||
| #![warn(missing_docs)] | |||||
| pub use arrow; | pub use arrow; | ||||
| pub use dora_arrow_convert::*; | pub use dora_arrow_convert::*; | ||||
| pub use dora_core::{self, uhlc}; | pub use dora_core::{self, uhlc}; | ||||
| @@ -20,8 +87,9 @@ pub use dora_message::{ | |||||
| metadata::{Metadata, MetadataParameters, Parameter}, | metadata::{Metadata, MetadataParameters, Parameter}, | ||||
| DataflowId, | DataflowId, | ||||
| }; | }; | ||||
| pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData, StopCause}; | |||||
| pub use event_stream::{merged, Event, EventScheduler, EventStream, StopCause}; | |||||
| pub use flume::Receiver; | pub use flume::Receiver; | ||||
| pub use futures; | |||||
| pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; | pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; | ||||
| mod daemon_connection; | mod daemon_connection; | ||||
| @@ -1,6 +1,11 @@ | |||||
| //! Utility functions for converting Arrow arrays to/from raw data. | |||||
| //! | |||||
| use arrow::array::{ArrayData, BufferSpec}; | use arrow::array::{ArrayData, BufferSpec}; | ||||
| use dora_message::metadata::{ArrowTypeInfo, BufferOffset}; | use dora_message::metadata::{ArrowTypeInfo, BufferOffset}; | ||||
| use eyre::Context; | |||||
| /// Calculates the data size in bytes required for storing a continuous copy of the given Arrow | |||||
| /// array. | |||||
| pub fn required_data_size(array: &ArrayData) -> usize { | pub fn required_data_size(array: &ArrayData) -> usize { | ||||
| let mut next_offset = 0; | let mut next_offset = 0; | ||||
| required_data_size_inner(array, &mut next_offset); | required_data_size_inner(array, &mut next_offset); | ||||
| @@ -10,8 +15,8 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) { | |||||
| let layout = arrow::array::layout(array.data_type()); | let layout = arrow::array::layout(array.data_type()); | ||||
| for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) { | for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) { | ||||
| // consider alignment padding | // consider alignment padding | ||||
| if let BufferSpec::FixedWidth { alignment, .. } = spec { | |||||
| *next_offset = (*next_offset).div_ceil(*alignment) * alignment; | |||||
| if let BufferSpec::FixedWidth { alignment, .. } = *spec { | |||||
| *next_offset = (*next_offset).div_ceil(alignment) * alignment; | |||||
| } | } | ||||
| *next_offset += buffer.len(); | *next_offset += buffer.len(); | ||||
| } | } | ||||
| @@ -20,6 +25,12 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) { | |||||
| } | } | ||||
| } | } | ||||
| /// Copy the given Arrow array into the provided buffer. | |||||
| /// | |||||
| /// If the Arrow array consists of multiple buffers, they are placed continuously in the target | |||||
| /// buffer (there might be some padding for alignment) | |||||
| /// | |||||
| /// Panics if the buffer is not large enough. | |||||
| pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo { | pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo { | ||||
| let mut next_offset = 0; | let mut next_offset = 0; | ||||
| copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) | copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) | ||||
| @@ -41,8 +52,8 @@ fn copy_array_into_sample_inner( | |||||
| *next_offset, | *next_offset, | ||||
| ); | ); | ||||
| // add alignment padding | // add alignment padding | ||||
| if let BufferSpec::FixedWidth { alignment, .. } = spec { | |||||
| *next_offset = (*next_offset).div_ceil(*alignment) * alignment; | |||||
| if let BufferSpec::FixedWidth { alignment, .. } = *spec { | |||||
| *next_offset = (*next_offset).div_ceil(alignment) * alignment; | |||||
| } | } | ||||
| target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice()); | target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice()); | ||||
| @@ -69,3 +80,38 @@ fn copy_array_into_sample_inner( | |||||
| child_data, | child_data, | ||||
| } | } | ||||
| } | } | ||||
| /// Tries to convert the given raw Arrow buffer into an Arrow array. | |||||
| /// | |||||
| /// The `type_info` is required for decoding the `raw_buffer` correctly. | |||||
| pub fn buffer_into_arrow_array( | |||||
| raw_buffer: &arrow::buffer::Buffer, | |||||
| type_info: &ArrowTypeInfo, | |||||
| ) -> eyre::Result<arrow::array::ArrayData> { | |||||
| if raw_buffer.is_empty() { | |||||
| return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type)); | |||||
| } | |||||
| let mut buffers = Vec::new(); | |||||
| for BufferOffset { offset, len } in &type_info.buffer_offsets { | |||||
| buffers.push(raw_buffer.slice_with_length(*offset, *len)); | |||||
| } | |||||
| let mut child_data = Vec::new(); | |||||
| for child_type_info in &type_info.child_data { | |||||
| child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?) | |||||
| } | |||||
| arrow::array::ArrayData::try_new( | |||||
| type_info.data_type.clone(), | |||||
| type_info.len, | |||||
| type_info | |||||
| .validity | |||||
| .clone() | |||||
| .map(arrow::buffer::Buffer::from_vec), | |||||
| type_info.offset, | |||||
| buffers, | |||||
| child_data, | |||||
| ) | |||||
| .context("Error creating Arrow array") | |||||
| } | |||||
| @@ -42,6 +42,20 @@ pub mod arrow_utils; | |||||
| mod control_channel; | mod control_channel; | ||||
| mod drop_stream; | mod drop_stream; | ||||
| /// The data size threshold at which we start using shared memory. | |||||
| /// | |||||
| /// Shared memory works by sharing memory pages. This means that the smallest | |||||
| /// memory region that can be shared is one memory page, which is typically | |||||
| /// 4KiB. | |||||
| /// | |||||
| /// Using shared memory for messages smaller than the page size still requires | |||||
| /// sharing a full page, so we have some memory overhead. We also have some | |||||
| /// performance overhead because we need to issue multiple syscalls. For small | |||||
| /// messages it is faster to send them over a traditional TCP stream (or similar). | |||||
| /// | |||||
| /// This hardcoded threshold value specifies which messages are sent through | |||||
| /// shared memory. Messages that are smaller than this threshold are sent through | |||||
| /// TCP. | |||||
| pub const ZERO_COPY_THRESHOLD: usize = 4096; | pub const ZERO_COPY_THRESHOLD: usize = 4096; | ||||
| #[allow(dead_code)] | #[allow(dead_code)] | ||||
| @@ -50,6 +64,10 @@ enum TokioRuntime { | |||||
| Handle(Handle), | Handle(Handle), | ||||
| } | } | ||||
| /// Allows sending outputs and retrieving node information. | |||||
| /// | |||||
| /// The main purpose of this struct is to send outputs via Dora. There are also functions available | |||||
| /// for retrieving the node configuration. | |||||
| pub struct DoraNode { | pub struct DoraNode { | ||||
| id: NodeId, | id: NodeId, | ||||
| dataflow_id: DataflowId, | dataflow_id: DataflowId, | ||||
| @@ -67,7 +85,11 @@ pub struct DoraNode { | |||||
| } | } | ||||
| impl DoraNode { | impl DoraNode { | ||||
| /// Initiate a node from environment variables set by `dora-coordinator` | |||||
| /// Initiate a node from environment variables set by the Dora daemon. | |||||
| /// | |||||
| /// This is the recommended initialization function for Dora nodes, which are spawned by | |||||
| /// Dora daemon instances. | |||||
| /// | |||||
| /// | /// | ||||
| /// ```no_run | /// ```no_run | ||||
| /// use dora_node_api::DoraNode; | /// use dora_node_api::DoraNode; | ||||
| @@ -94,6 +116,8 @@ impl DoraNode { | |||||
| /// Initiate a node from a dataflow id and a node id. | /// Initiate a node from a dataflow id and a node id. | ||||
| /// | /// | ||||
| /// This initialization function should be used for [_dynamic nodes_](index.html#dynamic-nodes). | |||||
| /// | |||||
| /// ```no_run | /// ```no_run | ||||
| /// use dora_node_api::DoraNode; | /// use dora_node_api::DoraNode; | ||||
| /// use dora_node_api::dora_core::config::NodeId; | /// use dora_node_api::dora_core::config::NodeId; | ||||
| @@ -126,6 +150,11 @@ impl DoraNode { | |||||
| } | } | ||||
| } | } | ||||
| /// Dynamic initialization function for nodes that are sometimes used as dynamic nodes. | |||||
| /// | |||||
| /// This function first tries initializing the traditional way through | |||||
| /// [`init_from_env`][Self::init_from_env]. If this fails, it falls back to | |||||
| /// [`init_from_node_id`][Self::init_from_node_id]. | |||||
| pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { | pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { | ||||
| if std::env::var("DORA_NODE_CONFIG").is_ok() { | if std::env::var("DORA_NODE_CONFIG").is_ok() { | ||||
| info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"); | info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"); | ||||
| @@ -135,6 +164,8 @@ impl DoraNode { | |||||
| } | } | ||||
| } | } | ||||
| /// Internal initialization routine that should not be used outside of Dora. | |||||
| #[doc(hidden)] | |||||
| #[tracing::instrument] | #[tracing::instrument] | ||||
| pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> { | pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> { | ||||
| let NodeConfig { | let NodeConfig { | ||||
| @@ -219,7 +250,8 @@ impl DoraNode { | |||||
| } | } | ||||
| } | } | ||||
| /// Send data from the node to the other nodes. | |||||
| /// Send raw data from the node to the other nodes. | |||||
| /// | |||||
| /// We take a closure as an input to enable zero copy on send. | /// We take a closure as an input to enable zero copy on send. | ||||
| /// | /// | ||||
| /// ```no_run | /// ```no_run | ||||
| @@ -242,6 +274,8 @@ impl DoraNode { | |||||
| /// }).expect("Could not send output"); | /// }).expect("Could not send output"); | ||||
| /// ``` | /// ``` | ||||
| /// | /// | ||||
| /// Ignores the output if the given `output_id` is not specified as node output in the dataflow | |||||
| /// configuration file. | |||||
| pub fn send_output_raw<F>( | pub fn send_output_raw<F>( | ||||
| &mut self, | &mut self, | ||||
| output_id: DataId, | output_id: DataId, | ||||
| @@ -263,6 +297,14 @@ impl DoraNode { | |||||
| self.send_output_sample(output_id, type_info, parameters, Some(sample)) | self.send_output_sample(output_id, type_info, parameters, Some(sample)) | ||||
| } | } | ||||
| /// Sends the give Arrow array as an output message. | |||||
| /// | |||||
| /// Uses shared memory for efficient data transfer if suitable. | |||||
| /// | |||||
| /// This method might copy the message once to move it to shared memory. | |||||
| /// | |||||
| /// Ignores the output if the given `output_id` is not specified as node output in the dataflow | |||||
| /// configuration file. | |||||
| pub fn send_output( | pub fn send_output( | ||||
| &mut self, | &mut self, | ||||
| output_id: DataId, | output_id: DataId, | ||||
| @@ -286,6 +328,12 @@ impl DoraNode { | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| /// Send the given raw byte data as output. | |||||
| /// | |||||
| /// Might copy the data once to move it into shared memory. | |||||
| /// | |||||
| /// Ignores the output if the given `output_id` is not specified as node output in the dataflow | |||||
| /// configuration file. | |||||
| pub fn send_output_bytes( | pub fn send_output_bytes( | ||||
| &mut self, | &mut self, | ||||
| output_id: DataId, | output_id: DataId, | ||||
| @@ -301,6 +349,12 @@ impl DoraNode { | |||||
| }) | }) | ||||
| } | } | ||||
| /// Send the give raw byte data with the provided type information. | |||||
| /// | |||||
| /// It is recommended to use a function like [`send_output`][Self::send_output] instead. | |||||
| /// | |||||
| /// Ignores the output if the given `output_id` is not specified as node output in the dataflow | |||||
| /// configuration file. | |||||
| pub fn send_typed_output<F>( | pub fn send_typed_output<F>( | ||||
| &mut self, | &mut self, | ||||
| output_id: DataId, | output_id: DataId, | ||||
| @@ -322,6 +376,12 @@ impl DoraNode { | |||||
| self.send_output_sample(output_id, type_info, parameters, Some(sample)) | self.send_output_sample(output_id, type_info, parameters, Some(sample)) | ||||
| } | } | ||||
| /// Sends the given [`DataSample`] as output, combined with the given type information. | |||||
| /// | |||||
| /// It is recommended to use a function like [`send_output`][Self::send_output] instead. | |||||
| /// | |||||
| /// Ignores the output if the given `output_id` is not specified as node output in the dataflow | |||||
| /// configuration file. | |||||
| pub fn send_output_sample( | pub fn send_output_sample( | ||||
| &mut self, | &mut self, | ||||
| output_id: DataId, | output_id: DataId, | ||||
| @@ -350,32 +410,46 @@ impl DoraNode { | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| pub fn close_outputs(&mut self, outputs: Vec<DataId>) -> eyre::Result<()> { | |||||
| for output_id in &outputs { | |||||
| /// Report the given outputs IDs as closed. | |||||
| /// | |||||
| /// The node is not allowed to send more outputs with the closed IDs. | |||||
| /// | |||||
| /// Closing outputs early can be helpful to receivers. | |||||
| pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> eyre::Result<()> { | |||||
| for output_id in &outputs_ids { | |||||
| if !self.node_config.outputs.remove(output_id) { | if !self.node_config.outputs.remove(output_id) { | ||||
| eyre::bail!("unknown output {output_id}"); | eyre::bail!("unknown output {output_id}"); | ||||
| } | } | ||||
| } | } | ||||
| self.control_channel | self.control_channel | ||||
| .report_closed_outputs(outputs) | |||||
| .report_closed_outputs(outputs_ids) | |||||
| .wrap_err("failed to report closed outputs to daemon")?; | .wrap_err("failed to report closed outputs to daemon")?; | ||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| /// Returns the ID of the node as specified in the dataflow configuration file. | |||||
| pub fn id(&self) -> &NodeId { | pub fn id(&self) -> &NodeId { | ||||
| &self.id | &self.id | ||||
| } | } | ||||
| /// Returns the unique identifier for the running dataflow instance. | |||||
| /// | |||||
| /// Dora assigns each dataflow instance a random identifier when started. | |||||
| pub fn dataflow_id(&self) -> &DataflowId { | pub fn dataflow_id(&self) -> &DataflowId { | ||||
| &self.dataflow_id | &self.dataflow_id | ||||
| } | } | ||||
| /// Returns the input and output configuration of this node. | |||||
| pub fn node_config(&self) -> &NodeRunConfig { | pub fn node_config(&self) -> &NodeRunConfig { | ||||
| &self.node_config | &self.node_config | ||||
| } | } | ||||
| /// Allocates a [`DataSample`] of the specified size. | |||||
| /// | |||||
| /// The data sample will use shared memory when suitable to enable efficient data transfer | |||||
| /// when sending an output message. | |||||
| pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> { | pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> { | ||||
| let data = if data_len >= ZERO_COPY_THRESHOLD { | let data = if data_len >= ZERO_COPY_THRESHOLD { | ||||
| // create shared memory region | // create shared memory region | ||||
| @@ -514,6 +588,11 @@ impl Drop for DoraNode { | |||||
| } | } | ||||
| } | } | ||||
| /// A data region suitable for sending as an output message. | |||||
| /// | |||||
| /// The region is stored in shared memory when suitable to enable efficient data transfer. | |||||
| /// | |||||
| /// `DataSample` implements the [`Deref`] and [`DerefMut`] traits to read and write the mapped data. | |||||
| pub struct DataSample { | pub struct DataSample { | ||||
| inner: DataSampleInner, | inner: DataSampleInner, | ||||
| len: usize, | len: usize, | ||||
| @@ -1,3 +1,52 @@ | |||||
| //! Provides the `dora build` command. | |||||
| //! | |||||
| //! The `dora build` command works like this: | |||||
| //! | |||||
| //! - Dataflows can specify a `build` command for each node in their YAML definition | |||||
| //! - Dora will run the `build` command when `dora build` is invoked | |||||
| //! - If the dataflow is distributed across multiple machines, each `build` command will be run the target machine of the corresponding node. | |||||
| //! - i.e. the machine specified under the `deploy` key | |||||
| //! - this requires a connection to the dora coordinator, so you need to specify the coordinator IP/port for this | |||||
| //! - to run the build commands of all nodes _locally_, you can use `dora build --local` | |||||
| //! - If the build command does not specify any `deploy` keys, all build commands will be run locally (i.e. `dora build` behaves like `dora build --local`) | |||||
| //! | |||||
| //! #### Git Source | |||||
| //! | |||||
| //! - Nodes can have a git repository as source | |||||
| //! - set the `git` config key to the URL of the repository | |||||
| //! - by default, the default branch is used | |||||
| //! - you can also specify a specific `branch` name | |||||
| //! - alternatively, you can specify a `tag` name or a `rev` key with the commit hash | |||||
| //! - you can only specify one of `branch`, `tag`, and `rev`, otherwise an error will occur | |||||
| //! - Dora will automatically clone and checkout the requested branch/tag/commit on `dora build` | |||||
| //! - the `build` command will be run after cloning | |||||
| //! - for distributed dataflows, the clone/checkout will happen on the target machine | |||||
| //! - subsequent `dora build` command will automatically fetch the latest changes for nodes | |||||
| //! - not when using `tag` or `rev`, because these are not expected to change | |||||
| //! - after fetching changes, the `build` command will be executed again | |||||
| //! - _tip:_ use a build tool that supports incremental builds (e.g. `cargo`) to make this rebuild faster | |||||
| //! | |||||
| //! The **working directory** will be set to the git repository. | |||||
| //! This means that both the `build` and `path` keys will be run from this folder. | |||||
| //! This allows you to use relative paths. | |||||
| //! | |||||
| //! #### Example | |||||
| //! | |||||
| //! ```yml | |||||
| //! nodes: | |||||
| //! - id: rust-node | |||||
| //! # URL of your repository | |||||
| //! git: https://github.com/dora-rs/dora.git | |||||
| //! # the build command that should be invoked after cloning | |||||
| //! build: cargo build -p rust-dataflow-example-node | |||||
| //! # path to the executable that should be run on start | |||||
| //! path: target/debug/rust-dataflow-example-node | |||||
| //! inputs: | |||||
| //! tick: dora/timer/millis/10 | |||||
| //! outputs: | |||||
| //! - random | |||||
| //! ``` | |||||
| use communication_layer_request_reply::TcpRequestReplyConnection; | use communication_layer_request_reply::TcpRequestReplyConnection; | ||||
| use dora_core::{ | use dora_core::{ | ||||
| descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt}, | descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt}, | ||||
| @@ -1,3 +1,10 @@ | |||||
| //! The `dora run` command is a quick and easy way to run a dataflow locally. | |||||
| //! It does not support distributed dataflows and will throw an error if there are any `deploy` keys in the YAML file. | |||||
| //! | |||||
| //! The `dora run` command does not interact with any `dora coordinator` or `dora daemon` instances, or with any other parallel `dora run` commands. | |||||
| //! | |||||
| //! Use `dora build --local` or manual build commands to build your nodes. | |||||
| use super::Executable; | use super::Executable; | ||||
| use crate::{ | use crate::{ | ||||
| common::{handle_dataflow_result, resolve_dataflow}, | common::{handle_dataflow_result, resolve_dataflow}, | ||||
| @@ -1,3 +1,7 @@ | |||||
| //! The `dora start` command is used to spawn a dataflow in a pre-existing _dora network_. To create a dora network, spawn a `dora coordinator` and one or multiple `dora daemon` instances. | |||||
| //! | |||||
| //! The `dora start` command does not run any build commands, nor update git dependencies or similar. Use `dora build` for that. | |||||
| use super::{default_tracing, Executable}; | use super::{default_tracing, Executable}; | ||||
| use crate::{ | use crate::{ | ||||
| command::start::attach::attach_dataflow, | command::start::attach::attach_dataflow, | ||||
| @@ -1,3 +1,7 @@ | |||||
| //! Provides functions for converting between Apache Arrow arrays and Rust data types. | |||||
| #![warn(missing_docs)] | |||||
| use arrow::array::{ | use arrow::array::{ | ||||
| Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, | Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, | ||||
| UInt32Array, UInt8Array, | UInt32Array, UInt8Array, | ||||
| @@ -10,12 +14,16 @@ use std::ops::{Deref, DerefMut}; | |||||
| mod from_impls; | mod from_impls; | ||||
| mod into_impls; | mod into_impls; | ||||
| /// Data that can be converted to an Arrow array. | |||||
| pub trait IntoArrow { | pub trait IntoArrow { | ||||
| /// The Array type that the data can be converted to. | |||||
| type A: Array; | type A: Array; | ||||
| /// Convert the data into an Arrow array. | |||||
| fn into_arrow(self) -> Self::A; | fn into_arrow(self) -> Self::A; | ||||
| } | } | ||||
| /// Wrapper type for an Arrow [`ArrayRef`](arrow::array::ArrayRef). | |||||
| #[derive(Debug)] | #[derive(Debug)] | ||||
| pub struct ArrowData(pub arrow::array::ArrayRef); | pub struct ArrowData(pub arrow::array::ArrayRef); | ||||
| @@ -35,6 +43,7 @@ impl DerefMut for ArrowData { | |||||
| macro_rules! register_array_handlers { | macro_rules! register_array_handlers { | ||||
| ($(($variant:path, $array_type:ty, $type_name:expr)),* $(,)?) => { | ($(($variant:path, $array_type:ty, $type_name:expr)),* $(,)?) => { | ||||
| /// Tries to convert the given Arrow array into a `Vec` of integers or floats. | |||||
| pub fn into_vec<T>(data: &ArrowData) -> Result<Vec<T>> | pub fn into_vec<T>(data: &ArrowData) -> Result<Vec<T>> | ||||
| where | where | ||||
| T: Copy + NumCast + 'static, | T: Copy + NumCast + 'static, | ||||
| @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; | |||||
| pub use crate::id::{DataId, NodeId, OperatorId}; | pub use crate::id::{DataId, NodeId, OperatorId}; | ||||
| /// Contains the input and output configuration of the node. | |||||
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] | #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] | ||||
| pub struct NodeRunConfig { | pub struct NodeRunConfig { | ||||
| /// Inputs for the nodes as a map from input ID to `node_id/output_id`. | /// Inputs for the nodes as a map from input ID to `node_id/output_id`. | ||||
| @@ -26,6 +26,9 @@ pub use arrow_data; | |||||
| pub use arrow_schema; | pub use arrow_schema; | ||||
| use uuid::{Timestamp, Uuid}; | use uuid::{Timestamp, Uuid}; | ||||
| /// Unique identifier for a dataflow instance. | |||||
| /// | |||||
| /// Dora assigns each dataflow instance a unique ID on start. | |||||
| pub type DataflowId = uuid::Uuid; | pub type DataflowId = uuid::Uuid; | ||||
| #[derive( | #[derive( | ||||
| @@ -3,6 +3,9 @@ use std::collections::BTreeMap; | |||||
| use arrow_schema::DataType; | use arrow_schema::DataType; | ||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||
| /// Additional data that is sent as part of output messages. | |||||
| /// | |||||
| /// Includes a timestamp, type information, and additional user-provided parameters. | |||||
| #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | ||||
| pub struct Metadata { | pub struct Metadata { | ||||
| metadata_version: u16, | metadata_version: u16, | ||||
| @@ -42,6 +45,7 @@ impl Metadata { | |||||
| } | } | ||||
| } | } | ||||
| /// Additional metadata that can be sent as part of output messages. | |||||
| pub type MetadataParameters = BTreeMap<String, Parameter>; | pub type MetadataParameters = BTreeMap<String, Parameter>; | ||||
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||||
| @@ -55,6 +59,7 @@ pub struct ArrowTypeInfo { | |||||
| pub child_data: Vec<ArrowTypeInfo>, | pub child_data: Vec<ArrowTypeInfo>, | ||||
| } | } | ||||
| /// A metadata parameter that can be sent as part of output messages. | |||||
| #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] | #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] | ||||
| pub enum Parameter { | pub enum Parameter { | ||||
| Bool(bool), | Bool(bool), | ||||