Compare commits

...

2 Commits

Author SHA1 Message Date
  haixuantao bb89c139e7 Add configurable port 6 months ago
  haixuantao 1448f27d95 initial commit for openai vggt 6 months ago
8 changed files with 376 additions and 37 deletions
Split View
  1. +29
    -0
      examples/openai-server/image-server.yaml
  2. +88
    -0
      examples/openai-server/image_client.py
  3. +33
    -0
      examples/openai-server/parse_text.py
  4. +95
    -0
      examples/vggt/format_response.py
  5. +54
    -0
      examples/vggt/openai-server.yaml
  6. +33
    -0
      examples/vggt/parse_text.py
  7. +2
    -0
      node-hub/dora-vggt/dora_vggt/main.py
  8. +42
    -37
      node-hub/openai-proxy-server/src/main.rs

+ 29
- 0
examples/openai-server/image-server.yaml View File

@@ -0,0 +1,29 @@
nodes:
- id: dora-openai-server
build: cargo build -p dora-openai-proxy-server --release
path: ../../target/release/dora-openai-proxy-server
outputs:
- text
inputs:
text: dora-echo/echo

- id: dora-echo
build: pip install -e ../../node-hub/dora-echo
path: dora-echo
inputs:
echo: dora-openai-server/text
outputs:
- echo

- id: parse_text
path: parse_text.py
inputs:
text: dora-openai-server/text
outputs:
- image

- id: plot
build: pip install dora-rerun
path: dora-rerun
inputs:
camera/image: parse_text/image

+ 88
- 0
examples/openai-server/image_client.py View File

@@ -0,0 +1,88 @@
"""TODO: Add docstring."""

from openai import OpenAI

client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy_api_key")


def test_list_models():
"""TODO: Add docstring."""
try:
models = client.models.list()
print("Available models:")
for model in models.data:
print(f"- {model.id}")
except Exception as e:
print(f"Error listing models: {e}")


def test_chat_completion(user_input):
"""TODO: Add docstring."""
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": user_input},
],
)
print("Chat Completion Response:")
print(response.choices[0].message.content)
except Exception as e:
print(f"Error in chat completion: {e}")


def test_chat_completion_image_url(user_input):
"""TODO: Add docstring."""
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "What is in this image?"},
{
"type": "image_url",
"image_url": {
"url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
},
},
],
}
],
)
print("Chat Completion Response:")
print(response.choices[0].message.content)
except Exception as e:
print(f"Error in chat completion: {e}")


def test_chat_completion_image_base64(user_input):
"""TODO: Add docstring."""
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "What is in this image?"},
{
"type": "image_url",
"image_url": {
"url": "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAABgAAAAYCAYAAADgdz34AAAABHNCSVQICAgIfAhkiAAAAAlwSFlzAAAApgAAAKYB3X3/OAAAABl0RVh0U29mdHdhcmUAd3d3Lmlua3NjYXBlLm9yZ5vuPBoAAANCSURBVEiJtZZPbBtFFMZ/M7ubXdtdb1xSFyeilBapySVU8h8OoFaooFSqiihIVIpQBKci6KEg9Q6H9kovIHoCIVQJJCKE1ENFjnAgcaSGC6rEnxBwA04Tx43t2FnvDAfjkNibxgHxnWb2e/u992bee7tCa00YFsffekFY+nUzFtjW0LrvjRXrCDIAaPLlW0nHL0SsZtVoaF98mLrx3pdhOqLtYPHChahZcYYO7KvPFxvRl5XPp1sN3adWiD1ZAqD6XYK1b/dvE5IWryTt2udLFedwc1+9kLp+vbbpoDh+6TklxBeAi9TL0taeWpdmZzQDry0AcO+jQ12RyohqqoYoo8RDwJrU+qXkjWtfi8Xxt58BdQuwQs9qC/afLwCw8tnQbqYAPsgxE1S6F3EAIXux2oQFKm0ihMsOF71dHYx+f3NND68ghCu1YIoePPQN1pGRABkJ6Bus96CutRZMydTl+TvuiRW1m3n0eDl0vRPcEysqdXn+jsQPsrHMquGeXEaY4Yk4wxWcY5V/9scqOMOVUFthatyTy8QyqwZ+kDURKoMWxNKr2EeqVKcTNOajqKoBgOE28U4tdQl5p5bwCw7BWquaZSzAPlwjlithJtp3pTImSqQRrb2Z8PHGigD4RZuNX6JYj6wj7O4TFLbCO/Mn/m8R+h6rYSUb3ekokRY6f/YukArN979jcW+V/S8g0eT/N3VN3kTqWbQ428m9/8k0P/1aIhF36PccEl6EhOcAUCrXKZXXWS3XKd2vc/TRBG9O5ELC17MmWubD2nKhUKZa26Ba2+D3P+4/MNCFwg59oWVeYhkzgN/JDR8deKBoD7Y+ljEjGZ0sosXVTvbc6RHirr2reNy1OXd6pJsQ+gqjk8VWFYmHrwBzW/n+uMPFiRwHB2I7ih8ciHFxIkd/3Omk5tCDV1t+2nNu5sxxpDFNx+huNhVT3/zMDz8usXC3ddaHBj1GHj/As08fwTS7Kt1HBTmyN29vdwAw+/wbwLVOJ3uAD1wi/dUH7Qei66PfyuRj4Ik9is+hglfbkbfR3cnZm7chlUWLdwmprtCohX4HUtlOcQjLYCu+fzGJH2QRKvP3UNz8bWk1qMxjGTOMThZ3kvgLI5AzFfo379UAAAAASUVORK5CYII="
},
},
],
}
],
)
print("Chat Completion Response:")
print(response.choices[0].message.content)
except Exception as e:
print(f"Error in chat completion: {e}")


