You will find in this pull request a quick implementation of an openAI server makes it possible to make request from an openai endpoint client and receive it within the dataflow. Note that I have only implemented chat completion and this is still experimental. Get started with: ```bash cd examples/openai-server dora build dataflow.yml dora start dataflow.yml # In a separate terminal python openai_api_client.py ```tags/v0.3.7rc0
| @@ -0,0 +1 @@ | |||||
| *.pt | |||||
| @@ -0,0 +1,17 @@ | |||||
| # Dora openai echo example | |||||
| This is a quick example to showcase how use the `dora-openai-server` to receive and send back data. | |||||
| Dora Openai Server is still experimental and may change in the future. | |||||
| Make sure to have, dora, pip and cargo installed. | |||||
| ```bash | |||||
| dora up | |||||
| dora build dataflow.yml | |||||
| dora start dataflow.yml | |||||
| # In a separate terminal | |||||
| python openai_api_client.py | |||||
| dora stop | |||||
| ``` | |||||
| @@ -0,0 +1,16 @@ | |||||
| nodes: | |||||
| - id: dora-openai-server | |||||
| build: pip install -e ../../node-hub/dora-openai-server | |||||
| path: dora-openai-server | |||||
| outputs: | |||||
| - v1/chat/completions | |||||
| inputs: | |||||
| v1/chat/completions: dora-echo/echo | |||||
| - id: dora-echo | |||||
| build: pip install -e ../../node-hub/dora-echo | |||||
| path: dora-echo | |||||
| inputs: | |||||
| echo: dora-openai-server/v1/chat/completions | |||||
| outputs: | |||||
| - echo | |||||
| @@ -0,0 +1,38 @@ | |||||
| from openai import OpenAI | |||||
| client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy_api_key") | |||||
| def test_list_models(): | |||||
| try: | |||||
| models = client.models.list() | |||||
| print("Available models:") | |||||
| for model in models.data: | |||||
| print(f"- {model.id}") | |||||
| except Exception as e: | |||||
| print(f"Error listing models: {e}") | |||||
| def test_chat_completion(user_input): | |||||
| try: | |||||
| response = client.chat.completions.create( | |||||
| model="gpt-3.5-turbo", | |||||
| messages=[ | |||||
| {"role": "system", "content": "You are a helpful assistant."}, | |||||
| {"role": "user", "content": user_input}, | |||||
| ], | |||||
| ) | |||||
| print("Chat Completion Response:") | |||||
| print(response.choices[0].message.content) | |||||
| except Exception as e: | |||||
| print(f"Error in chat completion: {e}") | |||||
| if __name__ == "__main__": | |||||
| print("Testing API endpoints...") | |||||
| test_list_models() | |||||
| print("\n" + "=" * 50 + "\n") | |||||
| chat_input = input("Enter a message for chat completion: ") | |||||
| test_chat_completion(chat_input) | |||||
| print("\n" + "=" * 50 + "\n") | |||||
| @@ -0,0 +1,5 @@ | |||||
| # Dora OpenAI Server | |||||
| This is an experimental to expose an openai server endpoint with dora. | |||||
| Check example at [examples/openai-server](../../examples/openai-server/README.md) | |||||
| @@ -0,0 +1,11 @@ | |||||
| import os | |||||
| # Define the path to the README file relative to the package directory | |||||
| readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.md") | |||||
| # Read the content of the README file | |||||
| try: | |||||
| with open(readme_path, "r", encoding="utf-8") as f: | |||||
| __doc__ = f.read() | |||||
| except FileNotFoundError: | |||||
| __doc__ = "README file not found." | |||||
| @@ -0,0 +1,135 @@ | |||||
| from fastapi import FastAPI | |||||
| from pydantic import BaseModel | |||||
| from typing import List, Optional | |||||
| import uvicorn | |||||
| from dora import Node | |||||
| import asyncio | |||||
| import pyarrow as pa | |||||
| import ast | |||||
| DORA_RESPONSE_TIMEOUT = 10 | |||||
| app = FastAPI() | |||||
| class ChatCompletionMessage(BaseModel): | |||||
| role: str | |||||
| content: str | |||||
| class ChatCompletionRequest(BaseModel): | |||||
| model: str | |||||
| messages: List[ChatCompletionMessage] | |||||
| temperature: Optional[float] = 1.0 | |||||
| max_tokens: Optional[int] = 100 | |||||
| class ChatCompletionResponse(BaseModel): | |||||
| id: str | |||||
| object: str | |||||
| created: int | |||||
| model: str | |||||
| choices: List[dict] | |||||
| usage: dict | |||||
| node = Node() # provide the name to connect to the dataflow if dynamic node | |||||
| @app.post("/v1/chat/completions") | |||||
| async def create_chat_completion(request: ChatCompletionRequest): | |||||
| data = next( | |||||
| (msg.content for msg in request.messages if msg.role == "user"), | |||||
| "No user message found.", | |||||
| ) | |||||
| # Convert user_message to Arrow array | |||||
| # user_message_array = pa.array([user_message]) | |||||
| # Publish user message to dora-echo | |||||
| # node.send_output("user_query", user_message_array) | |||||
| try: | |||||
| data = ast.literal_eval(data) | |||||
| except ValueError: | |||||
| print("Passing input as string") | |||||
| except SyntaxError: | |||||
| print("Passing input as string") | |||||
| if isinstance(data, list): | |||||
| data = pa.array(data) # initialize pyarrow array | |||||
| elif isinstance(data, str): | |||||
| data = pa.array([data]) | |||||
| elif isinstance(data, int): | |||||
| data = pa.array([data]) | |||||
| elif isinstance(data, float): | |||||
| data = pa.array([data]) | |||||
| elif isinstance(data, dict): | |||||
| data = pa.array([data]) | |||||
| else: | |||||
| data = pa.array(data) # initialize pyarrow array | |||||
| node.send_output("v1/chat/completions", data) | |||||
| # Wait for response from dora-echo | |||||
| while True: | |||||
| event = node.next(timeout=DORA_RESPONSE_TIMEOUT) | |||||
| if event["type"] == "ERROR": | |||||
| response_str = "No response received. Err: " + event["value"][0].as_py() | |||||
| break | |||||
| elif event["type"] == "INPUT" and event["id"] == "v1/chat/completions": | |||||
| response = event["value"] | |||||
| response_str = response[0].as_py() if response else "No response received" | |||||
| break | |||||
| else: | |||||
| pass | |||||
| return ChatCompletionResponse( | |||||
| id="chatcmpl-1234", | |||||
| object="chat.completion", | |||||
| created=1234567890, | |||||
| model=request.model, | |||||
| choices=[ | |||||
| { | |||||
| "index": 0, | |||||
| "message": {"role": "assistant", "content": response_str}, | |||||
| "finish_reason": "stop", | |||||
| } | |||||
| ], | |||||
| usage={ | |||||
| "prompt_tokens": len(data), | |||||
| "completion_tokens": len(response_str), | |||||
| "total_tokens": len(data) + len(response_str), | |||||
| }, | |||||
| ) | |||||
| @app.get("/v1/models") | |||||
| async def list_models(): | |||||
| return { | |||||
| "object": "list", | |||||
| "data": [ | |||||
| { | |||||
| "id": "gpt-3.5-turbo", | |||||
| "object": "model", | |||||
| "created": 1677610602, | |||||
| "owned_by": "openai", | |||||
| } | |||||
| ], | |||||
| } | |||||
| async def run_fastapi(): | |||||
| config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info") | |||||
| server = uvicorn.Server(config) | |||||
| server = asyncio.gather(server.serve()) | |||||
| while True: | |||||
| await asyncio.sleep(1) | |||||
| event = node.next(0.001) | |||||
| if event["type"] == "STOP": | |||||
| break | |||||
| def main(): | |||||
| asyncio.run(run_fastapi()) | |||||
| if __name__ == "__main__": | |||||
| asyncio.run(run_fastapi()) | |||||
| @@ -0,0 +1,30 @@ | |||||
| [tool.poetry] | |||||
| name = "dora-openai-server" | |||||
| version = "0.3.6" | |||||
| authors = [ | |||||
| "Haixuan Xavier Tao <tao.xavier@outlook.com>", | |||||
| "Enzo Le Van <dev@enzo-le-van.fr>", | |||||
| ] | |||||
| description = "Dora OpenAI API Server" | |||||
| license = "MIT License" | |||||
| homepage = "https://github.com/dora-rs/dora.git" | |||||
| documentation = "https://github.com/dora-rs/dora/blob/main/node-hub/dora-openai-server/README.md" | |||||
| readme = "README.md" | |||||
| packages = [{ include = "dora_openai_server" }] | |||||
| [tool.poetry.dependencies] | |||||
| dora-rs = "^0.3.6" | |||||
| numpy = "< 2.0.0" | |||||
| pyarrow = ">= 5.0.0" | |||||
| python = "^3.7" | |||||
| fastapi = "^0.115" | |||||
| asyncio = "^3.4" | |||||
| uvicorn = "^0.31" | |||||
| pydantic = "^2.9" | |||||
| [tool.poetry.scripts] | |||||
| dora-openai-server = "dora_openai_server.main:main" | |||||
| [build-system] | |||||
| requires = ["poetry-core>=1.8.0"] | |||||
| build-backend = "poetry.core.masonry.api" | |||||
| @@ -0,0 +1,2 @@ | |||||
| def test_import_main(): | |||||
| pass | |||||