diff --git a/Cargo.lock b/Cargo.lock index fd2731b7..9fb197e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2526,6 +2526,8 @@ dependencies = [ "dora-node-api", "eyre", "flume 0.10.14", + "futures", + "futures-concurrency", "pyo3", "serde_yaml 0.8.26", ] diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index b1d678f8..bb9f5e0e 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,12 +1,14 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] +use std::sync::Arc; use std::time::Duration; use arrow::pyarrow::{FromPyArrow, ToPyArrow}; use dora_node_api::dora_core::config::NodeId; +use dora_node_api::dora_core::daemon_messages::DataflowId; use dora_node_api::merged::{MergeExternalSend, MergedEvent}; use dora_node_api::{DoraNode, EventStream}; -use dora_operator_api_python::{pydict_to_metadata, PyEvent}; +use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent}; use dora_ros2_bridge_python::Ros2Subscription; use eyre::Context; use futures::{Stream, StreamExt}; @@ -30,8 +32,10 @@ use pyo3_special_method_derive::{Dict, Dir, Repr, Str}; #[derive(Dir, Dict, Str, Repr)] pub struct Node { events: Events, - #[pyo3_smd(skip)] - pub node: DoraNode, + node: DelayedCleanup, + + dataflow_id: DataflowId, + node_id: NodeId, } #[pymethods] @@ -45,8 +49,20 @@ impl Node { DoraNode::init_from_env().context("Couldn not initiate node from environment variable. For dynamic node, please add a node id in the initialization function.")? }; + let dataflow_id = *node.dataflow_id(); + let node_id = node.id().clone(); + let node = DelayedCleanup::new(node); + let events = DelayedCleanup::new(events); + let cleanup_handle = NodeCleanupHandle { + _handles: Arc::new((node.handle(), events.handle())), + }; Ok(Node { - events: Events::Dora(events), + events: Events { + inner: EventsInner::Dora(events), + cleanup_handle, + }, + dataflow_id, + node_id, node, }) } @@ -148,10 +164,11 @@ impl Node { if let Ok(py_bytes) = data.downcast_bound::(py) { let data = py_bytes.as_bytes(); self.node + .get_mut() .send_output_bytes(output_id.into(), parameters, data.len(), data) .wrap_err("failed to send output")?; } else if let Ok(arrow_array) = arrow::array::ArrayData::from_pyarrow_bound(data.bind(py)) { - self.node.send_output( + self.node.get_mut().send_output( output_id.into(), parameters, arrow::array::make_array(arrow_array), @@ -168,15 +185,18 @@ impl Node { /// This method returns the parsed dataflow YAML file. /// /// :rtype: dict - pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result { - pythonize::pythonize(py, self.node.dataflow_descriptor()) + pub fn dataflow_descriptor(&mut self, py: Python) -> eyre::Result { + Ok(pythonize::pythonize( + py, + self.node.get_mut().dataflow_descriptor(), + )?) } /// Returns the dataflow id. /// /// :rtype: str pub fn dataflow_id(&self) -> String { - self.node.dataflow_id().to_string() + self.dataflow_id.to_string() } /// Merge an external event stream with dora main loop. @@ -207,34 +227,46 @@ impl Node { // take out the event stream and temporarily replace it with a dummy let events = std::mem::replace( - &mut self.events, - Events::Merged(Box::new(futures::stream::empty())), + &mut self.events.inner, + EventsInner::Merged(Box::new(futures::stream::empty())), ); // update self.events with the merged stream - self.events = Events::Merged(events.merge_external_send(Box::pin(stream))); + self.events.inner = EventsInner::Merged(events.merge_external_send(Box::pin(stream))); Ok(()) } } -enum Events { - Dora(EventStream), - Merged(Box> + Unpin + Send>), +struct Events { + inner: EventsInner, + cleanup_handle: NodeCleanupHandle, } impl Events { fn recv(&mut self, timeout: Option) -> Option { - match self { - Events::Dora(events) => match timeout { - Some(timeout) => events.recv_timeout(timeout).map(PyEvent::from), - None => events.recv().map(PyEvent::from), + let event = match &mut self.inner { + EventsInner::Dora(events) => match timeout { + Some(timeout) => events + .get_mut() + .recv_timeout(timeout) + .map(MergedEvent::Dora), + None => events.get_mut().recv().map(MergedEvent::Dora), }, - Events::Merged(events) => futures::executor::block_on(events.next()).map(PyEvent::from), - } + EventsInner::Merged(events) => futures::executor::block_on(events.next()), + }; + event.map(|event| PyEvent { + event, + _cleanup: Some(self.cleanup_handle.clone()), + }) } } -impl<'a> MergeExternalSend<'a, PyObject> for Events { +enum EventsInner { + Dora(DelayedCleanup), + Merged(Box> + Unpin + Send>), +} + +impl<'a> MergeExternalSend<'a, PyObject> for EventsInner { type Item = MergedEvent; fn merge_external_send( @@ -242,8 +274,8 @@ impl<'a> MergeExternalSend<'a, PyObject> for Events { external_events: impl Stream + Unpin + Send + 'a, ) -> Box + Unpin + Send + 'a> { match self { - Events::Dora(events) => events.merge_external_send(external_events), - Events::Merged(events) => { + EventsInner::Dora(events) => events.merge_external_send(external_events), + EventsInner::Merged(events) => { let merged = events.merge_external_send(external_events); Box::new(merged.map(|event| match event { MergedEvent::Dora(e) => MergedEvent::Dora(e), @@ -256,7 +288,7 @@ impl<'a> MergeExternalSend<'a, PyObject> for Events { impl Node { pub fn id(&self) -> String { - self.node.id().to_string() + self.node_id.to_string() } } diff --git a/apis/python/operator/Cargo.toml b/apis/python/operator/Cargo.toml index 07cef1f0..a6dd2180 100644 --- a/apis/python/operator/Cargo.toml +++ b/apis/python/operator/Cargo.toml @@ -18,3 +18,5 @@ flume = "0.10.14" arrow = { workspace = true, features = ["pyarrow"] } arrow-schema = { workspace = true } aligned-vec = "0.5.0" +futures = "0.3.28" +futures-concurrency = "7.3.0" diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 2c027e9a..1929bc36 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -1,8 +1,16 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; use arrow::pyarrow::ToPyArrow; -use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters}; +use dora_node_api::{ + merged::{MergeExternalSend, MergedEvent}, + DoraNode, Event, EventStream, Metadata, MetadataParameters, +}; use eyre::{Context, Result}; +use futures::{Stream, StreamExt}; +use futures_concurrency::stream::Merge as _; use pyo3::{ prelude::*, pybacked::PyBackedStr, @@ -10,11 +18,65 @@ use pyo3::{ }; /// Dora Event -#[derive(Debug)] pub struct PyEvent { - event: MergedEvent, + pub event: MergedEvent, + pub _cleanup: Option, +} + +/// Keeps the dora node alive until all event objects have been dropped. +#[derive(Clone)] +#[pyclass] +pub struct NodeCleanupHandle { + pub _handles: Arc<(CleanupHandle, CleanupHandle)>, } +/// Owned type with delayed cleanup (using `handle` method). +pub struct DelayedCleanup(Arc>); + +impl DelayedCleanup { + pub fn new(value: T) -> Self { + Self(Arc::new(Mutex::new(value))) + } + + pub fn handle(&self) -> CleanupHandle { + CleanupHandle(self.0.clone()) + } + + pub fn get_mut(&mut self) -> std::sync::MutexGuard { + self.0.try_lock().expect("failed to lock DelayedCleanup") + } +} + +impl Stream for DelayedCleanup { + type Item = Event; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut inner: std::sync::MutexGuard<'_, EventStream> = self.get_mut().get_mut(); + inner.poll_next_unpin(cx) + } +} + +impl<'a, E> MergeExternalSend<'a, E> for DelayedCleanup +where + E: 'static, +{ + type Item = MergedEvent; + + fn merge_external_send( + self, + external_events: impl Stream + Unpin + Send + 'a, + ) -> Box + Unpin + Send + 'a> { + let dora = self.map(MergedEvent::Dora); + let external = external_events.map(MergedEvent::External); + Box::new((dora, external).merge()) + } +} + +pub struct CleanupHandle(Arc>); + impl PyEvent { pub fn to_py_dict(self, py: Python<'_>) -> PyResult> { let mut pydict = HashMap::new(); @@ -44,6 +106,10 @@ impl PyEvent { } } + if let Some(cleanup) = self._cleanup.clone() { + pydict.insert("_cleanup", cleanup.into_py(py)); + } + Ok(pydict.into_py_dict_bound(py).unbind()) } @@ -92,18 +158,6 @@ impl PyEvent { } } -impl From for PyEvent { - fn from(event: Event) -> Self { - Self::from(MergedEvent::Dora(event)) - } -} - -impl From> for PyEvent { - fn from(event: MergedEvent) -> Self { - Self { event } - } -} - pub fn pydict_to_metadata(dict: Option>) -> Result { let mut default_metadata = MetadataParameters::default(); if let Some(metadata) = dict { diff --git a/apis/rust/node/src/event_stream/thread.rs b/apis/rust/node/src/event_stream/thread.rs index d34e2ab8..bee8cc22 100644 --- a/apis/rust/node/src/event_stream/thread.rs +++ b/apis/rust/node/src/event_stream/thread.rs @@ -220,14 +220,14 @@ fn report_remaining_drop_tokens( let mut still_pending = Vec::new(); for (token, rx, since, _) in pending_drop_tokens.drain(..) { - match rx.recv_timeout(Duration::from_millis(50)) { + match rx.recv_timeout(Duration::from_millis(100)) { Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Err(flume::RecvTimeoutError::Disconnected) => { // the event was dropped -> add the drop token to the list drop_tokens.push(token); } Err(flume::RecvTimeoutError::Timeout) => { - let duration = Duration::from_millis(200); + let duration = Duration::from_secs(30); if since.elapsed() > duration { tracing::warn!( "timeout: node finished, but token {token:?} was still not \ diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 277084d3..9eb4b18e 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -401,7 +401,7 @@ impl Drop for DoraNode { ); } - match self.drop_stream.recv_timeout(Duration::from_millis(500)) { + match self.drop_stream.recv_timeout(Duration::from_secs(10)) { Ok(token) => { self.sent_out_shared_memory.remove(&token); } diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 85cff217..de799ad6 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1600,7 +1600,7 @@ impl RunningDataflow { let running_nodes = self.running_nodes.clone(); let grace_duration_kills = self.grace_duration_kills.clone(); tokio::spawn(async move { - let duration = grace_duration.unwrap_or(Duration::from_millis(2000)); + let duration = grace_duration.unwrap_or(Duration::from_millis(15000)); tokio::time::sleep(duration).await; let mut system = sysinfo::System::new(); system.refresh_processes(); diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index fa488388..fd436f1a 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -6,7 +6,7 @@ use dora_core::{ descriptor::{source_is_url, Descriptor, PythonSource}, }; use dora_download::download_file; -use dora_node_api::Event; +use dora_node_api::{merged::MergedEvent, Event}; use dora_operator_api_python::PyEvent; use dora_operator_api_types::DoraStatus; use eyre::{bail, eyre, Context, Result}; @@ -208,9 +208,12 @@ pub fn run( metadata.parameters.open_telemetry_context = string_cx; } - let py_event = PyEvent::from(event) - .to_py_dict(py) - .context("Could not convert event to pydict bound")?; + let py_event = PyEvent { + event: MergedEvent::Dora(event), + _cleanup: None, + } + .to_py_dict(py) + .context("Could not convert event to pydict bound")?; let status_enum = operator .call_method1(py, "on_event", (py_event, send_output.clone()))