if __name__ == "__main__":
test_chat_completion_image_base64("abc")

+ 33
- 0
examples/openai-server/parse_text.py View File

@@ -0,0 +1,33 @@
"""TODO: Add docstring."""

from PIL import Image
from io import BytesIO
import base64
from dora import Node
import pyarrow as pa
import numpy as np
node = Node()


for event in node:
if event["type"] == "INPUT":
texts = event["value"].to_numpy(zero_copy_only=False)

for text in texts:
if text.startswith("<|user|>\n<|vision_start|>\n"):
# Handle the case where the text starts with <|user|>\n<|vision_start|>
image = text.replace("<|user|>\n<|vision_start|>\n", "")
if "base64" in image:
image = image.split(",", 1)[1]
print("image", image)
image = Image.open(BytesIO(base64.b64decode(image)))
node.send_output(
"image",
pa.array(np.array(image).ravel()),
metadata={
"encoding": "rgb8",
"width": image.width,
"height": image.height,
},
)
print(f"Processed {len(texts)} texts.")

+ 95
- 0
examples/vggt/format_response.py View File

@@ -0,0 +1,95 @@
"""TODO: Add docstring."""

import base64
from dora import Node
import pyarrow as pa
import numpy as np
import cv2
node = Node()


cache = {}

def derive_image_as_base64(event):
storage = event["value"]
metadata = event["metadata"]
encoding = metadata["encoding"]
width = metadata["width"]
height = metadata["height"]

if (
encoding == "mono16" or encoding == "z16"
):
channels = 1
storage_type = np.uint16
else:
channels = 3
storage_type = np.uint8

if encoding == "bgr8":
frame = (
storage.to_numpy()
.astype(storage_type)
.reshape((height, width, channels))
)
frame = cv2.imencode('.jpg', frame)[1].tobytes()
frame = base64.b64encode(frame).decode('utf-8')
image = f"data:image/jpeg;base64,{frame}"
elif encoding == "rgb8":
frame = (
storage.to_numpy()
.astype(storage_type)
.reshape((height, width, channels))
)
frame = frame[:, :, ::-1] # OpenCV image (RGB to BGR)
frame = cv2.imencode('.jpg', frame)[1].tobytes()
frame = base64.b64encode(frame).decode('utf-8')
image = f"data:image/jpeg;base64,{frame}"
elif encoding in ["jpeg", "jpg", "avif", "webp", "png", "bmp"]:
storage = storage.to_numpy()
image = base64.b64encode(storage).decode('utf-8')
image = f"data:image/{encoding};base64,{image}"
elif encoding in ["mono16", "z16"]:
frame = (
storage.to_numpy()
.astype(storage_type)
.reshape((height, width, 1))
)
storage = storage.to_numpy()
image = base64.b64encode(storage).decode('utf-8')
image = f"data:image/jpeg;base64,{frame}"

else:
raise RuntimeError(f"Unsupported image encoding: {encoding}")

return image

for event in node:
if event["type"] == "INPUT":
event_id = event["id"]
print(f"Received event: {event_id}")
match event_id:
case s if "image" in s:
message_id = event["metadata"].get("id", 0)
if cache.get(message_id) is None:
cache[message_id] = {}
cache[message_id]["image"] = event
case s if "depth" in s:
message_id = event["metadata"].get("id", 0)
if cache.get(message_id) is None:
cache[message_id] = {}
cache[message_id]["depth"] = event
if len(cache[message_id]) == 2:
image = cache[message_id]["image"]
image_str = derive_image_as_base64(image)
depth = cache[message_id]["depth"]
depth_str = derive_image_as_base64(depth)

node.send_output(
"text",
pa.array(np.array(image_str + "\n" + depth_str).ravel()),
metadata={
"message_id": message_id,
},
)

+ 54
- 0
examples/vggt/openai-server.yaml View File

@@ -0,0 +1,54 @@
nodes:
- id: dora-openai-server
build: cargo build -p dora-openai-proxy-server --release
path: ../../target/release/dora-openai-proxy-server
outputs:
- text
inputs:
text: format_response/text

- id: parse_text
path: parse_text.py
inputs:
text: dora-openai-server/text
outputs:
- image

- id: dora-vggt
build: pip install -e ../../node-hub/dora-vggt
path: dora-vggt
inputs:
image: parse_text/image
outputs:
- depth
- image
env:
DEPTH_ENCODING: mono16

