Browse Source

Update Rust APIs and rust-dataflow-example to use arrow types

tags/v0.3.0-rc
Philipp Oppermann 2 years ago
parent
commit
6154415cbc
Failed to extract signature
30 changed files with 275 additions and 254 deletions
  1. +6
    -4
      Cargo.lock
  2. +1
    -1
      apis/c++/node/src/lib.rs
  3. +1
    -0
      apis/c/node/Cargo.toml
  4. +7
    -5
      apis/c/node/src/lib.rs
  5. +1
    -1
      apis/c/operator/operator_api.h
  6. +9
    -27
      apis/c/operator/operator_types.h
  7. +6
    -11
      apis/python/node/src/lib.rs
  8. +7
    -85
      apis/python/operator/src/lib.rs
  9. +2
    -53
      apis/rust/node/src/event_stream/event.rs
  10. +7
    -2
      apis/rust/node/src/event_stream/mod.rs
  11. +3
    -2
      apis/rust/node/src/lib.rs
  12. +60
    -0
      apis/rust/node/src/node/arrow_utils.rs
  13. +28
    -3
      apis/rust/node/src/node/mod.rs
  14. +1
    -1
      apis/rust/operator/macros/src/lib.rs
  15. +14
    -4
      apis/rust/operator/src/lib.rs
  16. +22
    -7
      apis/rust/operator/src/raw.rs
  17. +4
    -1
      apis/rust/operator/types/Cargo.toml
  18. +10
    -7
      apis/rust/operator/types/src/lib.rs
  19. +1
    -1
      binaries/cli/src/template/c/operator/operator-template.c
  20. +1
    -1
      binaries/runtime/Cargo.toml
  21. +1
    -1
      binaries/runtime/src/lib.rs
  22. +4
    -3
      binaries/runtime/src/operator/python.rs
  23. +34
    -13
      binaries/runtime/src/operator/shared_lib.rs
  24. +2
    -2
      examples/benchmark/node/src/main.rs
  25. +1
    -1
      examples/c++-dataflow/operator-c-api/operator.cc
  26. +1
    -1
      examples/c-dataflow/operator.c
  27. +1
    -1
      examples/multiple-daemons/node/src/main.rs
  28. +12
    -5
      examples/rust-dataflow/node/src/main.rs
  29. +19
    -8
      examples/rust-dataflow/operator/src/lib.rs
  30. +9
    -3
      examples/rust-dataflow/sink/src/main.rs

+ 6
- 4
Cargo.lock View File

