Delay dropping of `DoraNode` in Python until all event data is freedtags/v0.3.6-rc0
| @@ -2526,6 +2526,8 @@ dependencies = [ | |||
| "dora-node-api", | |||
| "eyre", | |||
| "flume 0.10.14", | |||
| "futures", | |||
| "futures-concurrency", | |||
| "pyo3", | |||
| "serde_yaml 0.8.26", | |||
| ] | |||
| @@ -1,12 +1,14 @@ | |||
| #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] | |||
| use std::sync::Arc; | |||
| use std::time::Duration; | |||
| use arrow::pyarrow::{FromPyArrow, ToPyArrow}; | |||
| use dora_node_api::dora_core::config::NodeId; | |||
| use dora_node_api::dora_core::daemon_messages::DataflowId; | |||
| use dora_node_api::merged::{MergeExternalSend, MergedEvent}; | |||
| use dora_node_api::{DoraNode, EventStream}; | |||
| use dora_operator_api_python::{pydict_to_metadata, PyEvent}; | |||
| use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent}; | |||
| use dora_ros2_bridge_python::Ros2Subscription; | |||
| use eyre::Context; | |||
| use futures::{Stream, StreamExt}; | |||
| @@ -30,8 +32,10 @@ use pyo3_special_method_derive::{Dict, Dir, Repr, Str}; | |||
| #[derive(Dir, Dict, Str, Repr)] | |||
| pub struct Node { | |||
| events: Events, | |||
| #[pyo3_smd(skip)] | |||
| pub node: DoraNode, | |||
| node: DelayedCleanup<DoraNode>, | |||
| dataflow_id: DataflowId, | |||
| node_id: NodeId, | |||
| } | |||
| #[pymethods] | |||
| @@ -45,8 +49,20 @@ impl Node { | |||
| DoraNode::init_from_env().context("Couldn not initiate node from environment variable. For dynamic node, please add a node id in the initialization function.")? | |||
| }; | |||
| let dataflow_id = *node.dataflow_id(); | |||
| let node_id = node.id().clone(); | |||
| let node = DelayedCleanup::new(node); | |||
| let events = DelayedCleanup::new(events); | |||
| let cleanup_handle = NodeCleanupHandle { | |||
| _handles: Arc::new((node.handle(), events.handle())), | |||
| }; | |||
| Ok(Node { | |||
| events: Events::Dora(events), | |||
| events: Events { | |||
| inner: EventsInner::Dora(events), | |||
| cleanup_handle, | |||
| }, | |||
| dataflow_id, | |||
| node_id, | |||
| node, | |||
| }) | |||
| } | |||
| @@ -148,10 +164,11 @@ impl Node { | |||
| if let Ok(py_bytes) = data.downcast_bound::<PyBytes>(py) { | |||
| let data = py_bytes.as_bytes(); | |||
| self.node | |||
| .get_mut() | |||
| .send_output_bytes(output_id.into(), parameters, data.len(), data) | |||
| .wrap_err("failed to send output")?; | |||
| } else if let Ok(arrow_array) = arrow::array::ArrayData::from_pyarrow_bound(data.bind(py)) { | |||
| self.node.send_output( | |||
| self.node.get_mut().send_output( | |||
| output_id.into(), | |||
| parameters, | |||
| arrow::array::make_array(arrow_array), | |||
| @@ -168,15 +185,18 @@ impl Node { | |||
| /// This method returns the parsed dataflow YAML file. | |||
| /// | |||
| /// :rtype: dict | |||
| pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result<PyObject> { | |||
| pythonize::pythonize(py, self.node.dataflow_descriptor()) | |||
| pub fn dataflow_descriptor(&mut self, py: Python) -> eyre::Result<PyObject> { | |||
| Ok(pythonize::pythonize( | |||
| py, | |||
| self.node.get_mut().dataflow_descriptor(), | |||
| )?) | |||
| } | |||
| /// Returns the dataflow id. | |||
| /// | |||
| /// :rtype: str | |||
| pub fn dataflow_id(&self) -> String { | |||
| self.node.dataflow_id().to_string() | |||
| self.dataflow_id.to_string() | |||
| } | |||
| /// Merge an external event stream with dora main loop. | |||
| @@ -207,34 +227,46 @@ impl Node { | |||
| // take out the event stream and temporarily replace it with a dummy | |||
| let events = std::mem::replace( | |||
| &mut self.events, | |||
| Events::Merged(Box::new(futures::stream::empty())), | |||
| &mut self.events.inner, | |||
| EventsInner::Merged(Box::new(futures::stream::empty())), | |||
| ); | |||
| // update self.events with the merged stream | |||
| self.events = Events::Merged(events.merge_external_send(Box::pin(stream))); | |||
| self.events.inner = EventsInner::Merged(events.merge_external_send(Box::pin(stream))); | |||
| Ok(()) | |||
| } | |||
| } | |||
| enum Events { | |||
| Dora(EventStream), | |||
| Merged(Box<dyn Stream<Item = MergedEvent<PyObject>> + Unpin + Send>), | |||
| struct Events { | |||
| inner: EventsInner, | |||
| cleanup_handle: NodeCleanupHandle, | |||
| } | |||
| impl Events { | |||
| fn recv(&mut self, timeout: Option<Duration>) -> Option<PyEvent> { | |||
| match self { | |||
| Events::Dora(events) => match timeout { | |||
| Some(timeout) => events.recv_timeout(timeout).map(PyEvent::from), | |||
| None => events.recv().map(PyEvent::from), | |||
| let event = match &mut self.inner { | |||
| EventsInner::Dora(events) => match timeout { | |||
| Some(timeout) => events | |||
| .get_mut() | |||
| .recv_timeout(timeout) | |||
| .map(MergedEvent::Dora), | |||
| None => events.get_mut().recv().map(MergedEvent::Dora), | |||
| }, | |||
| Events::Merged(events) => futures::executor::block_on(events.next()).map(PyEvent::from), | |||
| } | |||
| EventsInner::Merged(events) => futures::executor::block_on(events.next()), | |||
| }; | |||
| event.map(|event| PyEvent { | |||
| event, | |||
| _cleanup: Some(self.cleanup_handle.clone()), | |||
| }) | |||
| } | |||
| } | |||
| impl<'a> MergeExternalSend<'a, PyObject> for Events { | |||
| enum EventsInner { | |||
| Dora(DelayedCleanup<EventStream>), | |||
| Merged(Box<dyn Stream<Item = MergedEvent<PyObject>> + Unpin + Send>), | |||
| } | |||
| impl<'a> MergeExternalSend<'a, PyObject> for EventsInner { | |||
| type Item = MergedEvent<PyObject>; | |||
| fn merge_external_send( | |||
| @@ -242,8 +274,8 @@ impl<'a> MergeExternalSend<'a, PyObject> for Events { | |||
| external_events: impl Stream<Item = PyObject> + Unpin + Send + 'a, | |||
| ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + 'a> { | |||
| match self { | |||
| Events::Dora(events) => events.merge_external_send(external_events), | |||
| Events::Merged(events) => { | |||
| EventsInner::Dora(events) => events.merge_external_send(external_events), | |||
| EventsInner::Merged(events) => { | |||
| let merged = events.merge_external_send(external_events); | |||
| Box::new(merged.map(|event| match event { | |||
| MergedEvent::Dora(e) => MergedEvent::Dora(e), | |||
| @@ -256,7 +288,7 @@ impl<'a> MergeExternalSend<'a, PyObject> for Events { | |||
| impl Node { | |||
| pub fn id(&self) -> String { | |||
| self.node.id().to_string() | |||
| self.node_id.to_string() | |||
| } | |||
| } | |||
| @@ -18,3 +18,5 @@ flume = "0.10.14" | |||
| arrow = { workspace = true, features = ["pyarrow"] } | |||
| arrow-schema = { workspace = true } | |||
| aligned-vec = "0.5.0" | |||
| futures = "0.3.28" | |||
| futures-concurrency = "7.3.0" | |||
| @@ -1,8 +1,16 @@ | |||
| use std::collections::HashMap; | |||
| use std::{ | |||
| collections::HashMap, | |||
| sync::{Arc, Mutex}, | |||
| }; | |||
| use arrow::pyarrow::ToPyArrow; | |||
| use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters}; | |||
| use dora_node_api::{ | |||
| merged::{MergeExternalSend, MergedEvent}, | |||
| DoraNode, Event, EventStream, Metadata, MetadataParameters, | |||
| }; | |||
| use eyre::{Context, Result}; | |||
| use futures::{Stream, StreamExt}; | |||
| use futures_concurrency::stream::Merge as _; | |||
| use pyo3::{ | |||
| prelude::*, | |||
| pybacked::PyBackedStr, | |||
| @@ -10,11 +18,65 @@ use pyo3::{ | |||
| }; | |||
| /// Dora Event | |||
| #[derive(Debug)] | |||
| pub struct PyEvent { | |||
| event: MergedEvent<PyObject>, | |||
| pub event: MergedEvent<PyObject>, | |||
| pub _cleanup: Option<NodeCleanupHandle>, | |||
| } | |||
| /// Keeps the dora node alive until all event objects have been dropped. | |||
| #[derive(Clone)] | |||
| #[pyclass] | |||
| pub struct NodeCleanupHandle { | |||
| pub _handles: Arc<(CleanupHandle<DoraNode>, CleanupHandle<EventStream>)>, | |||
| } | |||
| /// Owned type with delayed cleanup (using `handle` method). | |||
| pub struct DelayedCleanup<T>(Arc<Mutex<T>>); | |||
| impl<T> DelayedCleanup<T> { | |||
| pub fn new(value: T) -> Self { | |||
| Self(Arc::new(Mutex::new(value))) | |||
| } | |||
| pub fn handle(&self) -> CleanupHandle<T> { | |||
| CleanupHandle(self.0.clone()) | |||
| } | |||
| pub fn get_mut(&mut self) -> std::sync::MutexGuard<T> { | |||
| self.0.try_lock().expect("failed to lock DelayedCleanup") | |||
| } | |||
| } | |||
| impl Stream for DelayedCleanup<EventStream> { | |||
| type Item = Event; | |||
| fn poll_next( | |||
| self: std::pin::Pin<&mut Self>, | |||
| cx: &mut std::task::Context<'_>, | |||
| ) -> std::task::Poll<Option<Self::Item>> { | |||
| let mut inner: std::sync::MutexGuard<'_, EventStream> = self.get_mut().get_mut(); | |||
| inner.poll_next_unpin(cx) | |||
| } | |||
| } | |||
| impl<'a, E> MergeExternalSend<'a, E> for DelayedCleanup<EventStream> | |||
| where | |||
| E: 'static, | |||
| { | |||
| type Item = MergedEvent<E>; | |||
| fn merge_external_send( | |||
| self, | |||
| external_events: impl Stream<Item = E> + Unpin + Send + 'a, | |||
| ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + 'a> { | |||
| let dora = self.map(MergedEvent::Dora); | |||
| let external = external_events.map(MergedEvent::External); | |||
| Box::new((dora, external).merge()) | |||
| } | |||
| } | |||
| pub struct CleanupHandle<T>(Arc<Mutex<T>>); | |||
| impl PyEvent { | |||
| pub fn to_py_dict(self, py: Python<'_>) -> PyResult<Py<PyDict>> { | |||
| let mut pydict = HashMap::new(); | |||
| @@ -44,6 +106,10 @@ impl PyEvent { | |||
| } | |||
| } | |||
| if let Some(cleanup) = self._cleanup.clone() { | |||
| pydict.insert("_cleanup", cleanup.into_py(py)); | |||
| } | |||
| Ok(pydict.into_py_dict_bound(py).unbind()) | |||
| } | |||
| @@ -92,18 +158,6 @@ impl PyEvent { | |||
| } | |||
| } | |||
| impl From<Event> for PyEvent { | |||
| fn from(event: Event) -> Self { | |||
| Self::from(MergedEvent::Dora(event)) | |||
| } | |||
| } | |||
| impl From<MergedEvent<PyObject>> for PyEvent { | |||
| fn from(event: MergedEvent<PyObject>) -> Self { | |||
| Self { event } | |||
| } | |||
| } | |||
| pub fn pydict_to_metadata(dict: Option<Bound<'_, PyDict>>) -> Result<MetadataParameters> { | |||
| let mut default_metadata = MetadataParameters::default(); | |||
| if let Some(metadata) = dict { | |||
| @@ -220,14 +220,14 @@ fn report_remaining_drop_tokens( | |||
| let mut still_pending = Vec::new(); | |||
| for (token, rx, since, _) in pending_drop_tokens.drain(..) { | |||
| match rx.recv_timeout(Duration::from_millis(50)) { | |||
| match rx.recv_timeout(Duration::from_millis(100)) { | |||
| Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), | |||
| Err(flume::RecvTimeoutError::Disconnected) => { | |||
| // the event was dropped -> add the drop token to the list | |||
| drop_tokens.push(token); | |||
| } | |||
| Err(flume::RecvTimeoutError::Timeout) => { | |||
| let duration = Duration::from_millis(200); | |||
| let duration = Duration::from_secs(30); | |||
| if since.elapsed() > duration { | |||
| tracing::warn!( | |||
| "timeout: node finished, but token {token:?} was still not \ | |||
| @@ -401,7 +401,7 @@ impl Drop for DoraNode { | |||
| ); | |||
| } | |||
| match self.drop_stream.recv_timeout(Duration::from_millis(500)) { | |||
| match self.drop_stream.recv_timeout(Duration::from_secs(10)) { | |||
| Ok(token) => { | |||
| self.sent_out_shared_memory.remove(&token); | |||
| } | |||
| @@ -1600,7 +1600,7 @@ impl RunningDataflow { | |||
| let running_nodes = self.running_nodes.clone(); | |||
| let grace_duration_kills = self.grace_duration_kills.clone(); | |||
| tokio::spawn(async move { | |||
| let duration = grace_duration.unwrap_or(Duration::from_millis(2000)); | |||
| let duration = grace_duration.unwrap_or(Duration::from_millis(15000)); | |||
| tokio::time::sleep(duration).await; | |||
| let mut system = sysinfo::System::new(); | |||
| system.refresh_processes(); | |||
| @@ -6,7 +6,7 @@ use dora_core::{ | |||
| descriptor::{source_is_url, Descriptor, PythonSource}, | |||
| }; | |||
| use dora_download::download_file; | |||
| use dora_node_api::Event; | |||
| use dora_node_api::{merged::MergedEvent, Event}; | |||
| use dora_operator_api_python::PyEvent; | |||
| use dora_operator_api_types::DoraStatus; | |||
| use eyre::{bail, eyre, Context, Result}; | |||
| @@ -208,9 +208,12 @@ pub fn run( | |||
| metadata.parameters.open_telemetry_context = string_cx; | |||
| } | |||
| let py_event = PyEvent::from(event) | |||
| .to_py_dict(py) | |||
| .context("Could not convert event to pydict bound")?; | |||
| let py_event = PyEvent { | |||
| event: MergedEvent::Dora(event), | |||
| _cleanup: None, | |||
| } | |||
| .to_py_dict(py) | |||
| .context("Could not convert event to pydict bound")?; | |||
| let status_enum = operator | |||
| .call_method1(py, "on_event", (py_event, send_output.clone())) | |||