Browse Source

Delay dropping of `DoraNode` in Python until all event data is freed

When dropping the `DoraNode`, it waits for the remaining drop tokens. This only works if all the dora events were already dropped before. With the Python GC, this is not guaranteed as some events might still be live on the heap (the user might even use them later). In such cases, we waited until we ran into a timeout, which resulted in very long exit times (see https://github.com/dora-rs/dora/issues/598).

This commit fixes this issue by adding a reference-counted copy of the `DoraNode` and `EventStream` to every event given to Python. This way, we can ensure that the underlying `DoraNode` is only dropped after the last event reference has been freed.
tags/v0.3.6-rc0
Philipp Oppermann 1 year ago
parent
commit
9cec8df1cc
Failed to extract signature
8 changed files with 140 additions and 48 deletions
  1. +2
    -0
      Cargo.lock
  2. +56
    -24
      apis/python/node/src/lib.rs
  3. +2
    -0
      apis/python/operator/Cargo.toml
  4. +69
    -16
      apis/python/operator/src/lib.rs
  5. +2
    -2
      apis/rust/node/src/event_stream/thread.rs
  6. +1
    -1
      apis/rust/node/src/node/mod.rs
  7. +1
    -1
      binaries/daemon/src/lib.rs
  8. +7
    -4
      binaries/runtime/src/operator/python.rs

+ 2
- 0
Cargo.lock View File

@@ -2526,6 +2526,8 @@ dependencies = [
"dora-node-api",
"eyre",
"flume 0.10.14",
"futures",
"futures-concurrency",
"pyo3",
"serde_yaml 0.8.26",
]


+ 56
- 24
apis/python/node/src/lib.rs View File

@@ -1,12 +1,14 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use std::sync::Arc;
use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_node_api::dora_core::config::NodeId;
use dora_node_api::dora_core::daemon_messages::DataflowId;
use dora_node_api::merged::{MergeExternalSend, MergedEvent};
use dora_node_api::{DoraNode, EventStream};
use dora_operator_api_python::{pydict_to_metadata, PyEvent};
use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent};
use dora_ros2_bridge_python::Ros2Subscription;
use eyre::Context;
use futures::{Stream, StreamExt};
@@ -30,8 +32,10 @@ use pyo3_special_method_derive::{Dict, Dir, Repr, Str};
#[derive(Dir, Dict, Str, Repr)]
pub struct Node {
events: Events,
#[pyo3_smd(skip)]
pub node: DoraNode,
node: DelayedCleanup<DoraNode>,

dataflow_id: DataflowId,
node_id: NodeId,
}

#[pymethods]
@@ -45,8 +49,20 @@ impl Node {
DoraNode::init_from_env().context("Couldn not initiate node from environment variable. For dynamic node, please add a node id in the initialization function.")?
};

