| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
bb89c139e7 | Add configurable port | 6 months ago |
|
|
1448f27d95 | initial commit for openai vggt | 6 months ago |
| @@ -0,0 +1,29 @@ | |||||
| nodes: | |||||
| - id: dora-openai-server | |||||
| build: cargo build -p dora-openai-proxy-server --release | |||||
| path: ../../target/release/dora-openai-proxy-server | |||||
| outputs: | |||||
| - text | |||||
| inputs: | |||||
| text: dora-echo/echo | |||||
| - id: dora-echo | |||||
| build: pip install -e ../../node-hub/dora-echo | |||||
| path: dora-echo | |||||
| inputs: | |||||
| echo: dora-openai-server/text | |||||
| outputs: | |||||
| - echo | |||||
| - id: parse_text | |||||
| path: parse_text.py | |||||
| inputs: | |||||
| text: dora-openai-server/text | |||||
| outputs: | |||||
| - image | |||||
| - id: plot | |||||
| build: pip install dora-rerun | |||||
| path: dora-rerun | |||||
| inputs: | |||||
| camera/image: parse_text/image | |||||
| @@ -0,0 +1,88 @@ | |||||
| """TODO: Add docstring.""" | |||||
| from openai import OpenAI | |||||
| client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy_api_key") | |||||
| def test_list_models(): | |||||
| """TODO: Add docstring.""" | |||||
| try: | |||||
| models = client.models.list() | |||||
| print("Available models:") | |||||
| for model in models.data: | |||||
| print(f"- {model.id}") | |||||
| except Exception as e: | |||||
| print(f"Error listing models: {e}") | |||||
| def test_chat_completion(user_input): | |||||
| """TODO: Add docstring.""" | |||||
| try: | |||||
| response = client.chat.completions.create( | |||||
| model="gpt-3.5-turbo", | |||||
| messages=[ | |||||
| {"role": "system", "content": "You are a helpful assistant."}, | |||||
| {"role": "user", "content": user_input}, | |||||
| ], | |||||
| ) | |||||
| print("Chat Completion Response:") | |||||
| print(response.choices[0].message.content) | |||||
| except Exception as e: | |||||
| print(f"Error in chat completion: {e}") | |||||
| def test_chat_completion_image_url(user_input): | |||||
| """TODO: Add docstring.""" | |||||
| try: | |||||
| response = client.chat.completions.create( | |||||
| model="gpt-3.5-turbo", | |||||
| messages=[ | |||||
| { | |||||
| "role": "user", | |||||
| "content": [ | |||||
| {"type": "text", "text": "What is in this image?"}, | |||||
| { | |||||
| "type": "image_url", | |||||
| "image_url": { | |||||
| "url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" | |||||
| }, | |||||
| }, | |||||
| ], | |||||
| } | |||||
| ], | |||||
| ) | |||||
| print("Chat Completion Response:") | |||||
| print(response.choices[0].message.content) | |||||
| except Exception as e: | |||||
| print(f"Error in chat completion: {e}") | |||||
| def test_chat_completion_image_base64(user_input): | |||||
| """TODO: Add docstring.""" | |||||
| try: | |||||
| response = client.chat.completions.create( | |||||
| model="gpt-3.5-turbo", | |||||
| messages=[ | |||||
| { | |||||
| "role": "user", | |||||
| "content": [ | |||||
| {"type": "text", "text": "What is in this image?"}, | |||||
| { | |||||
| "type": "image_url", | |||||
| "image_url": { | |||||
| "url": "" | |||||
| }, | |||||
| }, | |||||
| ], | |||||
| } | |||||
| ], | |||||
| ) | |||||
| print("Chat Completion Response:") | |||||
| print(response.choices[0].message.content) | |||||
| except Exception as e: | |||||
| print(f"Error in chat completion: {e}") | |||||
| if __name__ == "__main__": | |||||
| test_chat_completion_image_base64("abc") | |||||
| @@ -0,0 +1,33 @@ | |||||
| """TODO: Add docstring.""" | |||||
| from PIL import Image | |||||
| from io import BytesIO | |||||
| import base64 | |||||
| from dora import Node | |||||
| import pyarrow as pa | |||||
| import numpy as np | |||||
| node = Node() | |||||
| for event in node: | |||||
| if event["type"] == "INPUT": | |||||
| texts = event["value"].to_numpy(zero_copy_only=False) | |||||
| for text in texts: | |||||
| if text.startswith("<|user|>\n<|vision_start|>\n"): | |||||
| # Handle the case where the text starts with <|user|>\n<|vision_start|> | |||||
| image = text.replace("<|user|>\n<|vision_start|>\n", "") | |||||
| if "base64" in image: | |||||
| image = image.split(",", 1)[1] | |||||
| print("image", image) | |||||
| image = Image.open(BytesIO(base64.b64decode(image))) | |||||
| node.send_output( | |||||
| "image", | |||||
| pa.array(np.array(image).ravel()), | |||||
| metadata={ | |||||
| "encoding": "rgb8", | |||||
| "width": image.width, | |||||
| "height": image.height, | |||||
| }, | |||||
| ) | |||||
| print(f"Processed {len(texts)} texts.") | |||||
| @@ -0,0 +1,95 @@ | |||||
| """TODO: Add docstring.""" | |||||
| import base64 | |||||
| from dora import Node | |||||
| import pyarrow as pa | |||||
| import numpy as np | |||||
| import cv2 | |||||
| node = Node() | |||||
| cache = {} | |||||
| def derive_image_as_base64(event): | |||||
| storage = event["value"] | |||||
| metadata = event["metadata"] | |||||
| encoding = metadata["encoding"] | |||||
| width = metadata["width"] | |||||
| height = metadata["height"] | |||||
| if ( | |||||
| encoding == "mono16" or encoding == "z16" | |||||
| ): | |||||
| channels = 1 | |||||
| storage_type = np.uint16 | |||||
| else: | |||||
| channels = 3 | |||||
| storage_type = np.uint8 | |||||
| if encoding == "bgr8": | |||||
| frame = ( | |||||
| storage.to_numpy() | |||||
| .astype(storage_type) | |||||
| .reshape((height, width, channels)) | |||||
| ) | |||||
| frame = cv2.imencode('.jpg', frame)[1].tobytes() | |||||
| frame = base64.b64encode(frame).decode('utf-8') | |||||
| image = f"data:image/jpeg;base64,{frame}" | |||||
| elif encoding == "rgb8": | |||||
| frame = ( | |||||
| storage.to_numpy() | |||||
| .astype(storage_type) | |||||
| .reshape((height, width, channels)) | |||||
| ) | |||||
| frame = frame[:, :, ::-1] # OpenCV image (RGB to BGR) | |||||
| frame = cv2.imencode('.jpg', frame)[1].tobytes() | |||||
| frame = base64.b64encode(frame).decode('utf-8') | |||||
| image = f"data:image/jpeg;base64,{frame}" | |||||
| elif encoding in ["jpeg", "jpg", "avif", "webp", "png", "bmp"]: | |||||
| storage = storage.to_numpy() | |||||
| image = base64.b64encode(storage).decode('utf-8') | |||||
| image = f"data:image/{encoding};base64,{image}" | |||||
| elif encoding in ["mono16", "z16"]: | |||||
| frame = ( | |||||
| storage.to_numpy() | |||||
| .astype(storage_type) | |||||
| .reshape((height, width, 1)) | |||||
| ) | |||||
| storage = storage.to_numpy() | |||||
| image = base64.b64encode(storage).decode('utf-8') | |||||
| image = f"data:image/jpeg;base64,{frame}" | |||||
| else: | |||||
| raise RuntimeError(f"Unsupported image encoding: {encoding}") | |||||
| return image | |||||
| for event in node: | |||||
| if event["type"] == "INPUT": | |||||
| event_id = event["id"] | |||||
| print(f"Received event: {event_id}") | |||||
| match event_id: | |||||
| case s if "image" in s: | |||||
| message_id = event["metadata"].get("id", 0) | |||||
| if cache.get(message_id) is None: | |||||
| cache[message_id] = {} | |||||
| cache[message_id]["image"] = event | |||||
| case s if "depth" in s: | |||||
| message_id = event["metadata"].get("id", 0) | |||||
| if cache.get(message_id) is None: | |||||
| cache[message_id] = {} | |||||
| cache[message_id]["depth"] = event | |||||
| if len(cache[message_id]) == 2: | |||||
| image = cache[message_id]["image"] | |||||
| image_str = derive_image_as_base64(image) | |||||
| depth = cache[message_id]["depth"] | |||||
| depth_str = derive_image_as_base64(depth) | |||||
| node.send_output( | |||||
| "text", | |||||
| pa.array(np.array(image_str + "\n" + depth_str).ravel()), | |||||
| metadata={ | |||||
| "message_id": message_id, | |||||
| }, | |||||
| ) | |||||
| @@ -0,0 +1,54 @@ | |||||
| nodes: | |||||
| - id: dora-openai-server | |||||
| build: cargo build -p dora-openai-proxy-server --release | |||||
| path: ../../target/release/dora-openai-proxy-server | |||||
| outputs: | |||||
| - text | |||||
| inputs: | |||||
| text: format_response/text | |||||
| - id: parse_text | |||||
| path: parse_text.py | |||||
| inputs: | |||||
| text: dora-openai-server/text | |||||
| outputs: | |||||
| - image | |||||
| - id: dora-vggt | |||||
| build: pip install -e ../../node-hub/dora-vggt | |||||
| path: dora-vggt | |||||
| inputs: | |||||
| image: parse_text/image | |||||
| outputs: | |||||
| - depth | |||||
| - image | |||||
| env: | |||||
| DEPTH_ENCODING: mono16 | |||||
| - id: rav1e-image | |||||
| path: dora-rav1e | |||||
| build: pip install -e ../../node-hub/dora-rav1e | |||||
| inputs: | |||||
| image: dora-vggt/image | |||||
| outputs: | |||||
| - image | |||||
| env: | |||||
| ENCODING: avif | |||||
| - id: rav1e-depth | |||||
| path: dora-rav1e | |||||
| build: pip install -e ../../node-hub/dora-rav1e | |||||
| inputs: | |||||
| depth: dora-vggt/depth | |||||
| outputs: | |||||
| - depth | |||||
| env: | |||||
| ENCODING: avif | |||||
| - id: format_response | |||||
| path: format_response.py | |||||
| inputs: | |||||
| depth: rav1e-depth/depth | |||||
| image: rav1e-image/image | |||||
| outputs: | |||||
| - text | |||||
| @@ -0,0 +1,33 @@ | |||||
| """TODO: Add docstring.""" | |||||
| from PIL import Image | |||||
| from io import BytesIO | |||||
| import base64 | |||||
| from dora import Node | |||||
| import pyarrow as pa | |||||
| import numpy as np | |||||
| node = Node() | |||||
| for event in node: | |||||
| if event["type"] == "INPUT": | |||||
| texts = event["value"].to_numpy(zero_copy_only=False) | |||||
| for text in texts: | |||||
| if text.startswith("<|user|>\n<|vision_start|>\n"): | |||||
| # Handle the case where the text starts with <|user|>\n<|vision_start|> | |||||
| image = text.replace("<|user|>\n<|vision_start|>\n", "") | |||||
| if "base64" in image: | |||||
| image = image.split(",", 1)[1] | |||||
| image = Image.open(BytesIO(base64.b64decode(image))) | |||||
| node.send_output( | |||||
| "image", | |||||
| pa.array(np.array(image).ravel()), | |||||
| metadata={ | |||||
| "encoding": "rgb8", | |||||
| "width": image.width, | |||||
| "height": image.height, | |||||
| }, | |||||
| ) | |||||
| print(f"Processed {len(texts)} texts.") | |||||
| @@ -130,6 +130,7 @@ def main(): | |||||
| int(r_0), | int(r_0), | ||||
| int(r_1), | int(r_1), | ||||
| ], | ], | ||||
| "message_id": event["metadata"].get("message_id", 0), | |||||
| }, | }, | ||||
| ) | ) | ||||
| @@ -154,6 +155,7 @@ def main(): | |||||
| int(r_0), | int(r_0), | ||||
| int(r_1), | int(r_1), | ||||
| ], | ], | ||||
| "message_id": event["metadata"].get("message_id", 0), | |||||
| }, | }, | ||||
| ) | ) | ||||
| @@ -6,7 +6,7 @@ use dora_node_api::{ | |||||
| DoraNode, Event, | DoraNode, Event, | ||||
| }; | }; | ||||
| use eyre::{Context, ContextCompat}; | |||||
| use eyre::Context; | |||||
| use futures::{ | use futures::{ | ||||
| channel::oneshot::{self, Canceled}, | channel::oneshot::{self, Canceled}, | ||||
| TryStreamExt, | TryStreamExt, | ||||
| @@ -36,7 +36,10 @@ pub mod message; | |||||
| #[tokio::main] | #[tokio::main] | ||||
| async fn main() -> eyre::Result<()> { | async fn main() -> eyre::Result<()> { | ||||
| let web_ui = Path::new("chatbot-ui"); | let web_ui = Path::new("chatbot-ui"); | ||||
| let port = 8000; | |||||
| let port = std::env::var("PORT") | |||||
| .unwrap_or_else(|_| "8080".to_string()) | |||||
| .parse::<u16>() | |||||
| .unwrap_or(8080); | |||||
| let addr = SocketAddr::from(([0, 0, 0, 0], port)); | let addr = SocketAddr::from(([0, 0, 0, 0], port)); | ||||
| let (server_events_tx, server_events_rx) = mpsc::channel(3); | let (server_events_tx, server_events_rx) = mpsc::channel(3); | ||||
| @@ -107,42 +110,44 @@ async fn main() -> eyre::Result<()> { | |||||
| } => { | } => { | ||||
| match id.as_str() { | match id.as_str() { | ||||
| "text" => { | "text" => { | ||||
| let (reply_channel, prompt_tokens, model) = | |||||
| reply_channels.pop_front().context("no reply channel")?; | |||||
| let data = data.as_string::<i32>(); | |||||
| let string = data.iter().fold("".to_string(), |mut acc, s| { | |||||
| if let Some(s) = s { | |||||
| acc.push('\n'); | |||||
| acc.push_str(s); | |||||
| } | |||||
| acc | |||||
| }); | |||||
| let data = ChatCompletionObject { | |||||
| id: format!("completion-{}", uuid::Uuid::new_v4()), | |||||
| object: "chat.completion".to_string(), | |||||
| created: chrono::Utc::now().timestamp() as u64, | |||||
| model: model.unwrap_or_default(), | |||||
| choices: vec![ChatCompletionObjectChoice { | |||||
| index: 0, | |||||
| message: ChatCompletionObjectMessage { | |||||
| role: message::ChatCompletionRole::Assistant, | |||||
| content: Some(string.to_string()), | |||||
| tool_calls: Vec::new(), | |||||
| function_call: None, | |||||
| if let Some((reply_channel, prompt_tokens, model)) = | |||||
| reply_channels.pop_front() | |||||
| { | |||||
| let data = data.as_string::<i32>(); | |||||
| let string = data.iter().fold("".to_string(), |mut acc, s| { | |||||
| if let Some(s) = s { | |||||
| acc.push('\n'); | |||||
| acc.push_str(s); | |||||
| } | |||||
| acc | |||||
| }); | |||||
| let data = ChatCompletionObject { | |||||
| id: format!("completion-{}", uuid::Uuid::new_v4()), | |||||
| object: "chat.completion".to_string(), | |||||
| created: chrono::Utc::now().timestamp() as u64, | |||||
| model: model.unwrap_or_default(), | |||||
| choices: vec![ChatCompletionObjectChoice { | |||||
| index: 0, | |||||
| message: ChatCompletionObjectMessage { | |||||
| role: message::ChatCompletionRole::Assistant, | |||||
| content: Some(string.to_string()), | |||||
| tool_calls: Vec::new(), | |||||
| function_call: None, | |||||
| }, | |||||
| finish_reason: message::FinishReason::stop, | |||||
| logprobs: None, | |||||
| }], | |||||
| usage: Usage { | |||||
| prompt_tokens, | |||||
| completion_tokens: string.len() as u64, | |||||
| total_tokens: prompt_tokens + string.len() as u64, | |||||
| }, | }, | ||||
| finish_reason: message::FinishReason::stop, | |||||
| logprobs: None, | |||||
| }], | |||||
| usage: Usage { | |||||
| prompt_tokens, | |||||
| completion_tokens: string.len() as u64, | |||||
| total_tokens: prompt_tokens + string.len() as u64, | |||||
| }, | |||||
| }; | |||||
| if reply_channel.send(Ok(data)).is_err() { | |||||
| tracing::warn!("failed to send chat completion reply because channel closed early"); | |||||
| }; | |||||
| if reply_channel.send(Ok(data)).is_err() { | |||||
| tracing::warn!("failed to send chat completion reply because channel closed early"); | |||||
| } | |||||
| } | } | ||||
| } | } | ||||
| _ => eyre::bail!("unexpected input id: {}", id), | _ => eyre::bail!("unexpected input id: {}", id), | ||||