Browse Source

Merge branch 'main' into fix-all-clippy-warning

pull/1076/head
Chrislearn Young 6 months ago
parent
commit
bfedbe4b62
16 changed files with 649 additions and 163 deletions
  1. +13
    -7
      apis/python/operator/src/lib.rs
  2. +80
    -0
      apis/rust/node/src/event_stream/data_conversion.rs
  3. +69
    -112
      apis/rust/node/src/event_stream/event.rs
  4. +28
    -0
      apis/rust/node/src/event_stream/merged.rs
  5. +89
    -7
      apis/rust/node/src/event_stream/mod.rs
  6. +80
    -18
      apis/rust/node/src/event_stream/scheduler.rs
  7. +78
    -10
      apis/rust/node/src/lib.rs
  8. +50
    -4
      apis/rust/node/src/node/arrow_utils.rs
  9. +84
    -5
      apis/rust/node/src/node/mod.rs
  10. +49
    -0
      binaries/cli/src/command/build/mod.rs
  11. +7
    -0
      binaries/cli/src/command/run.rs
  12. +4
    -0
      binaries/cli/src/command/start/mod.rs
  13. +9
    -0
      libraries/arrow-convert/src/lib.rs
  14. +1
    -0
      libraries/message/src/config.rs
  15. +3
    -0
      libraries/message/src/lib.rs
  16. +5
    -0
      libraries/message/src/metadata.rs

+ 13
- 7
apis/python/operator/src/lib.rs View File

@@ -277,7 +277,7 @@ pub fn metadata_to_pydict<'a>(

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{ptr::NonNull, sync::Arc};

use aligned_vec::{AVec, ConstAlign};
use arrow::{
@@ -289,9 +289,8 @@ mod tests {
};

use arrow_schema::{DataType, Field};
use dora_node_api::{
arrow_utils::{copy_array_into_sample, required_data_size},
RawData,
use dora_node_api::arrow_utils::{
buffer_into_arrow_array, copy_array_into_sample, required_data_size,
};
use eyre::{Context, Result};

@@ -301,9 +300,16 @@ mod tests {

let info = copy_array_into_sample(&mut sample, arrow_array);

let serialized_deserialized_arrow_array = RawData::Vec(sample)
.into_arrow_array(&info)
.context("Could not create arrow array")?;
let serialized_deserialized_arrow_array = {
let ptr = NonNull::new(sample.as_ptr() as *mut _).unwrap();
let len = sample.len();

let raw_buffer = unsafe {
arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(sample))
};
buffer_into_arrow_array(&raw_buffer, &info)?
};

assert_eq!(arrow_array, &serialized_deserialized_arrow_array);

Ok(())


+ 80
- 0
apis/rust/node/src/event_stream/data_conversion.rs View File

@@ -0,0 +1,80 @@
use std::{ptr::NonNull, sync::Arc};

use aligned_vec::{AVec, ConstAlign};
use dora_arrow_convert::IntoArrow;
use dora_message::metadata::ArrowTypeInfo;
use eyre::Context;
use shared_memory_server::{Shmem, ShmemConf};

use crate::arrow_utils::buffer_into_arrow_array;

pub enum RawData {
Empty,
Vec(AVec<u8, ConstAlign<128>>),
SharedMemory(SharedMemoryData),
}

impl RawData {
pub fn into_arrow_array(
self,
type_info: &ArrowTypeInfo,
) -> eyre::Result<arrow::array::ArrayData> {
let raw_buffer = match self {
RawData::Empty => return Ok(().into_arrow().into()),
RawData::Vec(data) => {
let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap();
let len = data.len();

unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
}
RawData::SharedMemory(data) => {
let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap();
let len = data.data.len();

unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
}
};

buffer_into_arrow_array(&raw_buffer, type_info)
}
}

impl std::fmt::Debug for RawData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Data").finish_non_exhaustive()
}
}

pub struct SharedMemoryData {
pub data: MappedInputData,
pub _drop: flume::Sender<()>,
}

pub struct MappedInputData {
memory: Box<Shmem>,
len: usize,
}

impl MappedInputData {
pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result<Self> {
let memory = Box::new(
ShmemConf::new()
.os_id(shared_memory_id)
.writable(false)
.open()
.wrap_err("failed to map shared memory input")?,
);
Ok(MappedInputData { memory, len })
}
}

impl std::ops::Deref for MappedInputData {
type Target = [u8];

fn deref(&self) -> &Self::Target {
unsafe { &self.memory.as_slice()[..self.len] }
}
}

unsafe impl Send for MappedInputData {}
unsafe impl Sync for MappedInputData {}

+ 69
- 112
apis/rust/node/src/event_stream/event.rs View File

@@ -1,134 +1,91 @@
use std::{ptr::NonNull, sync::Arc};

use aligned_vec::{AVec, ConstAlign};
use dora_arrow_convert::{ArrowData, IntoArrow};
use dora_arrow_convert::ArrowData;
use dora_core::config::{DataId, OperatorId};
use dora_message::metadata::{ArrowTypeInfo, BufferOffset, Metadata};
use eyre::{Context, Result};
use shared_memory_extended::{Shmem, ShmemConf};

use dora_message::metadata::Metadata;

