diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index ad676943..02840c9e 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -23,6 +23,7 @@ flume = "0.10.14" dora-runtime = { workspace = true, features = ["tracing", "python"] } arrow = { version = "35.0.0", features = ["pyarrow"] } pythonize = "0.18.0" +futures = "0.3.28" [lib] name = "dora" diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index af77bf08..9a669c89 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,8 +1,10 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] -use dora_node_api::{DoraNode, EventStream}; +use dora_node_api::merged::MergedEvent; +use dora_node_api::{merged::MergeExternal, DoraNode, EventStream}; use dora_operator_api_python::{process_python_output, pydict_to_metadata, PyEvent}; -use eyre::Context; +use eyre::{Context, ContextCompat}; +use futures::{Stream, StreamExt}; use pyo3::prelude::*; use pyo3::types::PyDict; @@ -19,7 +21,7 @@ use pyo3::types::PyDict; /// #[pyclass] pub struct Node { - events: EventStream, + events: Events, node: DoraNode, } @@ -29,7 +31,10 @@ impl Node { pub fn new() -> eyre::Result { let (node, events) = DoraNode::init_from_env()?; - Ok(Node { events, node }) + Ok(Node { + events: Events::Dora(events), + node, + }) } /// `.next()` gives you the next input that the node has received. @@ -52,7 +57,7 @@ impl Node { pub fn __next__(&mut self, py: Python) -> PyResult> { let event = py.allow_threads(|| self.events.recv()); - Ok(event.map(PyEvent::from)) + Ok(event) } fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { @@ -90,6 +95,60 @@ impl Node { pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result { pythonize::pythonize(py, self.node.dataflow_descriptor()) } + + pub fn merge_external_events( + &mut self, + external_events: &mut ExternalEventStream, + ) -> eyre::Result<()> { + // take out the event stream and temporarily replace it with a dummy + let events = std::mem::replace( + &mut self.events, + Events::Merged(Box::new(futures::stream::empty())), + ); + // update self.events with the merged stream + self.events = Events::Merged(events.merge_external(Box::pin( + external_events.0.take().context("stream already taken")?, + ))); + + Ok(()) + } +} + +#[pyclass] +pub struct ExternalEventStream(pub Option + Unpin + Send>>); + +enum Events { + Dora(EventStream), + Merged(Box> + Unpin + Send>), +} + +impl Events { + fn recv(&mut self) -> Option { + match self { + Events::Dora(events) => events.recv().map(PyEvent::from), + Events::Merged(events) => futures::executor::block_on(events.next()).map(PyEvent::from), + } + } +} + +impl<'a> MergeExternal<'a, PyObject> for Events { + type Item = MergedEvent; + + fn merge_external( + self, + external_events: impl Stream + Send + Unpin + 'a, + ) -> Box + Send + Unpin + 'a> { + match self { + Events::Dora(events) => events.merge_external(external_events), + Events::Merged(events) => { + let merged = events.merge_external(external_events); + Box::new(merged.map(|event| match event { + MergedEvent::Dora(e) => MergedEvent::Dora(e), + MergedEvent::External(e) => MergedEvent::External(e.flatten()), + })) + } + } + } } impl Node { diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 38b26893..5e2091e9 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,7 +1,7 @@ use std::{borrow::Cow, sync::Arc}; use arrow::pyarrow::PyArrowConvert; -use dora_node_api::{Data, Event, Metadata, MetadataParameters}; +use dora_node_api::{merged::MergedEvent, Data, Event, Metadata, MetadataParameters}; use eyre::{Context, Result}; use pyo3::{ exceptions::PyLookupError, @@ -11,33 +11,38 @@ use pyo3::{ #[pyclass] pub struct PyEvent { - event: Event, + event: MergedEvent, data: Option>, } #[pymethods] impl PyEvent { pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult> { - let value = match key { - "type" => Some(self.ty().to_object(py)), - "id" => self.id().map(|v| v.to_object(py)), - "data" => self.data(py), - "value" => self.value(py)?, - "metadata" => self.metadata(py), - "error" => self.error().map(|v| v.to_object(py)), - other => { - return Err(PyLookupError::new_err(format!( - "event has no property `{other}`" - ))) + match &self.event { + MergedEvent::Dora(event) => { + let value = match key { + "type" => Some(Self::ty(event).to_object(py)), + "id" => Self::id(event).map(|v| v.to_object(py)), + "data" => self.data(py), + "value" => self.value(py)?, + "metadata" => Self::metadata(event, py), + "error" => Self::error(event).map(|v| v.to_object(py)), + other => { + return Err(PyLookupError::new_err(format!( + "event has no property `{other}`" + ))) + } + }; + Ok(value) } - }; - Ok(value) + MergedEvent::External(event) => event.call_method0(py, "__getitem__").map(Some), + } } } impl PyEvent { - fn ty(&self) -> &str { - match &self.event { + fn ty(event: &Event) -> &str { + match event { Event::Stop => "STOP", Event::Input { .. } => "INPUT", Event::InputClosed { .. } => "INPUT_CLOSED", @@ -46,8 +51,8 @@ impl PyEvent { } } - fn id(&self) -> Option<&str> { - match &self.event { + fn id(event: &Event) -> Option<&str> { + match event { Event::Input { id, .. } => Some(id), Event::InputClosed { id } => Some(id), _ => None, @@ -74,15 +79,15 @@ impl PyEvent { Ok(None) } - fn metadata(&self, py: Python<'_>) -> Option { - match &self.event { + fn metadata(event: &Event, py: Python<'_>) -> Option { + match event { Event::Input { metadata, .. } => Some(metadata_to_pydict(metadata, py).to_object(py)), _ => None, } } - fn error(&self) -> Option<&str> { - match &self.event { + fn error(event: &Event) -> Option<&str> { + match event { Event::Error(error) => Some(error), _other => None, } @@ -90,8 +95,14 @@ impl PyEvent { } impl From for PyEvent { - fn from(mut event: Event) -> Self { - let data = if let Event::Input { data, .. } = &mut event { + fn from(event: Event) -> Self { + Self::from(MergedEvent::Dora(event)) + } +} + +impl From> for PyEvent { + fn from(mut event: MergedEvent) -> Self { + let data = if let MergedEvent::Dora(Event::Input { data, .. }) = &mut event { data.take().map(Arc::new) } else { None diff --git a/apis/rust/node/src/event_stream/merged.rs b/apis/rust/node/src/event_stream/merged.rs index b9a10aa2..e622c856 100644 --- a/apis/rust/node/src/event_stream/merged.rs +++ b/apis/rust/node/src/event_stream/merged.rs @@ -11,13 +11,22 @@ pub enum Either { Second(B), } +impl Either { + pub fn flatten(self) -> A { + match self { + Either::First(a) => a, + Either::Second(a) => a, + } + } +} + pub trait MergeExternal<'a, E> { type Item; fn merge_external( self, - external_events: impl Stream + Unpin + 'a, - ) -> Box + Unpin + 'a>; + external_events: impl Stream + Send + Unpin + 'a, + ) -> Box + Send + Unpin + 'a>; } impl<'a, E> MergeExternal<'a, E> for super::EventStream @@ -28,8 +37,8 @@ where fn merge_external( self, - external_events: impl Stream + Unpin + 'a, - ) -> Box + Unpin + 'a> { + external_events: impl Stream + Send + Unpin + 'a, + ) -> Box + Send + Unpin + 'a> { let dora = self.map(MergedEvent::Dora); let external = external_events.map(MergedEvent::External); Box::new((dora, external).merge()) @@ -38,7 +47,7 @@ where impl<'a, E, F, S> MergeExternal<'a, F> for S where - S: Stream> + Unpin + 'a, + S: Stream> + Send + Unpin + 'a, E: 'a, F: 'a, { @@ -46,8 +55,8 @@ where fn merge_external( self, - external_events: impl Stream + Unpin + 'a, - ) -> Box + Unpin + 'a> { + external_events: impl Stream + Send + Unpin + 'a, + ) -> Box + Send + Unpin + 'a> { let first = self.map(|e| match e { MergedEvent::Dora(d) => MergedEvent::Dora(d), MergedEvent::External(e) => MergedEvent::External(Either::First(e)),