|
|
|
@@ -1,13 +1,13 @@ |
|
|
|
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] |
|
|
|
|
|
|
|
use arrow2::array::PrimitiveArray; |
|
|
|
use arrow2::{array::Array, datatypes::Field, ffi}; |
|
|
|
use dora_node_api::{DoraNode, Event, EventStream}; |
|
|
|
use dora_operator_api_python::{metadata_to_pydict, pydict_to_metadata}; |
|
|
|
use eyre::{Context, Result}; |
|
|
|
use pyo3::{ |
|
|
|
prelude::*, |
|
|
|
types::{PyBytes, PyDict}, |
|
|
|
}; |
|
|
|
|
|
|
|
use eyre::{Context, ContextCompat, Result}; |
|
|
|
use pyo3::ffi::Py_uintptr_t; |
|
|
|
use pyo3::prelude::*; |
|
|
|
use pyo3::types::PyDict; |
|
|
|
#[pyclass] |
|
|
|
pub struct Node { |
|
|
|
events: EventStream, |
|
|
|
@@ -26,12 +26,12 @@ impl IntoPy<PyObject> for PyInput<'_> { |
|
|
|
dict.set_item("id", id.to_string()) |
|
|
|
.wrap_err("failed to add input ID") |
|
|
|
.unwrap(); |
|
|
|
dict.set_item( |
|
|
|
"data", |
|
|
|
PyBytes::new(py, data.as_deref().unwrap_or_default()), |
|
|
|
) |
|
|
|
.wrap_err("failed to add input data") |
|
|
|
.unwrap(); |
|
|
|
let array = |
|
|
|
unsafe { arrow2::ffi::mmap::slice(&data.as_deref().unwrap_or_default()) }; |
|
|
|
let array_data = to_py_array(Box::new(array), py).unwrap(); |
|
|
|
dict.set_item("data", array_data) |
|
|
|
.wrap_err("failed to add input data") |
|
|
|
.unwrap(); |
|
|
|
dict.set_item("metadata", metadata_to_pydict(&metadata, py)) |
|
|
|
.wrap_err("failed to add input metadata") |
|
|
|
.unwrap(); |
|
|
|
@@ -85,14 +85,21 @@ impl Node { |
|
|
|
pub fn send_output( |
|
|
|
&mut self, |
|
|
|
output_id: String, |
|
|
|
data: &PyBytes, |
|
|
|
data: PyObject, |
|
|
|
metadata: Option<&PyDict>, |
|
|
|
py: Python, |
|
|
|
) -> Result<()> { |
|
|
|
let data = data.as_bytes(); |
|
|
|
let buffer = to_rust_array(data, py).unwrap(); |
|
|
|
let data = buffer |
|
|
|
.as_any() |
|
|
|
.downcast_ref::<PrimitiveArray<u8>>() |
|
|
|
.wrap_err("Could not cast sent output to arrow uint8 array") |
|
|
|
.unwrap() |
|
|
|
.values(); |
|
|
|
let metadata = pydict_to_metadata(metadata)?; |
|
|
|
self.node |
|
|
|
.send_output(output_id.into(), metadata, data.len(), |out| { |
|
|
|
out.copy_from_slice(data); |
|
|
|
out.copy_from_slice(&data); |
|
|
|
}) |
|
|
|
.wrap_err("Could not send output") |
|
|
|
} |
|
|
|
@@ -101,6 +108,56 @@ impl Node { |
|
|
|
self.node.id().to_string() |
|
|
|
} |
|
|
|
} |
|
|
|
// Taken from arrow2/example: https://github.com/jorgecarleitao/arrow2/blob/main/arrow-pyarrow-integration-testing/src/lib.rs |
|
|
|
fn to_py_array(array: Box<dyn Array>, py: Python) -> PyResult<PyObject> { |
|
|
|
let schema = Box::new(ffi::export_field_to_c(&Field::new( |
|
|
|
"", |
|
|
|
array.data_type().clone(), |
|
|
|
true, |
|
|
|
))); |
|
|
|
let array = Box::new(ffi::export_array_to_c(array)); |
|
|
|
|
|
|
|
let schema_ptr: *const arrow2::ffi::ArrowSchema = &*schema; |
|
|
|
let array_ptr: *const arrow2::ffi::ArrowArray = &*array; |
|
|
|
|
|
|
|
let pa = py.import("pyarrow")?; |
|
|
|
|
|
|
|
let array = pa.getattr("Array")?.call_method1( |
|
|
|
"_import_from_c", |
|
|
|
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t), |
|
|
|
)?; |
|
|
|
|
|
|
|
Ok(array.to_object(py)) |
|
|
|
} |
|
|
|
|
|
|
|
// Taken from arrow2/example: https://github.com/jorgecarleitao/arrow2/blob/main/arrow-pyarrow-integration-testing/src/lib.rs |
|
|
|
fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Box<dyn Array>> { |
|
|
|
// prepare a pointer to receive the Array struct |
|
|
|
let array = Box::new(ffi::ArrowArray::empty()); |
|
|
|
let schema = Box::new(ffi::ArrowSchema::empty()); |
|
|
|
|
|
|
|
let array_ptr = &*array as *const ffi::ArrowArray; |
|
|
|
let schema_ptr = &*schema as *const ffi::ArrowSchema; |
|
|
|
|
|
|
|
// make the conversion through PyArrow's private API |
|
|
|
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds |
|
|
|
ob.call_method1( |
|
|
|
py, |
|
|
|
"_export_to_c", |
|
|
|
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t), |
|
|
|
)?; |
|
|
|
|
|
|
|
let field = unsafe { |
|
|
|
ffi::import_field_from_c(schema.as_ref()).wrap_err("Could not parse output array")? |
|
|
|
}; |
|
|
|
|
|
|
|
let array = unsafe { |
|
|
|
ffi::import_array_from_c(*array, field.data_type) |
|
|
|
.wrap_err("Could not parse output array")? |
|
|
|
}; |
|
|
|
|
|
|
|
Ok(array) |
|
|
|
} |
|
|
|
|
|
|
|
#[pyfunction] |
|
|
|
fn start_runtime() -> Result<()> { |
|
|
|
|