let dataflow_id = *node.dataflow_id();
let node_id = node.id().clone();
let node = DelayedCleanup::new(node);
let events = DelayedCleanup::new(events);
let cleanup_handle = NodeCleanupHandle {
_handles: Arc::new((node.handle(), events.handle())),
};
Ok(Node {
events: Events::Dora(events),
events: Events {
inner: EventsInner::Dora(events),
cleanup_handle,
},
dataflow_id,
node_id,
node,
})
}
@@ -148,10 +164,11 @@ impl Node {
if let Ok(py_bytes) = data.downcast_bound::<PyBytes>(py) {
let data = py_bytes.as_bytes();
self.node
.get_mut()
.send_output_bytes(output_id.into(), parameters, data.len(), data)
.wrap_err("failed to send output")?;
} else if let Ok(arrow_array) = arrow::array::ArrayData::from_pyarrow_bound(data.bind(py)) {
self.node.send_output(
self.node.get_mut().send_output(
output_id.into(),
parameters,
arrow::array::make_array(arrow_array),
@@ -168,15 +185,18 @@ impl Node {
/// This method returns the parsed dataflow YAML file.
///
/// :rtype: dict
pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result<PyObject> {
pythonize::pythonize(py, self.node.dataflow_descriptor())
pub fn dataflow_descriptor(&mut self, py: Python) -> eyre::Result<PyObject> {
Ok(pythonize::pythonize(
py,
self.node.get_mut().dataflow_descriptor(),
)?)
}

/// Returns the dataflow id.
///
/// :rtype: str
pub fn dataflow_id(&self) -> String {
self.node.dataflow_id().to_string()
self.dataflow_id.to_string()
}

/// Merge an external event stream with dora main loop.
@@ -207,34 +227,46 @@ impl Node {

// take out the event stream and temporarily replace it with a dummy
let events = std::mem::replace(
&mut self.events,
Events::Merged(Box::new(futures::stream::empty())),
&mut self.events.inner,
EventsInner::Merged(Box::new(futures::stream::empty())),
);
// update self.events with the merged stream
self.events = Events::Merged(events.merge_external_send(Box::pin(stream)));
self.events.inner = EventsInner::Merged(events.merge_external_send(Box::pin(stream)));

Ok(())
}
}

enum Events {
Dora(EventStream),
Merged(Box<dyn Stream<Item = MergedEvent<PyObject>> + Unpin + Send>),
struct Events {
inner: EventsInner,
cleanup_handle: NodeCleanupHandle,
}

impl Events {
fn recv(&mut self, timeout: Option<Duration>) -> Option<PyEvent> {
match self {
Events::Dora(events) => match timeout {
Some(timeout) => events.recv_timeout(timeout).map(PyEvent::from),
None => events.recv().map(PyEvent::from),
let event = match &mut self.inner {
EventsInner::Dora(events) => match timeout {
Some(timeout) => events
.get_mut()
.recv_timeout(timeout)
.map(MergedEvent::Dora),
None => events.get_mut().recv().map(MergedEvent::Dora),
},
Events::Merged(events) => futures::executor::block_on(events.next()).map(PyEvent::from),
}
EventsInner::Merged(events) => futures::executor::block_on(events.next()),
};
event.map(|event| PyEvent {
event,
_cleanup: Some(self.cleanup_handle.clone()),
})
}
}

impl<'a> MergeExternalSend<'a, PyObject> for Events {
enum EventsInner {
Dora(DelayedCleanup<EventStream>),
Merged(Box<dyn Stream<Item = MergedEvent<PyObject>> + Unpin + Send>),
}

impl<'a> MergeExternalSend<'a, PyObject> for EventsInner {
type Item = MergedEvent<PyObject>;

fn merge_external_send(
@@ -242,8 +274,8 @@ impl<'a> MergeExternalSend<'a, PyObject> for Events {
external_events: impl Stream<Item = PyObject> + Unpin + Send + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + 'a> {
match self {
Events::Dora(events) => events.merge_external_send(external_events),
Events::Merged(events) => {
EventsInner::Dora(events) => events.merge_external_send(external_events),
EventsInner::Merged(events) => {
let merged = events.merge_external_send(external_events);
Box::new(merged.map(|event| match event {
MergedEvent::Dora(e) => MergedEvent::Dora(e),
@@ -256,7 +288,7 @@ impl<'a> MergeExternalSend<'a, PyObject> for Events {

impl Node {
pub fn id(&self) -> String {
self.node.id().to_string()
self.node_id.to_string()
}
}



+ 2
- 0
apis/python/operator/Cargo.toml View File

@@ -18,3 +18,5 @@ flume = "0.10.14"
arrow = { workspace = true, features = ["pyarrow"] }
arrow-schema = { workspace = true }
aligned-vec = "0.5.0"
futures = "0.3.28"
futures-concurrency = "7.3.0"

+ 69
- 16
apis/python/operator/src/lib.rs View File

@@ -1,8 +1,16 @@
use std::collections::HashMap;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use arrow::pyarrow::ToPyArrow;
use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters};
use dora_node_api::{
merged::{MergeExternalSend, MergedEvent},
DoraNode, Event, EventStream, Metadata, MetadataParameters,
};
use eyre::{Context, Result};
use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge as _;
use pyo3::{
prelude::*,
pybacked::PyBackedStr,
@@ -10,11 +18,64 @@ use pyo3::{
};

/// Dora Event
#[derive(Debug)]
pub struct PyEvent {
event: MergedEvent<PyObject>,
pub event: MergedEvent<PyObject>,
pub _cleanup: Option<NodeCleanupHandle>,
}

/// Keeps the dora node alive until all event objects have been dropped.
#[derive(Clone)]
#[pyclass]
pub struct NodeCleanupHandle {
pub _handles: Arc<(CleanupHandle<DoraNode>, CleanupHandle<EventStream>)>,
}

/// Owned type with delayed cleanup (using `handle` method).
pub struct DelayedCleanup<T>(Arc<Mutex<T>>);

impl<T> DelayedCleanup<T> {
pub fn new(value: T) -> Self {
Self(Arc::new(Mutex::new(value)))
}

pub fn handle(&self) -> CleanupHandle<T> {
CleanupHandle(self.0.clone())
}

pub fn get_mut(&mut self) -> std::sync::MutexGuard<T> {
self.0.try_lock().expect("failed to lock DelayedCleanup")
}
}

impl Stream for DelayedCleanup<EventStream> {
type Item = Event;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.get_mut().poll_next_unpin(cx)
}
}

impl<'a, E> MergeExternalSend<'a, E> for DelayedCleanup<EventStream>
where
E: 'static,
{
type Item = MergedEvent<E>;

fn merge_external_send(
self,
external_events: impl Stream<Item = E> + Unpin + Send + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + 'a> {
let dora = self.map(MergedEvent::Dora);
let external = external_events.map(MergedEvent::External);
Box::new((dora, external).merge())
}
}

pub struct CleanupHandle<T>(Arc<Mutex<T>>);

impl PyEvent {
pub fn to_py_dict(self, py: Python<'_>) -> PyResult<Py<PyDict>> {
let mut pydict = HashMap::new();
@@ -44,6 +105,10 @@ impl PyEvent {
}
}

if let Some(cleanup) = self._cleanup.clone() {
pydict.insert("_cleanup", cleanup.into_py(py));
}

Ok(pydict.into_py_dict_bound(py).unbind())
}

@@ -92,18 +157,6 @@ impl PyEvent {
}
}

impl From<Event> for PyEvent {
fn from(event: Event) -> Self {
Self::from(MergedEvent::Dora(event))
}
}

impl From<MergedEvent<PyObject>> for PyEvent {
fn from(event: MergedEvent<PyObject>) -> Self {
Self { event }
}
}

pub fn pydict_to_metadata(dict: Option<Bound<'_, PyDict>>) -> Result<MetadataParameters> {
let mut default_metadata = MetadataParameters::default();
if let Some(metadata) = dict {


+ 2
- 2
apis/rust/node/src/event_stream/thread.rs View File

@@ -220,14 +220,14 @@ fn report_remaining_drop_tokens(

let mut still_pending = Vec::new();
for (token, rx, since, _) in pending_drop_tokens.drain(..) {
match rx.recv_timeout(Duration::from_millis(50)) {
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::RecvTimeoutError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
}
Err(flume::RecvTimeoutError::Timeout) => {
let duration = Duration::from_millis(200);
let duration = Duration::from_secs(30);
if since.elapsed() > duration {
tracing::warn!(
"timeout: node finished, but token {token:?} was still not \


+ 1
- 1
apis/rust/node/src/node/mod.rs View File

@@ -401,7 +401,7 @@ impl Drop for DoraNode {
);
}

match self.drop_stream.recv_timeout(Duration::from_millis(500)) {
match self.drop_stream.recv_timeout(Duration::from_secs(10)) {
Ok(token) => {
self.sent_out_shared_memory.remove(&token);
}


+ 1
- 1
binaries/daemon/src/lib.rs View File

@@ -1600,7 +1600,7 @@ impl RunningDataflow {
let running_nodes = self.running_nodes.clone();
let grace_duration_kills = self.grace_duration_kills.clone();
tokio::spawn(async move {
let duration = grace_duration.unwrap_or(Duration::from_millis(2000));
let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
tokio::time::sleep(duration).await;
let mut system = sysinfo::System::new();
system.refresh_processes();


+ 7
- 4
binaries/runtime/src/operator/python.rs View File

@@ -6,7 +6,7 @@ use dora_core::{
descriptor::{source_is_url, Descriptor, PythonSource},
};
use dora_download::download_file;
use dora_node_api::Event;
use dora_node_api::{merged::MergedEvent, Event};
use dora_operator_api_python::PyEvent;
use dora_operator_api_types::DoraStatus;
use eyre::{bail, eyre, Context, Result};
@@ -208,9 +208,12 @@ pub fn run(
metadata.parameters.open_telemetry_context = string_cx;
}

let py_event = PyEvent::from(event)
.to_py_dict(py)
.context("Could not convert event to pydict bound")?;
let py_event = PyEvent {
event: MergedEvent::Dora(event),
_cleanup: None,
}
.to_py_dict(py)
.context("Could not convert event to pydict bound")?;

let status_enum = operator
.call_method1(py, "on_event", (py_event, send_output.clone()))


Loading…
Cancel
Save