diff --git a/Cargo.lock b/Cargo.lock index 2c7a8268..b3211fb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1330,6 +1330,13 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "dora-arrow-convert" +version = "0.2.6" +dependencies = [ + "arrow", +] + [[package]] name = "dora-cli" version = "0.2.6" @@ -1483,6 +1490,7 @@ dependencies = [ "arrow-schema", "bincode", "capnp", + "dora-arrow-convert", "dora-core", "dora-tracing", "eyre", @@ -1543,6 +1551,7 @@ dependencies = [ name = "dora-operator-api" version = "0.2.6" dependencies = [ + "dora-arrow-convert", "dora-operator-api-macros", "dora-operator-api-types", ] diff --git a/Cargo.toml b/Cargo.toml index 4069d98b..c9621572 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "examples/rust-ros2-dataflow/*", "examples/benchmark/*", "examples/multiple-daemons/*", + "libraries/arrow-convert", "libraries/communication-layer/*", "libraries/core", "libraries/message", @@ -45,6 +46,7 @@ dora-operator-api-python = { version = "0.2.6", path = "apis/python/operator" } dora-operator-api-c = { version = "0.2.6", path = "apis/c/operator" } dora-node-api-c = { version = "0.2.6", path = "apis/c/node" } dora-core = { version = "0.2.6", path = "libraries/core" } +dora-arrow-convert = { version = "0.2.6", path = "libraries/arrow-convert" } dora-tracing = { version = "0.2.6", path = "libraries/extensions/telemetry/tracing" } dora-metrics = { version = "0.2.6", path = "libraries/extensions/telemetry/metrics" } dora-download = { version = "0.2.6", path = "libraries/extensions/download" } diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index 16e27bad..77fd3197 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -1,4 +1,8 @@ -use dora_node_api::{self, Event, EventStream}; +use dora_node_api::{ + self, + arrow::array::{AsArray, BinaryArray}, + Event, EventStream, +}; use eyre::bail; #[cxx::bridge] @@ -81,9 +85,10 @@ fn event_as_input(event: Box) -> eyre::Result { let Some(Event::Input { id, metadata: _, data }) = event.0 else { bail!("not an input event"); }; + let data: Option<&BinaryArray> = data.as_binary_opt(); Ok(ffi::DoraInput { id: id.into(), - data: data.map(|d| d.to_owned()).unwrap_or_default(), + data: data.map(|d| d.value(0).to_owned()).unwrap_or_default(), }) } diff --git a/apis/c++/operator/src/lib.rs b/apis/c++/operator/src/lib.rs index 511b4470..1d4d776c 100644 --- a/apis/c++/operator/src/lib.rs +++ b/apis/c++/operator/src/lib.rs @@ -2,7 +2,9 @@ #![warn(unsafe_op_in_unsafe_fn)] use dora_operator_api::{ - self, register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, + self, register_operator, + types::arrow::array::{AsArray, BinaryArray}, + DoraOperator, DoraOutputSender, DoraStatus, Event, }; use ffi::DoraSendOutputResult; @@ -45,7 +47,7 @@ pub struct OutputSender<'a, 'b>(&'a mut DoraOutputSender<'b>); fn send_output(sender: &mut OutputSender, id: &str, data: &[u8]) -> DoraSendOutputResult { let error = sender .0 - .send(id.into(), data.to_owned()) + .send(id.into(), data.to_owned().into()) .err() .unwrap_or_default(); DoraSendOutputResult { error } @@ -75,6 +77,7 @@ impl DoraOperator for OperatorWrapper { Event::Input { id, data } => { let operator = self.operator.as_mut().unwrap(); let mut output_sender = OutputSender(output_sender); + let data: &BinaryArray = data.as_binary(); let result = ffi::on_input(operator, id, data, &mut output_sender); if result.error.is_empty() { diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index d15ebc1e..698f4f65 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -30,6 +30,7 @@ arrow = "45.0.0" arrow-schema = "45.0.0" futures = "0.3.28" futures-concurrency = "7.3.0" +dora-arrow-convert = { workspace = true } [dev-dependencies] tokio = { version = "1.24.2", features = ["rt"] } diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index c53f72d0..ae192408 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -14,6 +14,7 @@ //! ``` //! pub use arrow; +pub use dora_arrow_convert::*; pub use dora_core; pub use dora_core::message::{uhlc, Metadata, MetadataParameters}; pub use event_stream::{merged, Event, EventStream, MappedInputData}; diff --git a/apis/rust/operator/Cargo.toml b/apis/rust/operator/Cargo.toml index 21a26d8b..6391df39 100644 --- a/apis/rust/operator/Cargo.toml +++ b/apis/rust/operator/Cargo.toml @@ -11,3 +11,4 @@ license.workspace = true [dependencies] dora-operator-api-macros = { workspace = true } dora-operator-api-types = { workspace = true } +dora-arrow-convert = { workspace = true } diff --git a/apis/rust/operator/src/lib.rs b/apis/rust/operator/src/lib.rs index 3542a061..3e698206 100644 --- a/apis/rust/operator/src/lib.rs +++ b/apis/rust/operator/src/lib.rs @@ -18,6 +18,7 @@ #![warn(unsafe_op_in_unsafe_fn)] #![allow(clippy::missing_safety_doc)] +pub use dora_arrow_convert::*; pub use dora_operator_api_macros::register_operator; pub use dora_operator_api_types as types; pub use types::DoraStatus; diff --git a/examples/multiple-daemons/node/src/main.rs b/examples/multiple-daemons/node/src/main.rs index 4642120e..36f42d57 100644 --- a/examples/multiple-daemons/node/src/main.rs +++ b/examples/multiple-daemons/node/src/main.rs @@ -1,11 +1,4 @@ -use std::iter; - -use dora_node_api::{ - self, - arrow::{array::PrimitiveArray, datatypes::UInt64Type}, - dora_core::config::DataId, - DoraNode, Event, -}; +use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event, IntoArrow}; fn main() -> eyre::Result<()> { println!("hello"); @@ -29,8 +22,7 @@ fn main() -> eyre::Result<()> { "tick" => { let random: u64 = rand::random(); println!("tick {i}, sending {random:#x}"); - let data: PrimitiveArray = iter::once(random).collect(); - node.send_output(output.clone(), metadata.parameters, data)?; + node.send_output(output.clone(), metadata.parameters, random.into_arrow())?; } other => eprintln!("Ignoring unexpected input `{other}`"), }, diff --git a/examples/multiple-daemons/operator/src/lib.rs b/examples/multiple-daemons/operator/src/lib.rs index dd13ee5d..ccc98872 100644 --- a/examples/multiple-daemons/operator/src/lib.rs +++ b/examples/multiple-daemons/operator/src/lib.rs @@ -2,11 +2,8 @@ use dora_operator_api::{ register_operator, - types::arrow::{ - array::{AsArray, StringArray}, - datatypes::UInt64Type, - }, - DoraOperator, DoraOutputSender, DoraStatus, Event, + types::arrow::{array::AsArray, datatypes::UInt64Type}, + DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, }; register_operator!(ExampleOperator); @@ -33,11 +30,11 @@ impl DoraOperator for ExampleOperator { .ok_or_else(|| "expected u64 value".to_owned())? .value(0); - let output = StringArray::from_iter(std::iter::once(Some(format!( + let output = format!( "operator received random value {data:#x} after {} ticks", self.ticks - )))); - output_sender.send("status".into(), output)?; + ); + output_sender.send("status".into(), output.into_arrow())?; } other => eprintln!("ignoring unexpected input {other}"), }, diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index b90ed47e..36f42d57 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -1,9 +1,4 @@ -use dora_node_api::{ - self, - arrow::{array::PrimitiveBuilder, datatypes::UInt64Type}, - dora_core::config::DataId, - DoraNode, Event, -}; +use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event, IntoArrow}; fn main() -> eyre::Result<()> { println!("hello"); @@ -27,12 +22,7 @@ fn main() -> eyre::Result<()> { "tick" => { let random: u64 = rand::random(); println!("tick {i}, sending {random:#x}"); - let data = { - let mut builder: PrimitiveBuilder = PrimitiveBuilder::new(); - builder.append_value(random); - builder.finish() - }; - node.send_output(output.clone(), metadata.parameters, data)?; + node.send_output(output.clone(), metadata.parameters, random.into_arrow())?; } other => eprintln!("Ignoring unexpected input `{other}`"), }, diff --git a/examples/rust-dataflow/operator/src/lib.rs b/examples/rust-dataflow/operator/src/lib.rs index fe544049..fb48e6b7 100644 --- a/examples/rust-dataflow/operator/src/lib.rs +++ b/examples/rust-dataflow/operator/src/lib.rs @@ -3,10 +3,10 @@ use dora_operator_api::{ register_operator, types::arrow::{ - array::{AsArray, PrimitiveArray, StringBuilder}, + array::{AsArray, PrimitiveArray}, datatypes::UInt64Type, }, - DoraOperator, DoraOutputSender, DoraStatus, Event, + DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, }; register_operator!(ExampleOperator); @@ -36,12 +36,7 @@ impl DoraOperator for ExampleOperator { "operator received random value {value:#x} after {} ticks", self.ticks ); - let output_data = { - let mut builder = StringBuilder::new(); - builder.append_value(output); - builder.finish() - }; - output_sender.send("status".into(), output_data)?; + output_sender.send("status".into(), output.into_arrow())?; } other => eprintln!("ignoring unexpected input {other}"), }, diff --git a/libraries/arrow-convert/Cargo.toml b/libraries/arrow-convert/Cargo.toml new file mode 100644 index 00000000..30d5e2c2 --- /dev/null +++ b/libraries/arrow-convert/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "dora-arrow-convert" +version.workspace = true +edition = "2021" +documentation.workspace = true +description.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +arrow = { version = "45.0.0" } diff --git a/libraries/arrow-convert/src/lib.rs b/libraries/arrow-convert/src/lib.rs new file mode 100644 index 00000000..ad7a5946 --- /dev/null +++ b/libraries/arrow-convert/src/lib.rs @@ -0,0 +1,82 @@ +use arrow::array::{Array, PrimitiveArray, StringArray}; + +pub trait IntoArrow { + type A: Array; + + fn into_arrow(self) -> Self::A; +} + +impl IntoArrow for bool { + type A = arrow::array::BooleanArray; + fn into_arrow(self) -> Self::A { + std::iter::once(Some(self)).collect() + } +} + +impl IntoArrow for u8 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for u16 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for u32 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for u64 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for i8 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for i16 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for i32 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for i64 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for f32 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} +impl IntoArrow for f64 { + type A = PrimitiveArray; + fn into_arrow(self) -> Self::A { + std::iter::once(self).collect() + } +} + +impl IntoArrow for &str { + type A = StringArray; + fn into_arrow(self) -> Self::A { + std::iter::once(Some(self)).collect() + } +}