/// Represents an incoming Dora event.
///
/// Events might be triggered by other nodes, by Dora itself, or by some external user input.
///
/// It's safe to ignore event types that are not relevant to the node.
///
/// This enum is marked as `non_exhaustive` because we might add additional
/// variants in the future. Please ignore unknown event types instead of throwing an
/// error to avoid breakage when updating Dora.
#[derive(Debug)]
#[non_exhaustive]
#[allow(clippy::large_enum_variant)]
pub enum Event {
Stop(StopCause),
Reload {
operator_id: Option<OperatorId>,
},
/// An input was received from another node.
///
/// This event corresponds to one of the `inputs` of the node as specified
/// in the dataflow YAML file.
Input {
/// The input ID, as specified in the YAML file.
///
/// Note that this is not the output ID of the sender, but the ID
/// assigned to the input in the YAML file.
id: DataId,
/// Meta information about this input, e.g. the timestamp.
metadata: Metadata,
/// The actual data in the Apache Arrow data format.
data: ArrowData,
},
/// An input was closed by the sender.
///
/// The sending node mapped to an input exited, so this input will receive
/// no more data.
InputClosed {
/// The ID of the input that was closed, as specified in the YAML file.
///
/// Note that this is not the output ID of the sender, but the ID
/// assigned to the input in the YAML file.
id: DataId,
},
/// Notification that the event stream is about to close.
///
/// The [`StopCause`] field contains the reason for the event stream closure.
///
/// Typically, nodes should exit once the event stream closes. One notable
/// exception are nodes with no inputs, which will receive aa
/// `Event::Stop(StopCause::AllInputsClosed)` right at startup. Source nodes
/// might want to keep producing outputs still. (There is currently an open
/// discussion of changing this behavior and not sending `AllInputsClosed`
/// to nodes without inputs.)
///
/// Note: Stop events with `StopCause::Manual` indicate a manual stop operation
/// issued through `dora stop` or a `ctrl-c`. Nodes **must exit** once receiving
/// such a stop event, otherwise they will be killed by Dora.
Stop(StopCause),
/// Instructs the node to reload itself or one of its operators.
///
/// This event is currently only used for reloading Python operators that are
/// started by a `dora runtime` process. So this event should not be sent to normal
/// nodes yet.
Reload {
/// The ID of the operator that should be reloaded.
///
/// There is currently no case where `operator_id` is `None`.
operator_id: Option<OperatorId>,
},
/// Notifies the node about an unexpected error that happened inside Dora.
///
/// It's a good idea to output or log this error for debugging.
Error(String),
}

/// The reason for closing the event stream.
///
/// This enum is marked as `non_exhaustive` because we might add additional
/// variants in the future.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum StopCause {
/// The dataflow is stopped early after a `dora stop` command (or on `ctrl-c`).
///
/// Nodes should exit as soon as possible if they receive a stop event of
/// this type. Dora will kill nodes that keep running for too long after
/// receiving such a stop event.
Manual,
/// The event stream is closed because all of the node's inputs were closed.
AllInputsClosed,
}

pub enum RawData {
Empty,
Vec(AVec<u8, ConstAlign<128>>),
SharedMemory(SharedMemoryData),
}

impl RawData {
pub fn into_arrow_array(self, type_info: &ArrowTypeInfo) -> Result<arrow::array::ArrayData> {
let raw_buffer = match self {
RawData::Empty => return Ok(().into_arrow().into()),
RawData::Vec(data) => {
let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap();
let len = data.len();

unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
}
RawData::SharedMemory(data) => {
let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap();
let len = data.data.len();

unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
}
};

buffer_into_arrow_array(&raw_buffer, type_info)
}
}

pub struct SharedMemoryData {
pub data: MappedInputData,
pub _drop: flume::Sender<()>,
}

fn buffer_into_arrow_array(
raw_buffer: &arrow::buffer::Buffer,
type_info: &ArrowTypeInfo,
) -> eyre::Result<arrow::array::ArrayData> {
if raw_buffer.is_empty() {
return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
}

let mut buffers = Vec::new();
for BufferOffset { offset, len } in &type_info.buffer_offsets {
buffers.push(raw_buffer.slice_with_length(*offset, *len));
}

let mut child_data = Vec::new();
for child_type_info in &type_info.child_data {
child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
}

arrow::array::ArrayData::try_new(
type_info.data_type.clone(),
type_info.len,
type_info
.validity
.clone()
.map(arrow::buffer::Buffer::from_vec),
type_info.offset,
buffers,
child_data,
)
.context("Error creating Arrow array")
}

impl std::fmt::Debug for RawData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Data").finish_non_exhaustive()
}
}

pub struct MappedInputData {
memory: Box<Shmem>,
len: usize,
}

impl MappedInputData {
pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result<Self> {
let memory = Box::new(
ShmemConf::new()
.os_id(shared_memory_id)
.writable(false)
.open()
.wrap_err("failed to map shared memory input")?,
);
Ok(MappedInputData { memory, len })
}
}

impl std::ops::Deref for MappedInputData {
type Target = [u8];

fn deref(&self) -> &Self::Target {
unsafe { &self.memory.as_slice()[..self.len] }
}
}

unsafe impl Send for MappedInputData {}
unsafe impl Sync for MappedInputData {}

+ 28
- 0
apis/rust/node/src/event_stream/merged.rs View File

@@ -1,19 +1,33 @@
//! Merge external stream into an [`EventStream`][super::EventStream].
//!
//! Sometimes nodes need to listen to external events, in addition to Dora events.
//! This module provides support for that by providing the [`MergeExternal`] trait.

