Browse Source

Implement event stream merging for Python

tags/v0.2.5-alpha
Philipp Oppermann 2 years ago
parent
commit
7a21d965a9
Failed to extract signature
4 changed files with 117 additions and 37 deletions
  1. +1
    -0
      apis/python/node/Cargo.toml
  2. +64
    -5
      apis/python/node/src/lib.rs
  3. +36
    -25
      apis/python/operator/src/lib.rs
  4. +16
    -7
      apis/rust/node/src/event_stream/merged.rs

+ 1
- 0
apis/python/node/Cargo.toml View File

@@ -23,6 +23,7 @@ flume = "0.10.14"
dora-runtime = { workspace = true, features = ["tracing", "python"] }
arrow = { version = "35.0.0", features = ["pyarrow"] }
pythonize = "0.18.0"
futures = "0.3.28"

[lib]
name = "dora"


+ 64
- 5
apis/python/node/src/lib.rs View File

@@ -1,8 +1,10 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use dora_node_api::{DoraNode, EventStream};
use dora_node_api::merged::MergedEvent;
use dora_node_api::{merged::MergeExternal, DoraNode, EventStream};
use dora_operator_api_python::{process_python_output, pydict_to_metadata, PyEvent};
use eyre::Context;
use eyre::{Context, ContextCompat};
use futures::{Stream, StreamExt};
use pyo3::prelude::*;
use pyo3::types::PyDict;

@@ -19,7 +21,7 @@ use pyo3::types::PyDict;
///
#[pyclass]
pub struct Node {
events: EventStream,
events: Events,
node: DoraNode,
}

