From 5988a65ea2c925342bafa73635d99f5b23784639 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Tue, 17 Jun 2025 17:56:01 +0200 Subject: [PATCH 1/5] Expose all input closed message as a stop message --- apis/rust/node/src/event_stream/mod.rs | 8 +------- apis/rust/node/src/event_stream/thread.rs | 11 +++++++---- binaries/daemon/src/spawn.rs | 2 +- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 15c40e33..af8c42e6 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -234,13 +234,7 @@ impl EventStream { Err(err) => Event::Error(format!("{err:?}")), } } - NodeEvent::AllInputsClosed => { - let err = eyre!( - "received `AllInputsClosed` event, which should be handled by background task" - ); - tracing::error!("{err:?}"); - Event::Error(err.wrap_err("internal error").to_string()) - } + NodeEvent::AllInputsClosed => Event::Stop, }, EventItem::FatalError(err) => { diff --git a/apis/rust/node/src/event_stream/thread.rs b/apis/rust/node/src/event_stream/thread.rs index 5e982f74..a9dbba27 100644 --- a/apis/rust/node/src/event_stream/thread.rs +++ b/apis/rust/node/src/event_stream/thread.rs @@ -92,6 +92,7 @@ fn event_stream_loop( clock: Arc, ) { let mut tx = Some(tx); + let mut close_tx = false; let mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)> = Vec::new(); let mut drop_tokens = Vec::new(); @@ -135,10 +136,8 @@ fn event_stream_loop( data: Some(data), .. } => data.drop_token(), NodeEvent::AllInputsClosed => { - // close the event stream - tx = None; - // skip this internal event - continue; + close_tx = true; + None } _ => None, }; @@ -166,6 +165,10 @@ fn event_stream_loop( } else { tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`"); } + + if close_tx { + tx = None; + }; } }; if let Err(err) = result { diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 9087a4ec..1e5b5bf7 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -540,7 +540,7 @@ pub async fn spawn_node( // If log is an output, we're sending the logs to the dataflow if let Some(stdout_output_name) = &send_stdout_to { // Convert logs to DataMessage - let array = message.into_arrow(); + let array = message.clone().into_arrow(); let array: ArrayData = array.into(); let total_len = required_data_size(&array); From c64d6642afd1d12b9ab67a55dc23fed4938dc537 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Wed, 18 Jun 2025 12:00:01 +0200 Subject: [PATCH 2/5] fix string message --- binaries/daemon/src/spawn.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 1e5b5bf7..955a1dc7 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -540,7 +540,7 @@ pub async fn spawn_node( // If log is an output, we're sending the logs to the dataflow if let Some(stdout_output_name) = &send_stdout_to { // Convert logs to DataMessage - let array = message.clone().into_arrow(); + let array = message.as_str().into_arrow(); let array: ArrayData = array.into(); let total_len = required_data_size(&array); From f72991b09ce92ed6ba6e1bb01e0cda77422ac8b5 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Mon, 23 Jun 2025 10:53:14 +0200 Subject: [PATCH 3/5] Add Cause to stpo --- apis/c/node/src/lib.rs | 2 +- apis/python/operator/src/lib.rs | 9 +++++++-- apis/rust/node/src/event_stream/event.rs | 9 ++++++++- apis/rust/node/src/event_stream/mod.rs | 6 +++--- apis/rust/node/src/lib.rs | 2 +- binaries/runtime/src/lib.rs | 4 ++-- binaries/runtime/src/operator/shared_lib.rs | 2 +- 7 files changed, 23 insertions(+), 11 deletions(-) diff --git a/apis/c/node/src/lib.rs b/apis/c/node/src/lib.rs index 9d8762b0..8720c795 100644 --- a/apis/c/node/src/lib.rs +++ b/apis/c/node/src/lib.rs @@ -91,7 +91,7 @@ pub unsafe extern "C" fn dora_next_event(context: *mut c_void) -> *mut c_void { pub unsafe extern "C" fn read_dora_event_type(event: *const ()) -> EventType { let event: &Event = unsafe { &*event.cast() }; match event { - Event::Stop => EventType::Stop, + Event::Stop(_) => EventType::Stop, Event::Input { .. } => EventType::Input, Event::InputClosed { .. } => EventType::InputClosed, Event::Error(_) => EventType::Error, diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index cfc1e2bd..ea34b3b4 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -6,7 +6,7 @@ use std::{ use arrow::pyarrow::ToPyArrow; use dora_node_api::{ merged::{MergeExternalSend, MergedEvent}, - DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter, + DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter, StopCause, }; use eyre::{Context, Result}; use futures::{Stream, StreamExt}; @@ -146,7 +146,7 @@ impl PyEvent { fn ty(event: &Event) -> &str { match event { - Event::Stop => "STOP", + Event::Stop(_) => "STOP", Event::Input { .. } => "INPUT", Event::InputClosed { .. } => "INPUT_CLOSED", Event::Error(_) => "ERROR", @@ -158,6 +158,11 @@ impl PyEvent { match event { Event::Input { id, .. } => Some(id), Event::InputClosed { id } => Some(id), + Event::Stop(cause) => match cause { + StopCause::Manual => Some("MANUAL"), + StopCause::AllInputsClosed => Some("ALL_INPUTS_CLOSED"), + &_ => None, + }, _ => None, } } diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index b5d0e9b8..22997f4b 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -10,7 +10,7 @@ use shared_memory_extended::{Shmem, ShmemConf}; #[derive(Debug)] #[non_exhaustive] pub enum Event { - Stop, + Stop(StopCause), Reload { operator_id: Option, }, @@ -25,6 +25,13 @@ pub enum Event { Error(String), } +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum StopCause { + Manual, + AllInputsClosed, +} + pub enum RawData { Empty, Vec(AVec>), diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index af8c42e6..565f8713 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -11,7 +11,7 @@ use dora_message::{ node_to_daemon::{DaemonRequest, Timestamped}, DataflowId, }; -pub use event::{Event, MappedInputData, RawData}; +pub use event::{Event, MappedInputData, RawData, StopCause}; use futures::{ future::{select, Either}, Stream, StreamExt, @@ -199,7 +199,7 @@ impl EventStream { fn convert_event_item(item: EventItem) -> Event { match item { EventItem::NodeEvent { event, ack_channel } => match event { - NodeEvent::Stop => Event::Stop, + NodeEvent::Stop => Event::Stop(event::StopCause::Manual), NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, NodeEvent::InputClosed { id } => Event::InputClosed { id }, NodeEvent::Input { id, metadata, data } => { @@ -234,7 +234,7 @@ impl EventStream { Err(err) => Event::Error(format!("{err:?}")), } } - NodeEvent::AllInputsClosed => Event::Stop, + NodeEvent::AllInputsClosed => Event::Stop(event::StopCause::AllInputsClosed), }, EventItem::FatalError(err) => { diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 9836ab7b..e1b17b6f 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -20,7 +20,7 @@ pub use dora_message::{ metadata::{Metadata, MetadataParameters, Parameter}, DataflowId, }; -pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData}; +pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData, StopCause}; pub use flume::Receiver; pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index ea949bf4..d4df4b4b 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -232,10 +232,10 @@ async fn run( } } } - RuntimeEvent::Event(Event::Stop) => { + RuntimeEvent::Event(Event::Stop(cause)) => { // forward stop event to all operators and close the event channels for (_, channel) in operator_channels.drain() { - let _ = channel.send_async(Event::Stop).await; + let _ = channel.send_async(Event::Stop(cause.clone())).await; } } RuntimeEvent::Event(Event::Reload { diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index b7795470..e2920a2a 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -182,7 +182,7 @@ impl<'lib> SharedLibraryOperator<'lib> { } let mut operator_event = match event { - Event::Stop => dora_operator_api_types::RawEvent { + Event::Stop(_) => dora_operator_api_types::RawEvent { input: None, input_closed: None, stop: true, From c92600c97b1dd8231815a7c8866cfba46ca3c0ab Mon Sep 17 00:00:00 2001 From: haixuantao Date: Mon, 23 Jun 2025 11:10:24 +0200 Subject: [PATCH 4/5] Fix stop event in openai-proxy-server --- node-hub/openai-proxy-server/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node-hub/openai-proxy-server/src/main.rs b/node-hub/openai-proxy-server/src/main.rs index c0714886..e35c0c3d 100644 --- a/node-hub/openai-proxy-server/src/main.rs +++ b/node-hub/openai-proxy-server/src/main.rs @@ -165,7 +165,7 @@ async fn main() -> eyre::Result<()> { _ => eyre::bail!("unexpected input id: {}", id), }; } - Event::Stop => { + Event::Stop(_) => { break; } event => { From d155ee789a4a75777330f118ba188cf9bae04aab Mon Sep 17 00:00:00 2001 From: haixuantao Date: Mon, 23 Jun 2025 11:17:34 +0200 Subject: [PATCH 5/5] Fix stop in example nodes --- apis/c++/node/src/lib.rs | 2 +- examples/multiple-daemons/node/src/main.rs | 2 +- examples/multiple-daemons/sink/src/main.rs | 4 ++-- examples/rust-dataflow/node/src/main.rs | 2 +- examples/rust-dataflow/sink-dynamic/src/main.rs | 4 ++-- examples/rust-dataflow/sink/src/main.rs | 4 ++-- examples/rust-dataflow/status-node/src/main.rs | 2 +- examples/rust-ros2-dataflow/node/src/main.rs | 2 +- node-hub/dora-keyboard/dora_keyboard/main.py | 2 ++ node-hub/dora-microphone/dora_microphone/main.py | 1 + node-hub/dora-mistral-rs/src/main.rs | 4 ++-- 11 files changed, 16 insertions(+), 13 deletions(-) diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index a7ec233a..4e06e11a 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -144,7 +144,7 @@ pub struct DoraEvent(Option); fn event_type(event: &DoraEvent) -> ffi::DoraEventType { match &event.0 { Some(event) => match event { - Event::Stop => ffi::DoraEventType::Stop, + Event::Stop(_) => ffi::DoraEventType::Stop, Event::Input { .. } => ffi::DoraEventType::Input, Event::InputClosed { .. } => ffi::DoraEventType::InputClosed, Event::Error(_) => ffi::DoraEventType::Error, diff --git a/examples/multiple-daemons/node/src/main.rs b/examples/multiple-daemons/node/src/main.rs index 36f42d57..bf1cd424 100644 --- a/examples/multiple-daemons/node/src/main.rs +++ b/examples/multiple-daemons/node/src/main.rs @@ -26,7 +26,7 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => println!("Received manual stop"), + Event::Stop(_) => println!("Received stop"), other => eprintln!("Received unexpected input: {other:?}"), } } diff --git a/examples/multiple-daemons/sink/src/main.rs b/examples/multiple-daemons/sink/src/main.rs index e180af08..59316aba 100644 --- a/examples/multiple-daemons/sink/src/main.rs +++ b/examples/multiple-daemons/sink/src/main.rs @@ -24,8 +24,8 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => { - println!("Received manual stop"); + Event::Stop(_) => { + println!("Received stop"); } Event::InputClosed { id } => { println!("Input `{id}` was closed"); diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index 36f42d57..bf1cd424 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -26,7 +26,7 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => println!("Received manual stop"), + Event::Stop(_) => println!("Received stop"), other => eprintln!("Received unexpected input: {other:?}"), } } diff --git a/examples/rust-dataflow/sink-dynamic/src/main.rs b/examples/rust-dataflow/sink-dynamic/src/main.rs index 58f36e41..db5164b7 100644 --- a/examples/rust-dataflow/sink-dynamic/src/main.rs +++ b/examples/rust-dataflow/sink-dynamic/src/main.rs @@ -25,8 +25,8 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => { - println!("Received manual stop"); + Event::Stop(_) => { + println!("Received stop"); } Event::InputClosed { id } => { println!("Input `{id}` was closed"); diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index e180af08..59316aba 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -24,8 +24,8 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => { - println!("Received manual stop"); + Event::Stop(_) => { + println!("Received stop"); } Event::InputClosed { id } => { println!("Input `{id}` was closed"); diff --git a/examples/rust-dataflow/status-node/src/main.rs b/examples/rust-dataflow/status-node/src/main.rs index 09de8184..ff97b97e 100644 --- a/examples/rust-dataflow/status-node/src/main.rs +++ b/examples/rust-dataflow/status-node/src/main.rs @@ -29,7 +29,7 @@ fn main() -> eyre::Result<()> { } other => eprintln!("ignoring unexpected input {other}"), }, - Event::Stop => {} + Event::Stop(_) => {} Event::InputClosed { id } => { println!("input `{id}` was closed"); if *id == "random" { diff --git a/examples/rust-ros2-dataflow/node/src/main.rs b/examples/rust-ros2-dataflow/node/src/main.rs index 395a5e34..32ad5970 100644 --- a/examples/rust-ros2-dataflow/node/src/main.rs +++ b/examples/rust-ros2-dataflow/node/src/main.rs @@ -119,7 +119,7 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => println!("Received manual stop"), + Event::Stop(_) => println!("Received stop"), other => eprintln!("Received unexpected input: {other:?}"), }, MergedEvent::External(pose) => { diff --git a/node-hub/dora-keyboard/dora_keyboard/main.py b/node-hub/dora-keyboard/dora_keyboard/main.py index f3858fe0..644c10c5 100644 --- a/node-hub/dora-keyboard/dora_keyboard/main.py +++ b/node-hub/dora-keyboard/dora_keyboard/main.py @@ -11,6 +11,8 @@ def main(): node = Node() always_none = node.next(timeout=0.001) is None + always_none = node.next(timeout=0.001) is None + print("Always None:", always_none) with keyboard.Events() as events: while True: if not always_none: diff --git a/node-hub/dora-microphone/dora_microphone/main.py b/node-hub/dora-microphone/dora_microphone/main.py index b45756bc..aec0eeab 100644 --- a/node-hub/dora-microphone/dora_microphone/main.py +++ b/node-hub/dora-microphone/dora_microphone/main.py @@ -19,6 +19,7 @@ def main(): start_recording_time = tm.time() node = Node() + always_none = node.next(timeout=0.001) is None always_none = node.next(timeout=0.001) is None finished = False diff --git a/node-hub/dora-mistral-rs/src/main.rs b/node-hub/dora-mistral-rs/src/main.rs index bb451e1e..9b123113 100644 --- a/node-hub/dora-mistral-rs/src/main.rs +++ b/node-hub/dora-mistral-rs/src/main.rs @@ -46,8 +46,8 @@ async fn main() -> eyre::Result<()> { } other => eprintln!("Received input `{other}`"), }, - Event::Stop => { - println!("Received manual stop") + Event::Stop(_) => { + println!("Received command"); } Event::InputClosed { id } => { println!("input `{id}` was closed");