@@ -1,13 +1,12 @@
"""
FastAPI server with OpenAI compatibility and DORA integration,
"""FastAPI server with OpenAI compatibility and DORA integration,
sending text and image data on separate DORA topics.
sending text and image data on separate DORA topics.
"""
"""
import asyncio
import asyncio
import base64
import base64
import uuid # For generating unique request ID s
import time # For timestamp s
from typing import List, Optional, Union, Dict, Any, Literal
import time # For timestamp s
import uuid # For generating unique request ID s
from typing import Any, List, Literal, Optional, Union
import pyarrow as pa
import pyarrow as pa
import uvicorn
import uvicorn
@@ -19,53 +18,63 @@ from pydantic import BaseModel
DORA_RESPONSE_TIMEOUT_SECONDS = 20
DORA_RESPONSE_TIMEOUT_SECONDS = 20
DORA_TEXT_OUTPUT_TOPIC = "user_text_input"
DORA_TEXT_OUTPUT_TOPIC = "user_text_input"
DORA_IMAGE_OUTPUT_TOPIC = "user_image_input"
DORA_IMAGE_OUTPUT_TOPIC = "user_image_input"
DORA_RESPONSE_INPUT_TOPIC = "chat_completion_result" # Topic FastAPI listens on
DORA_RESPONSE_INPUT_TOPIC = "chat_completion_result" # Topic FastAPI listens on
app = FastAPI(
app = FastAPI(
title="DORA OpenAI-Compatible Demo Server (Separate Topics)",
title="DORA OpenAI-Compatible Demo Server (Separate Topics)",
description="Sends text and image data on different DORA topics and awaits a consolidated response.",
description="Sends text and image data on different DORA topics and awaits a consolidated response.",
)
)
# --- Pydantic Models ---
# --- Pydantic Models ---
class ImageUrl(BaseModel):
class ImageUrl(BaseModel):
url: str
url: str
detail: Optional[str] = "auto"
detail: Optional[str] = "auto"
class ContentPartText(BaseModel):
class ContentPartText(BaseModel):
type: Literal["text"]
type: Literal["text"]
text: str
text: str
class ContentPartImage(BaseModel):
class ContentPartImage(BaseModel):
type: Literal["image_url"]
type: Literal["image_url"]
image_url: ImageUrl
image_url: ImageUrl
ContentPart = Union[ContentPartText, ContentPartImage]
ContentPart = Union[ContentPartText, ContentPartImage]
class ChatCompletionMessage(BaseModel):
class ChatCompletionMessage(BaseModel):
role: str
role: str
content: Union[str, List[ContentPart]]
content: Union[str, List[ContentPart]]
class ChatCompletionRequest(BaseModel):
class ChatCompletionRequest(BaseModel):
model: str
model: str
messages: List[ChatCompletionMessage]
messages: List[ChatCompletionMessage]
temperature: Optional[float] = 1.0
temperature: Optional[float] = 1.0
max_tokens: Optional[int] = 100
max_tokens: Optional[int] = 100
class ChatCompletionChoiceMessage(BaseModel):
class ChatCompletionChoiceMessage(BaseModel):
role: str
role: str
content: str
content: str
class ChatCompletionChoice(BaseModel):
class ChatCompletionChoice(BaseModel):
index: int
index: int
message: ChatCompletionChoiceMessage
message: ChatCompletionChoiceMessage
finish_reason: str
finish_reason: str
logprobs: Optional[Any] = None
logprobs: Optional[Any] = None
class Usage(BaseModel):
class Usage(BaseModel):
prompt_tokens: int
prompt_tokens: int
completion_tokens: int
completion_tokens: int
total_tokens: int
total_tokens: int
class ChatCompletionResponse(BaseModel):
class ChatCompletionResponse(BaseModel):
id: str
id: str
object: str = "chat.completion"
object: str = "chat.completion"
@@ -75,6 +84,7 @@ class ChatCompletionResponse(BaseModel):
usage: Usage
usage: Usage
system_fingerprint: Optional[str] = None
system_fingerprint: Optional[str] = None
# --- DORA Node Initialization ---
# --- DORA Node Initialization ---
# This dictionary will hold unmatched responses if we implement more robust concurrent handling.
# This dictionary will hold unmatched responses if we implement more robust concurrent handling.
# For now, it's a placeholder for future improvement.
# For now, it's a placeholder for future improvement.
@@ -84,9 +94,12 @@ try:
node = Node()
node = Node()
print("FastAPI Server: DORA Node initialized.")
print("FastAPI Server: DORA Node initialized.")
except Exception as e:
except Exception as e:
print(f"FastAPI Server: Failed to initialize DORA Node. Running in standalone API mode. Error: {e}")
print(
f"FastAPI Server: Failed to initialize DORA Node. Running in standalone API mode. Error: {e}"
)
node = None
node = None
@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def create_chat_completion(request: ChatCompletionRequest):
async def create_chat_completion(request: ChatCompletionRequest):
internal_request_id = str(uuid.uuid4())
internal_request_id = str(uuid.uuid4())
@@ -109,21 +122,34 @@ async def create_chat_completion(request: ChatCompletionRequest):
if part.type == "text":
if part.type == "text":
user_text_parts.append(part.text)
user_text_parts.append(part.text)
elif part.type == "image_url":
elif part.type == "image_url":
if user_image_bytes: # Use only the first image
print(f"FastAPI Server (Req {internal_request_id}): Warning - Multiple images found, using the first one.")
if user_image_bytes: # Use only the first image
print(
f"FastAPI Server (Req {internal_request_id}): Warning - Multiple images found, using the first one."
)
continue
continue
image_url_data = part.image_url.url
image_url_data = part.image_url.url
if image_url_data.startswith("data:image"):
if image_url_data.startswith("data:image"):
try:
try:
header, encoded_data = image_url_data.split(",", 1)
header, encoded_data = image_url_data.split(",", 1)
user_image_content_type = header.split(":")[1].split(";")[0]
user_image_content_type = header.split(":")[1].split(
";"
)[0]
user_image_bytes = base64.b64decode(encoded_data)
user_image_bytes = base64.b64decode(encoded_data)
print(f"FastAPI Server (Req {internal_request_id}): Decoded image {user_image_content_type}, size: {len(user_image_bytes)} bytes")
print(
f"FastAPI Server (Req {internal_request_id}): Decoded image {user_image_content_type}, size: {len(user_image_bytes)} bytes"
)
except Exception as e:
except Exception as e:
print(f"FastAPI Server (Req {internal_request_id}): Error decoding base64 image: {e}")
raise HTTPException(status_code=400, detail=f"Invalid base64 image data: {e}")
print(
f"FastAPI Server (Req {internal_request_id}): Error decoding base64 image: {e}"
)
raise HTTPException(
status_code=400,
detail=f"Invalid base64 image data: {e}",
)
else:
else:
print(f"FastAPI Server (Req {internal_request_id}): Warning - Remote image URL '{image_url_data}' ignored. Only data URIs supported.")
print(
f"FastAPI Server (Req {internal_request_id}): Warning - Remote image URL '{image_url_data}' ignored. Only data URIs supported."
)
# Consider if you want to break after the first user message or aggregate all
# Consider if you want to break after the first user message or aggregate all
# break
# break
@@ -135,19 +161,24 @@ async def create_chat_completion(request: ChatCompletionRequest):
text_payload = {"request_id": internal_request_id, "text": final_user_text}
text_payload = {"request_id": internal_request_id, "text": final_user_text}
arrow_text_data = pa.array([text_payload])
arrow_text_data = pa.array([text_payload])
node.send_output(DORA_TEXT_OUTPUT_TOPIC, arrow_text_data)
node.send_output(DORA_TEXT_OUTPUT_TOPIC, arrow_text_data)
print(f"FastAPI Server (Req {internal_request_id}): Sent text to DORA topic '{DORA_TEXT_OUTPUT_TOPIC}'.")
print(
f"FastAPI Server (Req {internal_request_id}): Sent text to DORA topic '{DORA_TEXT_OUTPUT_TOPIC}'."
)
data_sent_to_dora = True
data_sent_to_dora = True
if user_image_bytes:
if user_image_bytes:
image_payload = {
image_payload = {
"request_id": internal_request_id,
"request_id": internal_request_id,
"image_bytes": user_image_bytes,
"image_bytes": user_image_bytes,
"image_content_type": user_image_content_type or "application/octet-stream"
"image_content_type": user_image_content_type
or "application/octet-stream",
}
}
arrow_image_data = pa.array([image_payload])
arrow_image_data = pa.array([image_payload])
node.send_output(DORA_IMAGE_OUTPUT_TOPIC, arrow_image_data)
node.send_output(DORA_IMAGE_OUTPUT_TOPIC, arrow_image_data)
print(f"FastAPI Server (Req {internal_request_id}): Sent image to DORA topic '{DORA_IMAGE_OUTPUT_TOPIC}'.")
prompt_tokens += len(user_image_bytes) # Crude image token approximation
print(
f"FastAPI Server (Req {internal_request_id}): Sent image to DORA topic '{DORA_IMAGE_OUTPUT_TOPIC}'."
)
prompt_tokens += len(user_image_bytes) # Crude image token approximation
data_sent_to_dora = True
data_sent_to_dora = True
response_str: str
response_str: str
@@ -158,50 +189,68 @@ async def create_chat_completion(request: ChatCompletionRequest):
response_str = "No user text or image found to send to DORA."
response_str = "No user text or image found to send to DORA."
print(f"FastAPI Server (Req {internal_request_id}): {response_str}")
print(f"FastAPI Server (Req {internal_request_id}): {response_str}")
else:
else:
print(f"FastAPI Server (Req {internal_request_id}): Waiting for response from DORA on topic '{DORA_RESPONSE_INPUT_TOPIC}'...")
print(
f"FastAPI Server (Req {internal_request_id}): Waiting for response from DORA on topic '{DORA_RESPONSE_INPUT_TOPIC}'..."
)
response_str = f"Timeout: No response from DORA for request_id {internal_request_id} within {DORA_RESPONSE_TIMEOUT_SECONDS}s."
response_str = f"Timeout: No response from DORA for request_id {internal_request_id} within {DORA_RESPONSE_TIMEOUT_SECONDS}s."
# WARNING: This blocking `node.next()` loop is not ideal for highly concurrent requests
# WARNING: This blocking `node.next()` loop is not ideal for highly concurrent requests
# in a single FastAPI worker process, as one request might block others or consume
# in a single FastAPI worker process, as one request might block others or consume
# a response meant for another if `request_id` matching isn't perfect or fast enough.
# a response meant for another if `request_id` matching isn't perfect or fast enough.
# A more robust solution would involve a dedicated listener task and async Futures/Queues.
# A more robust solution would involve a dedicated listener task and async Futures/Queues.
start_wait_time = time.monotonic()
start_wait_time = time.monotonic()
while time.monotonic() - start_wait_time < DORA_RESPONSE_TIMEOUT_SECONDS:
while time.monotonic() - start_wait_time < DORA_RESPONSE_TIMEOUT_SECONDS:
remaining_timeout = DORA_RESPONSE_TIMEOUT_SECONDS - (time.monotonic() - start_wait_time)
if remaining_timeout <= 0: break
remaining_timeout = DORA_RESPONSE_TIMEOUT_SECONDS - (
time.monotonic() - start_wait_time
)
if remaining_timeout <= 0:
break
event = node.next(
timeout=min(1.0, remaining_timeout)
) # Poll with a smaller timeout
event = node.next(timeout=min(1.0, remaining_timeout)) # Poll with a smaller timeout
if event is None: # Timeout for this poll iteration
if event is None: # Timeout for this poll iteration
continue
continue
if event["type"] == "INPUT" and event["id"] == DORA_RESPONSE_INPUT_TOPIC:
if event["type"] == "INPUT" and event["id"] == DORA_RESPONSE_INPUT_TOPIC:
response_value_arrow = event["value"]
response_value_arrow = event["value"]
if response_value_arrow and len(response_value_arrow) > 0:
if response_value_arrow and len(response_value_arrow) > 0:
dora_response_data = response_value_arrow[0].as_py() # Expecting a dict
dora_response_data = response_value_arrow[
0
].as_py() # Expecting a dict
if isinstance(dora_response_data, dict):
if isinstance(dora_response_data, dict):
resp_request_id = dora_response_data.get("request_id")
resp_request_id = dora_response_data.get("request_id")
if resp_request_id == internal_request_id:
if resp_request_id == internal_request_id:
response_str = dora_response_data.get("response_text", f"DORA response for {internal_request_id} missing 'response_text'.")
print(f"FastAPI Server (Req {internal_request_id}): Received correlated DORA response.")
break # Correct response received
else:
# This response is for another request. Ideally, store it.
print(f"FastAPI Server (Req {internal_request_id}): Received DORA response for different request_id '{resp_request_id}'. Discarding and waiting. THIS IS A CONCURRENCY ISSUE.")
# unmatched_dora_responses[resp_request_id] = dora_response_data # Example of storing
response_str = dora_response_data.get(
"response_text",
f"DORA response for {internal_request_id} missing 'response_text'.",
)
print(
f"FastAPI Server (Req {internal_request_id}): Received correlated DORA response."
)
break # Correct response received
# This response is for another request. Ideally, store it.
print(
f"FastAPI Server (Req {internal_request_id}): Received DORA response for different request_id '{resp_request_id}'. Discarding and waiting. THIS IS A CONCURRENCY ISSUE."
)
# unmatched_dora_responses[resp_request_id] = dora_response_data # Example of storing
else:
else:
response_str = f"Unrecognized DORA response format for {internal_request_id}: {str(dora_response_data)[:100]}"
response_str = f"Unrecognized DORA response format for {internal_request_id}: {str(dora_response_data)[:100]}"
break
break
else:
else:
response_str = f"Empty response payload from DORA for {internal_request_id}."
response_str = (
f"Empty response payload from DORA for {internal_request_id}."
)
break
break
elif event["type"] == "ERROR":
elif event["type"] == "ERROR":
response_str = f"Error event from DORA for {internal_request_id}: {event.get('value', event.get('error', 'Unknown DORA Error'))}"
response_str = f"Error event from DORA for {internal_request_id}: {event.get('value', event.get('error', 'Unknown DORA Error'))}"
print(response_str)
print(response_str)
break
break
else: # Outer while loop timed out
print(f"FastAPI Server (Req {internal_request_id}): Overall timeout waiting for DORA response.")
else: # Outer while loop timed out
print(
f"FastAPI Server (Req {internal_request_id}): Overall timeout waiting for DORA response."
)
completion_tokens = len(response_str)
completion_tokens = len(response_str)
total_tokens = prompt_tokens + completion_tokens
total_tokens = prompt_tokens + completion_tokens
@@ -213,7 +262,9 @@ async def create_chat_completion(request: ChatCompletionRequest):
choices=[
choices=[
ChatCompletionChoice(
ChatCompletionChoice(
index=0,
index=0,
message=ChatCompletionChoiceMessage(role="assistant", content=response_str),
message=ChatCompletionChoiceMessage(
role="assistant", content=response_str
),
finish_reason="stop",
finish_reason="stop",
)
)
],
],
@@ -224,6 +275,7 @@ async def create_chat_completion(request: ChatCompletionRequest):
),
),
)
)
@app.get("/v1/models")
@app.get("/v1/models")
async def list_models():
async def list_models():
return {
return {
@@ -231,12 +283,17 @@ async def list_models():
"data": [
"data": [
{
{
"id": "dora-multi-stream-vision",
"id": "dora-multi-stream-vision",
"object": "model", "created": int(time.time()), "owned_by": "dora-ai",
"permission": [], "root": "dora-multi-stream-vision", "parent": None,
"object": "model",
"created": int(time.time()),
"owned_by": "dora-ai",
"permission": [],
"root": "dora-multi-stream-vision",
"parent": None,
},
},
],
],
}
}
async def run_fastapi_server_task():
async def run_fastapi_server_task():
config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info")
config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info")
server = uvicorn.Server(config)
server = uvicorn.Server(config)
@@ -244,6 +301,7 @@ async def run_fastapi_server_task():
await server.serve()
await server.serve()
print("FastAPI Server: Uvicorn server stopped.")
print("FastAPI Server: Uvicorn server stopped.")
async def run_dora_main_loop_task():
async def run_dora_main_loop_task():
if not node:
if not node:
print("FastAPI Server: DORA node not initialized, DORA main loop skipped.")
print("FastAPI Server: DORA node not initialized, DORA main loop skipped.")
@@ -253,21 +311,31 @@ async def run_dora_main_loop_task():
while True:
while True:
# This loop is primarily for the main "STOP" event for the FastAPI node itself.
# This loop is primarily for the main "STOP" event for the FastAPI node itself.
# Individual request/response cycles are handled within the endpoint.
# Individual request/response cycles are handled within the endpoint.
event = node.next(timeout=1.0) # Check for STOP periodically
event = node.next(timeout=1.0) # Check for STOP periodically
if event is None:
if event is None:
await asyncio.sleep(0.01) # Yield control if no event
await asyncio.sleep(0.01) # Yield control if no event
continue
continue
if event["type"] == "STOP":
if event["type"] == "STOP":
print("FastAPI Server: DORA STOP event received. Requesting server shutdown.")
print(
"FastAPI Server: DORA STOP event received. Requesting server shutdown."
)
# Attempt to gracefully shut down Uvicorn
# Attempt to gracefully shut down Uvicorn
# This is tricky; uvicorn's server.shutdown() or server.should_exit might be better
# This is tricky; uvicorn's server.shutdown() or server.should_exit might be better
# For simplicity, we cancel the server task.
# For simplicity, we cancel the server task.
for task in asyncio.all_tasks():
for task in asyncio.all_tasks():
# Identify the server task more reliably if possible
# Identify the server task more reliably if possible
if task.get_coro().__name__ == 'serve' and hasattr(task.get_coro(), 'cr_frame') and \
isinstance(task.get_coro().cr_frame.f_locals.get('self'), uvicorn.Server):
if (
task.get_coro().__name__ == "serve"
and hasattr(task.get_coro(), "cr_frame")
and isinstance(
task.get_coro().cr_frame.f_locals.get("self"),
uvicorn.Server,
)
):
task.cancel()
task.cancel()
print("FastAPI Server: Uvicorn server task cancellation requested.")
print(
"FastAPI Server: Uvicorn server task cancellation requested."
)
break
break
# Handle other unexpected general inputs/errors for the FastAPI node if necessary
# Handle other unexpected general inputs/errors for the FastAPI node if necessary
# elif event["type"] == "INPUT":
# elif event["type"] == "INPUT":
@@ -280,9 +348,10 @@ async def run_dora_main_loop_task():
finally:
finally:
print("FastAPI Server: DORA main loop listener finished.")
print("FastAPI Server: DORA main loop listener finished.")
async def main_async_runner():
async def main_async_runner():
server_task = asyncio.create_task(run_fastapi_server_task())
server_task = asyncio.create_task(run_fastapi_server_task())
# Only run the DORA main loop if the node was initialized.
# Only run the DORA main loop if the node was initialized.
# This loop is mainly for the STOP event.
# This loop is mainly for the STOP event.
dora_listener_task = None
dora_listener_task = None
@@ -293,17 +362,19 @@ async def main_async_runner():
tasks_to_wait_for = [server_task]
tasks_to_wait_for = [server_task]
done, pending = await asyncio.wait(
done, pending = await asyncio.wait(
tasks_to_wait_for, return_when=asyncio.FIRST_COMPLETED,
tasks_to_wait_for,
return_when=asyncio.FIRST_COMPLETED,
)
)
for task in pending:
for task in pending:
print(f"FastAPI Server: Cancelling pending task: {task.get_name()}")
print(f"FastAPI Server: Cancelling pending task: {task.get_name()}")
task.cancel()
task.cancel()
if pending:
if pending:
await asyncio.gather(*pending, return_exceptions=True)
await asyncio.gather(*pending, return_exceptions=True)
print("FastAPI Server: Application shutdown complete.")
print("FastAPI Server: Application shutdown complete.")
def main():
def main():
print("FastAPI Server: Starting application...")
print("FastAPI Server: Starting application...")
try:
try:
@@ -313,5 +384,6 @@ def main():
finally:
finally:
print("FastAPI Server: Exited main function.")
print("FastAPI Server: Exited main function.")
if __name__ == "__main__":
if __name__ == "__main__":
main()
main()