From a476b50992c0095218b60e803c123f659ae6b2d8 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Wed, 18 Jun 2025 17:47:44 +0200 Subject: [PATCH] Fix openai server --- .../dora_openai_server/main.py | 172 +++++++++++++----- node-hub/dora-openai-server/pyproject.toml | 20 +- 2 files changed, 131 insertions(+), 61 deletions(-) diff --git a/node-hub/dora-openai-server/dora_openai_server/main.py b/node-hub/dora-openai-server/dora_openai_server/main.py index 980ef5fc..e1713392 100644 --- a/node-hub/dora-openai-server/dora_openai_server/main.py +++ b/node-hub/dora-openai-server/dora_openai_server/main.py @@ -1,13 +1,12 @@ -""" -FastAPI server with OpenAI compatibility and DORA integration, +"""FastAPI server with OpenAI compatibility and DORA integration, sending text and image data on separate DORA topics. """ import asyncio import base64 -import uuid # For generating unique request IDs -import time # For timestamps -from typing import List, Optional, Union, Dict, Any, Literal +import time # For timestamps +import uuid # For generating unique request IDs +from typing import Any, List, Literal, Optional, Union import pyarrow as pa import uvicorn @@ -19,53 +18,63 @@ from pydantic import BaseModel DORA_RESPONSE_TIMEOUT_SECONDS = 20 DORA_TEXT_OUTPUT_TOPIC = "user_text_input" DORA_IMAGE_OUTPUT_TOPIC = "user_image_input" -DORA_RESPONSE_INPUT_TOPIC = "chat_completion_result" # Topic FastAPI listens on +DORA_RESPONSE_INPUT_TOPIC = "chat_completion_result" # Topic FastAPI listens on app = FastAPI( title="DORA OpenAI-Compatible Demo Server (Separate Topics)", description="Sends text and image data on different DORA topics and awaits a consolidated response.", ) + # --- Pydantic Models --- class ImageUrl(BaseModel): url: str detail: Optional[str] = "auto" + class ContentPartText(BaseModel): type: Literal["text"] text: str + class ContentPartImage(BaseModel): type: Literal["image_url"] image_url: ImageUrl + ContentPart = Union[ContentPartText, ContentPartImage] + class ChatCompletionMessage(BaseModel): role: str content: Union[str, List[ContentPart]] + class ChatCompletionRequest(BaseModel): model: str messages: List[ChatCompletionMessage] temperature: Optional[float] = 1.0 max_tokens: Optional[int] = 100 + class ChatCompletionChoiceMessage(BaseModel): role: str content: str + class ChatCompletionChoice(BaseModel): index: int message: ChatCompletionChoiceMessage finish_reason: str logprobs: Optional[Any] = None + class Usage(BaseModel): prompt_tokens: int completion_tokens: int total_tokens: int + class ChatCompletionResponse(BaseModel): id: str object: str = "chat.completion" @@ -75,6 +84,7 @@ class ChatCompletionResponse(BaseModel): usage: Usage system_fingerprint: Optional[str] = None + # --- DORA Node Initialization --- # This dictionary will hold unmatched responses if we implement more robust concurrent handling. # For now, it's a placeholder for future improvement. @@ -84,9 +94,12 @@ try: node = Node() print("FastAPI Server: DORA Node initialized.") except Exception as e: - print(f"FastAPI Server: Failed to initialize DORA Node. Running in standalone API mode. Error: {e}") + print( + f"FastAPI Server: Failed to initialize DORA Node. Running in standalone API mode. Error: {e}" + ) node = None + @app.post("/v1/chat/completions", response_model=ChatCompletionResponse) async def create_chat_completion(request: ChatCompletionRequest): internal_request_id = str(uuid.uuid4()) @@ -109,21 +122,34 @@ async def create_chat_completion(request: ChatCompletionRequest): if part.type == "text": user_text_parts.append(part.text) elif part.type == "image_url": - if user_image_bytes: # Use only the first image - print(f"FastAPI Server (Req {internal_request_id}): Warning - Multiple images found, using the first one.") + if user_image_bytes: # Use only the first image + print( + f"FastAPI Server (Req {internal_request_id}): Warning - Multiple images found, using the first one." + ) continue image_url_data = part.image_url.url if image_url_data.startswith("data:image"): try: header, encoded_data = image_url_data.split(",", 1) - user_image_content_type = header.split(":")[1].split(";")[0] + user_image_content_type = header.split(":")[1].split( + ";" + )[0] user_image_bytes = base64.b64decode(encoded_data) - print(f"FastAPI Server (Req {internal_request_id}): Decoded image {user_image_content_type}, size: {len(user_image_bytes)} bytes") + print( + f"FastAPI Server (Req {internal_request_id}): Decoded image {user_image_content_type}, size: {len(user_image_bytes)} bytes" + ) except Exception as e: - print(f"FastAPI Server (Req {internal_request_id}): Error decoding base64 image: {e}") - raise HTTPException(status_code=400, detail=f"Invalid base64 image data: {e}") + print( + f"FastAPI Server (Req {internal_request_id}): Error decoding base64 image: {e}" + ) + raise HTTPException( + status_code=400, + detail=f"Invalid base64 image data: {e}", + ) else: - print(f"FastAPI Server (Req {internal_request_id}): Warning - Remote image URL '{image_url_data}' ignored. Only data URIs supported.") + print( + f"FastAPI Server (Req {internal_request_id}): Warning - Remote image URL '{image_url_data}' ignored. Only data URIs supported." + ) # Consider if you want to break after the first user message or aggregate all # break @@ -135,19 +161,24 @@ async def create_chat_completion(request: ChatCompletionRequest): text_payload = {"request_id": internal_request_id, "text": final_user_text} arrow_text_data = pa.array([text_payload]) node.send_output(DORA_TEXT_OUTPUT_TOPIC, arrow_text_data) - print(f"FastAPI Server (Req {internal_request_id}): Sent text to DORA topic '{DORA_TEXT_OUTPUT_TOPIC}'.") + print( + f"FastAPI Server (Req {internal_request_id}): Sent text to DORA topic '{DORA_TEXT_OUTPUT_TOPIC}'." + ) data_sent_to_dora = True if user_image_bytes: image_payload = { "request_id": internal_request_id, "image_bytes": user_image_bytes, - "image_content_type": user_image_content_type or "application/octet-stream" + "image_content_type": user_image_content_type + or "application/octet-stream", } arrow_image_data = pa.array([image_payload]) node.send_output(DORA_IMAGE_OUTPUT_TOPIC, arrow_image_data) - print(f"FastAPI Server (Req {internal_request_id}): Sent image to DORA topic '{DORA_IMAGE_OUTPUT_TOPIC}'.") - prompt_tokens += len(user_image_bytes) # Crude image token approximation + print( + f"FastAPI Server (Req {internal_request_id}): Sent image to DORA topic '{DORA_IMAGE_OUTPUT_TOPIC}'." + ) + prompt_tokens += len(user_image_bytes) # Crude image token approximation data_sent_to_dora = True response_str: str @@ -158,50 +189,68 @@ async def create_chat_completion(request: ChatCompletionRequest): response_str = "No user text or image found to send to DORA." print(f"FastAPI Server (Req {internal_request_id}): {response_str}") else: - print(f"FastAPI Server (Req {internal_request_id}): Waiting for response from DORA on topic '{DORA_RESPONSE_INPUT_TOPIC}'...") + print( + f"FastAPI Server (Req {internal_request_id}): Waiting for response from DORA on topic '{DORA_RESPONSE_INPUT_TOPIC}'..." + ) response_str = f"Timeout: No response from DORA for request_id {internal_request_id} within {DORA_RESPONSE_TIMEOUT_SECONDS}s." - + # WARNING: This blocking `node.next()` loop is not ideal for highly concurrent requests # in a single FastAPI worker process, as one request might block others or consume # a response meant for another if `request_id` matching isn't perfect or fast enough. # A more robust solution would involve a dedicated listener task and async Futures/Queues. start_wait_time = time.monotonic() while time.monotonic() - start_wait_time < DORA_RESPONSE_TIMEOUT_SECONDS: - remaining_timeout = DORA_RESPONSE_TIMEOUT_SECONDS - (time.monotonic() - start_wait_time) - if remaining_timeout <= 0: break + remaining_timeout = DORA_RESPONSE_TIMEOUT_SECONDS - ( + time.monotonic() - start_wait_time + ) + if remaining_timeout <= 0: + break + + event = node.next( + timeout=min(1.0, remaining_timeout) + ) # Poll with a smaller timeout - event = node.next(timeout=min(1.0, remaining_timeout)) # Poll with a smaller timeout - - if event is None: # Timeout for this poll iteration + if event is None: # Timeout for this poll iteration continue if event["type"] == "INPUT" and event["id"] == DORA_RESPONSE_INPUT_TOPIC: response_value_arrow = event["value"] if response_value_arrow and len(response_value_arrow) > 0: - dora_response_data = response_value_arrow[0].as_py() # Expecting a dict + dora_response_data = response_value_arrow[ + 0 + ].as_py() # Expecting a dict if isinstance(dora_response_data, dict): resp_request_id = dora_response_data.get("request_id") if resp_request_id == internal_request_id: - response_str = dora_response_data.get("response_text", f"DORA response for {internal_request_id} missing 'response_text'.") - print(f"FastAPI Server (Req {internal_request_id}): Received correlated DORA response.") - break # Correct response received - else: - # This response is for another request. Ideally, store it. - print(f"FastAPI Server (Req {internal_request_id}): Received DORA response for different request_id '{resp_request_id}'. Discarding and waiting. THIS IS A CONCURRENCY ISSUE.") - # unmatched_dora_responses[resp_request_id] = dora_response_data # Example of storing + response_str = dora_response_data.get( + "response_text", + f"DORA response for {internal_request_id} missing 'response_text'.", + ) + print( + f"FastAPI Server (Req {internal_request_id}): Received correlated DORA response." + ) + break # Correct response received + # This response is for another request. Ideally, store it. + print( + f"FastAPI Server (Req {internal_request_id}): Received DORA response for different request_id '{resp_request_id}'. Discarding and waiting. THIS IS A CONCURRENCY ISSUE." + ) + # unmatched_dora_responses[resp_request_id] = dora_response_data # Example of storing else: response_str = f"Unrecognized DORA response format for {internal_request_id}: {str(dora_response_data)[:100]}" break else: - response_str = f"Empty response payload from DORA for {internal_request_id}." + response_str = ( + f"Empty response payload from DORA for {internal_request_id}." + ) break elif event["type"] == "ERROR": response_str = f"Error event from DORA for {internal_request_id}: {event.get('value', event.get('error', 'Unknown DORA Error'))}" print(response_str) break - else: # Outer while loop timed out - print(f"FastAPI Server (Req {internal_request_id}): Overall timeout waiting for DORA response.") - + else: # Outer while loop timed out + print( + f"FastAPI Server (Req {internal_request_id}): Overall timeout waiting for DORA response." + ) completion_tokens = len(response_str) total_tokens = prompt_tokens + completion_tokens @@ -213,7 +262,9 @@ async def create_chat_completion(request: ChatCompletionRequest): choices=[ ChatCompletionChoice( index=0, - message=ChatCompletionChoiceMessage(role="assistant", content=response_str), + message=ChatCompletionChoiceMessage( + role="assistant", content=response_str + ), finish_reason="stop", ) ], @@ -224,6 +275,7 @@ async def create_chat_completion(request: ChatCompletionRequest): ), ) + @app.get("/v1/models") async def list_models(): return { @@ -231,12 +283,17 @@ async def list_models(): "data": [ { "id": "dora-multi-stream-vision", - "object": "model", "created": int(time.time()), "owned_by": "dora-ai", - "permission": [], "root": "dora-multi-stream-vision", "parent": None, + "object": "model", + "created": int(time.time()), + "owned_by": "dora-ai", + "permission": [], + "root": "dora-multi-stream-vision", + "parent": None, }, ], } + async def run_fastapi_server_task(): config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info") server = uvicorn.Server(config) @@ -244,6 +301,7 @@ async def run_fastapi_server_task(): await server.serve() print("FastAPI Server: Uvicorn server stopped.") + async def run_dora_main_loop_task(): if not node: print("FastAPI Server: DORA node not initialized, DORA main loop skipped.") @@ -253,21 +311,31 @@ async def run_dora_main_loop_task(): while True: # This loop is primarily for the main "STOP" event for the FastAPI node itself. # Individual request/response cycles are handled within the endpoint. - event = node.next(timeout=1.0) # Check for STOP periodically + event = node.next(timeout=1.0) # Check for STOP periodically if event is None: - await asyncio.sleep(0.01) # Yield control if no event + await asyncio.sleep(0.01) # Yield control if no event continue if event["type"] == "STOP": - print("FastAPI Server: DORA STOP event received. Requesting server shutdown.") + print( + "FastAPI Server: DORA STOP event received. Requesting server shutdown." + ) # Attempt to gracefully shut down Uvicorn # This is tricky; uvicorn's server.shutdown() or server.should_exit might be better # For simplicity, we cancel the server task. for task in asyncio.all_tasks(): # Identify the server task more reliably if possible - if task.get_coro().__name__ == 'serve' and hasattr(task.get_coro(), 'cr_frame') and \ - isinstance(task.get_coro().cr_frame.f_locals.get('self'), uvicorn.Server): + if ( + task.get_coro().__name__ == "serve" + and hasattr(task.get_coro(), "cr_frame") + and isinstance( + task.get_coro().cr_frame.f_locals.get("self"), + uvicorn.Server, + ) + ): task.cancel() - print("FastAPI Server: Uvicorn server task cancellation requested.") + print( + "FastAPI Server: Uvicorn server task cancellation requested." + ) break # Handle other unexpected general inputs/errors for the FastAPI node if necessary # elif event["type"] == "INPUT": @@ -280,9 +348,10 @@ async def run_dora_main_loop_task(): finally: print("FastAPI Server: DORA main loop listener finished.") + async def main_async_runner(): server_task = asyncio.create_task(run_fastapi_server_task()) - + # Only run the DORA main loop if the node was initialized. # This loop is mainly for the STOP event. dora_listener_task = None @@ -293,17 +362,19 @@ async def main_async_runner(): tasks_to_wait_for = [server_task] done, pending = await asyncio.wait( - tasks_to_wait_for, return_when=asyncio.FIRST_COMPLETED, + tasks_to_wait_for, + return_when=asyncio.FIRST_COMPLETED, ) for task in pending: print(f"FastAPI Server: Cancelling pending task: {task.get_name()}") task.cancel() - + if pending: await asyncio.gather(*pending, return_exceptions=True) print("FastAPI Server: Application shutdown complete.") + def main(): print("FastAPI Server: Starting application...") try: @@ -313,5 +384,6 @@ def main(): finally: print("FastAPI Server: Exited main function.") + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/node-hub/dora-openai-server/pyproject.toml b/node-hub/dora-openai-server/pyproject.toml index 8b29cec9..6ec73daf 100644 --- a/node-hub/dora-openai-server/pyproject.toml +++ b/node-hub/dora-openai-server/pyproject.toml @@ -2,8 +2,8 @@ name = "dora-openai-server" version = "0.3.11" authors = [ - { name = "Haixuan Xavier Tao", email = "tao.xavier@outlook.com" }, - { name = "Enzo Le Van", email = "dev@enzo-le-van.fr" }, + { name = "Haixuan Xavier Tao", email = "tao.xavier@outlook.com" }, + { name = "Enzo Le Van", email = "dev@enzo-le-van.fr" }, ] description = "Dora OpenAI API Server" license = { text = "MIT" } @@ -11,14 +11,13 @@ readme = "README.md" requires-python = ">=3.8" dependencies = [ - "dora-rs >= 0.3.9", - "numpy < 2.0.0", - "pyarrow >= 5.0.0", - - "fastapi >= 0.115", - "asyncio >= 3.4", - "uvicorn >= 0.31", - "pydantic >= 2.9", + "dora-rs >= 0.3.9", + "numpy < 2.0.0", + "pyarrow >= 5.0.0", + "fastapi >= 0.115", + "asyncio >= 3.4", + "uvicorn >= 0.31", + "pydantic >= 2.9", ] [dependency-groups] @@ -29,7 +28,6 @@ dora-openai-server = "dora_openai_server.main:main" [tool.ruff.lint] extend-select = [ - "D", # pydocstyle "UP", # Ruff's UP rule "PERF", # Ruff's PERF rule "RET", # Ruff's RET rule