@@ -1511,6 +1511,7 @@ dependencies = [
name = "dora-node-api-c"
version = "0.2.4"
dependencies = [
"arrow-array",
"dora-node-api",
"eyre",
"flume",
@@ -1594,6 +1595,7 @@ dependencies = [
name = "dora-operator-api-types"
version = "0.2.4"
dependencies = [
"arrow",
"safer-ffi",
]

@@ -4618,9 +4620,9 @@ checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072"

[[package]]
name = "safer-ffi"
version = "0.1.0"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62fd645a8c0b4a71f0883dce1bc48e358fcc02b99c83613f62ede5660b3572c1"
checksum = "4f47f1d2f33598dab2baa9517fffa1cf722f2e3a30633f2a230f20f9da67c564"
dependencies = [
"inventory",
"libc",
@@ -4635,9 +4637,9 @@ dependencies = [

[[package]]
name = "safer_ffi-proc_macros"
version = "0.1.0"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d39be56c8f7e5878594154dd0d2c03e2b12c66fa5a5ba3fe302412bd89bad774"
checksum = "b08f58cf71a58bda5734758eb20051cdb66c06c9243badbc45092ced1be834df"
dependencies = [
"macro_rules_attribute",
"prettyplease",


+ 1
- 1
apis/c++/node/src/lib.rs View File

@@ -92,7 +92,7 @@ pub struct OutputSender(dora_node_api::DoraNode);
fn send_output(sender: &mut Box<OutputSender>, id: String, data: &[u8]) -> ffi::DoraResult {
let result = sender
.0
.send_output(id.into(), Default::default(), data.len(), |out| {
.send_output_raw(id.into(), Default::default(), data.len(), |out| {
out.copy_from_slice(data)
});
let error = match result {


+ 1
- 0
apis/c/node/Cargo.toml View File

@@ -21,6 +21,7 @@ tracing = ["dora-node-api/tracing"]
eyre = "0.6.8"
flume = "0.10.14"
tracing = "0.1.33"
arrow-array = "45.0.0"

[dependencies.dora-node-api]
workspace = true

+ 7
- 5
apis/c/node/src/lib.rs View File

@@ -1,6 +1,7 @@
#![deny(unsafe_op_in_unsafe_fn)]

use dora_node_api::{DoraNode, Event, EventStream};
use arrow_array::BinaryArray;
use dora_node_api::{arrow::array::AsArray, DoraNode, Event, EventStream};
use eyre::Context;
use std::{ffi::c_void, ptr, slice};

@@ -170,9 +171,10 @@ pub unsafe extern "C" fn read_dora_input_data(
let event: &Event = unsafe { &*event.cast() };
match event {
Event::Input { data, .. } => {
if let Ok(data) = data.as_byte_slice() {
let ptr = data.as_ptr();
let len = data.len();
let data: Option<&BinaryArray> = data.as_binary_opt();
if let Some(data) = data {
let ptr = data.value(0).as_ptr();
let len = data.value(0).len();
unsafe {
*out_ptr = ptr;
*out_len = len;
@@ -250,7 +252,7 @@ unsafe fn try_send_output(
let data = unsafe { slice::from_raw_parts(data_ptr, data_len) };
context
.node
.send_output(output_id, Default::default(), data.len(), |out| {
.send_output_raw(output_id, Default::default(), data.len(), |out| {
out.copy_from_slice(data);
})
}

+ 1
- 1
apis/c/operator/operator_api.h View File

@@ -19,7 +19,7 @@ extern "C"
EXPORT DoraResult_t dora_drop_operator(void *operator_context);

EXPORT OnEventResult_t dora_on_event(
const RawEvent_t *event,
RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context);



+ 9
- 27
apis/c/operator/operator_types.h View File

@@ -86,22 +86,7 @@ typedef struct OnEventResult {
} OnEventResult_t;

/** <No documentation available> */
typedef struct Metadata {
/** <No documentation available> */
Vec_uint8_t open_telemetry_context;
} Metadata_t;

/** <No documentation available> */
typedef struct Input {
/** <No documentation available> */
Vec_uint8_t id;

/** <No documentation available> */
Vec_uint8_t data;

/** <No documentation available> */
Metadata_t metadata;
} Input_t;
typedef struct Input Input_t;


#include <stdbool.h>
@@ -122,16 +107,7 @@ typedef struct RawEvent {
} RawEvent_t;

/** <No documentation available> */
typedef struct Output {
/** <No documentation available> */
Vec_uint8_t id;

/** <No documentation available> */
Vec_uint8_t data;

/** <No documentation available> */
Metadata_t metadata;
} Output_t;
typedef struct Output Output_t;

/** \brief
* `Arc<dyn Send + Sync + Fn(A1) -> Ret>`
@@ -159,9 +135,15 @@ typedef struct SendOutput {
/** <No documentation available> */
typedef struct DoraOnEvent {
/** <No documentation available> */
OnEventResult_t (*on_event)(RawEvent_t const *, SendOutput_t const *, void *);
OnEventResult_t (*on_event)(RawEvent_t *, SendOutput_t const *, void *);
} DoraOnEvent_t;

/** <No documentation available> */
typedef struct Metadata {
/** <No documentation available> */
Vec_uint8_t open_telemetry_context;
} Metadata_t;


#ifdef __cplusplus
} /* extern \"C\" */


+ 6
- 11
apis/python/node/src/lib.rs View File

@@ -3,9 +3,7 @@
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_node_api::merged::{MergeExternalSend, MergedEvent};
use dora_node_api::{DoraNode, EventStream};
use dora_operator_api_python::{
copy_array_into_sample, pydict_to_metadata, required_data_size, PyEvent,
};
use dora_operator_api_python::{pydict_to_metadata, PyEvent};
use dora_ros2_bridge_python::Ros2Subscription;
use eyre::Context;
use futures::{Stream, StreamExt};
@@ -100,14 +98,11 @@ impl Node {
.send_output_bytes(output_id.into(), parameters, data.len(), data)
.wrap_err("failed to send output")?;
} else if let Ok(arrow_array) = arrow::array::ArrayData::from_pyarrow(data.as_ref(py)) {
let total_len = required_data_size(&arrow_array);

let mut sample = self.node.allocate_data_sample(total_len)?;
let type_info = copy_array_into_sample(&mut sample, &arrow_array)?;

self.node
.send_output_sample(output_id.into(), type_info, parameters, Some(sample))
.wrap_err("failed to send output")?;
self.node.send_output(
output_id.into(),
parameters,
arrow::array::make_array(arrow_array),
)?;
} else {
eyre::bail!("invalid `data` type, must by `PyBytes` or arrow array")
}


+ 7
- 85
apis/python/operator/src/lib.rs View File

@@ -1,22 +1,12 @@
use std::sync::Arc;

use arrow::{array::ArrayData, pyarrow::ToPyArrow};
use dora_node_api::{
dora_core::message::{ArrowTypeInfo, BufferOffset},
merged::MergedEvent,
Data, Event, Metadata, MetadataParameters,
};
use arrow::{array::ArrayRef, pyarrow::ToPyArrow};
use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters};
use eyre::{Context, Result};
use pyo3::{
exceptions::PyLookupError,
prelude::*,
types::{PyBytes, PyDict},
};
use pyo3::{exceptions::PyLookupError, prelude::*, types::PyDict};

#[pyclass]
pub struct PyEvent {
event: MergedEvent<PyObject>,
data: Option<Arc<Data>>,
data: Option<ArrayRef>,
}

#[pymethods]
@@ -34,7 +24,6 @@ impl PyEvent {
let value = match key {
"type" => Some(Self::ty(event).to_object(py)),
"id" => Self::id(event).map(|v| v.to_object(py)),
"data" => self.data(py),
"value" => self.value(py)?,
"metadata" => Self::metadata(event, py),
"error" => Self::error(event).map(|v| v.to_object(py)),
@@ -77,21 +66,12 @@ impl PyEvent {
}
}

/// Returns the payload of an input event as a `PyBytes` object (if any).
fn data(&self, py: Python<'_>) -> Option<PyObject> {
self.data.as_ref().map(|data| PyBytes::new(py, data).into())
}

/// Returns the payload of an input event as an arrow array (if any).
fn value(&self, py: Python<'_>) -> PyResult<Option<PyObject>> {
match (&self.event, &self.data) {
(MergedEvent::Dora(Event::Input { metadata, .. }), Some(data)) => {
let array = data
.clone()
.into_arrow_array(&metadata.type_info)
.context("Could not create arrow array")?;
(MergedEvent::Dora(Event::Input { .. }), Some(data)) => {
// TODO: Does this call leak data?
let array_data = array.to_pyarrow(py)?;
let array_data = data.to_data().to_pyarrow(py)?;
Ok(Some(array_data))
}
_ => Ok(None),
@@ -122,7 +102,7 @@ impl From<Event> for PyEvent {
impl From<MergedEvent<PyObject>> for PyEvent {
fn from(mut event: MergedEvent<PyObject>) -> Self {
let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event {
data.take().map(Arc::new)
Some(data.clone())
} else {
None
};
@@ -166,61 +146,3 @@ pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> &'a PyD
.unwrap();
dict
}

pub fn copy_array_into_sample(
target_buffer: &mut [u8],
arrow_array: &ArrayData,
) -> eyre::Result<ArrowTypeInfo> {
let mut next_offset = 0;
copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array)
}

fn copy_array_into_sample_inner(
target_buffer: &mut [u8],
next_offset: &mut usize,
arrow_array: &ArrayData,
) -> eyre::Result<ArrowTypeInfo> {
let mut buffer_offsets = Vec::new();
for buffer in arrow_array.buffers().iter() {
let len = buffer.len();
assert!(
target_buffer[*next_offset..].len() >= len,
"target buffer too small (total_len: {}, offset: {}, required_len: {len})",
target_buffer.len(),
*next_offset,
);
target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice());
buffer_offsets.push(BufferOffset {
offset: *next_offset,
len,
});
*next_offset += len;
}

let mut child_data = Vec::new();
for child in arrow_array.child_data() {
let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child)?;
child_data.push(child_type_info);
}

Ok(ArrowTypeInfo {
data_type: arrow_array.data_type().clone(),
len: arrow_array.len(),
null_count: arrow_array.null_count(),
validity: arrow_array.nulls().map(|b| b.validity().to_owned()),
offset: arrow_array.offset(),
buffer_offsets,
child_data,
})
}

pub fn required_data_size(array: &ArrayData) -> usize {
let mut size = 0;
for buffer in array.buffers() {
size += buffer.len();
}
for child in array.child_data() {
size += required_data_size(child);
}
size
}

+ 2
- 53
apis/rust/node/src/event_stream/event.rs View File

@@ -1,11 +1,10 @@
use std::{ptr::NonNull, sync::Arc};

use arrow_schema::DataType;
use dora_core::{
config::{DataId, OperatorId},
message::{ArrowTypeInfo, BufferOffset, Metadata},
};
use eyre::{Context, ContextCompat, Result};
use eyre::{Context, Result};
use shared_memory_extended::{Shmem, ShmemConf};

#[derive(Debug)]
@@ -18,7 +17,7 @@ pub enum Event {
Input {
id: DataId,
metadata: Metadata,
data: ArrowData,
data: arrow::array::ArrayRef,
},
InputClosed {
id: DataId,
@@ -26,56 +25,6 @@ pub enum Event {
Error(String),
}

#[derive(Debug)]
pub struct ArrowData(arrow::array::ArrayData);

impl ArrowData {
pub(super) fn new(data: Option<Data>, metadata: &Metadata) -> eyre::Result<ArrowData> {
let raw_data = Arc::new(data.unwrap_or(Data::Vec(Vec::new())));
raw_data.into_arrow_array(&metadata.type_info).map(Self)
}

pub fn as_byte_slice(&self) -> eyre::Result<&[u8]> {
let first_buffer = self.0.buffers().iter().next().context("no buffers")?;
if self.0.buffers().len() != 1 {
eyre::bail!(
"must have exactly one buffer, has {} buffers",
self.0.buffers().len()
);
}
if !self.0.child_data().is_empty() {
eyre::bail!(
"should have no child data, has {}",
self.0.child_data().len()
);
}

if !matches!(self.0.data_type(), DataType::UInt8) {
eyre::bail!("should have DataType::UInt8, has {}", self.0.data_type());
}
if self.0.len() != first_buffer.len() {
eyre::bail!(
"len ({}) should match len of first buffer ({})",
self.0.len(),
first_buffer.len()
);
}

if self.0.null_count() != 0 {
eyre::bail!("should have no nulls, has {}", self.0.null_count());
}

if self.0.offset() != 0 {
eyre::bail!(
"should start at offset 0, starts at offset {}",
self.0.offset()
);
}

Ok(first_buffer.as_slice())
}
}

pub(super) enum Data {
Vec(Vec<u8>),
SharedMemory {


+ 7
- 2
apis/rust/node/src/event_stream/mod.rs View File

@@ -1,6 +1,6 @@
use std::sync::Arc;

pub use event::{ArrowData, Event, MappedInputData};
pub use event::{Event, MappedInputData};
use futures::{Stream, StreamExt};

use self::{
@@ -134,7 +134,12 @@ impl EventStream {
})
},
};
let data = data.and_then(|data| ArrowData::new(data, &metadata));
let data = data.and_then(|data| {
let raw_data = Arc::new(data.unwrap_or(Data::Vec(Vec::new())));
raw_data
.into_arrow_array(&metadata.type_info)
.map(arrow::array::make_array)
});
match data {
Ok(data) => Event::Input { id, metadata, data },
Err(err) => Event::Error(format!("{err:?}")),


+ 3
- 2
apis/rust/node/src/lib.rs View File

@@ -13,11 +13,12 @@
//! dora new project_xyz --kind dataflow
//! ```
//!
pub use arrow;
pub use dora_core;
pub use dora_core::message::{uhlc, Metadata, MetadataParameters};
pub use event_stream::{merged, ArrowData, Event, EventStream, MappedInputData};
pub use event_stream::{merged, Event, EventStream, MappedInputData};
pub use flume::Receiver;
pub use node::{DataSample, DoraNode, ZERO_COPY_THRESHOLD};
pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD};

mod daemon_connection;
mod event_stream;


+ 60
- 0
apis/rust/node/src/node/arrow_utils.rs View File

@@ -0,0 +1,60 @@
use arrow::array::ArrayData;
use dora_core::message::{ArrowTypeInfo, BufferOffset};

pub fn required_data_size(array: &ArrayData) -> usize {
let mut size = 0;
for buffer in array.buffers() {
size += buffer.len();
}
for child in array.child_data() {
size += required_data_size(child);
}
size
}

pub fn copy_array_into_sample(
target_buffer: &mut [u8],
arrow_array: &ArrayData,
) -> eyre::Result<ArrowTypeInfo> {
let mut next_offset = 0;
copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array)
}

fn copy_array_into_sample_inner(
target_buffer: &mut [u8],
next_offset: &mut usize,
arrow_array: &ArrayData,
) -> eyre::Result<ArrowTypeInfo> {
let mut buffer_offsets = Vec::new();
for buffer in arrow_array.buffers().iter() {
let len = buffer.len();
assert!(
target_buffer[*next_offset..].len() >= len,
"target buffer too small (total_len: {}, offset: {}, required_len: {len})",
target_buffer.len(),
*next_offset,
);
target_buffer[*next_offset..][..len].copy_from_slice(buffer.as_slice());
buffer_offsets.push(BufferOffset {
offset: *next_offset,
len,
});
*next_offset += len;
}

let mut child_data = Vec::new();
for child in arrow_array.child_data() {
let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child)?;
child_data.push(child_type_info);
}

Ok(ArrowTypeInfo {
data_type: arrow_array.data_type().clone(),
len: arrow_array.len(),
null_count: arrow_array.null_count(),
validity: arrow_array.nulls().map(|b| b.validity().to_owned()),
offset: arrow_array.offset(),
buffer_offsets,
child_data,
})
}

+ 28
- 3
apis/rust/node/src/node/mod.rs View File

@@ -1,6 +1,11 @@
use crate::EventStream;

use self::{control_channel::ControlChannel, drop_stream::DropStream};
use self::{
arrow_utils::{copy_array_into_sample, required_data_size},
control_channel::ControlChannel,
drop_stream::DropStream,
};
use arrow::array::Array;
use dora_core::{
config::{DataId, NodeId, NodeRunConfig},
daemon_messages::{Data, DropToken, NodeConfig},
@@ -19,6 +24,7 @@ use std::{
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;

pub mod arrow_utils;
mod control_channel;
mod drop_stream;

@@ -117,7 +123,7 @@ impl DoraNode {
/// }).expect("Could not send output");
/// ```
///
pub fn send_output<F>(
pub fn send_output_raw<F>(
&mut self,
output_id: DataId,
parameters: MetadataParameters,
@@ -135,6 +141,25 @@ impl DoraNode {
self.send_output_sample(output_id, type_info, parameters, Some(sample))
}

pub fn send_output(
&mut self,
output_id: DataId,
parameters: MetadataParameters,
data: impl Array,
) -> eyre::Result<()> {
let arrow_array = data.to_data();

let total_len = required_data_size(&arrow_array);

let mut sample = self.allocate_data_sample(total_len)?;
let type_info = copy_array_into_sample(&mut sample, &arrow_array)?;

self.send_output_sample(output_id, type_info, parameters, Some(sample))
.wrap_err("failed to send output")?;

Ok(())
}

pub fn send_output_bytes(
&mut self,
output_id: DataId,
@@ -142,7 +167,7 @@ impl DoraNode {
data_len: usize,
data: &[u8],
) -> eyre::Result<()> {
self.send_output(output_id, parameters, data_len, |sample| {
self.send_output_raw(output_id, parameters, data_len, |sample| {
sample.copy_from_slice(data)
})
}


+ 1
- 1
apis/rust/operator/macros/src/lib.rs View File

@@ -52,7 +52,7 @@ fn register_operator_impl(item: &TokenStream2) -> syn::Result<TokenStream2> {
let on_event = quote! {
#[no_mangle]
pub unsafe extern "C" fn dora_on_event(
event: &dora_operator_api::types::RawEvent,
event: &mut dora_operator_api::types::RawEvent,
send_output: &dora_operator_api::types::SendOutput,
operator_context: *mut std::ffi::c_void,
) -> dora_operator_api::types::OnEventResult {


+ 14
- 4
apis/rust/operator/src/lib.rs View File

@@ -21,14 +21,21 @@
pub use dora_operator_api_macros::register_operator;
pub use dora_operator_api_types as types;
pub use types::DoraStatus;
use types::{Metadata, Output, SendOutput};
use types::{
arrow::{
self,
array::{Array, ArrayRef},
},
Metadata, Output, SendOutput,
};

pub mod raw;

#[derive(Debug)]
#[non_exhaustive]
pub enum Event<'a> {
Input { id: &'a str, data: &'a [u8] },
Input { id: &'a str, data: ArrayRef },
InputParseError { id: &'a str, error: String },
InputClosed { id: &'a str },
Stop,
}
@@ -48,10 +55,13 @@ impl DoraOutputSender<'_> {
/// Send an output from the operator:
/// - `id` is the `output_id` as defined in your dataflow.
/// - `data` is the data that should be sent
pub fn send(&mut self, id: String, data: Vec<u8>) -> Result<(), String> {
pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String> {
let (data_array, schema) =
arrow::ffi::to_ffi(&data.into_data()).map_err(|err| err.to_string())?;
let result = self.0.send_output.call(Output {
id: id.into(),
data: data.into(),
data_array,
schema,
metadata: Metadata {
open_telemetry_context: String::new().into(), // TODO
},


+ 22
- 7
apis/rust/operator/src/raw.rs View File

@@ -1,5 +1,7 @@
use crate::{DoraOperator, DoraOutputSender, DoraStatus, Event};
use dora_operator_api_types::{DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput};
use dora_operator_api_types::{
arrow, DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput,
};
use std::ffi::c_void;

pub type OutputFnRaw = unsafe extern "C" fn(
@@ -27,7 +29,7 @@ pub unsafe fn dora_drop_operator<O>(operator_context: *mut c_void) -> DoraResult
}

pub unsafe fn dora_on_event<O: DoraOperator>(
event: &RawEvent,
event: &mut RawEvent,
send_output: &SendOutput,
operator_context: *mut std::ffi::c_void,
) -> OnEventResult {
@@ -35,11 +37,24 @@ pub unsafe fn dora_on_event<O: DoraOperator>(

let operator: &mut O = unsafe { &mut *operator_context.cast() };

let event_variant = if let Some(input) = &event.input {
let data = input.data.as_ref().as_slice();
Event::Input {
id: &input.id,
data,
let event_variant = if let Some(input) = &mut event.input {
let Some(data_array) = input.data_array.take() else {
return OnEventResult {
result: DoraResult { error: Some("data already taken".to_string().into()) },
status: DoraStatus::Continue,
};
};
let data = arrow::ffi::from_ffi(data_array, &input.schema);

match data {
Ok(data) => Event::Input {
id: &input.id,
data: arrow::array::make_array(data),
},
Err(err) => Event::InputParseError {
id: &input.id,
error: format!("{err}"),
},
}
} else if let Some(input_id) = &event.input_closed {
Event::InputClosed { id: input_id }


+ 4
- 1
apis/rust/operator/types/Cargo.toml View File

@@ -8,6 +8,9 @@ license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "45.0.0", features = ["ffi"] }

[dependencies.safer-ffi]
version = "0.1.0-rc1"
version = "0.1.2"
features = ["headers"]

+ 10
- 7
apis/rust/operator/types/src/lib.rs View File

@@ -1,6 +1,9 @@
#![deny(elided_lifetimes_in_paths)] // required for safer-ffi

pub use arrow;
pub use safer_ffi;

use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use safer_ffi::{closure::ArcDynFn1, derive_ReprC, ffi_export};
use std::path::Path;

@@ -46,7 +49,7 @@ pub struct DoraOnEvent {
#[repr(transparent)]
pub struct OnEventFn(
pub unsafe extern "C" fn(
event: &RawEvent,
event: &mut RawEvent,
send_output: &SendOutput,
operator_context: *mut std::ffi::c_void,
) -> OnEventResult,
@@ -64,12 +67,12 @@ pub struct RawEvent {
}

#[derive_ReprC]
#[ffi_export]
#[repr(C)]
#[repr(opaque)]
#[derive(Debug)]
pub struct Input {
pub id: safer_ffi::String,
pub data: safer_ffi::Vec<u8>,
pub data_array: Option<FFI_ArrowArray>,
pub schema: FFI_ArrowSchema,
pub metadata: Metadata,
}

@@ -89,12 +92,12 @@ pub struct SendOutput {
}

#[derive_ReprC]
#[ffi_export]
#[repr(C)]
#[repr(opaque)]
#[derive(Debug)]
pub struct Output {
pub id: safer_ffi::String,
pub data: safer_ffi::Vec<u8>,
pub data_array: FFI_ArrowArray,
pub schema: FFI_ArrowSchema,
pub metadata: Metadata,
}



+ 1
- 1
binaries/cli/src/template/c/operator/operator-template.c View File

@@ -23,7 +23,7 @@ DoraResult_t dora_drop_operator(void *operator_context)
}

OnEventResult_t dora_on_event(
const RawEvent_t *event,
RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context)
{


+ 1
- 1
binaries/runtime/Cargo.toml View File

@@ -37,7 +37,7 @@ clap = { version = "4.0.3", features = ["derive"] }
tracing-opentelemetry = { version = "0.18.0", optional = true }
pythonize = { version = "0.19.0", optional = true }
arrow-schema = "45.0.0"
arrow = { version = "45.0.0" }
arrow = { version = "45.0.0", features = ["ffi"] }

[features]
default = ["tracing"]


+ 1
- 1
binaries/runtime/src/lib.rs View File

@@ -218,7 +218,7 @@ async fn run(
}
OperatorEvent::Output {
output_id,
type_info: type_info,
type_info,
parameters,
data,
} => {


+ 4
- 3
binaries/runtime/src/operator/python.rs View File

@@ -271,10 +271,11 @@ mod callback_impl {
use super::SendOutputCallback;
use arrow::{array::ArrayData, pyarrow::FromPyArrow};
use dora_core::message::ArrowTypeInfo;
use dora_node_api::ZERO_COPY_THRESHOLD;
use dora_operator_api_python::{
copy_array_into_sample, pydict_to_metadata, required_data_size,
use dora_node_api::{
arrow_utils::{copy_array_into_sample, required_data_size},
ZERO_COPY_THRESHOLD,
};
use dora_operator_api_python::pydict_to_metadata;
use eyre::{eyre, Context, Result};
use pyo3::{
pymethods,


+ 34
- 13
binaries/runtime/src/operator/shared_lib.rs View File

@@ -3,10 +3,12 @@ use dora_core::{
adjust_shared_library_path,
config::{DataId, NodeId, OperatorId},
descriptor::source_is_url,
message::ArrowTypeInfo,
};
use dora_download::download_file;
use dora_node_api::{Event, MetadataParameters};
use dora_node_api::{
arrow_utils::{copy_array_into_sample, required_data_size},
Event, MetadataParameters,
};
use dora_operator_api_types::{
safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnEvent,
DoraResult, DoraStatus, Metadata, OnEventResult, Output, SendOutput,
@@ -110,7 +112,8 @@ impl<'lib> SharedLibraryOperator<'lib> {
let send_output_closure = Arc::new(move |output: Output| {
let Output {
id: output_id,
data,
data_array,
schema,
metadata: Metadata {
open_telemetry_context,
},
@@ -120,11 +123,31 @@ impl<'lib> SharedLibraryOperator<'lib> {
..Default::default()
};

let arrow_array = match arrow::ffi::from_ffi(data_array, &schema) {
Ok(a) => a,
Err(err) => {
return DoraResult {
error: Some(err.to_string().into()),
}
}
};

let total_len = required_data_size(&arrow_array);
let mut sample = vec![0; total_len];
let type_info = match copy_array_into_sample(&mut sample, &arrow_array) {
Ok(t) => t,
Err(err) => {
return DoraResult {
error: Some(err.to_string().into()),
}
}
};

let event = OperatorEvent::Output {
output_id: DataId::from(String::from(output_id)),
type_info: ArrowTypeInfo::byte_array(data.len()),
type_info,
parameters,
data: Some(data.to_owned().into()),
data: Some(sample.into()),
};

let result = self
@@ -168,7 +191,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
metadata.parameters.open_telemetry_context = string_cx;
}

let operator_event = match event {
let mut operator_event = match event {
Event::Stop => dora_operator_api_types::RawEvent {
input: None,
input_closed: None,
@@ -180,14 +203,12 @@ impl<'lib> SharedLibraryOperator<'lib> {
metadata,
data,
} => {
let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?;

let operator_input = dora_operator_api_types::Input {
id: String::from(input_id).into(),
data: data
.as_byte_slice()
.map(|d| d.to_vec())
// TODO: don't silence error
.unwrap_or_default()
.into(),
data_array: Some(data_array),
schema,
metadata: Metadata {
open_telemetry_context: metadata
.parameters
@@ -232,7 +253,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
status,
} = unsafe {
(self.bindings.on_event.on_event)(
&operator_event,
&mut operator_event,
&send_output,
operator_context.raw,
)


+ 2
- 2
examples/benchmark/node/src/main.rs View File

@@ -31,7 +31,7 @@ fn main() -> eyre::Result<()> {
.sample_iter(rand::distributions::Standard)
.take(size)
.collect();
node.send_output(latency.clone(), Default::default(), data.len(), |out| {
node.send_output_raw(latency.clone(), Default::default(), data.len(), |out| {
out.copy_from_slice(&data);
})?;

@@ -50,7 +50,7 @@ fn main() -> eyre::Result<()> {
.sample_iter(rand::distributions::Standard)
.take(size)
.collect();
node.send_output(throughput.clone(), Default::default(), data.len(), |out| {
node.send_output_raw(throughput.clone(), Default::default(), data.len(), |out| {
out.copy_from_slice(&data);
})?;
}


+ 1
- 1
examples/c++-dataflow/operator-c-api/operator.cc View File

@@ -31,7 +31,7 @@ extern "C" DoraResult_t dora_drop_operator(void *operator_context)
}

extern "C" OnEventResult_t dora_on_event(
const RawEvent_t *event,
RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context)
{


+ 1
- 1
examples/c-dataflow/operator.c View File

@@ -23,7 +23,7 @@ DoraResult_t dora_drop_operator(void *operator_context)
}

OnEventResult_t dora_on_event(
const RawEvent_t *event,
RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context)
{


+ 1
- 1
examples/multiple-daemons/node/src/main.rs View File

@@ -23,7 +23,7 @@ fn main() -> eyre::Result<()> {
let random: u64 = rand::random();
println!("tick {i}, sending {random:#x}");
let data: &[u8] = &random.to_le_bytes();
node.send_output(output.clone(), metadata.parameters, data.len(), |out| {
node.send_output_raw(output.clone(), metadata.parameters, data.len(), |out| {
out.copy_from_slice(data);
})?;
}


+ 12
- 5
examples/rust-dataflow/node/src/main.rs View File

@@ -1,4 +1,9 @@
use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event};
use dora_node_api::{
self,
arrow::{array::PrimitiveBuilder, datatypes::UInt64Type},
dora_core::config::DataId,
DoraNode, Event,
};

fn main() -> eyre::Result<()> {
println!("hello");
@@ -22,10 +27,12 @@ fn main() -> eyre::Result<()> {
"tick" => {
let random: u64 = rand::random();
println!("tick {i}, sending {random:#x}");
let data: &[u8] = &random.to_le_bytes();
node.send_output(output.clone(), metadata.parameters, data.len(), |out| {
out.copy_from_slice(data);
})?;
let data = {
let mut builder: PrimitiveBuilder<UInt64Type> = PrimitiveBuilder::new();
builder.append_value(random);
builder.finish()
};
node.send_output(output.clone(), metadata.parameters, data)?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},


+ 19
- 8
examples/rust-dataflow/operator/src/lib.rs View File

@@ -1,6 +1,13 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event};
use dora_operator_api::{
register_operator,
types::arrow::{
array::{AsArray, PrimitiveArray, StringBuilder},
datatypes::UInt64Type,
},
DoraOperator, DoraOutputSender, DoraStatus, Event,
};

register_operator!(ExampleOperator);

@@ -21,16 +28,20 @@ impl DoraOperator for ExampleOperator {
self.ticks += 1;
}
"random" => {
let parsed = {
let data: [u8; 8] =
(*data).try_into().map_err(|_| "unexpected random data")?;
u64::from_le_bytes(data)
};
let primitive_array: &PrimitiveArray<UInt64Type> =
data.as_primitive_opt().ok_or("expected primitive array")?;
let value = primitive_array.value(0);

let output = format!(
"operator received random value {parsed:#x} after {} ticks",
"operator received random value {value:#x} after {} ticks",
self.ticks
);
output_sender.send("status".into(), output.into_bytes())?;
let output_data = {
let mut builder = StringBuilder::new();
builder.append_value(output);
builder.finish()
};
output_sender.send("status".into(), output_data)?;
}
other => eprintln!("ignoring unexpected input {other}"),
},


+ 9
- 3
examples/rust-dataflow/sink/src/main.rs View File

@@ -1,5 +1,9 @@
use dora_node_api::{self, DoraNode, Event};
use eyre::{bail, Context};
use dora_node_api::{
self,
arrow::array::{AsArray, StringArray},
DoraNode, Event,
};
use eyre::{bail, ContextCompat};

fn main() -> eyre::Result<()> {
let (_node, mut events) = DoraNode::init_from_env()?;
@@ -8,8 +12,10 @@ fn main() -> eyre::Result<()> {
match event {
Event::Input { id, metadata, data } => match id.as_str() {
"message" => {
let received_string = std::str::from_utf8(data.as_byte_slice()?)
let string_array: &StringArray = data
.as_string_opt()
.wrap_err("received message was not utf8-encoded")?;
let received_string = string_array.value(0);
println!("sink received message: {}", received_string);
if !received_string.starts_with("operator received random value ") {
bail!("unexpected message format (should start with 'operator received random value')")


Loading…
Cancel
Save