From 9cec8df1ccabf7baf8e181085deba22ddee3a3a6 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Sun, 21 Jul 2024 15:48:28 +0200 Subject: [PATCH] Delay dropping of `DoraNode` in Python until all event data is freed When dropping the `DoraNode`, it waits for the remaining drop tokens. This only works if all the dora events were already dropped before. With the Python GC, this is not guaranteed as some events might still be live on the heap (the user might even use them later). In such cases, we waited until we ran into a timeout, which resulted in very long exit times (see https://github.com/dora-rs/dora/issues/598). This commit fixes this issue by adding a reference-counted copy of the `DoraNode` and `EventStream` to every event given to Python. This way, we can ensure that the underlying `DoraNode` is only dropped after the last event reference has been freed. --- Cargo.lock | 2 + apis/python/node/src/lib.rs | 80 ++++++++++++++------- apis/python/operator/Cargo.toml | 2 + apis/python/operator/src/lib.rs | 85 ++++++++++++++++++----- apis/rust/node/src/event_stream/thread.rs | 4 +- apis/rust/node/src/node/mod.rs | 2 +- binaries/daemon/src/lib.rs | 2 +- binaries/runtime/src/operator/python.rs | 11 +-- 8 files changed, 140 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 08eb932a..fee0b0c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2526,6 +2526,8 @@ dependencies = [ "dora-node-api", "eyre", "flume 0.10.14", + "futures", + "futures-concurrency", "pyo3", "serde_yaml 0.8.26", ] diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index b1d678f8..bb9f5e0e 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,12 +1,14 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] +use std::sync::Arc; use std::time::Duration; use arrow::pyarrow::{FromPyArrow, ToPyArrow}; use dora_node_api::dora_core::config::NodeId; +use dora_node_api::dora_core::daemon_messages::DataflowId; use dora_node_api::merged::{MergeExternalSend, MergedEvent}; use dora_node_api::{DoraNode, EventStream}; -use dora_operator_api_python::{pydict_to_metadata, PyEvent}; +use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent}; use dora_ros2_bridge_python::Ros2Subscription; use eyre::Context; use futures::{Stream, StreamExt}; @@ -30,8 +32,10 @@ use pyo3_special_method_derive::{Dict, Dir, Repr, Str}; #[derive(Dir, Dict, Str, Repr)] pub struct Node { events: Events, - #[pyo3_smd(skip)] - pub node: DoraNode, + node: DelayedCleanup, + + dataflow_id: DataflowId, + node_id: NodeId, } #[pymethods] @@ -45,8 +49,20 @@ impl Node { DoraNode::init_from_env().context("Couldn not initiate node from environment variable. For dynamic node, please add a node id in the initialization function.")? }; + let dataflow_id = *node.dataflow_id(); + let node_id = node.id().clone(); + let node = DelayedCleanup::new(node); + let events = DelayedCleanup::new(events); + let cleanup_handle = NodeCleanupHandle { + _handles: Arc::new((node.handle(), events.handle())), + }; Ok(Node { - events: Events::Dora(events), + events: Events { + inner: EventsInner::Dora(events), + cleanup_handle, + }, + dataflow_id, + node_id, node, }) } @@ -148,10 +164,11 @@ impl Node { if let Ok(py_bytes) = data.downcast_bound::(py) { let data = py_bytes.as_bytes(); self.node + .get_mut() .send_output_bytes(output_id.into(), parameters, data.len(), data) .wrap_err("failed to send output")?; } else if let Ok(arrow_array) = arrow::array::ArrayData::from_pyarrow_bound(data.bind(py)) { - self.node.send_output( + self.node.get_mut().send_output( output_id.into(), parameters, arrow::array::make_array(arrow_array), @@ -168,15 +185,18 @@ impl Node { /// This method returns the parsed dataflow YAML file. /// /// :rtype: dict - pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result { - pythonize::pythonize(py, self.node.dataflow_descriptor()) + pub fn dataflow_descriptor(&mut self, py: Python) -> eyre::Result { + Ok(pythonize::pythonize( + py, + self.node.get_mut().dataflow_descriptor(), + )?) } /// Returns the dataflow id. /// /// :rtype: str pub fn dataflow_id(&self) -> String { - self.node.dataflow_id().to_string() + self.dataflow_id.to_string() } /// Merge an external event stream with dora main loop. @@ -207,34 +227,46 @@ impl Node { // take out the event stream and temporarily replace it with a dummy let events = std::mem::replace( - &mut self.events, - Events::Merged(Box::new(futures::stream::empty())), + &mut self.events.inner, + EventsInner::Merged(Box::new(futures::stream::empty())), ); // update self.events with the merged stream - self.events = Events::Merged(events.merge_external_send(Box::pin(stream))); + self.events.inner = EventsInner::Merged(events.merge_external_send(Box::pin(stream))); Ok(()) } } -enum Events { - Dora(EventStream), - Merged(Box> + Unpin + Send>), +struct Events { + inner: EventsInner, + cleanup_handle: NodeCleanupHandle, } impl Events { fn recv(&mut self, timeout: Option) -> Option { - match self { - Events::Dora(events) => match timeout { - Some(timeout) => events.recv_timeout(timeout).map(PyEvent::from), - None => events.recv().map(PyEvent::from), + let event = match &mut self.inner { + EventsInner::Dora(events) => match timeout { + Some(timeout) => events + .get_mut() + .recv_timeout(timeout) + .map(MergedEvent::Dora), + None => events.get_mut().recv().map(MergedEvent::Dora), }, - Events::Merged(events) => futures::executor::block_on(events.next()).map(PyEvent::from), - } + EventsInner::Merged(events) => futures::executor::block_on(events.next()), + }; + event.map(|event| PyEvent { + event, + _cleanup: Some(self.cleanup_handle.clone()), + }) } } -impl<'a> MergeExternalSend<'a, PyObject> for Events { +enum EventsInner { + Dora(DelayedCleanup), + Merged(Box> + Unpin + Send>), +} + +impl<'a> MergeExternalSend<'a, PyObject> for EventsInner { type Item = MergedEvent; fn merge_external_send( @@ -242,8 +274,8 @@ impl<'a> MergeExternalSend<'a, PyObject> for Events { external_events: impl Stream + Unpin + Send + 'a, ) -> Box + Unpin + Send + 'a> { match self { - Events::Dora(events) => events.merge_external_send(external_events), - Events::Merged(events) => { + EventsInner::Dora(events) => events.merge_external_send(external_events), + EventsInner::Merged(events) => { let merged = events.merge_external_send(external_events); Box::new(merged.map(|event| match event { MergedEvent::Dora(e) => MergedEvent::Dora(e), @@ -256,7 +288,7 @@ impl<'a> MergeExternalSend<'a, PyObject> for Events { impl Node { pub fn id(&self) -> String { - self.node.id().to_string() + self.node_id.to_string() } } diff --git a/apis/python/operator/Cargo.toml b/apis/python/operator/Cargo.toml index 07cef1f0..a6dd2180 100644 --- a/apis/python/operator/Cargo.toml +++ b/apis/python/operator/Cargo.toml @@ -18,3 +18,5 @@ flume = "0.10.14" arrow = { workspace = true, features = ["pyarrow"] } arrow-schema = { workspace = true } aligned-vec = "0.5.0" +futures = "0.3.28" +futures-concurrency = "7.3.0" diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 2c027e9a..481b0be9 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,8 +1,16 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; use arrow::pyarrow::ToPyArrow; -use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters}; +use dora_node_api::{ + merged::{MergeExternalSend, MergedEvent}, + DoraNode, Event, EventStream, Metadata, MetadataParameters, +}; use eyre::{Context, Result}; +use futures::{Stream, StreamExt}; +use futures_concurrency::stream::Merge as _; use pyo3::{ prelude::*, pybacked::PyBackedStr, @@ -10,11 +18,64 @@ use pyo3::{ }; /// Dora Event -#[derive(Debug)] pub struct PyEvent { - event: MergedEvent, + pub event: MergedEvent, + pub _cleanup: Option, +} + +/// Keeps the dora node alive until all event objects have been dropped. +#[derive(Clone)] +#[pyclass] +pub struct NodeCleanupHandle { + pub _handles: Arc<(CleanupHandle, CleanupHandle)>, } +/// Owned type with delayed cleanup (using `handle` method). +pub struct DelayedCleanup(Arc>); + +impl DelayedCleanup { + pub fn new(value: T) -> Self { + Self(Arc::new(Mutex::new(value))) + } + + pub fn handle(&self) -> CleanupHandle { + CleanupHandle(self.0.clone()) + } + + pub fn get_mut(&mut self) -> std::sync::MutexGuard { + self.0.try_lock().expect("failed to lock DelayedCleanup") + } +} + +impl Stream for DelayedCleanup { + type Item = Event; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_mut().poll_next_unpin(cx) + } +} + +impl<'a, E> MergeExternalSend<'a, E> for DelayedCleanup +where + E: 'static, +{ + type Item = MergedEvent; + + fn merge_external_send( + self, + external_events: impl Stream + Unpin + Send + 'a, + ) -> Box + Unpin + Send + 'a> { + let dora = self.map(MergedEvent::Dora); + let external = external_events.map(MergedEvent::External); + Box::new((dora, external).merge()) + } +} + +pub struct CleanupHandle(Arc>); + impl PyEvent { pub fn to_py_dict(self, py: Python<'_>) -> PyResult> { let mut pydict = HashMap::new(); @@ -44,6 +105,10 @@ impl PyEvent { } } + if let Some(cleanup) = self._cleanup.clone() { + pydict.insert("_cleanup", cleanup.into_py(py)); + } + Ok(pydict.into_py_dict_bound(py).unbind()) } @@ -92,18 +157,6 @@ impl PyEvent { } } -impl From for PyEvent { - fn from(event: Event) -> Self { - Self::from(MergedEvent::Dora(event)) - } -} - -impl From> for PyEvent { - fn from(event: MergedEvent) -> Self { - Self { event } - } -} - pub fn pydict_to_metadata(dict: Option>) -> Result { let mut default_metadata = MetadataParameters::default(); if let Some(metadata) = dict { diff --git a/apis/rust/node/src/event_stream/thread.rs b/apis/rust/node/src/event_stream/thread.rs index d34e2ab8..bee8cc22 100644 --- a/apis/rust/node/src/event_stream/thread.rs +++ b/apis/rust/node/src/event_stream/thread.rs @@ -220,14 +220,14 @@ fn report_remaining_drop_tokens( let mut still_pending = Vec::new(); for (token, rx, since, _) in pending_drop_tokens.drain(..) { - match rx.recv_timeout(Duration::from_millis(50)) { + match rx.recv_timeout(Duration::from_millis(100)) { Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Err(flume::RecvTimeoutError::Disconnected) => { // the event was dropped -> add the drop token to the list drop_tokens.push(token); } Err(flume::RecvTimeoutError::Timeout) => { - let duration = Duration::from_millis(200); + let duration = Duration::from_secs(30); if since.elapsed() > duration { tracing::warn!( "timeout: node finished, but token {token:?} was still not \ diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 277084d3..9eb4b18e 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -401,7 +401,7 @@ impl Drop for DoraNode { ); } - match self.drop_stream.recv_timeout(Duration::from_millis(500)) { + match self.drop_stream.recv_timeout(Duration::from_secs(10)) { Ok(token) => { self.sent_out_shared_memory.remove(&token); } diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 54d7284e..9f41a379 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1600,7 +1600,7 @@ impl RunningDataflow { let running_nodes = self.running_nodes.clone(); let grace_duration_kills = self.grace_duration_kills.clone(); tokio::spawn(async move { - let duration = grace_duration.unwrap_or(Duration::from_millis(2000)); + let duration = grace_duration.unwrap_or(Duration::from_millis(15000)); tokio::time::sleep(duration).await; let mut system = sysinfo::System::new(); system.refresh_processes(); diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index fa488388..fd436f1a 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -6,7 +6,7 @@ use dora_core::{ descriptor::{source_is_url, Descriptor, PythonSource}, }; use dora_download::download_file; -use dora_node_api::Event; +use dora_node_api::{merged::MergedEvent, Event}; use dora_operator_api_python::PyEvent; use dora_operator_api_types::DoraStatus; use eyre::{bail, eyre, Context, Result}; @@ -208,9 +208,12 @@ pub fn run( metadata.parameters.open_telemetry_context = string_cx; } - let py_event = PyEvent::from(event) - .to_py_dict(py) - .context("Could not convert event to pydict bound")?; + let py_event = PyEvent { + event: MergedEvent::Dora(event), + _cleanup: None, + } + .to_py_dict(py) + .context("Could not convert event to pydict bound")?; let status_enum = operator .call_method1(py, "on_event", (py_event, send_output.clone()))