From 5988a65ea2c925342bafa73635d99f5b23784639 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Tue, 17 Jun 2025 17:56:01 +0200 Subject: [PATCH 1/7] Expose all input closed message as a stop message --- apis/rust/node/src/event_stream/mod.rs | 8 +------- apis/rust/node/src/event_stream/thread.rs | 11 +++++++---- binaries/daemon/src/spawn.rs | 2 +- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 15c40e33..af8c42e6 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -234,13 +234,7 @@ impl EventStream { Err(err) => Event::Error(format!("{err:?}")), } } - NodeEvent::AllInputsClosed => { - let err = eyre!( - "received `AllInputsClosed` event, which should be handled by background task" - ); - tracing::error!("{err:?}"); - Event::Error(err.wrap_err("internal error").to_string()) - } + NodeEvent::AllInputsClosed => Event::Stop, }, EventItem::FatalError(err) => { diff --git a/apis/rust/node/src/event_stream/thread.rs b/apis/rust/node/src/event_stream/thread.rs index 5e982f74..a9dbba27 100644 --- a/apis/rust/node/src/event_stream/thread.rs +++ b/apis/rust/node/src/event_stream/thread.rs @@ -92,6 +92,7 @@ fn event_stream_loop( clock: Arc, ) { let mut tx = Some(tx); + let mut close_tx = false; let mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)> = Vec::new(); let mut drop_tokens = Vec::new(); @@ -135,10 +136,8 @@ fn event_stream_loop( data: Some(data), .. } => data.drop_token(), NodeEvent::AllInputsClosed => { - // close the event stream - tx = None; - // skip this internal event - continue; + close_tx = true; + None } _ => None, }; @@ -166,6 +165,10 @@ fn event_stream_loop( } else { tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`"); } + + if close_tx { + tx = None; + }; } }; if let Err(err) = result { diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 9087a4ec..1e5b5bf7 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -540,7 +540,7 @@ pub async fn spawn_node( // If log is an output, we're sending the logs to the dataflow if let Some(stdout_output_name) = &send_stdout_to { // Convert logs to DataMessage - let array = message.into_arrow(); + let array = message.clone().into_arrow(); let array: ArrayData = array.into(); let total_len = required_data_size(&array); From c64d6642afd1d12b9ab67a55dc23fed4938dc537 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Wed, 18 Jun 2025 12:00:01 +0200 Subject: [PATCH 2/7] fix string message --- binaries/daemon/src/spawn.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 1e5b5bf7..955a1dc7 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -540,7 +540,7 @@ pub async fn spawn_node( // If log is an output, we're sending the logs to the dataflow if let Some(stdout_output_name) = &send_stdout_to { // Convert logs to DataMessage - let array = message.clone().into_arrow(); + let array = message.as_str().into_arrow(); let array: ArrayData = array.into(); let total_len = required_data_size(&array); From f72991b09ce92ed6ba6e1bb01e0cda77422ac8b5 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Mon, 23 Jun 2025 10:53:14 +0200 Subject: [PATCH 3/7] Add Cause to stpo --- apis/c/node/src/lib.rs | 2 +- apis/python/operator/src/lib.rs | 9 +++++++-- apis/rust/node/src/event_stream/event.rs | 9 ++++++++- apis/rust/node/src/event_stream/mod.rs | 6 +++--- apis/rust/node/src/lib.rs | 2 +- binaries/runtime/src/lib.rs | 4 ++-- binaries/runtime/src/operator/shared_lib.rs | 2 +- 7 files changed, 23 insertions(+), 11 deletions(-) diff --git a/apis/c/node/src/lib.rs b/apis/c/node/src/lib.rs index 9d8762b0..8720c795 100644 --- a/apis/c/node/src/lib.rs +++ b/apis/c/node/src/lib.rs @@ -91,7 +91,7 @@ pub unsafe extern "C" fn dora_next_event(context: *mut c_void) -> *mut c_void { pub unsafe extern "C" fn read_dora_event_type(event: *const ()) -> EventType { let event: &Event = unsafe { &*event.cast() }; match event { - Event::Stop => EventType::Stop, + Event::Stop(_) => EventType::Stop, Event::Input { .. } => EventType::Input, Event::InputClosed { .. } => EventType::InputClosed, Event::Error(_) => EventType::Error, diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index cfc1e2bd..ea34b3b4 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -6,7 +6,7 @@ use std::{ use arrow::pyarrow::ToPyArrow; use dora_node_api::{ merged::{MergeExternalSend, MergedEvent}, - DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter, + DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter, StopCause, }; use eyre::{Context, Result}; use futures::{Stream, StreamExt}; @@ -146,7 +146,7 @@ impl PyEvent { fn ty(event: &Event) -> &str { match event { - Event::Stop => "STOP", + Event::Stop(_) => "STOP", Event::Input { .. } => "INPUT", Event::InputClosed { .. } => "INPUT_CLOSED", Event::Error(_) => "ERROR", @@ -158,6 +158,11 @@ impl PyEvent { match event { Event::Input { id, .. } => Some(id), Event::InputClosed { id } => Some(id), + Event::Stop(cause) => match cause { + StopCause::Manual => Some("MANUAL"), + StopCause::AllInputsClosed => Some("ALL_INPUTS_CLOSED"), + &_ => None, + }, _ => None, } } diff --git a/apis/rust/node/src/event_stream/event.rs b/apis/rust/node/src/event_stream/event.rs index b5d0e9b8..22997f4b 100644 --- a/apis/rust/node/src/event_stream/event.rs +++ b/apis/rust/node/src/event_stream/event.rs @@ -10,7 +10,7 @@ use shared_memory_extended::{Shmem, ShmemConf}; #[derive(Debug)] #[non_exhaustive] pub enum Event { - Stop, + Stop(StopCause), Reload { operator_id: Option, }, @@ -25,6 +25,13 @@ pub enum Event { Error(String), } +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum StopCause { + Manual, + AllInputsClosed, +} + pub enum RawData { Empty, Vec(AVec>), diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index af8c42e6..565f8713 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -11,7 +11,7 @@ use dora_message::{ node_to_daemon::{DaemonRequest, Timestamped}, DataflowId, }; -pub use event::{Event, MappedInputData, RawData}; +pub use event::{Event, MappedInputData, RawData, StopCause}; use futures::{ future::{select, Either}, Stream, StreamExt, @@ -199,7 +199,7 @@ impl EventStream { fn convert_event_item(item: EventItem) -> Event { match item { EventItem::NodeEvent { event, ack_channel } => match event { - NodeEvent::Stop => Event::Stop, + NodeEvent::Stop => Event::Stop(event::StopCause::Manual), NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, NodeEvent::InputClosed { id } => Event::InputClosed { id }, NodeEvent::Input { id, metadata, data } => { @@ -234,7 +234,7 @@ impl EventStream { Err(err) => Event::Error(format!("{err:?}")), } } - NodeEvent::AllInputsClosed => Event::Stop, + NodeEvent::AllInputsClosed => Event::Stop(event::StopCause::AllInputsClosed), }, EventItem::FatalError(err) => { diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 9836ab7b..e1b17b6f 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -20,7 +20,7 @@ pub use dora_message::{ metadata::{Metadata, MetadataParameters, Parameter}, DataflowId, }; -pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData}; +pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData, StopCause}; pub use flume::Receiver; pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index ea949bf4..d4df4b4b 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -232,10 +232,10 @@ async fn run( } } } - RuntimeEvent::Event(Event::Stop) => { + RuntimeEvent::Event(Event::Stop(cause)) => { // forward stop event to all operators and close the event channels for (_, channel) in operator_channels.drain() { - let _ = channel.send_async(Event::Stop).await; + let _ = channel.send_async(Event::Stop(cause.clone())).await; } } RuntimeEvent::Event(Event::Reload { diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index b7795470..e2920a2a 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -182,7 +182,7 @@ impl<'lib> SharedLibraryOperator<'lib> { } let mut operator_event = match event { - Event::Stop => dora_operator_api_types::RawEvent { + Event::Stop(_) => dora_operator_api_types::RawEvent { input: None, input_closed: None, stop: true, From c92600c97b1dd8231815a7c8866cfba46ca3c0ab Mon Sep 17 00:00:00 2001 From: haixuantao Date: Mon, 23 Jun 2025 11:10:24 +0200 Subject: [PATCH 4/7] Fix stop event in openai-proxy-server --- node-hub/openai-proxy-server/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node-hub/openai-proxy-server/src/main.rs b/node-hub/openai-proxy-server/src/main.rs index c0714886..e35c0c3d 100644 --- a/node-hub/openai-proxy-server/src/main.rs +++ b/node-hub/openai-proxy-server/src/main.rs @@ -165,7 +165,7 @@ async fn main() -> eyre::Result<()> { _ => eyre::bail!("unexpected input id: {}", id), }; } - Event::Stop => { + Event::Stop(_) => { break; } event => { From d155ee789a4a75777330f118ba188cf9bae04aab Mon Sep 17 00:00:00 2001 From: haixuantao Date: Mon, 23 Jun 2025 11:17:34 +0200 Subject: [PATCH 5/7] Fix stop in example nodes --- apis/c++/node/src/lib.rs | 2 +- examples/multiple-daemons/node/src/main.rs | 2 +- examples/multiple-daemons/sink/src/main.rs | 4 ++-- examples/rust-dataflow/node/src/main.rs | 2 +- examples/rust-dataflow/sink-dynamic/src/main.rs | 4 ++-- examples/rust-dataflow/sink/src/main.rs | 4 ++-- examples/rust-dataflow/status-node/src/main.rs | 2 +- examples/rust-ros2-dataflow/node/src/main.rs | 2 +- node-hub/dora-keyboard/dora_keyboard/main.py | 2 ++ node-hub/dora-microphone/dora_microphone/main.py | 1 + node-hub/dora-mistral-rs/src/main.rs | 4 ++-- 11 files changed, 16 insertions(+), 13 deletions(-) diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index a7ec233a..4e06e11a 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -144,7 +144,7 @@ pub struct DoraEvent(Option); fn event_type(event: &DoraEvent) -> ffi::DoraEventType { match &event.0 { Some(event) => match event { - Event::Stop => ffi::DoraEventType::Stop, + Event::Stop(_) => ffi::DoraEventType::Stop, Event::Input { .. } => ffi::DoraEventType::Input, Event::InputClosed { .. } => ffi::DoraEventType::InputClosed, Event::Error(_) => ffi::DoraEventType::Error, diff --git a/examples/multiple-daemons/node/src/main.rs b/examples/multiple-daemons/node/src/main.rs index 36f42d57..bf1cd424 100644 --- a/examples/multiple-daemons/node/src/main.rs +++ b/examples/multiple-daemons/node/src/main.rs @@ -26,7 +26,7 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => println!("Received manual stop"), + Event::Stop(_) => println!("Received stop"), other => eprintln!("Received unexpected input: {other:?}"), } } diff --git a/examples/multiple-daemons/sink/src/main.rs b/examples/multiple-daemons/sink/src/main.rs index e180af08..59316aba 100644 --- a/examples/multiple-daemons/sink/src/main.rs +++ b/examples/multiple-daemons/sink/src/main.rs @@ -24,8 +24,8 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => { - println!("Received manual stop"); + Event::Stop(_) => { + println!("Received stop"); } Event::InputClosed { id } => { println!("Input `{id}` was closed"); diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index 36f42d57..bf1cd424 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -26,7 +26,7 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => println!("Received manual stop"), + Event::Stop(_) => println!("Received stop"), other => eprintln!("Received unexpected input: {other:?}"), } } diff --git a/examples/rust-dataflow/sink-dynamic/src/main.rs b/examples/rust-dataflow/sink-dynamic/src/main.rs index 58f36e41..db5164b7 100644 --- a/examples/rust-dataflow/sink-dynamic/src/main.rs +++ b/examples/rust-dataflow/sink-dynamic/src/main.rs @@ -25,8 +25,8 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => { - println!("Received manual stop"); + Event::Stop(_) => { + println!("Received stop"); } Event::InputClosed { id } => { println!("Input `{id}` was closed"); diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index e180af08..59316aba 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -24,8 +24,8 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => { - println!("Received manual stop"); + Event::Stop(_) => { + println!("Received stop"); } Event::InputClosed { id } => { println!("Input `{id}` was closed"); diff --git a/examples/rust-dataflow/status-node/src/main.rs b/examples/rust-dataflow/status-node/src/main.rs index 09de8184..ff97b97e 100644 --- a/examples/rust-dataflow/status-node/src/main.rs +++ b/examples/rust-dataflow/status-node/src/main.rs @@ -29,7 +29,7 @@ fn main() -> eyre::Result<()> { } other => eprintln!("ignoring unexpected input {other}"), }, - Event::Stop => {} + Event::Stop(_) => {} Event::InputClosed { id } => { println!("input `{id}` was closed"); if *id == "random" { diff --git a/examples/rust-ros2-dataflow/node/src/main.rs b/examples/rust-ros2-dataflow/node/src/main.rs index 395a5e34..32ad5970 100644 --- a/examples/rust-ros2-dataflow/node/src/main.rs +++ b/examples/rust-ros2-dataflow/node/src/main.rs @@ -119,7 +119,7 @@ fn main() -> eyre::Result<()> { } other => eprintln!("Ignoring unexpected input `{other}`"), }, - Event::Stop => println!("Received manual stop"), + Event::Stop(_) => println!("Received stop"), other => eprintln!("Received unexpected input: {other:?}"), }, MergedEvent::External(pose) => { diff --git a/node-hub/dora-keyboard/dora_keyboard/main.py b/node-hub/dora-keyboard/dora_keyboard/main.py index f3858fe0..644c10c5 100644 --- a/node-hub/dora-keyboard/dora_keyboard/main.py +++ b/node-hub/dora-keyboard/dora_keyboard/main.py @@ -11,6 +11,8 @@ def main(): node = Node() always_none = node.next(timeout=0.001) is None + always_none = node.next(timeout=0.001) is None + print("Always None:", always_none) with keyboard.Events() as events: while True: if not always_none: diff --git a/node-hub/dora-microphone/dora_microphone/main.py b/node-hub/dora-microphone/dora_microphone/main.py index b45756bc..aec0eeab 100644 --- a/node-hub/dora-microphone/dora_microphone/main.py +++ b/node-hub/dora-microphone/dora_microphone/main.py @@ -19,6 +19,7 @@ def main(): start_recording_time = tm.time() node = Node() + always_none = node.next(timeout=0.001) is None always_none = node.next(timeout=0.001) is None finished = False diff --git a/node-hub/dora-mistral-rs/src/main.rs b/node-hub/dora-mistral-rs/src/main.rs index bb451e1e..9b123113 100644 --- a/node-hub/dora-mistral-rs/src/main.rs +++ b/node-hub/dora-mistral-rs/src/main.rs @@ -46,8 +46,8 @@ async fn main() -> eyre::Result<()> { } other => eprintln!("Received input `{other}`"), }, - Event::Stop => { - println!("Received manual stop") + Event::Stop(_) => { + println!("Received command"); } Event::InputClosed { id } => { println!("input `{id}` was closed"); From c6ce84167b6b103011c586f8619300e134d209fe Mon Sep 17 00:00:00 2001 From: haixuantao Date: Mon, 23 Jun 2025 16:29:25 +0200 Subject: [PATCH 6/7] Adding vision to openai server --- examples/openai-server/dataflow-rust.yml | 6 +- examples/openai-server/openai_api_client.py | 60 +++++++- examples/openai-server/qwenvl.yml | 16 ++ libraries/arrow-convert/src/into_impls.rs | 14 ++ node-hub/dora-mistral-rs/src/main.rs | 2 +- .../dora-qwen2-5-vl/dora_qwen2_5_vl/main.py | 143 ++++++++++++++---- node-hub/openai-proxy-server/src/main.rs | 124 +++++++-------- node-hub/openai-proxy-server/src/message.rs | 44 ++++++ 8 files changed, 304 insertions(+), 105 deletions(-) create mode 100644 examples/openai-server/qwenvl.yml diff --git a/examples/openai-server/dataflow-rust.yml b/examples/openai-server/dataflow-rust.yml index 8c6a1d8d..85668b5a 100644 --- a/examples/openai-server/dataflow-rust.yml +++ b/examples/openai-server/dataflow-rust.yml @@ -3,14 +3,14 @@ nodes: build: cargo build -p dora-openai-proxy-server --release path: ../../target/release/dora-openai-proxy-server outputs: - - chat_completion_request + - text inputs: - completion_reply: dora-echo/echo + text: dora-echo/echo - id: dora-echo build: pip install -e ../../node-hub/dora-echo path: dora-echo inputs: - echo: dora-openai-server/chat_completion_request + echo: dora-openai-server/text outputs: - echo diff --git a/examples/openai-server/openai_api_client.py b/examples/openai-server/openai_api_client.py index 0a88d5b1..4580d652 100644 --- a/examples/openai-server/openai_api_client.py +++ b/examples/openai-server/openai_api_client.py @@ -32,11 +32,69 @@ def test_chat_completion(user_input): print(f"Error in chat completion: {e}") +def test_chat_completion_image_url(user_input): + """TODO: Add docstring.""" + try: + response = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": "What is in this image?"}, + { + "type": "image_url", + "image_url": { + "url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" + }, + }, + ], + } + ], + ) + print("Chat Completion Response:") + print(response.choices[0].message.content) + except Exception as e: + print(f"Error in chat completion: {e}") + + +def test_chat_completion_image_base64(user_input): + """TODO: Add docstring.""" + try: + response = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": "What is in this image?"}, + { + "type": "image_url", + "image_url": { + "url": "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAABgAAAAYCAYAAADgdz34AAAABHNCSVQICAgIfAhkiAAAAAlwSFlzAAAApgAAAKYB3X3/OAAAABl0RVh0U29mdHdhcmUAd3d3Lmlua3NjYXBlLm9yZ5vuPBoAAANCSURBVEiJtZZPbBtFFMZ/M7ubXdtdb1xSFyeilBapySVU8h8OoFaooFSqiihIVIpQBKci6KEg9Q6H9kovIHoCIVQJJCKE1ENFjnAgcaSGC6rEnxBwA04Tx43t2FnvDAfjkNibxgHxnWb2e/u992bee7tCa00YFsffekFY+nUzFtjW0LrvjRXrCDIAaPLlW0nHL0SsZtVoaF98mLrx3pdhOqLtYPHChahZcYYO7KvPFxvRl5XPp1sN3adWiD1ZAqD6XYK1b/dvE5IWryTt2udLFedwc1+9kLp+vbbpoDh+6TklxBeAi9TL0taeWpdmZzQDry0AcO+jQ12RyohqqoYoo8RDwJrU+qXkjWtfi8Xxt58BdQuwQs9qC/afLwCw8tnQbqYAPsgxE1S6F3EAIXux2oQFKm0ihMsOF71dHYx+f3NND68ghCu1YIoePPQN1pGRABkJ6Bus96CutRZMydTl+TvuiRW1m3n0eDl0vRPcEysqdXn+jsQPsrHMquGeXEaY4Yk4wxWcY5V/9scqOMOVUFthatyTy8QyqwZ+kDURKoMWxNKr2EeqVKcTNOajqKoBgOE28U4tdQl5p5bwCw7BWquaZSzAPlwjlithJtp3pTImSqQRrb2Z8PHGigD4RZuNX6JYj6wj7O4TFLbCO/Mn/m8R+h6rYSUb3ekokRY6f/YukArN979jcW+V/S8g0eT/N3VN3kTqWbQ428m9/8k0P/1aIhF36PccEl6EhOcAUCrXKZXXWS3XKd2vc/TRBG9O5ELC17MmWubD2nKhUKZa26Ba2+D3P+4/MNCFwg59oWVeYhkzgN/JDR8deKBoD7Y+ljEjGZ0sosXVTvbc6RHirr2reNy1OXd6pJsQ+gqjk8VWFYmHrwBzW/n+uMPFiRwHB2I7ih8ciHFxIkd/3Omk5tCDV1t+2nNu5sxxpDFNx+huNhVT3/zMDz8usXC3ddaHBj1GHj/As08fwTS7Kt1HBTmyN29vdwAw+/wbwLVOJ3uAD1wi/dUH7Qei66PfyuRj4Ik9is+hglfbkbfR3cnZm7chlUWLdwmprtCohX4HUtlOcQjLYCu+fzGJH2QRKvP3UNz8bWk1qMxjGTOMThZ3kvgLI5AzFfo379UAAAAASUVORK5CYII=" + }, + }, + ], + } + ], + ) + print("Chat Completion Response:") + print(response.choices[0].message.content) + except Exception as e: + print(f"Error in chat completion: {e}") + + if __name__ == "__main__": print("Testing API endpoints...") - test_list_models() + # test_list_models() print("\n" + "=" * 50 + "\n") chat_input = input("Enter a message for chat completion: ") test_chat_completion(chat_input) + + print("\n" + "=" * 50 + "\n") + + test_chat_completion_image_url(chat_input) + print("\n" + "=" * 50 + "\n") + test_chat_completion_image_base64(chat_input) print("\n" + "=" * 50 + "\n") diff --git a/examples/openai-server/qwenvl.yml b/examples/openai-server/qwenvl.yml new file mode 100644 index 00000000..b737b3be --- /dev/null +++ b/examples/openai-server/qwenvl.yml @@ -0,0 +1,16 @@ +nodes: + - id: dora-openai-server + build: cargo build -p dora-openai-proxy-server --release + path: ../../target/release/dora-openai-proxy-server + outputs: + - text + inputs: + text: dora-qwen2.5-vl/text + + - id: dora-qwen2.5-vl + build: pip install -e ../../node-hub/dora-qwen2-5-vl + path: dora-qwen2-5-vl + inputs: + text: dora-openai-server/text + outputs: + - text diff --git a/libraries/arrow-convert/src/into_impls.rs b/libraries/arrow-convert/src/into_impls.rs index a8434694..b2174146 100644 --- a/libraries/arrow-convert/src/into_impls.rs +++ b/libraries/arrow-convert/src/into_impls.rs @@ -81,6 +81,20 @@ impl IntoArrow for NaiveTime { } } +impl IntoArrow for String { + type A = StringArray; + fn into_arrow(self) -> Self::A { + std::iter::once(Some(self)).collect() + } +} + +impl IntoArrow for Vec { + type A = StringArray; + fn into_arrow(self) -> Self::A { + StringArray::from(self) + } +} + impl IntoArrow for NaiveDateTime { type A = arrow::array::TimestampNanosecondArray; fn into_arrow(self) -> Self::A { diff --git a/node-hub/dora-mistral-rs/src/main.rs b/node-hub/dora-mistral-rs/src/main.rs index 9b123113..e9227633 100644 --- a/node-hub/dora-mistral-rs/src/main.rs +++ b/node-hub/dora-mistral-rs/src/main.rs @@ -41,7 +41,7 @@ async fn main() -> eyre::Result<()> { node.send_output( mistral_output.clone(), metadata.parameters, - output.into_arrow(), + output.as_str().into_arrow(), )?; } other => eprintln!("Received input `{other}`"), diff --git a/node-hub/dora-qwen2-5-vl/dora_qwen2_5_vl/main.py b/node-hub/dora-qwen2-5-vl/dora_qwen2_5_vl/main.py index 3125858c..898b444d 100644 --- a/node-hub/dora-qwen2-5-vl/dora_qwen2_5_vl/main.py +++ b/node-hub/dora-qwen2-5-vl/dora_qwen2_5_vl/main.py @@ -62,29 +62,118 @@ if ADAPTER_PATH != "": processor = AutoProcessor.from_pretrained(MODEL_NAME_OR_PATH, use_fast=True) -def generate(frames: dict, question, history, past_key_values=None, image_id=None): +def generate( + frames: dict, texts: list[str], history, past_key_values=None, image_id=None +): """Generate the response to the question given the image using Qwen2 model.""" if image_id is not None: images = [frames[image_id]] else: images = list(frames.values()) - messages = [ - { - "role": "user", - "content": [ + + messages = [] + + for text in texts: + if text.startswith("<|system|>\n"): + messages.append( { - "type": "image", - "image": image, - "resized_height": image.size[1] * IMAGE_RESIZE_RATIO, - "resized_width": image.size[0] * IMAGE_RESIZE_RATIO, + "role": "system", + "content": [ + {"type": "text", "text": text.replace("<|system|>\n", "")}, + ], } - for image in images - ] - + [ - {"type": "text", "text": question}, - ], - }, - ] + ) + elif text.startswith("<|assistant|>\n"): + messages.append( + { + "role": "assistant", + "content": [ + {"type": "text", "text": text.replace("<|assistant|>\n", "")}, + ], + } + ) + elif text.startswith("<|tool|>\n"): + messages.append( + { + "role": "tool", + "content": [ + {"type": "text", "text": text.replace("<|tool|>\n", "")}, + ], + } + ) + elif text.startswith("<|user|>\n<|im_start|>\n"): + messages.append( + { + "role": "user", + "content": [ + { + "type": "text", + "text": text.replace("<|user|>\n<|im_start|>\n", ""), + }, + ], + } + ) + elif text.startswith("<|user|>\n<|vision_start|>\n"): + # Handle the case where the text starts with <|user|>\n<|vision_start|> + image_url = text.replace("<|user|>\n<|vision_start|>\n", "") + + # If the last message was from the user, append the image URL to it + if messages[-1]["role"] == "user": + messages[-1]["content"].append( + { + "type": "image", + "image": image_url, + } + ) + else: + messages.append( + { + "role": "user", + "content": [ + { + "type": "image", + "image": image_url, + }, + ], + } + ) + else: + messages.append( + { + "role": "user", + "content": [ + {"type": "text", "text": text}, + ], + } + ) + + # If the last message was from the user, append the image URL to it + if messages[-1]["role"] == "user": + messages[-1]["content"] += [ + { + "type": "image", + "image": image, + "resized_height": image.size[1] * IMAGE_RESIZE_RATIO, + "resized_width": image.size[0] * IMAGE_RESIZE_RATIO, + } + for image in images + ] + else: + messages.append( + { + "role": "user", + "content": [ + { + "type": "image", + "image": image, + "resized_height": image.size[1] * IMAGE_RESIZE_RATIO, + "resized_width": image.size[0] * IMAGE_RESIZE_RATIO, + } + for image in images + ], + } + ) + tmp_history = history + messages # Preparation for inference text = processor.apply_chat_template( @@ -120,19 +209,13 @@ def generate(frames: dict, question, history, past_key_values=None, image_id=Non clean_up_tokenization_spaces=False, ) if HISTORY: - history += [ - { - "role": "user", - "content": [ - {"type": "text", "text": question}, - ], - }, + history = tmp_history + [ { "role": "assistant", "content": [ {"type": "text", "text": output_text[0]}, ], - }, + } ] return output_text[0], history, past_key_values @@ -207,24 +290,22 @@ def main(): elif "text" in event_id: if len(event["value"]) > 0: - text = event["value"][0].as_py() + texts = event["value"].to_pylist() image_id = event["metadata"].get("image_id", None) else: - text = cached_text - words = text.split() + texts = cached_text + words = texts[-1].split() if len(ACTIVATION_WORDS) > 0 and all( word not in ACTIVATION_WORDS for word in words ): continue - cached_text = text + cached_text = texts - if len(frames.keys()) == 0: - continue # set the max number of tiles in `max_num` response, history, past_key_values = generate( frames, - text, + texts, history, past_key_values, image_id, diff --git a/node-hub/openai-proxy-server/src/main.rs b/node-hub/openai-proxy-server/src/main.rs index e35c0c3d..50e73880 100644 --- a/node-hub/openai-proxy-server/src/main.rs +++ b/node-hub/openai-proxy-server/src/main.rs @@ -1,4 +1,10 @@ -use dora_node_api::{self, dora_core::config::DataId, merged::MergeExternalSend, DoraNode, Event}; +use dora_node_api::{ + self, + arrow::array::{AsArray, StringArray}, + dora_core::config::DataId, + merged::MergeExternalSend, + DoraNode, Event, +}; use eyre::{Context, ContextCompat}; use futures::{ @@ -14,7 +20,7 @@ use hyper::{ }; use message::{ ChatCompletionObject, ChatCompletionObjectChoice, ChatCompletionObjectMessage, - ChatCompletionRequest, ChatCompletionRequestMessage, Usage, + ChatCompletionRequest, Usage, }; use std::{ collections::VecDeque, @@ -71,7 +77,7 @@ async fn main() -> eyre::Result<()> { let merged = events.merge_external_send(server_events); let events = futures::executor::block_on_stream(merged); - let output_id = DataId::from("chat_completion_request".to_owned()); + let output_id = DataId::from("text".to_owned()); let mut reply_channels = VecDeque::new(); for event in events { @@ -82,45 +88,15 @@ async fn main() -> eyre::Result<()> { break; } ServerEvent::ChatCompletionRequest { request, reply } => { - let message = request - .messages - .into_iter() - .find_map(|m| match m { - ChatCompletionRequestMessage::User(message) => Some(message), - _ => None, - }) - .context("no user message found"); - match message { - Ok(message) => match message.content() { - message::ChatCompletionUserMessageContent::Text(content) => { - node.send_output_bytes( - output_id.clone(), - Default::default(), - content.len(), - content.as_bytes(), - ) - .context("failed to send dora output")?; - reply_channels.push_back(( - reply, - content.as_bytes().len() as u64, - request.model, - )); - } - message::ChatCompletionUserMessageContent::Parts(_) => { - if reply - .send(Err(eyre::eyre!("unsupported message content"))) - .is_err() - { - tracing::warn!("failed to send chat completion reply because channel closed early"); - }; - } - }, - Err(err) => { - if reply.send(Err(err)).is_err() { - tracing::warn!("failed to send chat completion reply error because channel closed early"); - } - } - } + let texts = request.to_texts(); + node.send_output( + output_id.clone(), + Default::default(), + StringArray::from(texts), + ) + .context("failed to send dora output")?; + + reply_channels.push_back((reply, 0 as u64, request.model)); } }, dora_node_api::merged::MergedEvent::Dora(event) => match event { @@ -130,35 +106,42 @@ async fn main() -> eyre::Result<()> { metadata: _, } => { match id.as_str() { - "completion_reply" => { + "text" => { let (reply_channel, prompt_tokens, model) = reply_channels.pop_front().context("no reply channel")?; - let data = TryFrom::try_from(&data) - .with_context(|| format!("invalid reply data: {data:?}")) - .map(|s: &[u8]| ChatCompletionObject { - id: format!("completion-{}", uuid::Uuid::new_v4()), - object: "chat.completion".to_string(), - created: chrono::Utc::now().timestamp() as u64, - model: model.unwrap_or_default(), - choices: vec![ChatCompletionObjectChoice { - index: 0, - message: ChatCompletionObjectMessage { - role: message::ChatCompletionRole::Assistant, - content: Some(String::from_utf8_lossy(s).to_string()), - tool_calls: Vec::new(), - function_call: None, - }, - finish_reason: message::FinishReason::stop, - logprobs: None, - }], - usage: Usage { - prompt_tokens, - completion_tokens: s.len() as u64, - total_tokens: prompt_tokens + s.len() as u64, + let data = data.as_string::(); + let string = data.iter().fold("".to_string(), |mut acc, s| { + if let Some(s) = s { + acc.push_str("\n"); + acc.push_str(s); + } + acc + }); + + let data = ChatCompletionObject { + id: format!("completion-{}", uuid::Uuid::new_v4()), + object: "chat.completion".to_string(), + created: chrono::Utc::now().timestamp() as u64, + model: model.unwrap_or_default(), + choices: vec![ChatCompletionObjectChoice { + index: 0, + message: ChatCompletionObjectMessage { + role: message::ChatCompletionRole::Assistant, + content: Some(string.to_string()), + tool_calls: Vec::new(), + function_call: None, }, - }); - - if reply_channel.send(data).is_err() { + finish_reason: message::FinishReason::stop, + logprobs: None, + }], + usage: Usage { + prompt_tokens, + completion_tokens: string.len() as u64, + total_tokens: prompt_tokens + string.len() as u64, + }, + }; + + if reply_channel.send(Ok(data)).is_err() { tracing::warn!("failed to send chat completion reply because channel closed early"); } } @@ -168,8 +151,11 @@ async fn main() -> eyre::Result<()> { Event::Stop(_) => { break; } + Event::InputClosed { id, .. } => { + info!("Input channel closed for id: {}", id); + } event => { - println!("Event: {event:#?}") + eyre::bail!("unexpected event: {:#?}", event) } }, } diff --git a/node-hub/openai-proxy-server/src/message.rs b/node-hub/openai-proxy-server/src/message.rs index dff7e101..4c9eb99f 100644 --- a/node-hub/openai-proxy-server/src/message.rs +++ b/node-hub/openai-proxy-server/src/message.rs @@ -230,6 +230,15 @@ impl<'de> Deserialize<'de> for ChatCompletionRequest { } } +impl ChatCompletionRequest { + pub fn to_texts(&self) -> Vec { + self.messages + .iter() + .flat_map(|message| message.to_texts()) + .collect() + } +} + /// Message for comprising the conversation. #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] #[serde(tag = "role", rename_all = "lowercase")] @@ -308,6 +317,22 @@ impl ChatCompletionRequestMessage { ChatCompletionRequestMessage::Tool(_) => None, } } + + /// The contents of the message. + pub fn to_texts(&self) -> Vec { + match self { + ChatCompletionRequestMessage::System(message) => { + vec![String::from("<|system|>\n") + &message.content] + } + ChatCompletionRequestMessage::User(message) => message.content.to_texts(), + ChatCompletionRequestMessage::Assistant(message) => { + vec![String::from("<|assistant|>\n") + &message.content.clone().unwrap_or_default()] + } + ChatCompletionRequestMessage::Tool(message) => { + vec![String::from("<|tool|>\n") + &message.content.clone()] + } + } + } } /// Sampling methods used for chat completion requests. @@ -587,6 +612,25 @@ impl ChatCompletionUserMessageContent { ChatCompletionUserMessageContent::Parts(_) => "parts", } } + + pub fn to_texts(&self) -> Vec { + match self { + ChatCompletionUserMessageContent::Text(text) => { + vec![String::from("user: ") + &text.clone()] + } + ChatCompletionUserMessageContent::Parts(parts) => parts + .iter() + .map(|part| match part { + ContentPart::Text(text_part) => { + String::from("<|user|>\n<|im_start|>\n") + &text_part.text.clone() + } + ContentPart::Image(image) => { + String::from("<|user|>\n<|vision_start|>\n") + &image.image().url.clone() + } + }) + .collect(), + } + } } /// Define the content part of a user message. From 2cef9eb6261d862dfe02a07f78f64451e71ca43c Mon Sep 17 00:00:00 2001 From: haixuantao Date: Wed, 25 Jun 2025 15:08:16 +0200 Subject: [PATCH 7/7] Adding monochrome encoding and examples --- Cargo.lock | 4 +- examples/vggt/depth-to-avif.yaml | 54 +++++++++++++++++++++++++++ examples/vggt/depth.dora-session.yaml | 8 ---- examples/vggt/image_saver.py | 34 +++++++++++++++++ node-hub/dora-rav1e/Cargo.toml | 2 +- node-hub/dora-rav1e/src/lib.rs | 25 ++++++++++++- node-hub/dora-vggt/dora_vggt/main.py | 30 +++++++++------ 7 files changed, 133 insertions(+), 24 deletions(-) create mode 100644 examples/vggt/depth-to-avif.yaml delete mode 100644 examples/vggt/depth.dora-session.yaml create mode 100644 examples/vggt/image_saver.py diff --git a/Cargo.lock b/Cargo.lock index e3bf55bf..5cd2486e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1165,9 +1165,9 @@ dependencies = [ [[package]] name = "avif-serialize" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98922d6a4cfbcb08820c69d8eeccc05bb1f29bfa06b4f5b1dbfe9a868bd7608e" +checksum = "19135c0c7a60bfee564dbe44ab5ce0557c6bf3884e5291a50be76a15640c4fbd" dependencies = [ "arrayvec", ] diff --git a/examples/vggt/depth-to-avif.yaml b/examples/vggt/depth-to-avif.yaml new file mode 100644 index 00000000..6db92ac3 --- /dev/null +++ b/examples/vggt/depth-to-avif.yaml @@ -0,0 +1,54 @@ +nodes: + - id: camera + build: pip install opencv-video-capture + path: opencv-video-capture + inputs: + tick: dora/timer/millis/100 + outputs: + - image + env: + CAPTURE_PATH: 1 + + - id: dora-vggt + build: pip install -e ../../node-hub/dora-vggt + path: dora-vggt + inputs: + image: camera/image + outputs: + - depth + - image + env: + DEPTH_ENCODING: mono16 + + - id: rav1e-depth + path: dora-rav1e + build: cargo build -p dora-rav1e --release + inputs: + depth: dora-vggt/depth + outputs: + - depth + env: + ENCODING: avif + + - id: rav1e-image + path: dora-rav1e + build: cargo build -p dora-rav1e --release + inputs: + image: dora-vggt/image + outputs: + - image + env: + ENCODING: avif + + - id: bench + path: image_saver.py + inputs: + camera_depth: rav1e-image/image + vggt_depth: rav1e-depth/depth + + - id: plot + build: pip install dora-rerun + path: dora-rerun + inputs: + camera/image: dora-vggt/image + camera/depth: dora-vggt/depth diff --git a/examples/vggt/depth.dora-session.yaml b/examples/vggt/depth.dora-session.yaml deleted file mode 100644 index 13428f1b..00000000 --- a/examples/vggt/depth.dora-session.yaml +++ /dev/null @@ -1,8 +0,0 @@ -build_id: 2b402c1e-e52e-45e9-86e5-236b33a77369 -session_id: 275de19c-e605-4865-bc5f-2f15916bade9 -git_sources: {} -local_build: - node_working_dirs: - camera: /Users/xaviertao/Documents/work/dora/examples/vggt - dora-vggt: /Users/xaviertao/Documents/work/dora/examples/vggt - plot: /Users/xaviertao/Documents/work/dora/examples/vggt diff --git a/examples/vggt/image_saver.py b/examples/vggt/image_saver.py new file mode 100644 index 00000000..5552d3ba --- /dev/null +++ b/examples/vggt/image_saver.py @@ -0,0 +1,34 @@ +from dora import Node + +node = Node() + +index_dict = {} +i = 0 + +LEAD_TOPIC = "vggt_depth" + +for event in node: + if event["type"] == "INPUT": + if LEAD_TOPIC in event["id"]: + storage = event["value"] + metadata = event["metadata"] + encoding = metadata["encoding"] + width = metadata["width"] + height = metadata["height"] + + # Save to file + filename = f"out/{event['id']}_{i}.{encoding}" + with open(filename, "wb") as f: + f.write(storage.to_numpy()) + for key, value in index_dict.items(): + filename = f"out/{key}_{i}.{value['metadata']['encoding']}" + with open(filename, "wb") as f: + f.write(value["value"]) + i += 1 + else: + # Store the event in the index dictionary + index_dict[event["id"]] = { + "type": event["type"], + "value": event["value"].to_numpy(), + "metadata": event["metadata"], + } diff --git a/node-hub/dora-rav1e/Cargo.toml b/node-hub/dora-rav1e/Cargo.toml index c2e35cd2..5dcb6b73 100644 --- a/node-hub/dora-rav1e/Cargo.toml +++ b/node-hub/dora-rav1e/Cargo.toml @@ -25,7 +25,7 @@ pyo3 = { workspace = true, features = [ "eyre", "generate-import-lib", ], optional = true } -avif-serialize = "0.8.3" +avif-serialize = "0.8.4" [lib] diff --git a/node-hub/dora-rav1e/src/lib.rs b/node-hub/dora-rav1e/src/lib.rs index 22e43180..68280155 100644 --- a/node-hub/dora-rav1e/src/lib.rs +++ b/node-hub/dora-rav1e/src/lib.rs @@ -336,7 +336,7 @@ pub fn lib_main() -> Result<()> { if let Some(buffer) = data.as_primitive_opt::() { let mut buffer = buffer.values().to_vec(); if std::env::var("FILL_ZEROS") - .map(|s| s != "false") + .map(|s| s.to_lowercase() != "false") .unwrap_or(true) { fill_zeros_toward_center_y_plane_in_place(&mut buffer, width, height); @@ -370,7 +370,28 @@ pub fn lib_main() -> Result<()> { let data = pkt.data; match output_encoding.as_str() { "avif" => { - warn!("avif encoding not supported for mono16"); + metadata.parameters.insert( + "encoding".to_string(), + Parameter::String("avif".to_string()), + ); + + let data = avif_serialize::Aviffy::new() + .full_color_range(false) + .set_seq_profile(0) + .set_monochrome(true) + .to_vec( + &data, + None, + enc.width as u32, + enc.height as u32, + enc.bit_depth as u8, + ); + + let arrow = data.into_arrow(); + + node.send_output(id, metadata.parameters.clone(), arrow) + .context("could not send output") + .unwrap(); } _ => { metadata.parameters.insert( diff --git a/node-hub/dora-vggt/dora_vggt/main.py b/node-hub/dora-vggt/dora_vggt/main.py index 7c0e24c7..500e665d 100644 --- a/node-hub/dora-vggt/dora_vggt/main.py +++ b/node-hub/dora-vggt/dora_vggt/main.py @@ -1,6 +1,7 @@ """TODO: Add docstring.""" import io +import os from collections import deque as Deque import cv2 @@ -17,11 +18,15 @@ from vggt.utils.pose_enc import pose_encoding_to_extri_intri dtype = torch.bfloat16 +# Check if cuda is available and set the device accordingly +device = "cuda" if torch.cuda.is_available() else "cpu" + # Initialize the model and load the pretrained weights. # This will automatically download the model weights the first time it's run, which may take a while. -model = VGGT.from_pretrained("facebook/VGGT-1B").to("cuda") +model = VGGT.from_pretrained("facebook/VGGT-1B").to(device) model.eval() +DEPTH_ENCODING = os.environ.get("DEPTH_ENCODING", "float64") # Import vecdeque @@ -32,7 +37,6 @@ def main(): for event in node: if event["type"] == "INPUT": - if "image" in event["id"]: storage = event["value"] metadata = event["metadata"] @@ -80,7 +84,7 @@ def main(): raw_images.append(buffer) with torch.no_grad(): - images = load_and_preprocess_images(raw_images).to("cuda") + images = load_and_preprocess_images(raw_images).to(device) images = images[None] # add batch dimension aggregated_tokens_list, ps_idx = model.aggregator(images) @@ -107,20 +111,24 @@ def main(): depth_map = depth_map[-1][-1].cpu().numpy() # Warning: Make sure to add my_output_id and my_input_id within the dataflow. + if DEPTH_ENCODING == "mono16": + depth_map = (depth_map * 1000).astype(np.uint16) + node.send_output( output_id="depth", data=pa.array(depth_map.ravel()), metadata={ "width": depth_map.shape[1], "height": depth_map.shape[0], - "focal": [ - int(f_0), - int(f_1), - ], - "resolution": [ - int(r_0), - int(r_1), - ], + "encoding": DEPTH_ENCODING, + "focal": [ + int(f_0), + int(f_1), + ], + "resolution": [ + int(r_0), + int(r_1), + ], }, )