Browse Source

Expose all input closed message as a stop message

add-vision-to-openai-server
haixuantao 7 months ago
parent
commit
b2474de9a3
3 changed files with 9 additions and 12 deletions
  1. +1
    -7
      apis/rust/node/src/event_stream/mod.rs
  2. +7
    -4
      apis/rust/node/src/event_stream/thread.rs
  3. +1
    -1
      binaries/daemon/src/spawn.rs

+ 1
- 7
apis/rust/node/src/event_stream/mod.rs View File

@@ -234,13 +234,7 @@ impl EventStream {
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => {
let err = eyre!(
"received `AllInputsClosed` event, which should be handled by background task"
);
tracing::error!("{err:?}");
Event::Error(err.wrap_err("internal error").to_string())
}
NodeEvent::AllInputsClosed => Event::Stop,
},

EventItem::FatalError(err) => {


+ 7
- 4
apis/rust/node/src/event_stream/thread.rs View File

@@ -92,6 +92,7 @@ fn event_stream_loop(
clock: Arc<uhlc::HLC>,
) {
let mut tx = Some(tx);
let mut close_tx = false;
let mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)> = Vec::new();
let mut drop_tokens = Vec::new();

@@ -135,10 +136,8 @@ fn event_stream_loop(
data: Some(data), ..
} => data.drop_token(),
NodeEvent::AllInputsClosed => {
// close the event stream
tx = None;
// skip this internal event
continue;
close_tx = true;
None
}
_ => None,
};
@@ -166,6 +165,10 @@ fn event_stream_loop(
} else {
tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`");
}

if close_tx {
tx = None;
};
}
};
if let Err(err) = result {


+ 1
- 1
binaries/daemon/src/spawn.rs View File

@@ -540,7 +540,7 @@ pub async fn spawn_node(
// If log is an output, we're sending the logs to the dataflow
if let Some(stdout_output_name) = &send_stdout_to {
// Convert logs to DataMessage
let array = message.into_arrow();
let array = message.clone().into_arrow();

let array: ArrayData = array.into();
let total_len = required_data_size(&array);


Loading…
Cancel
Save