diff --git a/examples/openai-server/image-server.yaml b/examples/openai-server/image-server.yaml new file mode 100644 index 00000000..51655ca0 --- /dev/null +++ b/examples/openai-server/image-server.yaml @@ -0,0 +1,29 @@ +nodes: + - id: dora-openai-server + build: cargo build -p dora-openai-proxy-server --release + path: ../../target/release/dora-openai-proxy-server + outputs: + - text + inputs: + text: dora-echo/echo + + - id: dora-echo + build: pip install -e ../../node-hub/dora-echo + path: dora-echo + inputs: + echo: dora-openai-server/text + outputs: + - echo + + - id: parse_text + path: parse_text.py + inputs: + text: dora-openai-server/text + outputs: + - image + + - id: plot + build: pip install dora-rerun + path: dora-rerun + inputs: + camera/image: parse_text/image diff --git a/examples/openai-server/image_client.py b/examples/openai-server/image_client.py new file mode 100644 index 00000000..498d0da2 --- /dev/null +++ b/examples/openai-server/image_client.py @@ -0,0 +1,88 @@ +"""TODO: Add docstring.""" + +from openai import OpenAI + +client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy_api_key") + + +def test_list_models(): + """TODO: Add docstring.""" + try: + models = client.models.list() + print("Available models:") + for model in models.data: + print(f"- {model.id}") + except Exception as e: + print(f"Error listing models: {e}") + + +def test_chat_completion(user_input): + """TODO: Add docstring.""" + try: + response = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": user_input}, + ], + ) + print("Chat Completion Response:") + print(response.choices[0].message.content) + except Exception as e: + print(f"Error in chat completion: {e}") + + +def test_chat_completion_image_url(user_input): + """TODO: Add docstring.""" + try: + response = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": "What is in this image?"}, + { + "type": "image_url", + "image_url": { + "url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" + }, + }, + ], + } + ], + ) + print("Chat Completion Response:") + print(response.choices[0].message.content) + except Exception as e: + print(f"Error in chat completion: {e}") + + +def test_chat_completion_image_base64(user_input): + """TODO: Add docstring.""" + try: + response = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": "What is in this image?"}, + { + "type": "image_url", + "image_url": { + "url": "" + }, + }, + ], + } + ], + ) + print("Chat Completion Response:") + print(response.choices[0].message.content) + except Exception as e: + print(f"Error in chat completion: {e}") + + +if __name__ == "__main__": + test_chat_completion_image_base64("abc") diff --git a/examples/openai-server/parse_text.py b/examples/openai-server/parse_text.py new file mode 100644 index 00000000..883d641a --- /dev/null +++ b/examples/openai-server/parse_text.py @@ -0,0 +1,33 @@ +"""TODO: Add docstring.""" + +from PIL import Image +from io import BytesIO +import base64 +from dora import Node +import pyarrow as pa +import numpy as np +node = Node() + + +for event in node: + if event["type"] == "INPUT": + texts = event["value"].to_numpy(zero_copy_only=False) + + for text in texts: + if text.startswith("<|user|>\n<|vision_start|>\n"): + # Handle the case where the text starts with <|user|>\n<|vision_start|> + image = text.replace("<|user|>\n<|vision_start|>\n", "") + if "base64" in image: + image = image.split(",", 1)[1] + print("image", image) + image = Image.open(BytesIO(base64.b64decode(image))) + node.send_output( + "image", + pa.array(np.array(image).ravel()), + metadata={ + "encoding": "rgb8", + "width": image.width, + "height": image.height, + }, + ) + print(f"Processed {len(texts)} texts.") diff --git a/examples/vggt/format_response.py b/examples/vggt/format_response.py new file mode 100644 index 00000000..cabe5ffe --- /dev/null +++ b/examples/vggt/format_response.py @@ -0,0 +1,95 @@ +"""TODO: Add docstring.""" + +import base64 +from dora import Node +import pyarrow as pa +import numpy as np +import cv2 +node = Node() + + +cache = {} + +def derive_image_as_base64(event): + storage = event["value"] + metadata = event["metadata"] + encoding = metadata["encoding"] + width = metadata["width"] + height = metadata["height"] + + if ( + encoding == "mono16" or encoding == "z16" + ): + channels = 1 + storage_type = np.uint16 + else: + channels = 3 + storage_type = np.uint8 + + if encoding == "bgr8": + frame = ( + storage.to_numpy() + .astype(storage_type) + .reshape((height, width, channels)) + ) + frame = cv2.imencode('.jpg', frame)[1].tobytes() + frame = base64.b64encode(frame).decode('utf-8') + image = f"data:image/jpeg;base64,{frame}" + elif encoding == "rgb8": + frame = ( + storage.to_numpy() + .astype(storage_type) + .reshape((height, width, channels)) + ) + frame = frame[:, :, ::-1] # OpenCV image (RGB to BGR) + frame = cv2.imencode('.jpg', frame)[1].tobytes() + frame = base64.b64encode(frame).decode('utf-8') + image = f"data:image/jpeg;base64,{frame}" + elif encoding in ["jpeg", "jpg", "avif", "webp", "png", "bmp"]: + storage = storage.to_numpy() + image = base64.b64encode(storage).decode('utf-8') + image = f"data:image/{encoding};base64,{image}" + elif encoding in ["mono16", "z16"]: + frame = ( + storage.to_numpy() + .astype(storage_type) + .reshape((height, width, 1)) + ) + storage = storage.to_numpy() + image = base64.b64encode(storage).decode('utf-8') + image = f"data:image/jpeg;base64,{frame}" + + else: + raise RuntimeError(f"Unsupported image encoding: {encoding}") + + return image + +for event in node: + if event["type"] == "INPUT": + event_id = event["id"] + print(f"Received event: {event_id}") + match event_id: + case s if "image" in s: + + message_id = event["metadata"].get("id", 0) + if cache.get(message_id) is None: + cache[message_id] = {} + cache[message_id]["image"] = event + case s if "depth" in s: + message_id = event["metadata"].get("id", 0) + if cache.get(message_id) is None: + cache[message_id] = {} + cache[message_id]["depth"] = event + if len(cache[message_id]) == 2: + image = cache[message_id]["image"] + image_str = derive_image_as_base64(image) + depth = cache[message_id]["depth"] + depth_str = derive_image_as_base64(depth) + + node.send_output( + "text", + pa.array(np.array(image_str + "\n" + depth_str).ravel()), + metadata={ + "message_id": message_id, + }, + ) diff --git a/examples/vggt/openai-server.yaml b/examples/vggt/openai-server.yaml new file mode 100644 index 00000000..284199e8 --- /dev/null +++ b/examples/vggt/openai-server.yaml @@ -0,0 +1,54 @@ +nodes: + - id: dora-openai-server + build: cargo build -p dora-openai-proxy-server --release + path: ../../target/release/dora-openai-proxy-server + outputs: + - text + inputs: + text: format_response/text + + - id: parse_text + path: parse_text.py + inputs: + text: dora-openai-server/text + outputs: + - image + + - id: dora-vggt + build: pip install -e ../../node-hub/dora-vggt + path: dora-vggt + inputs: + image: parse_text/image + outputs: + - depth + - image + env: + DEPTH_ENCODING: mono16 + + - id: rav1e-image + path: dora-rav1e + build: pip install -e ../../node-hub/dora-rav1e + inputs: + image: dora-vggt/image + outputs: + - image + env: + ENCODING: avif + + - id: rav1e-depth + path: dora-rav1e + build: pip install -e ../../node-hub/dora-rav1e + inputs: + depth: dora-vggt/depth + outputs: + - depth + env: + ENCODING: avif + + - id: format_response + path: format_response.py + inputs: + depth: rav1e-depth/depth + image: rav1e-image/image + outputs: + - text diff --git a/examples/vggt/parse_text.py b/examples/vggt/parse_text.py new file mode 100644 index 00000000..61bc8864 --- /dev/null +++ b/examples/vggt/parse_text.py @@ -0,0 +1,33 @@ +"""TODO: Add docstring.""" + +from PIL import Image +from io import BytesIO +import base64 +from dora import Node +import pyarrow as pa +import numpy as np +node = Node() + + +for event in node: + if event["type"] == "INPUT": + texts = event["value"].to_numpy(zero_copy_only=False) + + for text in texts: + if text.startswith("<|user|>\n<|vision_start|>\n"): + # Handle the case where the text starts with <|user|>\n<|vision_start|> + image = text.replace("<|user|>\n<|vision_start|>\n", "") + if "base64" in image: + image = image.split(",", 1)[1] + + image = Image.open(BytesIO(base64.b64decode(image))) + node.send_output( + "image", + pa.array(np.array(image).ravel()), + metadata={ + "encoding": "rgb8", + "width": image.width, + "height": image.height, + }, + ) + print(f"Processed {len(texts)} texts.") \ No newline at end of file diff --git a/node-hub/dora-vggt/dora_vggt/main.py b/node-hub/dora-vggt/dora_vggt/main.py index e494c8e3..0e156ed5 100644 --- a/node-hub/dora-vggt/dora_vggt/main.py +++ b/node-hub/dora-vggt/dora_vggt/main.py @@ -130,6 +130,7 @@ def main(): int(r_0), int(r_1), ], + "message_id": event["metadata"].get("message_id", 0), }, ) @@ -154,6 +155,7 @@ def main(): int(r_0), int(r_1), ], + "message_id": event["metadata"].get("message_id", 0), }, ) diff --git a/node-hub/openai-proxy-server/src/main.rs b/node-hub/openai-proxy-server/src/main.rs index 4cee442f..7633352f 100644 --- a/node-hub/openai-proxy-server/src/main.rs +++ b/node-hub/openai-proxy-server/src/main.rs @@ -107,42 +107,44 @@ async fn main() -> eyre::Result<()> { } => { match id.as_str() { "text" => { - let (reply_channel, prompt_tokens, model) = - reply_channels.pop_front().context("no reply channel")?; - let data = data.as_string::(); - let string = data.iter().fold("".to_string(), |mut acc, s| { - if let Some(s) = s { - acc.push('\n'); - acc.push_str(s); - } - acc - }); - - let data = ChatCompletionObject { - id: format!("completion-{}", uuid::Uuid::new_v4()), - object: "chat.completion".to_string(), - created: chrono::Utc::now().timestamp() as u64, - model: model.unwrap_or_default(), - choices: vec![ChatCompletionObjectChoice { - index: 0, - message: ChatCompletionObjectMessage { - role: message::ChatCompletionRole::Assistant, - content: Some(string.to_string()), - tool_calls: Vec::new(), - function_call: None, + if let Some((reply_channel, prompt_tokens, model)) = + reply_channels.pop_front() + { + let data = data.as_string::(); + let string = data.iter().fold("".to_string(), |mut acc, s| { + if let Some(s) = s { + acc.push('\n'); + acc.push_str(s); + } + acc + }); + + let data = ChatCompletionObject { + id: format!("completion-{}", uuid::Uuid::new_v4()), + object: "chat.completion".to_string(), + created: chrono::Utc::now().timestamp() as u64, + model: model.unwrap_or_default(), + choices: vec![ChatCompletionObjectChoice { + index: 0, + message: ChatCompletionObjectMessage { + role: message::ChatCompletionRole::Assistant, + content: Some(string.to_string()), + tool_calls: Vec::new(), + function_call: None, + }, + finish_reason: message::FinishReason::stop, + logprobs: None, + }], + usage: Usage { + prompt_tokens, + completion_tokens: string.len() as u64, + total_tokens: prompt_tokens + string.len() as u64, }, - finish_reason: message::FinishReason::stop, - logprobs: None, - }], - usage: Usage { - prompt_tokens, - completion_tokens: string.len() as u64, - total_tokens: prompt_tokens + string.len() as u64, - }, - }; - - if reply_channel.send(Ok(data)).is_err() { - tracing::warn!("failed to send chat completion reply because channel closed early"); + }; + + if reply_channel.send(Ok(data)).is_err() { + tracing::warn!("failed to send chat completion reply because channel closed early"); + } } } _ => eyre::bail!("unexpected input id: {}", id),