@@ -1,69 +1,53 @@
use arrow::{array::ArrayRef, pyarrow::ToPyArrow};
use std::collections::HashMap;
use arrow::pyarrow::ToPyArrow;
use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters};
use eyre::{Context, Result};
use pyo3::{exceptions::PyLookupError, prelude::*, pybacked::PyBackedStr, types::PyDict};
use pyo3::{
prelude::*,
pybacked::PyBackedStr,
types::{IntoPyDict, PyDict},
};
/// Dora Event
#[pyclass]
#[derive(Debug)]
pub struct PyEvent {
event: MergedEvent<PyObject>,
data: Option<ArrayRef>,
}
// Dora Event
#[pymethods]
impl PyEvent {
///
/// :rtype: dora.PyObject
pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult<Option<PyObject>> {
if key == "kind" {
let kind = match &self.event {
MergedEvent::Dora(_) => "dora",
MergedEvent::External(_) => "external",
};
return Ok(Some(kind.to_object(py)));
}
pub fn to_py_dict_bound(self, py: Python<'_>) -> PyResult<Py<PyDict>> {
let mut pydict = HashMap::new();
match &self.event {
MergedEvent::Dora(event) => {
let value = match key {
"type" => Some(Self::ty(event).to_object(py)),
"id" => Self::id(event).map(|v| v.to_object(py)),
"value" => self.value(py)?,
"metadata" => Self::metadata(event, py),
"error" => Self::error(event).map(|v| v.to_object(py)),
other => {
return Err(PyLookupError::new_err(format!(
"event has no property `{other}`"
)))
}
if let Some(id) = Self::id(event) {
pydict.insert("id", id.into_py(py));
}
pydict.insert("type", Self::ty(event).to_object(py));
match &self.event {
MergedEvent::Dora(_) => pydict.insert("kind", "dora".to_object(py)),
MergedEvent::External(_) => pydict.insert("kind", "external".to_object(py)),
};
Ok(value)
if let Some(value) = self.value(py)? {
pydict.insert("value", value);
}
if let Some(metadata) = Self::metadata(event, py) {
pydict.insert("metadata", metadata);
}
if let Some(error) = Self::error(event) {
pydict.insert("error", error.to_object(py));
}
}
MergedEvent::External(event) => {
let value = match key {
"value" => event,
_ => todo!(),
};
Ok(Some(value.clone()))
pydict.insert("value", event.clone());
}
}
}
pub fn inner(&mut self) -> Option<&PyObject> {
match &self.event {
MergedEvent::Dora(_) => None,
MergedEvent::External(event) => Some(event),
}
Ok(pydict.into_py_dict_bound(py).unbind())
}
fn __str__(&self) -> PyResult<String> {
Ok(format!("{:#?}", &self.event))
}
}
impl PyEvent {
fn ty(event: &Event) -> &str {
match event {
Event::Stop => "STOP",
@@ -84,9 +68,9 @@ impl PyEvent {
/// Returns the payload of an input event as an arrow array (if any).
fn value(&self, py: Python<'_>) -> PyResult<Option<PyObject>> {
match ( &self.event, &self.data) {
( MergedEvent::Dora(Event::Input { .. }), Some(data) ) => {
// TODO: Does this call leak data?
match &self.event {
MergedEvent::Dora(Event::Input { data, .. }) => {
// TODO: Does this call leak data?&
let array_data = data.to_data().to_pyarrow(py)?;
Ok(Some(array_data))
}
@@ -116,13 +100,8 @@ impl From<Event> for PyEvent {
}
impl From<MergedEvent<PyObject>> for PyEvent {
fn from(mut event: MergedEvent<PyObject>) -> Self {
let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event {
Some(data.clone())
} else {
None
};
Self { event, data }
fn from(event: MergedEvent<PyObject>) -> Self {
Self { event }
}
}