Browse Source

Expose AllInputClosed message as a Stop message (#1026)

This PR makes it possible for nodes to stop when all input is closed.
This is necessary because when merging two different event stream and we
want to stop when all event from dora finishes we can rely on the stop
messages to stop.

It also removes the complexity of the event stream code.
tags/v0.3.12-rc0
Haixuan Xavier Tao GitHub 7 months ago
parent
commit
561a319d6c
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
21 changed files with 48 additions and 36 deletions
  1. +1
    -1
      apis/c++/node/src/lib.rs
  2. +1
    -1
      apis/c/node/src/lib.rs
  3. +7
    -2
      apis/python/operator/src/lib.rs
  4. +8
    -1
      apis/rust/node/src/event_stream/event.rs
  5. +3
    -9
      apis/rust/node/src/event_stream/mod.rs
  6. +7
    -4
      apis/rust/node/src/event_stream/thread.rs
  7. +1
    -1
      apis/rust/node/src/lib.rs
  8. +1
    -1
      binaries/daemon/src/spawn.rs
  9. +2
    -2
      binaries/runtime/src/lib.rs
  10. +1
    -1
      binaries/runtime/src/operator/shared_lib.rs
  11. +1
    -1
      examples/multiple-daemons/node/src/main.rs
  12. +2
    -2
      examples/multiple-daemons/sink/src/main.rs
  13. +1
    -1
      examples/rust-dataflow/node/src/main.rs
  14. +2
    -2
      examples/rust-dataflow/sink-dynamic/src/main.rs
  15. +2
    -2
      examples/rust-dataflow/sink/src/main.rs
  16. +1
    -1
      examples/rust-dataflow/status-node/src/main.rs
  17. +1
    -1
      examples/rust-ros2-dataflow/node/src/main.rs
  18. +2
    -0
      node-hub/dora-keyboard/dora_keyboard/main.py
  19. +1
    -0
      node-hub/dora-microphone/dora_microphone/main.py
  20. +2
    -2
      node-hub/dora-mistral-rs/src/main.rs
  21. +1
    -1
      node-hub/openai-proxy-server/src/main.rs

+ 1
- 1
apis/c++/node/src/lib.rs View File

@@ -144,7 +144,7 @@ pub struct DoraEvent(Option<Event>);
fn event_type(event: &DoraEvent) -> ffi::DoraEventType {
match &event.0 {
Some(event) => match event {
Event::Stop => ffi::DoraEventType::Stop,
Event::Stop(_) => ffi::DoraEventType::Stop,
Event::Input { .. } => ffi::DoraEventType::Input,
Event::InputClosed { .. } => ffi::DoraEventType::InputClosed,
Event::Error(_) => ffi::DoraEventType::Error,


+ 1
- 1
apis/c/node/src/lib.rs View File

@@ -91,7 +91,7 @@ pub unsafe extern "C" fn dora_next_event(context: *mut c_void) -> *mut c_void {
pub unsafe extern "C" fn read_dora_event_type(event: *const ()) -> EventType {
let event: &Event = unsafe { &*event.cast() };
match event {
Event::Stop => EventType::Stop,
Event::Stop(_) => EventType::Stop,
Event::Input { .. } => EventType::Input,
Event::InputClosed { .. } => EventType::InputClosed,
Event::Error(_) => EventType::Error,


+ 7
- 2
apis/python/operator/src/lib.rs View File

@@ -6,7 +6,7 @@ use std::{
use arrow::pyarrow::ToPyArrow;
use dora_node_api::{
merged::{MergeExternalSend, MergedEvent},
DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter,
DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter, StopCause,
};
use eyre::{Context, Result};
use futures::{Stream, StreamExt};
@@ -146,7 +146,7 @@ impl PyEvent {

fn ty(event: &Event) -> &str {
match event {
Event::Stop => "STOP",
Event::Stop(_) => "STOP",
Event::Input { .. } => "INPUT",
Event::InputClosed { .. } => "INPUT_CLOSED",
Event::Error(_) => "ERROR",
@@ -158,6 +158,11 @@ impl PyEvent {
match event {
Event::Input { id, .. } => Some(id),
Event::InputClosed { id } => Some(id),
Event::Stop(cause) => match cause {
StopCause::Manual => Some("MANUAL"),
StopCause::AllInputsClosed => Some("ALL_INPUTS_CLOSED"),
&_ => None,
},
_ => None,
}
}


+ 8
- 1
apis/rust/node/src/event_stream/event.rs View File

@@ -10,7 +10,7 @@ use shared_memory_extended::{Shmem, ShmemConf};
#[derive(Debug)]
#[non_exhaustive]
pub enum Event {
Stop,
Stop(StopCause),
Reload {
operator_id: Option<OperatorId>,
},
@@ -25,6 +25,13 @@ pub enum Event {
Error(String),
}

#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum StopCause {
Manual,
AllInputsClosed,
}

pub enum RawData {
Empty,
Vec(AVec<u8, ConstAlign<128>>),


+ 3
- 9
apis/rust/node/src/event_stream/mod.rs View File

@@ -11,7 +11,7 @@ use dora_message::{
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
};
pub use event::{Event, MappedInputData, RawData};
pub use event::{Event, MappedInputData, RawData, StopCause};
use futures::{
future::{select, Either},
Stream, StreamExt,
@@ -199,7 +199,7 @@ impl EventStream {
fn convert_event_item(item: EventItem) -> Event {
match item {
EventItem::NodeEvent { event, ack_channel } => match event {
NodeEvent::Stop => Event::Stop,
NodeEvent::Stop => Event::Stop(event::StopCause::Manual),
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
@@ -234,13 +234,7 @@ impl EventStream {
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => {
let err = eyre!(
"received `AllInputsClosed` event, which should be handled by background task"
);
tracing::error!("{err:?}");
Event::Error(err.wrap_err("internal error").to_string())
}
NodeEvent::AllInputsClosed => Event::Stop(event::StopCause::AllInputsClosed),
},

EventItem::FatalError(err) => {


+ 7
- 4
apis/rust/node/src/event_stream/thread.rs View File

@@ -92,6 +92,7 @@ fn event_stream_loop(
clock: Arc<uhlc::HLC>,
) {
let mut tx = Some(tx);
let mut close_tx = false;
let mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)> = Vec::new();
let mut drop_tokens = Vec::new();

@@ -135,10 +136,8 @@ fn event_stream_loop(
data: Some(data), ..
} => data.drop_token(),
NodeEvent::AllInputsClosed => {
// close the event stream
tx = None;
// skip this internal event
continue;
close_tx = true;
None
}
_ => None,
};
@@ -166,6 +165,10 @@ fn event_stream_loop(
} else {
tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`");
}

if close_tx {
tx = None;
};
}
};
if let Err(err) = result {


+ 1
- 1
apis/rust/node/src/lib.rs View File

@@ -20,7 +20,7 @@ pub use dora_message::{
metadata::{Metadata, MetadataParameters, Parameter},
DataflowId,
};
pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData};
pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData, StopCause};
pub use flume::Receiver;
pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD};



+ 1
- 1
binaries/daemon/src/spawn.rs View File

@@ -540,7 +540,7 @@ pub async fn spawn_node(
// If log is an output, we're sending the logs to the dataflow
if let Some(stdout_output_name) = &send_stdout_to {
// Convert logs to DataMessage
let array = message.into_arrow();
let array = message.as_str().into_arrow();

let array: ArrayData = array.into();
let total_len = required_data_size(&array);


+ 2
- 2
binaries/runtime/src/lib.rs View File

@@ -232,10 +232,10 @@ async fn run(
}
}
}
RuntimeEvent::Event(Event::Stop) => {
RuntimeEvent::Event(Event::Stop(cause)) => {
// forward stop event to all operators and close the event channels
for (_, channel) in operator_channels.drain() {
let _ = channel.send_async(Event::Stop).await;
let _ = channel.send_async(Event::Stop(cause.clone())).await;
}
}
RuntimeEvent::Event(Event::Reload {


+ 1
- 1
binaries/runtime/src/operator/shared_lib.rs View File

@@ -182,7 +182,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
}

let mut operator_event = match event {
Event::Stop => dora_operator_api_types::RawEvent {
Event::Stop(_) => dora_operator_api_types::RawEvent {
input: None,
input_closed: None,
stop: true,


+ 1
- 1
examples/multiple-daemons/node/src/main.rs View File

@@ -26,7 +26,7 @@ fn main() -> eyre::Result<()> {
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::Stop => println!("Received manual stop"),
Event::Stop(_) => println!("Received stop"),
other => eprintln!("Received unexpected input: {other:?}"),
}
}


+ 2
- 2
examples/multiple-daemons/sink/src/main.rs View File

@@ -24,8 +24,8 @@ fn main() -> eyre::Result<()> {
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::Stop => {
println!("Received manual stop");
Event::Stop(_) => {
println!("Received stop");
}
Event::InputClosed { id } => {
println!("Input `{id}` was closed");


+ 1
- 1
examples/rust-dataflow/node/src/main.rs View File

@@ -26,7 +26,7 @@ fn main() -> eyre::Result<()> {
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::Stop => println!("Received manual stop"),
Event::Stop(_) => println!("Received stop"),
other => eprintln!("Received unexpected input: {other:?}"),
}
}


+ 2
- 2
examples/rust-dataflow/sink-dynamic/src/main.rs View File

@@ -25,8 +25,8 @@ fn main() -> eyre::Result<()> {
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::Stop => {
println!("Received manual stop");
Event::Stop(_) => {
println!("Received stop");
}
Event::InputClosed { id } => {
println!("Input `{id}` was closed");


+ 2
- 2
examples/rust-dataflow/sink/src/main.rs View File

@@ -24,8 +24,8 @@ fn main() -> eyre::Result<()> {
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::Stop => {
println!("Received manual stop");
Event::Stop(_) => {
println!("Received stop");
}
Event::InputClosed { id } => {
println!("Input `{id}` was closed");


+ 1
- 1
examples/rust-dataflow/status-node/src/main.rs View File

@@ -29,7 +29,7 @@ fn main() -> eyre::Result<()> {
}
other => eprintln!("ignoring unexpected input {other}"),
},
Event::Stop => {}
Event::Stop(_) => {}
Event::InputClosed { id } => {
println!("input `{id}` was closed");
if *id == "random" {


+ 1
- 1
examples/rust-ros2-dataflow/node/src/main.rs View File

@@ -119,7 +119,7 @@ fn main() -> eyre::Result<()> {
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::Stop => println!("Received manual stop"),
Event::Stop(_) => println!("Received stop"),
other => eprintln!("Received unexpected input: {other:?}"),
},
MergedEvent::External(pose) => {


+ 2
- 0
node-hub/dora-keyboard/dora_keyboard/main.py View File

@@ -11,6 +11,8 @@ def main():
node = Node()

always_none = node.next(timeout=0.001) is None
always_none = node.next(timeout=0.001) is None
print("Always None:", always_none)
with keyboard.Events() as events:
while True:
if not always_none:


+ 1
- 0
node-hub/dora-microphone/dora_microphone/main.py View File

@@ -19,6 +19,7 @@ def main():
start_recording_time = tm.time()
node = Node()

always_none = node.next(timeout=0.001) is None
always_none = node.next(timeout=0.001) is None
finished = False



+ 2
- 2
node-hub/dora-mistral-rs/src/main.rs View File

@@ -46,8 +46,8 @@ async fn main() -> eyre::Result<()> {
}
other => eprintln!("Received input `{other}`"),
},
Event::Stop => {
println!("Received manual stop")
Event::Stop(_) => {
println!("Received command");
}
Event::InputClosed { id } => {
println!("input `{id}` was closed");


+ 1
- 1
node-hub/openai-proxy-server/src/main.rs View File

@@ -165,7 +165,7 @@ async fn main() -> eyre::Result<()> {
_ => eyre::bail!("unexpected input id: {}", id),
};
}
Event::Stop => {
Event::Stop(_) => {
break;
}
event => {


Loading…
Cancel
Save