| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
bb89c139e7 | Add configurable port | 6 months ago |
|
|
1448f27d95 | initial commit for openai vggt | 6 months ago |
| @@ -0,0 +1,29 @@ | |||
| nodes: | |||
| - id: dora-openai-server | |||
| build: cargo build -p dora-openai-proxy-server --release | |||
| path: ../../target/release/dora-openai-proxy-server | |||
| outputs: | |||
| - text | |||
| inputs: | |||
| text: dora-echo/echo | |||
| - id: dora-echo | |||
| build: pip install -e ../../node-hub/dora-echo | |||
| path: dora-echo | |||
| inputs: | |||
| echo: dora-openai-server/text | |||
| outputs: | |||
| - echo | |||
| - id: parse_text | |||
| path: parse_text.py | |||
| inputs: | |||
| text: dora-openai-server/text | |||
| outputs: | |||
| - image | |||
| - id: plot | |||
| build: pip install dora-rerun | |||
| path: dora-rerun | |||
| inputs: | |||
| camera/image: parse_text/image | |||
| @@ -0,0 +1,88 @@ | |||
| """TODO: Add docstring.""" | |||
| from openai import OpenAI | |||
| client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy_api_key") | |||
| def test_list_models(): | |||
| """TODO: Add docstring.""" | |||
| try: | |||
| models = client.models.list() | |||
| print("Available models:") | |||
| for model in models.data: | |||
| print(f"- {model.id}") | |||
| except Exception as e: | |||
| print(f"Error listing models: {e}") | |||
| def test_chat_completion(user_input): | |||
| """TODO: Add docstring.""" | |||
| try: | |||
| response = client.chat.completions.create( | |||
| model="gpt-3.5-turbo", | |||
| messages=[ | |||
| {"role": "system", "content": "You are a helpful assistant."}, | |||
| {"role": "user", "content": user_input}, | |||
| ], | |||
| ) | |||
| print("Chat Completion Response:") | |||
| print(response.choices[0].message.content) | |||
| except Exception as e: | |||
| print(f"Error in chat completion: {e}") | |||
| def test_chat_completion_image_url(user_input): | |||
| """TODO: Add docstring.""" | |||
| try: | |||
| response = client.chat.completions.create( | |||
| model="gpt-3.5-turbo", | |||
| messages=[ | |||
| { | |||
| "role": "user", | |||
| "content": [ | |||
| {"type": "text", "text": "What is in this image?"}, | |||
| { | |||
| "type": "image_url", | |||
| "image_url": { | |||
| "url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" | |||
| }, | |||
| }, | |||
| ], | |||
| } | |||
| ], | |||
| ) | |||
| print("Chat Completion Response:") | |||
| print(response.choices[0].message.content) | |||
| except Exception as e: | |||
| print(f"Error in chat completion: {e}") | |||
| def test_chat_completion_image_base64(user_input): | |||
| """TODO: Add docstring.""" | |||
| try: | |||
| response = client.chat.completions.create( | |||
| model="gpt-3.5-turbo", | |||
| messages=[ | |||
| { | |||
| "role": "user", | |||
| "content": [ | |||
| {"type": "text", "text": "What is in this image?"}, | |||
| { | |||
| "type": "image_url", | |||
| "image_url": { | |||
| "url": "" | |||
| }, | |||
| }, | |||
| ], | |||
| } | |||
| ], | |||
| ) | |||
| print("Chat Completion Response:") | |||
| print(response.choices[0].message.content) | |||
| except Exception as e: | |||
| print(f"Error in chat completion: {e}") | |||
| if __name__ == "__main__": | |||
| test_chat_completion_image_base64("abc") | |||
| @@ -0,0 +1,33 @@ | |||
| """TODO: Add docstring.""" | |||
| from PIL import Image | |||
| from io import BytesIO | |||
| import base64 | |||
| from dora import Node | |||
| import pyarrow as pa | |||
| import numpy as np | |||
| node = Node() | |||
| for event in node: | |||
| if event["type"] == "INPUT": | |||
| texts = event["value"].to_numpy(zero_copy_only=False) | |||
| for text in texts: | |||
| if text.startswith("<|user|>\n<|vision_start|>\n"): | |||
| # Handle the case where the text starts with <|user|>\n<|vision_start|> | |||
| image = text.replace("<|user|>\n<|vision_start|>\n", "") | |||
| if "base64" in image: | |||
| image = image.split(",", 1)[1] | |||
| print("image", image) | |||
| image = Image.open(BytesIO(base64.b64decode(image))) | |||
| node.send_output( | |||
| "image", | |||
| pa.array(np.array(image).ravel()), | |||
| metadata={ | |||
| "encoding": "rgb8", | |||
| "width": image.width, | |||
| "height": image.height, | |||
| }, | |||
| ) | |||
| print(f"Processed {len(texts)} texts.") | |||
| @@ -0,0 +1,95 @@ | |||
| """TODO: Add docstring.""" | |||
| import base64 | |||
| from dora import Node | |||
| import pyarrow as pa | |||
| import numpy as np | |||
| import cv2 | |||
| node = Node() | |||
| cache = {} | |||
| def derive_image_as_base64(event): | |||
| storage = event["value"] | |||
| metadata = event["metadata"] | |||
| encoding = metadata["encoding"] | |||
| width = metadata["width"] | |||
| height = metadata["height"] | |||
| if ( | |||
| encoding == "mono16" or encoding == "z16" | |||
| ): | |||
| channels = 1 | |||
| storage_type = np.uint16 | |||
| else: | |||
| channels = 3 | |||
| storage_type = np.uint8 | |||
| if encoding == "bgr8": | |||
| frame = ( | |||
| storage.to_numpy() | |||
| .astype(storage_type) | |||
| .reshape((height, width, channels)) | |||
| ) | |||
| frame = cv2.imencode('.jpg', frame)[1].tobytes() | |||
| frame = base64.b64encode(frame).decode('utf-8') | |||
| image = f"data:image/jpeg;base64,{frame}" | |||
| elif encoding == "rgb8": | |||
| frame = ( | |||
| storage.to_numpy() | |||
| .astype(storage_type) | |||
| .reshape((height, width, channels)) | |||
| ) | |||
| frame = frame[:, :, ::-1] # OpenCV image (RGB to BGR) | |||
| frame = cv2.imencode('.jpg', frame)[1].tobytes() | |||
| frame = base64.b64encode(frame).decode('utf-8') | |||
| image = f"data:image/jpeg;base64,{frame}" | |||
| elif encoding in ["jpeg", "jpg", "avif", "webp", "png", "bmp"]: | |||
| storage = storage.to_numpy() | |||
| image = base64.b64encode(storage).decode('utf-8') | |||
| image = f"data:image/{encoding};base64,{image}" | |||
| elif encoding in ["mono16", "z16"]: | |||
| frame = ( | |||
| storage.to_numpy() | |||
| .astype(storage_type) | |||
| .reshape((height, width, 1)) | |||
| ) | |||
| storage = storage.to_numpy() | |||
| image = base64.b64encode(storage).decode('utf-8') | |||
| image = f"data:image/jpeg;base64,{frame}" | |||
| else: | |||
| raise RuntimeError(f"Unsupported image encoding: {encoding}") | |||
| return image | |||
| for event in node: | |||
| if event["type"] == "INPUT": | |||
| event_id = event["id"] | |||
| print(f"Received event: {event_id}") | |||
| match event_id: | |||
| case s if "image" in s: | |||
| message_id = event["metadata"].get("id", 0) | |||
| if cache.get(message_id) is None: | |||
| cache[message_id] = {} | |||
| cache[message_id]["image"] = event | |||
| case s if "depth" in s: | |||
| message_id = event["metadata"].get("id", 0) | |||
| if cache.get(message_id) is None: | |||
| cache[message_id] = {} | |||
| cache[message_id]["depth"] = event | |||
| if len(cache[message_id]) == 2: | |||
| image = cache[message_id]["image"] | |||
| image_str = derive_image_as_base64(image) | |||
| depth = cache[message_id]["depth"] | |||
| depth_str = derive_image_as_base64(depth) | |||
| node.send_output( | |||
| "text", | |||
| pa.array(np.array(image_str + "\n" + depth_str).ravel()), | |||
| metadata={ | |||
| "message_id": message_id, | |||
| }, | |||
| ) | |||
| @@ -0,0 +1,54 @@ | |||
| nodes: | |||
| - id: dora-openai-server | |||
| build: cargo build -p dora-openai-proxy-server --release | |||
| path: ../../target/release/dora-openai-proxy-server | |||
| outputs: | |||
| - text | |||
| inputs: | |||
| text: format_response/text | |||
| - id: parse_text | |||
| path: parse_text.py | |||
| inputs: | |||
| text: dora-openai-server/text | |||
| outputs: | |||
| - image | |||
| - id: dora-vggt | |||
| build: pip install -e ../../node-hub/dora-vggt | |||
| path: dora-vggt | |||
| inputs: | |||
| image: parse_text/image | |||
| outputs: | |||
| - depth | |||
| - image | |||
| env: | |||
| DEPTH_ENCODING: mono16 | |||
| - id: rav1e-image | |||
| path: dora-rav1e | |||
| build: pip install -e ../../node-hub/dora-rav1e | |||
| inputs: | |||
| image: dora-vggt/image | |||
| outputs: | |||
| - image | |||
| env: | |||
| ENCODING: avif | |||
| - id: rav1e-depth | |||
| path: dora-rav1e | |||
| build: pip install -e ../../node-hub/dora-rav1e | |||
| inputs: | |||
| depth: dora-vggt/depth | |||
| outputs: | |||
| - depth | |||
| env: | |||
| ENCODING: avif | |||
| - id: format_response | |||
| path: format_response.py | |||
| inputs: | |||
| depth: rav1e-depth/depth | |||
| image: rav1e-image/image | |||
| outputs: | |||
| - text | |||
| @@ -0,0 +1,33 @@ | |||
| """TODO: Add docstring.""" | |||
| from PIL import Image | |||
| from io import BytesIO | |||
| import base64 | |||
| from dora import Node | |||
| import pyarrow as pa | |||
| import numpy as np | |||
| node = Node() | |||
| for event in node: | |||
| if event["type"] == "INPUT": | |||
| texts = event["value"].to_numpy(zero_copy_only=False) | |||
| for text in texts: | |||
| if text.startswith("<|user|>\n<|vision_start|>\n"): | |||
| # Handle the case where the text starts with <|user|>\n<|vision_start|> | |||
| image = text.replace("<|user|>\n<|vision_start|>\n", "") | |||
| if "base64" in image: | |||
| image = image.split(",", 1)[1] | |||
| image = Image.open(BytesIO(base64.b64decode(image))) | |||
| node.send_output( | |||
| "image", | |||
| pa.array(np.array(image).ravel()), | |||
| metadata={ | |||
| "encoding": "rgb8", | |||
| "width": image.width, | |||
| "height": image.height, | |||
| }, | |||
| ) | |||
| print(f"Processed {len(texts)} texts.") | |||
| @@ -130,6 +130,7 @@ def main(): | |||
| int(r_0), | |||
| int(r_1), | |||
| ], | |||
| "message_id": event["metadata"].get("message_id", 0), | |||
| }, | |||
| ) | |||
| @@ -154,6 +155,7 @@ def main(): | |||
| int(r_0), | |||
| int(r_1), | |||
| ], | |||
| "message_id": event["metadata"].get("message_id", 0), | |||
| }, | |||
| ) | |||
| @@ -6,7 +6,7 @@ use dora_node_api::{ | |||
| DoraNode, Event, | |||
| }; | |||
| use eyre::{Context, ContextCompat}; | |||
| use eyre::Context; | |||
| use futures::{ | |||
| channel::oneshot::{self, Canceled}, | |||
| TryStreamExt, | |||
| @@ -36,7 +36,10 @@ pub mod message; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| let web_ui = Path::new("chatbot-ui"); | |||
| let port = 8000; | |||
| let port = std::env::var("PORT") | |||
| .unwrap_or_else(|_| "8080".to_string()) | |||
| .parse::<u16>() | |||
| .unwrap_or(8080); | |||
| let addr = SocketAddr::from(([0, 0, 0, 0], port)); | |||
| let (server_events_tx, server_events_rx) = mpsc::channel(3); | |||
| @@ -107,42 +110,44 @@ async fn main() -> eyre::Result<()> { | |||
| } => { | |||
| match id.as_str() { | |||
| "text" => { | |||
| let (reply_channel, prompt_tokens, model) = | |||
| reply_channels.pop_front().context("no reply channel")?; | |||
| let data = data.as_string::<i32>(); | |||
| let string = data.iter().fold("".to_string(), |mut acc, s| { | |||
| if let Some(s) = s { | |||
| acc.push('\n'); | |||
| acc.push_str(s); | |||
| } | |||
| acc | |||
| }); | |||
| let data = ChatCompletionObject { | |||
| id: format!("completion-{}", uuid::Uuid::new_v4()), | |||
| object: "chat.completion".to_string(), | |||
| created: chrono::Utc::now().timestamp() as u64, | |||
| model: model.unwrap_or_default(), | |||
| choices: vec![ChatCompletionObjectChoice { | |||
| index: 0, | |||
| message: ChatCompletionObjectMessage { | |||
| role: message::ChatCompletionRole::Assistant, | |||
| content: Some(string.to_string()), | |||
| tool_calls: Vec::new(), | |||
| function_call: None, | |||
| if let Some((reply_channel, prompt_tokens, model)) = | |||
| reply_channels.pop_front() | |||
| { | |||
| let data = data.as_string::<i32>(); | |||
| let string = data.iter().fold("".to_string(), |mut acc, s| { | |||
| if let Some(s) = s { | |||
| acc.push('\n'); | |||
| acc.push_str(s); | |||
| } | |||
| acc | |||
| }); | |||
| let data = ChatCompletionObject { | |||
| id: format!("completion-{}", uuid::Uuid::new_v4()), | |||
| object: "chat.completion".to_string(), | |||
| created: chrono::Utc::now().timestamp() as u64, | |||
| model: model.unwrap_or_default(), | |||
| choices: vec![ChatCompletionObjectChoice { | |||
| index: 0, | |||
| message: ChatCompletionObjectMessage { | |||
| role: message::ChatCompletionRole::Assistant, | |||
| content: Some(string.to_string()), | |||
| tool_calls: Vec::new(), | |||
| function_call: None, | |||
| }, | |||
| finish_reason: message::FinishReason::stop, | |||
| logprobs: None, | |||
| }], | |||
| usage: Usage { | |||
| prompt_tokens, | |||
| completion_tokens: string.len() as u64, | |||
| total_tokens: prompt_tokens + string.len() as u64, | |||
| }, | |||
| finish_reason: message::FinishReason::stop, | |||
| logprobs: None, | |||
| }], | |||
| usage: Usage { | |||
| prompt_tokens, | |||
| completion_tokens: string.len() as u64, | |||
| total_tokens: prompt_tokens + string.len() as u64, | |||
| }, | |||
| }; | |||
| if reply_channel.send(Ok(data)).is_err() { | |||
| tracing::warn!("failed to send chat completion reply because channel closed early"); | |||
| }; | |||
| if reply_channel.send(Ok(data)).is_err() { | |||
| tracing::warn!("failed to send chat completion reply because channel closed early"); | |||
| } | |||
| } | |||
| } | |||
| _ => eyre::bail!("unexpected input id: {}", id), | |||