@@ -29,7 +31,10 @@ impl Node {
pub fn new() -> eyre::Result<Self> {
let (node, events) = DoraNode::init_from_env()?;

Ok(Node { events, node })
Ok(Node {
events: Events::Dora(events),
node,
})
}

/// `.next()` gives you the next input that the node has received.
@@ -52,7 +57,7 @@ impl Node {

pub fn __next__(&mut self, py: Python) -> PyResult<Option<PyEvent>> {
let event = py.allow_threads(|| self.events.recv());
Ok(event.map(PyEvent::from))
Ok(event)
}

fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
@@ -90,6 +95,60 @@ impl Node {
pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result<PyObject> {
pythonize::pythonize(py, self.node.dataflow_descriptor())
}

pub fn merge_external_events(
&mut self,
external_events: &mut ExternalEventStream,
) -> eyre::Result<()> {
// take out the event stream and temporarily replace it with a dummy
let events = std::mem::replace(
&mut self.events,
Events::Merged(Box::new(futures::stream::empty())),
);
// update self.events with the merged stream
self.events = Events::Merged(events.merge_external(Box::pin(
external_events.0.take().context("stream already taken")?,
)));

Ok(())
}
}

#[pyclass]
pub struct ExternalEventStream(pub Option<Box<dyn Stream<Item = PyObject> + Unpin + Send>>);

enum Events {
Dora(EventStream),
Merged(Box<dyn Stream<Item = MergedEvent<PyObject>> + Unpin + Send>),
}

impl Events {
fn recv(&mut self) -> Option<PyEvent> {
match self {
Events::Dora(events) => events.recv().map(PyEvent::from),
Events::Merged(events) => futures::executor::block_on(events.next()).map(PyEvent::from),
}
}
}

impl<'a> MergeExternal<'a, PyObject> for Events {
type Item = MergedEvent<PyObject>;

fn merge_external(
self,
external_events: impl Stream<Item = PyObject> + Send + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Send + Unpin + 'a> {
match self {
Events::Dora(events) => events.merge_external(external_events),
Events::Merged(events) => {
let merged = events.merge_external(external_events);
Box::new(merged.map(|event| match event {
MergedEvent::Dora(e) => MergedEvent::Dora(e),
MergedEvent::External(e) => MergedEvent::External(e.flatten()),
}))
}
}
}
}

impl Node {


+ 36
- 25
apis/python/operator/src/lib.rs View File

@@ -1,7 +1,7 @@
use std::{borrow::Cow, sync::Arc};

use arrow::pyarrow::PyArrowConvert;
use dora_node_api::{Data, Event, Metadata, MetadataParameters};
use dora_node_api::{merged::MergedEvent, Data, Event, Metadata, MetadataParameters};
use eyre::{Context, Result};
use pyo3::{
exceptions::PyLookupError,
@@ -11,33 +11,38 @@ use pyo3::{

#[pyclass]
pub struct PyEvent {
event: Event,
event: MergedEvent<PyObject>,
data: Option<Arc<Data>>,
}

#[pymethods]
impl PyEvent {
pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult<Option<PyObject>> {
let value = match key {
"type" => Some(self.ty().to_object(py)),
"id" => self.id().map(|v| v.to_object(py)),
"data" => self.data(py),
"value" => self.value(py)?,
"metadata" => self.metadata(py),
"error" => self.error().map(|v| v.to_object(py)),
other => {
return Err(PyLookupError::new_err(format!(
"event has no property `{other}`"
)))
match &self.event {
MergedEvent::Dora(event) => {
let value = match key {
"type" => Some(Self::ty(event).to_object(py)),
"id" => Self::id(event).map(|v| v.to_object(py)),
"data" => self.data(py),
"value" => self.value(py)?,
"metadata" => Self::metadata(event, py),
"error" => Self::error(event).map(|v| v.to_object(py)),
other => {
return Err(PyLookupError::new_err(format!(
"event has no property `{other}`"
)))
}
};
Ok(value)
}
};
Ok(value)
MergedEvent::External(event) => event.call_method0(py, "__getitem__").map(Some),
}
}
}

impl PyEvent {
fn ty(&self) -> &str {
match &self.event {
fn ty(event: &Event) -> &str {
match event {
Event::Stop => "STOP",
Event::Input { .. } => "INPUT",
Event::InputClosed { .. } => "INPUT_CLOSED",
@@ -46,8 +51,8 @@ impl PyEvent {
}
}

fn id(&self) -> Option<&str> {
match &self.event {
fn id(event: &Event) -> Option<&str> {
match event {
Event::Input { id, .. } => Some(id),
Event::InputClosed { id } => Some(id),
_ => None,
@@ -74,15 +79,15 @@ impl PyEvent {
Ok(None)
}

fn metadata(&self, py: Python<'_>) -> Option<PyObject> {
match &self.event {
fn metadata(event: &Event, py: Python<'_>) -> Option<PyObject> {
match event {
Event::Input { metadata, .. } => Some(metadata_to_pydict(metadata, py).to_object(py)),
_ => None,
}
}

fn error(&self) -> Option<&str> {
match &self.event {
fn error(event: &Event) -> Option<&str> {
match event {
Event::Error(error) => Some(error),
_other => None,
}
@@ -90,8 +95,14 @@ impl PyEvent {
}

impl From<Event> for PyEvent {
fn from(mut event: Event) -> Self {
let data = if let Event::Input { data, .. } = &mut event {
fn from(event: Event) -> Self {
Self::from(MergedEvent::Dora(event))
}
}

impl From<MergedEvent<PyObject>> for PyEvent {
fn from(mut event: MergedEvent<PyObject>) -> Self {
let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event {
data.take().map(Arc::new)
} else {
None


+ 16
- 7
apis/rust/node/src/event_stream/merged.rs View File

@@ -11,13 +11,22 @@ pub enum Either<A, B> {
Second(B),
}

impl<A> Either<A, A> {
pub fn flatten(self) -> A {
match self {
Either::First(a) => a,
Either::Second(a) => a,
}
}
}

pub trait MergeExternal<'a, E> {
type Item;

fn merge_external(
self,
external_events: impl Stream<Item = E> + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>;
external_events: impl Stream<Item = E> + Send + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Send + Unpin + 'a>;
}

impl<'a, E> MergeExternal<'a, E> for super::EventStream
@@ -28,8 +37,8 @@ where

fn merge_external(
self,
external_events: impl Stream<Item = E> + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
external_events: impl Stream<Item = E> + Send + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Send + Unpin + 'a> {
let dora = self.map(MergedEvent::Dora);
let external = external_events.map(MergedEvent::External);
Box::new((dora, external).merge())
@@ -38,7 +47,7 @@ where

impl<'a, E, F, S> MergeExternal<'a, F> for S
where
S: Stream<Item = MergedEvent<E>> + Unpin + 'a,
S: Stream<Item = MergedEvent<E>> + Send + Unpin + 'a,
E: 'a,
F: 'a,
{
@@ -46,8 +55,8 @@ where

fn merge_external(
self,
external_events: impl Stream<Item = F> + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
external_events: impl Stream<Item = F> + Send + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Send + Unpin + 'a> {
let first = self.map(|e| match e {
MergedEvent::Dora(d) => MergedEvent::Dora(d),
MergedEvent::External(e) => MergedEvent::External(Either::First(e)),


Loading…
Cancel
Save