use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge;

/// A Dora event or an event from an external source.
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum MergedEvent<E> {
/// A Dora event
Dora(super::Event),
/// An external event
///
/// Yielded by the stream that was merged into the Dora [`EventStream`][super::EventStream].
External(E),
}

/// A general enum to represent a value of two possible types.
pub enum Either<A, B> {
/// Value is of the first type, type `A`.
First(A),
/// Value is of the second type, type `B`.
Second(B),
}

impl<A> Either<A, A> {
/// Unwraps an `Either` instance where both types are identical.
pub fn flatten(self) -> A {
match self {
Either::First(a) => a,
@@ -22,19 +36,33 @@ impl<A> Either<A, A> {
}
}

/// Allows merging an external event stream into an existing event stream.
// TODO: use impl trait return type once stable
pub trait MergeExternal<'a, E> {
/// The item type yielded from the merged stream.
type Item;

/// Merge the given stream into an existing event stream.
///
/// Returns a new event stream that yields items from both streams.
/// The ordering between the two streams is not guaranteed.
fn merge_external(
self,
external_events: impl Stream<Item = E> + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>;
}

/// Allows merging a sendable external event stream into an existing (sendable) event stream.
///
/// By implementing [`Send`], the streams can be sent to different threads.
pub trait MergeExternalSend<'a, E> {
/// The item type yielded from the merged stream.
type Item;

/// Merge the given stream into an existing event stream.
///
/// Returns a new event stream that yields items from both streams.
/// The ordering between the two streams is not guaranteed.
fn merge_external_send(
self,
external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a,


+ 89
- 7
apis/rust/node/src/event_stream/mod.rs View File

@@ -11,7 +11,7 @@ use dora_message::{
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
};
pub use event::{Event, MappedInputData, RawData, StopCause};
pub use event::{Event, StopCause};
use futures::{
future::{select, Either},
Stream, StreamExt,
@@ -19,22 +19,44 @@ use futures::{
use futures_timer::Delay;
use scheduler::{Scheduler, NON_INPUT_EVENT};

use self::{
event::SharedMemoryData,
thread::{EventItem, EventStreamThreadHandle},
use self::thread::{EventItem, EventStreamThreadHandle};
use crate::{
daemon_connection::DaemonChannel,
event_stream::data_conversion::{MappedInputData, RawData, SharedMemoryData},
};
use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::{Input, NodeId},
uhlc,
};
use eyre::{eyre, Context};

pub use scheduler::Scheduler as EventScheduler;

mod data_conversion;
mod event;
pub mod merged;
mod scheduler;
mod thread;

/// Asynchronous iterator over the incoming [`Event`]s destined for this node.
///
/// This struct [implements](#impl-Stream-for-EventStream) the [`Stream`] trait,
/// so you can use methods of the [`StreamExt`] trait
/// on this struct. A common pattern is `while let Some(event) = event_stream.next().await`.
///
/// Nodes should iterate over this event stream and react to events that they are interested in.
/// Typically, the most important event type is [`Event::Input`].
/// You don't need to handle all events, it's fine to ignore events that are not relevant to your node.
///
/// The event stream will close itself after a [`Event::Stop`] was received.
/// A manual `break` on [`Event::Stop`] is typically not needed.
/// _(You probably do need to use a manual `break` on stop events when using the
/// [`StreamExt::merge`][`futures_concurrency::stream::StreamExt::merge`] implementation on
/// [`EventStream`] to combine the stream with an external one.)_
///
/// Once the event stream finished, nodes should exit.
/// Note that Dora kills nodes that don't exit quickly after a [`Event::Stop`] of type
/// [`StopCause::Manual`] was received.
pub struct EventStream {
node_id: NodeId,
receiver: flume::r#async::RecvStream<'static, EventItem>,
@@ -157,16 +179,61 @@ impl EventStream {
})
}

/// wait for the next event on the events stream.
/// Synchronously waits for the next event.
///
/// Blocks the thread until the next event arrives.
/// Returns [`None`] once the event stream is closed.
///
/// For an asynchronous variant of this method see [`recv_async`][Self::recv_async].
///
/// ## Event Reordering
///
/// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
/// events might be returned in a different order than they occurred. For details, check the
/// documentation of the [`EventScheduler`] struct.
///
/// If you want to receive the events in their original chronological order, use the
/// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the
/// [`Stream`] trait).
pub fn recv(&mut self) -> Option<Event> {
futures::executor::block_on(self.recv_async())
}

/// wait for the next event on the events stream until timeout
/// Receives the next incoming [`Event`] synchronously with a timeout.
///
/// Blocks the thread until the next event arrives or the timeout is reached.
/// Returns a [`Event::Error`] if no event was received within the given duration.
///
/// Returns [`None`] once the event stream is closed.
///
/// For an asynchronous variant of this method see [`recv_async_timeout`][Self::recv_async_timeout].
///
/// ## Event Reordering
///
/// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
/// events might be returned in a different order than they occurred. For details, check the
/// documentation of the [`EventScheduler`] struct.
///
/// If you want to receive the events in their original chronological order, use the
/// asynchronous [`StreamExt::next`] method instead ([`EventStream`] implements the
/// [`Stream`] trait).
pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> {
futures::executor::block_on(self.recv_async_timeout(dur))
}

/// Receives the next incoming [`Event`] asynchronously, using an [`EventScheduler`] for fairness.
///
/// Returns [`None`] once the event stream is closed.
///
/// ## Event Reordering
///
/// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
/// events might be returned in a different order than they occurred. For details, check the
/// documentation of the [`EventScheduler`] struct.
///
/// If you want to receive the events in their original chronological order, use the
/// [`StreamExt::next`] method with a custom timeout future instead
/// ([`EventStream`] implements the [`Stream`] trait).
pub async fn recv_async(&mut self) -> Option<Event> {
loop {
if self.scheduler.is_empty() {
@@ -187,6 +254,21 @@ impl EventStream {
event.map(Self::convert_event_item)
}

/// Receives the next incoming [`Event`] asynchronously with a timeout.
///
/// Returns a [`Event::Error`] if no event was received within the given duration.
///
/// Returns [`None`] once the event stream is closed.
///
/// ## Event Reordering
///
/// This method uses an [`EventScheduler`] internally to **reorder events**. This means that the
/// events might be returned in a different order than they occurred. For details, check the
/// documentation of the [`EventScheduler`] struct.
///
/// If you want to receive the events in their original chronological order, use the
/// [`StreamExt::next`] method with a custom timeout future instead
/// ([`EventStream`] implements the [`Stream`] trait).
pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
match select(Delay::new(dur), pin!(self.recv_async())).await {
Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError(


+ 80
- 18
apis/rust/node/src/event_stream/scheduler.rs View File

@@ -3,27 +3,89 @@ use std::collections::{HashMap, VecDeque};
use dora_message::{daemon_to_node::NodeEvent, id::DataId};

use super::thread::EventItem;
pub const NON_INPUT_EVENT: &str = "dora/non_input_event";
pub(crate) const NON_INPUT_EVENT: &str = "dora/non_input_event";

// This scheduler will make sure that there is fairness between
// inputs.
//
// It is going to always make sure that the input that has not been used for the longest period is the first one to be used next.
//
// Ex:
// In the case that one input has a very high frequency and another one with a very slow frequency.\
//
// The Node will always alternate between the two inputs when each input is available
// Avoiding one input to be overwhelmingly present.
//
/// This scheduler will make sure that there is fairness between inputs.
///
/// The scheduler reorders events in the following way:
///
/// - **Non-input events are prioritized**
///
/// If the node received any events that are not input events, they are returned first. The
/// intention of this reordering is that the nodes can react quickly to dataflow-related events
/// even when their input queues are very full.
///
/// This reordering has some side effects that might be unexpected:
/// - An [`InputClosed`][super::Event::InputClosed] event might be yielded before the last
/// input events of that ID.
///
/// Usually, an `InputClosed` event indicates that there won't be any subsequent inputs
/// of a certain ID. This invariant does not hold anymore for a scheduled event stream.
/// - The [`Stop`][super::Event::Stop] event might not be the last event of the stream anymore.
///
/// Usually, the `Stop` event is the last event that is sent to a node before the event stream
/// is closed. Because of the reordering, the stream might return more events after a `Stop`
/// event.
/// - **Input events are grouped by ID** and yielded in a **least-recently used order (by ID)**.
///
/// The scheduler keeps a separate queue for each input ID, where the incoming input events are
/// placed in their chronological order. When yielding the next event, the scheduler iterates over
/// these queues in least-recently used order. This means that the queue corresponding to the
/// last yielded event will be checked last. The scheduler will return the oldest event from the
/// first non-empty queue.
///
/// The side effect of this change is that inputs events of different IDs are no longer in their
/// chronological order. This might lead to unexpected results for input events that are caused by
/// each other.
///
/// ## Example 1
/// Consider the case that one input has a very high frequency and another one with a very slow
/// frequency. The event stream will always alternate between the two inputs when each input is
/// available.
/// Without the scheduling, the high-frequency input would be returned much more often.
///
/// ## Example 2
/// Again, let's consider the case that one input has a very high frequency and the other has a
/// very slow frequency. This time, we define a small maximum queue sizes for the low-frequency
/// input, but a large queue size for the high-frequency one.
/// Using the scheduler, the event stream will always alternate between high and low-frequency
/// inputs as long as inputs of both types are available.
///
/// Without scheduling, the low-frequency input might never be yielded before
/// it's dropped because there is almost always an older high-frequency input available that is
/// yielded first. Once the low-frequency input would be the next one chronologically, it might
/// have been dropped already because the node received newer low-frequency inputs in the
/// meantime (the queue length is small). At this point, the next-oldest input is a high-frequency
/// input again.
///
/// ## Example 3
/// Consider a high-frequency camera input and a low-frequency bounding box input, which is based
/// on the latest camera image. The dataflow YAML file specifies a large queue size for the camera
/// input and a small queue size for the bounding box input.
///
/// With scheduling, the number of
/// buffered camera inputs might grow over time. As a result the camera inputs yielded from the
/// stream (in oldest-first order) are not synchronized with the bounding box inputs anymore. So
/// the node receives an up-to-date bounding box, but a considerably outdated image.
///
/// Without scheduling, the events are returned in chronological order. This time, the bounding
/// box might be slightly outdated if the camera sent new images before the bounding box was
/// ready. However, the time difference between the two input types is independent of the
/// queue size this time.
///
/// (If a perfect matching bounding box is required, we recommend to forward the input image as
/// part of the bounding box output. This way, the receiving node only needs to subscribe to one
/// input so no mismatches can happen.)
#[derive(Debug)]
pub struct Scheduler {
last_used: VecDeque<DataId>, // Tracks the last-used event ID
event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>, // Tracks events per ID
/// Tracks the last-used event ID
last_used: VecDeque<DataId>,
/// Tracks events per ID
event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>,
}

impl Scheduler {
pub fn new(event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>) -> Self {
pub(crate) fn new(event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>) -> Self {
let topic = VecDeque::from_iter(
event_queues
.keys()
@@ -36,7 +98,7 @@ impl Scheduler {
}
}

pub fn add_event(&mut self, event: EventItem) {
pub(crate) fn add_event(&mut self, event: EventItem) {
let event_id = match &event {
EventItem::NodeEvent {
event:
@@ -63,7 +125,7 @@ impl Scheduler {
}
}

pub fn next(&mut self) -> Option<EventItem> {
pub(crate) fn next(&mut self) -> Option<EventItem> {
// Retrieve message from the non input event first that have priority over input message.
if let Some((_size, queue)) = self
.event_queues
@@ -89,7 +151,7 @@ impl Scheduler {
None
}

pub fn is_empty(&self) -> bool {
pub(crate) fn is_empty(&self) -> bool {
self.event_queues
.iter()
.all(|(_id, (_size, queue))| queue.is_empty())


+ 78
- 10
apis/rust/node/src/lib.rs View File

@@ -1,18 +1,85 @@
//! The custom node API allow you to integrate `dora` into your application.
//! It allows you to retrieve input and send output in any fashion you want.
//! This crate enables you to create nodes for the [Dora] dataflow framework.
//!
//! Try it out with:
//! [Dora]: https://dora-rs.ai/
//!
//! ```bash
//! dora new node --kind node
//! ```
//! ## The Dora Framework
//!
//! Dora is a dataflow frame work that models applications as a directed graph, with nodes
//! representing operations and edges representing data transfer.
//! The layout of the dataflow graph is defined through a YAML file in Dora.
//! For details, see our [Dataflow Specification](https://dora-rs.ai/docs/api/dataflow-config/)
//! chapter.
//!
//! Dora nodes are typically spawned by the Dora framework, instead of spawning them manually.
//! If you want to spawn a node manually, define it as a [_dynamic_ node](#dynamic-nodes).
//!
//! You can also generate a dora rust project with
//! ## Normal Usage
//!
//! ```bash
//! dora new project_xyz --kind dataflow
//! In order to connect your executable to Dora, you need to initialize a [`DoraNode`].
//! For standard nodes, the recommended initialization function is [`init_from_env`][`DoraNode::init_from_env`].
//! This function will return two values, a [`DoraNode`] instance and an [`EventStream`]:
//!
//! ```no_run
//! use dora_node_api::DoraNode;
//!
//! let (mut node, mut events) = DoraNode::init_from_env()?;
//! # Ok::<(), eyre::Report>(())
//! ```
//!
//! You can use the `node` instance to send outputs and retrieve information about the node and
//! the dataflow. The `events` stream yields the inputs that the node defines in the dataflow
//! YAML file and other incoming events.
//!
//! ### Sending Outputs
//!
//! The [`DoraNode`] instance enables you to send outputs in different formats.
//! For best performance, use the [Arrow](https://arrow.apache.org/docs/index.html) data format
//! and one of the output functions that utilizes shared memory.
//!
//! ### Receiving Events
//!
//! The [`EventStream`] is an [`AsyncIterator`][std::async_iter::AsyncIterator] that yields the incoming [`Event`]s.
//!
//! Nodes should iterate over this event stream and react to events that they are interested in.
//! Typically, the most important event type is [`Event::Input`].
//! You don't need to handle all events, it's fine to ignore events that are not relevant to your node.
//!
//! The event stream will close itself after a [`Event::Stop`] was received.
//! A manual `break` on [`Event::Stop`] is typically not needed.
//! _(You probably do need to use a manual `break` on stop events when using the
//! [`StreamExt::merge`][`futures_concurrency::stream::StreamExt::merge`] implementation on
//! [`EventStream`] to combine the stream with an external one.)_
//!
//! Once the event stream finished, nodes should exit.
//! Note that Dora kills nodes that don't exit quickly after a [`Event::Stop`] of type
//! [`StopCause::Manual`] was received.
//!
//!
//!
//! ## Dynamic Nodes
//!
//! <div class="warning">
//!
//! Dynamic nodes have certain [limitations](#limitations). Use with care.
//!
//! </div>
//!
//! Nodes can be defined as `dynamic` by setting their `path` attribute to `dynamic` in the
//! dataflow YAML file. Dynamic nodes are not spawned by the Dora framework and need to be started
//! manually.
//!
//! Dynamic nodes cannot use the [`DoraNode::init_from_env`] function for initialization.
//! Instead, they can be initialized through the [`DoraNode::init_from_node_id`] function.
//!
//! ### Limitations
//!
//! - Dynamic nodes **don't work with `dora run`**.
//! - As dynamic nodes are identified by their node ID, this **ID must be unique**
//! across all running dataflows.
//! - For distributed dataflows, nodes need to be manually spawned on the correct machine.

#![warn(missing_docs)]

pub use arrow;
pub use dora_arrow_convert::*;
pub use dora_core::{self, uhlc};
@@ -20,8 +87,9 @@ pub use dora_message::{
metadata::{Metadata, MetadataParameters, Parameter},
DataflowId,
};
pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData, StopCause};
pub use event_stream::{merged, Event, EventScheduler, EventStream, StopCause};
pub use flume::Receiver;
pub use futures;
pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD};

mod daemon_connection;


+ 50
- 4
apis/rust/node/src/node/arrow_utils.rs View File

@@ -1,6 +1,11 @@
//! Utility functions for converting Arrow arrays to/from raw data.
//!
use arrow::array::{ArrayData, BufferSpec};
use dora_message::metadata::{ArrowTypeInfo, BufferOffset};
use eyre::Context;

/// Calculates the data size in bytes required for storing a continuous copy of the given Arrow
/// array.
pub fn required_data_size(array: &ArrayData) -> usize {
let mut next_offset = 0;
required_data_size_inner(array, &mut next_offset);
@@ -10,8 +15,8 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) {
let layout = arrow::array::layout(array.data_type());
for (buffer, spec) in array.buffers().iter().zip(&layout.buffers) {
// consider alignment padding
if let BufferSpec::FixedWidth { alignment, .. } = spec {
*next_offset = (*next_offset).div_ceil(*alignment) * alignment;
if let BufferSpec::FixedWidth { alignment, .. } = *spec {
*next_offset = (*next_offset).div_ceil(alignment) * alignment;
}
*next_offset += buffer.len();
}
@@ -20,6 +25,12 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) {
}
}

/// Copy the given Arrow array into the provided buffer.
///
/// If the Arrow array consists of multiple buffers, they are placed continuously in the target
/// buffer (there might be some padding for alignment)
///
/// Panics if the buffer is not large enough.
pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo {
let mut next_offset = 0;
copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array)
@@ -41,8 +52,8 @@ fn copy_array_into_sample_inner(
*next_offset,
);
// add alignment padding
if let BufferSpec::FixedWidth { alignment, .. } = spec {
*next_offset = (*next_offset).div_ceil(*alignment) * alignment;
if let BufferSpec::FixedWidth { alignment, .. } = *spec {
*next_offset = (*next_offset).div_ceil(alignment) * alignment;
}

target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice());
@@ -69,3 +80,38 @@ fn copy_array_into_sample_inner(
child_data,
}
}

/// Tries to convert the given raw Arrow buffer into an Arrow array.
///
/// The `type_info` is required for decoding the `raw_buffer` correctly.
pub fn buffer_into_arrow_array(
raw_buffer: &arrow::buffer::Buffer,
type_info: &ArrowTypeInfo,
) -> eyre::Result<arrow::array::ArrayData> {
if raw_buffer.is_empty() {
return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
}

let mut buffers = Vec::new();
for BufferOffset { offset, len } in &type_info.buffer_offsets {
buffers.push(raw_buffer.slice_with_length(*offset, *len));
}

let mut child_data = Vec::new();
for child_type_info in &type_info.child_data {
child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
}

arrow::array::ArrayData::try_new(
type_info.data_type.clone(),
type_info.len,
type_info
.validity
.clone()
.map(arrow::buffer::Buffer::from_vec),
type_info.offset,
buffers,
child_data,
)
.context("Error creating Arrow array")
}

+ 84
- 5
apis/rust/node/src/node/mod.rs View File

@@ -42,6 +42,20 @@ pub mod arrow_utils;
mod control_channel;
mod drop_stream;

/// The data size threshold at which we start using shared memory.
///
/// Shared memory works by sharing memory pages. This means that the smallest
/// memory region that can be shared is one memory page, which is typically
/// 4KiB.
///
/// Using shared memory for messages smaller than the page size still requires
/// sharing a full page, so we have some memory overhead. We also have some
/// performance overhead because we need to issue multiple syscalls. For small
/// messages it is faster to send them over a traditional TCP stream (or similar).
///
/// This hardcoded threshold value specifies which messages are sent through
/// shared memory. Messages that are smaller than this threshold are sent through
/// TCP.
pub const ZERO_COPY_THRESHOLD: usize = 4096;

#[allow(dead_code)]
@@ -50,6 +64,10 @@ enum TokioRuntime {
Handle(Handle),
}

/// Allows sending outputs and retrieving node information.
///
/// The main purpose of this struct is to send outputs via Dora. There are also functions available
/// for retrieving the node configuration.
pub struct DoraNode {
id: NodeId,
dataflow_id: DataflowId,
@@ -67,7 +85,11 @@ pub struct DoraNode {
}

impl DoraNode {
/// Initiate a node from environment variables set by `dora-coordinator`
/// Initiate a node from environment variables set by the Dora daemon.
///
/// This is the recommended initialization function for Dora nodes, which are spawned by
/// Dora daemon instances.
///
///
/// ```no_run
/// use dora_node_api::DoraNode;
@@ -94,6 +116,8 @@ impl DoraNode {

/// Initiate a node from a dataflow id and a node id.
///
/// This initialization function should be used for [_dynamic nodes_](index.html#dynamic-nodes).
///
/// ```no_run
/// use dora_node_api::DoraNode;
/// use dora_node_api::dora_core::config::NodeId;
@@ -126,6 +150,11 @@ impl DoraNode {
}
}

/// Dynamic initialization function for nodes that are sometimes used as dynamic nodes.
///
/// This function first tries initializing the traditional way through
/// [`init_from_env`][Self::init_from_env]. If this fails, it falls back to
/// [`init_from_node_id`][Self::init_from_node_id].
pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
if std::env::var("DORA_NODE_CONFIG").is_ok() {
info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`");
@@ -135,6 +164,8 @@ impl DoraNode {
}
}

/// Internal initialization routine that should not be used outside of Dora.
#[doc(hidden)]
#[tracing::instrument]
pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
let NodeConfig {
@@ -219,7 +250,8 @@ impl DoraNode {
}
}

/// Send data from the node to the other nodes.
/// Send raw data from the node to the other nodes.
///
/// We take a closure as an input to enable zero copy on send.
///
/// ```no_run
@@ -242,6 +274,8 @@ impl DoraNode {
/// }).expect("Could not send output");
/// ```
///
/// Ignores the output if the given `output_id` is not specified as node output in the dataflow
/// configuration file.
pub fn send_output_raw<F>(
&mut self,
output_id: DataId,
@@ -263,6 +297,14 @@ impl DoraNode {
self.send_output_sample(output_id, type_info, parameters, Some(sample))
}

/// Sends the give Arrow array as an output message.
///
/// Uses shared memory for efficient data transfer if suitable.
///
/// This method might copy the message once to move it to shared memory.
///
/// Ignores the output if the given `output_id` is not specified as node output in the dataflow
/// configuration file.
pub fn send_output(
&mut self,
output_id: DataId,
@@ -286,6 +328,12 @@ impl DoraNode {
Ok(())
}

/// Send the given raw byte data as output.
///
/// Might copy the data once to move it into shared memory.
///
/// Ignores the output if the given `output_id` is not specified as node output in the dataflow
/// configuration file.
pub fn send_output_bytes(
&mut self,
output_id: DataId,
@@ -301,6 +349,12 @@ impl DoraNode {
})
}

/// Send the give raw byte data with the provided type information.
///
/// It is recommended to use a function like [`send_output`][Self::send_output] instead.
///
/// Ignores the output if the given `output_id` is not specified as node output in the dataflow
/// configuration file.
pub fn send_typed_output<F>(
&mut self,
output_id: DataId,
@@ -322,6 +376,12 @@ impl DoraNode {
self.send_output_sample(output_id, type_info, parameters, Some(sample))
}

/// Sends the given [`DataSample`] as output, combined with the given type information.
///
/// It is recommended to use a function like [`send_output`][Self::send_output] instead.
///
/// Ignores the output if the given `output_id` is not specified as node output in the dataflow
/// configuration file.
pub fn send_output_sample(
&mut self,
output_id: DataId,
@@ -350,32 +410,46 @@ impl DoraNode {
Ok(())
}

pub fn close_outputs(&mut self, outputs: Vec<DataId>) -> eyre::Result<()> {
for output_id in &outputs {
/// Report the given outputs IDs as closed.
///
/// The node is not allowed to send more outputs with the closed IDs.
///
/// Closing outputs early can be helpful to receivers.
pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> eyre::Result<()> {
for output_id in &outputs_ids {
if !self.node_config.outputs.remove(output_id) {
eyre::bail!("unknown output {output_id}");
}
}

self.control_channel
.report_closed_outputs(outputs)
.report_closed_outputs(outputs_ids)
.wrap_err("failed to report closed outputs to daemon")?;

Ok(())
}

/// Returns the ID of the node as specified in the dataflow configuration file.
pub fn id(&self) -> &NodeId {
&self.id
}

/// Returns the unique identifier for the running dataflow instance.
///
/// Dora assigns each dataflow instance a random identifier when started.
pub fn dataflow_id(&self) -> &DataflowId {
&self.dataflow_id
}

/// Returns the input and output configuration of this node.
pub fn node_config(&self) -> &NodeRunConfig {
&self.node_config
}

/// Allocates a [`DataSample`] of the specified size.
///
/// The data sample will use shared memory when suitable to enable efficient data transfer
/// when sending an output message.
pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
let data = if data_len >= ZERO_COPY_THRESHOLD {
// create shared memory region
@@ -514,6 +588,11 @@ impl Drop for DoraNode {
}
}

/// A data region suitable for sending as an output message.
///
/// The region is stored in shared memory when suitable to enable efficient data transfer.
///
/// `DataSample` implements the [`Deref`] and [`DerefMut`] traits to read and write the mapped data.
pub struct DataSample {
inner: DataSampleInner,
len: usize,


+ 49
- 0
binaries/cli/src/command/build/mod.rs View File

@@ -1,3 +1,52 @@
//! Provides the `dora build` command.
//!
//! The `dora build` command works like this:
//!
//! - Dataflows can specify a `build` command for each node in their YAML definition
//! - Dora will run the `build` command when `dora build` is invoked
//! - If the dataflow is distributed across multiple machines, each `build` command will be run the target machine of the corresponding node.
//! - i.e. the machine specified under the `deploy` key
//! - this requires a connection to the dora coordinator, so you need to specify the coordinator IP/port for this
//! - to run the build commands of all nodes _locally_, you can use `dora build --local`
//! - If the build command does not specify any `deploy` keys, all build commands will be run locally (i.e. `dora build` behaves like `dora build --local`)
//!
//! #### Git Source
//!
//! - Nodes can have a git repository as source
//! - set the `git` config key to the URL of the repository
//! - by default, the default branch is used
//! - you can also specify a specific `branch` name
//! - alternatively, you can specify a `tag` name or a `rev` key with the commit hash
//! - you can only specify one of `branch`, `tag`, and `rev`, otherwise an error will occur
//! - Dora will automatically clone and checkout the requested branch/tag/commit on `dora build`
//! - the `build` command will be run after cloning
//! - for distributed dataflows, the clone/checkout will happen on the target machine
//! - subsequent `dora build` command will automatically fetch the latest changes for nodes
//! - not when using `tag` or `rev`, because these are not expected to change
//! - after fetching changes, the `build` command will be executed again
//! - _tip:_ use a build tool that supports incremental builds (e.g. `cargo`) to make this rebuild faster
//!
//! The **working directory** will be set to the git repository.
//! This means that both the `build` and `path` keys will be run from this folder.
//! This allows you to use relative paths.
//!
//! #### Example
//!
//! ```yml
//! nodes:
//! - id: rust-node
//! # URL of your repository
//! git: https://github.com/dora-rs/dora.git
//! # the build command that should be invoked after cloning
//! build: cargo build -p rust-dataflow-example-node
//! # path to the executable that should be run on start
//! path: target/debug/rust-dataflow-example-node
//! inputs:
//! tick: dora/timer/millis/10
//! outputs:
//! - random
//! ```

use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::{
descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt},


+ 7
- 0
binaries/cli/src/command/run.rs View File

@@ -1,3 +1,10 @@
//! The `dora run` command is a quick and easy way to run a dataflow locally.
//! It does not support distributed dataflows and will throw an error if there are any `deploy` keys in the YAML file.
//!
//! The `dora run` command does not interact with any `dora coordinator` or `dora daemon` instances, or with any other parallel `dora run` commands.
//!
//! Use `dora build --local` or manual build commands to build your nodes.

use super::Executable;
use crate::{
common::{handle_dataflow_result, resolve_dataflow},


+ 4
- 0
binaries/cli/src/command/start/mod.rs View File

@@ -1,3 +1,7 @@
//! The `dora start` command is used to spawn a dataflow in a pre-existing _dora network_. To create a dora network, spawn a `dora coordinator` and one or multiple `dora daemon` instances.
//!
//! The `dora start` command does not run any build commands, nor update git dependencies or similar. Use `dora build` for that.

use super::{default_tracing, Executable};
use crate::{
command::start::attach::attach_dataflow,


+ 9
- 0
libraries/arrow-convert/src/lib.rs View File

@@ -1,3 +1,7 @@
//! Provides functions for converting between Apache Arrow arrays and Rust data types.

#![warn(missing_docs)]

use arrow::array::{
Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array,
UInt32Array, UInt8Array,
@@ -10,12 +14,16 @@ use std::ops::{Deref, DerefMut};
mod from_impls;
mod into_impls;

/// Data that can be converted to an Arrow array.
pub trait IntoArrow {
/// The Array type that the data can be converted to.
type A: Array;

/// Convert the data into an Arrow array.
fn into_arrow(self) -> Self::A;
}

/// Wrapper type for an Arrow [`ArrayRef`](arrow::array::ArrayRef).
#[derive(Debug)]
pub struct ArrowData(pub arrow::array::ArrayRef);

@@ -35,6 +43,7 @@ impl DerefMut for ArrowData {

macro_rules! register_array_handlers {
($(($variant:path, $array_type:ty, $type_name:expr)),* $(,)?) => {
/// Tries to convert the given Arrow array into a `Vec` of integers or floats.
pub fn into_vec<T>(data: &ArrowData) -> Result<Vec<T>>
where
T: Copy + NumCast + 'static,


+ 1
- 0
libraries/message/src/config.rs View File

@@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};

pub use crate::id::{DataId, NodeId, OperatorId};

/// Contains the input and output configuration of the node.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
pub struct NodeRunConfig {
/// Inputs for the nodes as a map from input ID to `node_id/output_id`.


+ 3
- 0
libraries/message/src/lib.rs View File

@@ -26,6 +26,9 @@ pub use arrow_data;
pub use arrow_schema;
use uuid::{Timestamp, Uuid};

/// Unique identifier for a dataflow instance.
///
/// Dora assigns each dataflow instance a unique ID on start.
pub type DataflowId = uuid::Uuid;

#[derive(


+ 5
- 0
libraries/message/src/metadata.rs View File

@@ -3,6 +3,9 @@ use std::collections::BTreeMap;
use arrow_schema::DataType;
use serde::{Deserialize, Serialize};

/// Additional data that is sent as part of output messages.
///
/// Includes a timestamp, type information, and additional user-provided parameters.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Metadata {
metadata_version: u16,
@@ -42,6 +45,7 @@ impl Metadata {
}
}

/// Additional metadata that can be sent as part of output messages.
pub type MetadataParameters = BTreeMap<String, Parameter>;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -55,6 +59,7 @@ pub struct ArrowTypeInfo {
pub child_data: Vec<ArrowTypeInfo>,
}

/// A metadata parameter that can be sent as part of output messages.
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum Parameter {
Bool(bool),


Loading…
Cancel
Save