| @@ -0,0 +1,22 @@ | |||
| # Dora LMDeploy Node | |||
| Experimental Dora node for efficient inference using LMDeploy. | |||
| ## YAML Specification | |||
| Use this node as follows: | |||
| id: dora-lmdeploy | |||
| build: pip install dora-lmdeploy | |||
| path: dora-lmdeploy | |||
| inputs: | |||
| image: | |||
| source: camera/image | |||
| queue_size: 1 | |||
| text: dora-distil-whisper/text | |||
| outputs: | |||
| text | |||
| env: | |||
| MODEL_NAME: "<your-lmdeploy-model-name>" | |||
| DEFAULT_PROMPT: "Describe the image briefly." | |||
| @@ -0,0 +1,11 @@ | |||
| import os | |||
| # Define the path to the README file relative to the package directory | |||
| readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.md") | |||
| # Read the content of the README file | |||
| try: | |||
| with open(readme_path, encoding="utf-8") as f: | |||
| __doc__ = f.read() | |||
| except FileNotFoundError: | |||
| __doc__ = "README file not found." | |||
| @@ -0,0 +1,3 @@ | |||
| { | |||
| "chat_template": "{% set image_count = namespace(value=0) %}{% set video_count = namespace(value=0) %}{% for message in messages %}{% if loop.first and message['role'] != 'system' %}<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n{% endif %}<|im_start|>{{ message['role'] }}\n{% if message['content'] is string %}{{ message['content'] }}<|im_end|>\n{% else %}{% for content in message['content'] %}{% if content['type'] == 'image' or 'image' in content or 'image_url' in content %}{% set image_count.value = image_count.value + 1 %}{% if add_vision_id %}Picture {{ image_count.value }}: {% endif %}<|vision_start|><|image_pad|><|vision_end|>{% elif content['type'] == 'video' or 'video' in content %}{% set video_count.value = video_count.value + 1 %}{% if add_vision_id %}Video {{ video_count.value }}: {% endif %}<|vision_start|><|video_pad|><|vision_end|>{% elif 'text' in content %}{{ content['text'] }}{% endif %}{% endfor %}<|im_end|>\n{% endif %}{% endfor %}{% if add_generation_prompt %}<|im_start|>assistant\n{% endif %}" | |||
| } | |||
| @@ -0,0 +1,114 @@ | |||
| """Dora Node for LMDeploy-based inference.""" | |||
| import os | |||
| import cv2 | |||
| import numpy as np | |||
| import pyarrow as pa | |||
| from dora import Node | |||
| from PIL import Image | |||
| from lmdeploy import pipeline | |||
| # Environment variables | |||
| MODEL_NAME = os.getenv("MODEL_NAME", "internlm/internlm-xcomposer2-vl-7b") | |||
| DEFAULT_PROMPT = os.getenv("DEFAULT_PROMPT", "Describe this image.") | |||
| IMAGE_RESIZE_RATIO = float(os.getenv("IMAGE_RESIZE_RATIO", "1.0")) | |||
| ACTIVATION_WORDS = os.getenv("ACTIVATION_WORDS", "").split() | |||
| # Initialize LMDeploy pipeline | |||
| pipe = pipeline(model_name=MODEL_NAME, backend_config={"backend": "tensorrt"}) | |||
| def generate(frames: dict, question: str, image_id=None): | |||
| """Generate a response using LMDeploy given images and a question.""" | |||
| if image_id is not None: | |||
| images = [frames[image_id]] | |||
| else: | |||
| images = list(frames.values()) | |||
| # Perform inference using LMDeploy pipeline | |||
| result = pipe((question, images)) | |||
| return result.text.strip() | |||
| def main(): | |||
| """Main function to handle events in the Dora node.""" | |||
| pa.array([]) # Initialize PyArrow array | |||
| node = Node() | |||
| frames = {} | |||
| cached_text = DEFAULT_PROMPT | |||
| for event in node: | |||
| event_type = event["type"] | |||
| if event_type == "INPUT": | |||
| event_id = event["id"] | |||
| if "image" in event_id: | |||
| # Process incoming image data | |||
| storage = event["value"] | |||
| metadata = event["metadata"] | |||
| encoding = metadata["encoding"] | |||
| width = metadata["width"] | |||
| height = metadata["height"] | |||
| if encoding in ["bgr8", "rgb8", "jpeg", "jpg", "jpe", "bmp", "webp", "png"]: | |||
| channels = 3 | |||
| storage_type = np.uint8 | |||
| else: | |||
| raise RuntimeError(f"Unsupported image encoding: {encoding}") | |||
| if encoding == "bgr8": | |||
| frame = ( | |||
| storage.to_numpy() | |||
| .astype(storage_type) | |||
| .reshape((height, width, channels)) | |||
| ) | |||
| frame = frame[:, :, ::-1] # Convert BGR to RGB | |||
| elif encoding == "rgb8": | |||
| frame = ( | |||
| storage.to_numpy() | |||
| .astype(storage_type) | |||
| .reshape((height, width, channels)) | |||
| ) | |||
| elif encoding in ["jpeg", "jpg", "jpe", "bmp", "webp", "png"]: | |||
| storage = storage.to_numpy() | |||
| frame = cv2.imdecode(storage, cv2.IMREAD_COLOR) | |||
| frame = frame[:, :, ::-1] # Convert BGR to RGB | |||
| else: | |||
| raise RuntimeError(f"Unsupported image encoding: {encoding}") | |||
| image = Image.fromarray(frame) | |||
| frames[event_id] = image | |||
| elif "text" in event_id: | |||
| # Process incoming text data | |||
| if len(event["value"]) > 0: | |||
| text = event["value"][0].as_py() | |||
| image_id = event["metadata"].get("image_id", None) | |||
| else: | |||
| text = cached_text | |||
| words = text.split() | |||
| if len(ACTIVATION_WORDS) > 0 and all(word not in ACTIVATION_WORDS for word in words): | |||
| continue | |||
| cached_text = text | |||
| if len(frames.keys()) == 0: | |||
| continue | |||
| # Generate response using LMDeploy pipeline | |||
| response = generate(frames, text, image_id) | |||
| node.send_output( | |||
| "text", | |||
| pa.array([response]), | |||
| {"image_id": image_id if image_id is not None else "all"}, | |||
| ) | |||
| elif event_type == "ERROR": | |||
| print("Event Error:" + event["error"]) | |||
| if __name__ == "__main__": | |||
| main() | |||
| @@ -0,0 +1,36 @@ | |||
| [project] | |||
| name = "dora-lmdeploy" | |||
| version = "0.1.0" | |||
| authors = [ | |||
| { name = "Haixuan Xavier Tao", email = "tao.xavier@outlook.com" }, | |||
| { name = "Enzo Le Van", email = "dev@enzo-le-van.fr" }, | |||
| ] | |||
| description = "Dora Node for inference using LMDeploy" | |||
| license = { text = "MIT" } | |||
| readme = "README.md" | |||
| requires-python = ">=3.9" | |||
| dependencies = [ | |||
| "dora-rs >= 0.3.9", | |||
| "numpy < 2.0.0", | |||
| "torch == 2.4.0", | |||
| "torchvision >= 0.19", | |||
| "torchaudio >= 2.1.0", | |||
| "lmdeploy >= 0.2.0", | |||
| "opencv-python >= 4.1.1", | |||
| ] | |||
| [dependency-groups] | |||
| dev = ["pytest >=8.1.1", "ruff >=0.9.1"] | |||
| [project.scripts] | |||
| dora-lmdeploy = "dora_lmdeploy.main:main" | |||
| [build-system] | |||
| requires = ["setuptools", "setuptools-scm"] | |||
| build-backend = "setuptools.build_meta" | |||
| [tool.ruff.lint] | |||
| extend-select = [ | |||
| "D", # pydocstyle | |||
| ] | |||
| @@ -0,0 +1,19 @@ | |||
| """Module for testing Dora LMDeploy functionality. | |||
| This module contains tests for verifying the functionality of the Dora LMDeploy component. | |||
| """ | |||
| import pytest | |||
| def test_import_main(): | |||
| """Test that the main function in Dora LMDeploy can be imported and executed. | |||
| This test verifies that the `main` function can be imported and runs without errors, | |||
| catching any expected RuntimeError when executed outside a valid Dora dataflow context. | |||
| """ | |||
| from dora_lmdeploy.main import main | |||
| with pytest.raises(RuntimeError, match=".*Node.*"): | |||
| main() | |||