- id: rav1e-image
path: dora-rav1e
build: pip install -e ../../node-hub/dora-rav1e
inputs:
image: dora-vggt/image
outputs:
- image
env:
ENCODING: avif

- id: rav1e-depth
path: dora-rav1e
build: pip install -e ../../node-hub/dora-rav1e
inputs:
depth: dora-vggt/depth
outputs:
- depth
env:
ENCODING: avif

- id: format_response
path: format_response.py
inputs:
depth: rav1e-depth/depth
image: rav1e-image/image
outputs:
- text

+ 33
- 0
examples/vggt/parse_text.py View File

@@ -0,0 +1,33 @@
"""TODO: Add docstring."""

from PIL import Image
from io import BytesIO
import base64
from dora import Node
import pyarrow as pa
import numpy as np
node = Node()


for event in node:
if event["type"] == "INPUT":
texts = event["value"].to_numpy(zero_copy_only=False)

for text in texts:
if text.startswith("<|user|>\n<|vision_start|>\n"):
# Handle the case where the text starts with <|user|>\n<|vision_start|>
image = text.replace("<|user|>\n<|vision_start|>\n", "")
if "base64" in image:
image = image.split(",", 1)[1]

image = Image.open(BytesIO(base64.b64decode(image)))
node.send_output(
"image",
pa.array(np.array(image).ravel()),
metadata={
"encoding": "rgb8",
"width": image.width,
"height": image.height,
},
)
print(f"Processed {len(texts)} texts.")

+ 2
- 0
node-hub/dora-vggt/dora_vggt/main.py View File

@@ -130,6 +130,7 @@ def main():
int(r_0),
int(r_1),
],
"message_id": event["metadata"].get("message_id", 0),
},
)

@@ -154,6 +155,7 @@ def main():
int(r_0),
int(r_1),
],
"message_id": event["metadata"].get("message_id", 0),
},
)



+ 42
- 37
node-hub/openai-proxy-server/src/main.rs View File

@@ -6,7 +6,7 @@ use dora_node_api::{
DoraNode, Event,
};

use eyre::{Context, ContextCompat};
use eyre::Context;
use futures::{
channel::oneshot::{self, Canceled},
TryStreamExt,
@@ -36,7 +36,10 @@ pub mod message;
#[tokio::main]
async fn main() -> eyre::Result<()> {
let web_ui = Path::new("chatbot-ui");
let port = 8000;
let port = std::env::var("PORT")
.unwrap_or_else(|_| "8080".to_string())
.parse::<u16>()
.unwrap_or(8080);
let addr = SocketAddr::from(([0, 0, 0, 0], port));

let (server_events_tx, server_events_rx) = mpsc::channel(3);
@@ -107,42 +110,44 @@ async fn main() -> eyre::Result<()> {
} => {
match id.as_str() {
"text" => {
let (reply_channel, prompt_tokens, model) =
reply_channels.pop_front().context("no reply channel")?;
let data = data.as_string::<i32>();
let string = data.iter().fold("".to_string(), |mut acc, s| {
if let Some(s) = s {
acc.push('\n');
acc.push_str(s);
}
acc
});

let data = ChatCompletionObject {
id: format!("completion-{}", uuid::Uuid::new_v4()),
object: "chat.completion".to_string(),
created: chrono::Utc::now().timestamp() as u64,
model: model.unwrap_or_default(),
choices: vec![ChatCompletionObjectChoice {
index: 0,
message: ChatCompletionObjectMessage {
role: message::ChatCompletionRole::Assistant,
content: Some(string.to_string()),
tool_calls: Vec::new(),
function_call: None,
if let Some((reply_channel, prompt_tokens, model)) =
reply_channels.pop_front()
{
let data = data.as_string::<i32>();
let string = data.iter().fold("".to_string(), |mut acc, s| {
if let Some(s) = s {
acc.push('\n');
acc.push_str(s);
}
acc
});

let data = ChatCompletionObject {
id: format!("completion-{}", uuid::Uuid::new_v4()),
object: "chat.completion".to_string(),
created: chrono::Utc::now().timestamp() as u64,
model: model.unwrap_or_default(),
choices: vec![ChatCompletionObjectChoice {
index: 0,
message: ChatCompletionObjectMessage {
role: message::ChatCompletionRole::Assistant,
content: Some(string.to_string()),
tool_calls: Vec::new(),
function_call: None,
},
finish_reason: message::FinishReason::stop,
logprobs: None,
}],
usage: Usage {
prompt_tokens,
completion_tokens: string.len() as u64,
total_tokens: prompt_tokens + string.len() as u64,
},
finish_reason: message::FinishReason::stop,
logprobs: None,
}],
usage: Usage {
prompt_tokens,
completion_tokens: string.len() as u64,
total_tokens: prompt_tokens + string.len() as u64,
},
};

if reply_channel.send(Ok(data)).is_err() {
tracing::warn!("failed to send chat completion reply because channel closed early");
};

if reply_channel.send(Ok(data)).is_err() {
tracing::warn!("failed to send chat completion reply because channel closed early");
}
}
}
_ => eyre::bail!("unexpected input id: {}", id),


Loading…
Cancel
Save