Browse Source

Simplify arrow array construction through new `IntoArrow` trait

tags/v0.3.0-rc
Philipp Oppermann 2 years ago
parent
commit
89f980e753
Failed to extract signature
14 changed files with 133 additions and 42 deletions
  1. +9
    -0
      Cargo.lock
  2. +2
    -0
      Cargo.toml
  3. +7
    -2
      apis/c++/node/src/lib.rs
  4. +5
    -2
      apis/c++/operator/src/lib.rs
  5. +1
    -0
      apis/rust/node/Cargo.toml
  6. +1
    -0
      apis/rust/node/src/lib.rs
  7. +1
    -0
      apis/rust/operator/Cargo.toml
  8. +1
    -0
      apis/rust/operator/src/lib.rs
  9. +2
    -10
      examples/multiple-daemons/node/src/main.rs
  10. +5
    -8
      examples/multiple-daemons/operator/src/lib.rs
  11. +2
    -12
      examples/rust-dataflow/node/src/main.rs
  12. +3
    -8
      examples/rust-dataflow/operator/src/lib.rs
  13. +12
    -0
      libraries/arrow-convert/Cargo.toml
  14. +82
    -0
      libraries/arrow-convert/src/lib.rs

+ 9
- 0
Cargo.lock View File

@@ -1330,6 +1330,13 @@ dependencies = [
"windows-sys 0.48.0",
]

[[package]]
name = "dora-arrow-convert"
version = "0.2.6"
dependencies = [
"arrow",
]

[[package]]
name = "dora-cli"
version = "0.2.6"
@@ -1483,6 +1490,7 @@ dependencies = [
"arrow-schema",
"bincode",
"capnp",
"dora-arrow-convert",
"dora-core",
"dora-tracing",
"eyre",
@@ -1543,6 +1551,7 @@ dependencies = [
name = "dora-operator-api"
version = "0.2.6"
dependencies = [
"dora-arrow-convert",
"dora-operator-api-macros",
"dora-operator-api-types",
]


+ 2
- 0
Cargo.toml View File

@@ -15,6 +15,7 @@ members = [
"examples/rust-ros2-dataflow/*",
"examples/benchmark/*",
"examples/multiple-daemons/*",
"libraries/arrow-convert",
"libraries/communication-layer/*",
"libraries/core",
"libraries/message",
@@ -45,6 +46,7 @@ dora-operator-api-python = { version = "0.2.6", path = "apis/python/operator" }
dora-operator-api-c = { version = "0.2.6", path = "apis/c/operator" }
dora-node-api-c = { version = "0.2.6", path = "apis/c/node" }
dora-core = { version = "0.2.6", path = "libraries/core" }
dora-arrow-convert = { version = "0.2.6", path = "libraries/arrow-convert" }
dora-tracing = { version = "0.2.6", path = "libraries/extensions/telemetry/tracing" }
dora-metrics = { version = "0.2.6", path = "libraries/extensions/telemetry/metrics" }
dora-download = { version = "0.2.6", path = "libraries/extensions/download" }


+ 7
- 2
apis/c++/node/src/lib.rs View File

@@ -1,4 +1,8 @@
use dora_node_api::{self, Event, EventStream};
use dora_node_api::{
self,
arrow::array::{AsArray, BinaryArray},
Event, EventStream,
};
use eyre::bail;

#[cxx::bridge]
@@ -81,9 +85,10 @@ fn event_as_input(event: Box<DoraEvent>) -> eyre::Result<ffi::DoraInput> {
let Some(Event::Input { id, metadata: _, data }) = event.0 else {
bail!("not an input event");
};
let data: Option<&BinaryArray> = data.as_binary_opt();
Ok(ffi::DoraInput {
id: id.into(),
data: data.map(|d| d.to_owned()).unwrap_or_default(),
data: data.map(|d| d.value(0).to_owned()).unwrap_or_default(),
})
}



+ 5
- 2
apis/c++/operator/src/lib.rs View File

@@ -2,7 +2,9 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{
self, register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event,
self, register_operator,
types::arrow::array::{AsArray, BinaryArray},
DoraOperator, DoraOutputSender, DoraStatus, Event,
};
use ffi::DoraSendOutputResult;

@@ -45,7 +47,7 @@ pub struct OutputSender<'a, 'b>(&'a mut DoraOutputSender<'b>);
fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> DoraSendOutputResult {
let error = sender
.0
.send(id.into(), data.to_owned())
.send(id.into(), data.to_owned().into())
.err()
.unwrap_or_default();
DoraSendOutputResult { error }
@@ -75,6 +77,7 @@ impl DoraOperator for OperatorWrapper {
Event::Input { id, data } => {
let operator = self.operator.as_mut().unwrap();
let mut output_sender = OutputSender(output_sender);
let data: &BinaryArray = data.as_binary();

let result = ffi::on_input(operator, id, data, &mut output_sender);
if result.error.is_empty() {


+ 1
- 0
apis/rust/node/Cargo.toml View File

@@ -30,6 +30,7 @@ arrow = "45.0.0"
arrow-schema = "45.0.0"
futures = "0.3.28"
futures-concurrency = "7.3.0"
dora-arrow-convert = { workspace = true }

[dev-dependencies]
tokio = { version = "1.24.2", features = ["rt"] }

+ 1
- 0
apis/rust/node/src/lib.rs View File

@@ -14,6 +14,7 @@
//! ```
//!
pub use arrow;
pub use dora_arrow_convert::*;
pub use dora_core;
pub use dora_core::message::{uhlc, Metadata, MetadataParameters};
pub use event_stream::{merged, Event, EventStream, MappedInputData};


+ 1
- 0
apis/rust/operator/Cargo.toml View File

@@ -11,3 +11,4 @@ license.workspace = true
[dependencies]
dora-operator-api-macros = { workspace = true }
dora-operator-api-types = { workspace = true }
dora-arrow-convert = { workspace = true }

+ 1
- 0
apis/rust/operator/src/lib.rs View File

@@ -18,6 +18,7 @@
#![warn(unsafe_op_in_unsafe_fn)]
#![allow(clippy::missing_safety_doc)]

pub use dora_arrow_convert::*;
pub use dora_operator_api_macros::register_operator;
pub use dora_operator_api_types as types;
pub use types::DoraStatus;


+ 2
- 10
examples/multiple-daemons/node/src/main.rs View File

@@ -1,11 +1,4 @@
use std::iter;

use dora_node_api::{
self,
arrow::{array::PrimitiveArray, datatypes::UInt64Type},
dora_core::config::DataId,
DoraNode, Event,
};
use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event, IntoArrow};

fn main() -> eyre::Result<()> {
println!("hello");
@@ -29,8 +22,7 @@ fn main() -> eyre::Result<()> {
"tick" => {
let random: u64 = rand::random();
println!("tick {i}, sending {random:#x}");
let data: PrimitiveArray<UInt64Type> = iter::once(random).collect();
node.send_output(output.clone(), metadata.parameters, data)?;
node.send_output(output.clone(), metadata.parameters, random.into_arrow())?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},


+ 5
- 8
examples/multiple-daemons/operator/src/lib.rs View File

@@ -2,11 +2,8 @@

use dora_operator_api::{
register_operator,
types::arrow::{
array::{AsArray, StringArray},
datatypes::UInt64Type,
},
DoraOperator, DoraOutputSender, DoraStatus, Event,
types::arrow::{array::AsArray, datatypes::UInt64Type},
DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow,
};

register_operator!(ExampleOperator);
@@ -33,11 +30,11 @@ impl DoraOperator for ExampleOperator {
.ok_or_else(|| "expected u64 value".to_owned())?
.value(0);

let output = StringArray::from_iter(std::iter::once(Some(format!(
let output = format!(
"operator received random value {data:#x} after {} ticks",
self.ticks
))));
output_sender.send("status".into(), output)?;
);
output_sender.send("status".into(), output.into_arrow())?;
}
other => eprintln!("ignoring unexpected input {other}"),
},


+ 2
- 12
examples/rust-dataflow/node/src/main.rs View File

@@ -1,9 +1,4 @@
use dora_node_api::{
self,
arrow::{array::PrimitiveBuilder, datatypes::UInt64Type},
dora_core::config::DataId,
DoraNode, Event,
};
use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event, IntoArrow};

fn main() -> eyre::Result<()> {
println!("hello");
@@ -27,12 +22,7 @@ fn main() -> eyre::Result<()> {
"tick" => {
let random: u64 = rand::random();
println!("tick {i}, sending {random:#x}");
let data = {
let mut builder: PrimitiveBuilder<UInt64Type> = PrimitiveBuilder::new();
builder.append_value(random);
builder.finish()
};
node.send_output(output.clone(), metadata.parameters, data)?;
node.send_output(output.clone(), metadata.parameters, random.into_arrow())?;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},


+ 3
- 8
examples/rust-dataflow/operator/src/lib.rs View File

@@ -3,10 +3,10 @@
use dora_operator_api::{
register_operator,
types::arrow::{
array::{AsArray, PrimitiveArray, StringBuilder},
array::{AsArray, PrimitiveArray},
datatypes::UInt64Type,
},
DoraOperator, DoraOutputSender, DoraStatus, Event,
DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow,
};

register_operator!(ExampleOperator);
@@ -36,12 +36,7 @@ impl DoraOperator for ExampleOperator {
"operator received random value {value:#x} after {} ticks",
self.ticks
);
let output_data = {
let mut builder = StringBuilder::new();
builder.append_value(output);
builder.finish()
};
output_sender.send("status".into(), output_data)?;
output_sender.send("status".into(), output.into_arrow())?;
}
other => eprintln!("ignoring unexpected input {other}"),
},


+ 12
- 0
libraries/arrow-convert/Cargo.toml View File

@@ -0,0 +1,12 @@
[package]
name = "dora-arrow-convert"
version.workspace = true
edition = "2021"
documentation.workspace = true
description.workspace = true
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "45.0.0" }

+ 82
- 0
libraries/arrow-convert/src/lib.rs View File

@@ -0,0 +1,82 @@
use arrow::array::{Array, PrimitiveArray, StringArray};

pub trait IntoArrow {
type A: Array;

fn into_arrow(self) -> Self::A;
}

impl IntoArrow for bool {
type A = arrow::array::BooleanArray;
fn into_arrow(self) -> Self::A {
std::iter::once(Some(self)).collect()
}
}

impl IntoArrow for u8 {
type A = PrimitiveArray<arrow::datatypes::UInt8Type>;
fn into_arrow(self) -> Self::A {
std::iter::once(self).collect()
}
}
impl IntoArrow for u16 {
type A = PrimitiveArray<arrow::datatypes::UInt16Type>;
fn into_arrow(self) -> Self::A {
std::iter::once(self).collect()
}
}
impl IntoArrow for u32 {
type A = PrimitiveArray<arrow::datatypes::UInt32Type>;
fn into_arrow(self) -> Self::A {
std::iter::once(self).collect()
}
}
impl IntoArrow for u64 {
type A = PrimitiveArray<arrow::datatypes::UInt64Type>;
fn into_arrow(self) -> Self::A {
std::iter::once(self).collect()
}
}
impl IntoArrow for i8 {
type A = PrimitiveArray<arrow::datatypes::Int8Type>;
fn into_arrow(self) -> Self::A {
std::iter::once(self).collect()
}
}
impl IntoArrow for i16 {
type A = PrimitiveArray<arrow::datatypes::Int16Type>;
fn into_arrow(self) -> Self::A {
std::iter::once(self).collect()
}
}
impl IntoArrow for i32 {
type A = PrimitiveArray<arrow::datatypes::Int32Type>;
fn into_arrow(self) -> Self::A {
std::iter::once(self).collect()
}
}
impl IntoArrow for i64 {
type A = PrimitiveArray<arrow::datatypes::Int64Type>;
fn into_arrow(self) -> Self::A {
std::iter::once(self).collect()
}
}
impl IntoArrow for f32 {
type A = PrimitiveArray<arrow::datatypes::Float32Type>;
fn into_arrow(self) -> Self::A {
std::iter::once(self).collect()
}
}
impl IntoArrow for f64 {
type A = PrimitiveArray<arrow::datatypes::Float64Type>;
fn into_arrow(self) -> Self::A {
std::iter::once(self).collect()
}
}

impl IntoArrow for &str {
type A = StringArray;
fn into_arrow(self) -> Self::A {
std::iter::once(Some(self)).collect()
}
}

Loading…
Cancel
Save