From 656d27ce149af7178ece4ef2bacd578ffd0ec92e Mon Sep 17 00:00:00 2001 From: 7SOMAY Date: Fri, 14 Mar 2025 04:09:50 +0530 Subject: [PATCH 1/8] Added dora-phi4 inside node-hub --- node-hub/dora-phi4/README.md | 40 ++++++ node-hub/dora-phi4/dora_phi4/__init__.py | 11 ++ node-hub/dora-phi4/dora_phi4/__main__.py | 5 + node-hub/dora-phi4/dora_phi4/main.py | 148 +++++++++++++++++++++ node-hub/dora-phi4/pyproject.toml | 32 +++++ node-hub/dora-phi4/tests/test_dora_phi4.py | 9 ++ 6 files changed, 245 insertions(+) create mode 100644 node-hub/dora-phi4/README.md create mode 100644 node-hub/dora-phi4/dora_phi4/__init__.py create mode 100644 node-hub/dora-phi4/dora_phi4/__main__.py create mode 100644 node-hub/dora-phi4/dora_phi4/main.py create mode 100644 node-hub/dora-phi4/pyproject.toml create mode 100644 node-hub/dora-phi4/tests/test_dora_phi4.py diff --git a/node-hub/dora-phi4/README.md b/node-hub/dora-phi4/README.md new file mode 100644 index 00000000..3f71e018 --- /dev/null +++ b/node-hub/dora-phi4/README.md @@ -0,0 +1,40 @@ +# dora-phi4 + +## Getting started + +- Install it with uv: + +```bash +uv venv -p 3.11 --seed +uv pip install -e . +``` + +## Contribution Guide + +- Format with [ruff](https://docs.astral.sh/ruff/): + +```bash +uv pip install ruff +uv run ruff check . --fix +``` + +- Lint with ruff: + +```bash +uv run ruff check . +``` + +- Test with [pytest](https://github.com/pytest-dev/pytest) + +```bash +uv pip install pytest +uv run pytest . # Test +``` + +## YAML Specification + +## Examples + +## License + +dora-phi4's code are released under the MIT License diff --git a/node-hub/dora-phi4/dora_phi4/__init__.py b/node-hub/dora-phi4/dora_phi4/__init__.py new file mode 100644 index 00000000..ac3cbef9 --- /dev/null +++ b/node-hub/dora-phi4/dora_phi4/__init__.py @@ -0,0 +1,11 @@ +import os + +# Define the path to the README file relative to the package directory +readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.md") + +# Read the content of the README file +try: + with open(readme_path, "r", encoding="utf-8") as f: + __doc__ = f.read() +except FileNotFoundError: + __doc__ = "README file not found." diff --git a/node-hub/dora-phi4/dora_phi4/__main__.py b/node-hub/dora-phi4/dora_phi4/__main__.py new file mode 100644 index 00000000..bcbfde6d --- /dev/null +++ b/node-hub/dora-phi4/dora_phi4/__main__.py @@ -0,0 +1,5 @@ +from .main import main + + +if __name__ == "__main__": + main() diff --git a/node-hub/dora-phi4/dora_phi4/main.py b/node-hub/dora-phi4/dora_phi4/main.py new file mode 100644 index 00000000..6cfc5067 --- /dev/null +++ b/node-hub/dora-phi4/dora_phi4/main.py @@ -0,0 +1,148 @@ +import io +from urllib.request import urlopen + +import pyarrow as pa +import requests +import soundfile as sf +import torch +from accelerate import infer_auto_device_map +from dora import Node +from PIL import Image +from transformers import ( + AutoModelForCausalLM, + AutoProcessor, + GenerationConfig, +) + +# 🔍 Detect the best available device +if torch.cuda.is_available(): + device = "cuda" + torch_dtype = torch.float16 # Use float16 for efficiency +elif torch.backends.mps.is_available(): + device = "mps" + torch_dtype = torch.float16 # Reduce memory usage for MPS +else: + device = "cpu" + torch_dtype = torch.float32 # CPU uses float32 + + +# Load the model and processor +MODEL_PATH = "microsoft/Phi-4-multimodal-instruct" + +processor = AutoProcessor.from_pretrained(MODEL_PATH, trust_remote_code=True) +# bnb_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16) + +model = AutoModelForCausalLM.from_pretrained( + MODEL_PATH, + # quantization_config=bnb_config, + torch_dtype=torch.float16 + if device == "cuda" + else torch.bfloat16, # Use bfloat16 for CPU + trust_remote_code=True, + _attn_implementation="flash_attention_2" + if device == "cuda" and torch.cuda.get_device_properties(0).total_memory > 16e9 + else "eager", + low_cpu_mem_usage=True, +) + +# Infer and apply the device map before moving model +device_map = infer_auto_device_map(model) + +model = AutoModelForCausalLM.from_pretrained( + MODEL_PATH, + # quantization_config=bnb_config, + torch_dtype=torch.float16 + if device == "cuda" + else torch.bfloat16, # Use bfloat16 for CPU + trust_remote_code=True, + _attn_implementation="flash_attention_2" + if device == "cuda" and torch.cuda.get_device_properties(0).total_memory > 16e9 + else "eager", + low_cpu_mem_usage=True, + device_map=device_map, +) + +generation_config = GenerationConfig.from_pretrained(MODEL_PATH) + +# Define prompt structure +USER_PROMPT = "<|user|>" +ASSISTANT_PROMPT = "<|assistant|>" +PROMPT_SUFFIX = "<|end|>" + + +def process_image(image_url): + """Processes an image through the model and returns the response.""" + prompt = f"{USER_PROMPT}<|image_1|>What is shown in this image?{PROMPT_SUFFIX}{ASSISTANT_PROMPT}" + + # Download and open image + image = Image.open(requests.get(image_url, stream=True).raw) + + # Process input + inputs = processor(text=prompt, images=image, return_tensors="pt").to(model.device) + + # Generate response + with torch.no_grad(): + generate_ids = model.generate( + **inputs, max_new_tokens=512, generation_config=generation_config + ) + generate_ids = generate_ids[:, inputs["input_ids"].shape[1] :] + + response = processor.batch_decode( + generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False + )[0] + return response + + +def process_audio(audio_url): + """Processes an audio file through the model and returns the transcript + translation.""" + speech_prompt = "Transcribe the audio to text, and then translate the audio to French. Use as a separator." + prompt = f"{USER_PROMPT}<|audio_1|>{speech_prompt}{PROMPT_SUFFIX}{ASSISTANT_PROMPT}" + + # Download and read audio file + audio, samplerate = sf.read(io.BytesIO(urlopen(audio_url).read())) + + # Process input + inputs = processor( + text=prompt, audios=[(audio, samplerate)], return_tensors="pt" + ).to(model.device) + + # Generate response + with torch.no_grad(): + generate_ids = model.generate( + **inputs, max_new_tokens=512, generation_config=generation_config + ) + generate_ids = generate_ids[:, inputs["input_ids"].shape[1] :] + + response = processor.batch_decode( + generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False + )[0] + return response + + +def main(): + node = Node() + + for event in node: + if event["type"] == "INPUT": + input_id = event["id"] + value = event["value"] + + print(f"Received event: {input_id}, value: {value}") + + # Check if it's an image URL + if input_id == "image_input": + image_response = process_image(value.as_py()) # Convert from PyArrow + node.send_output( + output_id="image_output", data=pa.array([image_response]) + ) + + # Check if it's an audio URL + elif input_id == "audio_input": + audio_response = process_audio(value.as_py()) # Convert from PyArrow + node.send_output( + output_id="audio_output", data=pa.array([audio_response]) + ) + + +if __name__ == "__main__": + main() diff --git a/node-hub/dora-phi4/pyproject.toml b/node-hub/dora-phi4/pyproject.toml new file mode 100644 index 00000000..b1c05cfb --- /dev/null +++ b/node-hub/dora-phi4/pyproject.toml @@ -0,0 +1,32 @@ +[project] +name = "dora-phi4" +version = "0.0.0" +authors = [{ name = "Somay", email = "ssomay2002@gmail.com" }] +description = "DORA node for Phi-4 multimodal model" +license = { text = "MIT" } +readme = "README.md" +requires-python = ">=3.10" + +dependencies = [ + "dora-rs>=0.3.9", + "torch==2.6.0", + "torchvision==0.21.0", + "transformers==4.48.2", + "accelerate==1.3.0", + "soundfile==0.13.1", + "pillow==11.1.0", + "scipy==1.15.2", + "backoff==2.2.1", + "peft==0.13.2", + "bitsandbytes>=0.42.0", + "requests" +] + +[tool.setuptools] +packages = ["dora_phi4"] + +[dependency-groups] +dev = ["pytest >=8.1.1", "ruff >=0.9.1"] + +[project.scripts] +dora-phi4 = "dora_phi4.main:main" diff --git a/node-hub/dora-phi4/tests/test_dora_phi4.py b/node-hub/dora-phi4/tests/test_dora_phi4.py new file mode 100644 index 00000000..fc0a3883 --- /dev/null +++ b/node-hub/dora-phi4/tests/test_dora_phi4.py @@ -0,0 +1,9 @@ +import pytest + + +def test_import_main(): + from dora_phi4.main import main + + # Check that everything is working, and catch dora Runtime Exception as we're not running in a dora dataflow. + with pytest.raises(RuntimeError): + main() From 4efd88c36f94451eb82e728612a38d918dc6bd98 Mon Sep 17 00:00:00 2001 From: 7SOMAY Date: Fri, 14 Mar 2025 21:29:24 +0530 Subject: [PATCH 2/8] Resolve the redundancy & inconsistency from main.py: dora-phi4 --- node-hub/dora-phi4/dora_phi4/__init__.py | 2 +- node-hub/dora-phi4/dora_phi4/__main__.py | 1 - node-hub/dora-phi4/dora_phi4/main.py | 55 +++++++++--------------- 3 files changed, 21 insertions(+), 37 deletions(-) diff --git a/node-hub/dora-phi4/dora_phi4/__init__.py b/node-hub/dora-phi4/dora_phi4/__init__.py index ac3cbef9..cde7a377 100644 --- a/node-hub/dora-phi4/dora_phi4/__init__.py +++ b/node-hub/dora-phi4/dora_phi4/__init__.py @@ -5,7 +5,7 @@ readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.m # Read the content of the README file try: - with open(readme_path, "r", encoding="utf-8") as f: + with open(readme_path, encoding="utf-8") as f: __doc__ = f.read() except FileNotFoundError: __doc__ = "README file not found." diff --git a/node-hub/dora-phi4/dora_phi4/__main__.py b/node-hub/dora-phi4/dora_phi4/__main__.py index bcbfde6d..40e2b013 100644 --- a/node-hub/dora-phi4/dora_phi4/__main__.py +++ b/node-hub/dora-phi4/dora_phi4/__main__.py @@ -1,5 +1,4 @@ from .main import main - if __name__ == "__main__": main() diff --git a/node-hub/dora-phi4/dora_phi4/main.py b/node-hub/dora-phi4/dora_phi4/main.py index 6cfc5067..830c0a4c 100644 --- a/node-hub/dora-phi4/dora_phi4/main.py +++ b/node-hub/dora-phi4/dora_phi4/main.py @@ -30,37 +30,22 @@ else: MODEL_PATH = "microsoft/Phi-4-multimodal-instruct" processor = AutoProcessor.from_pretrained(MODEL_PATH, trust_remote_code=True) -# bnb_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16) - -model = AutoModelForCausalLM.from_pretrained( - MODEL_PATH, - # quantization_config=bnb_config, - torch_dtype=torch.float16 - if device == "cuda" - else torch.bfloat16, # Use bfloat16 for CPU - trust_remote_code=True, - _attn_implementation="flash_attention_2" - if device == "cuda" and torch.cuda.get_device_properties(0).total_memory > 16e9 - else "eager", - low_cpu_mem_usage=True, -) -# Infer and apply the device map before moving model -device_map = infer_auto_device_map(model) - -model = AutoModelForCausalLM.from_pretrained( - MODEL_PATH, - # quantization_config=bnb_config, - torch_dtype=torch.float16 - if device == "cuda" - else torch.bfloat16, # Use bfloat16 for CPU - trust_remote_code=True, - _attn_implementation="flash_attention_2" +# Define model config +MODEL_CONFIG = { + "torch_dtype": torch_dtype, + "trust_remote_code": True, + "_attn_implementation": "flash_attention_2" if device == "cuda" and torch.cuda.get_device_properties(0).total_memory > 16e9 else "eager", - low_cpu_mem_usage=True, - device_map=device_map, -) + "low_cpu_mem_usage": True, +} + +# Infer device map without full initialization +device_map = infer_auto_device_map(AutoModelForCausalLM.from_pretrained(MODEL_PATH, **MODEL_CONFIG)) + +# Load the model directly with the inferred device map +model = AutoModelForCausalLM.from_pretrained(MODEL_PATH, **MODEL_CONFIG, device_map=device_map) generation_config = GenerationConfig.from_pretrained(MODEL_PATH) @@ -83,12 +68,12 @@ def process_image(image_url): # Generate response with torch.no_grad(): generate_ids = model.generate( - **inputs, max_new_tokens=512, generation_config=generation_config + **inputs, max_new_tokens=512, generation_config=generation_config, ) generate_ids = generate_ids[:, inputs["input_ids"].shape[1] :] response = processor.batch_decode( - generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False + generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False, )[0] return response @@ -103,18 +88,18 @@ def process_audio(audio_url): # Process input inputs = processor( - text=prompt, audios=[(audio, samplerate)], return_tensors="pt" + text=prompt, audios=[(audio, samplerate)], return_tensors="pt", ).to(model.device) # Generate response with torch.no_grad(): generate_ids = model.generate( - **inputs, max_new_tokens=512, generation_config=generation_config + **inputs, max_new_tokens=512, generation_config=generation_config, ) generate_ids = generate_ids[:, inputs["input_ids"].shape[1] :] response = processor.batch_decode( - generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False + generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False, )[0] return response @@ -133,14 +118,14 @@ def main(): if input_id == "image_input": image_response = process_image(value.as_py()) # Convert from PyArrow node.send_output( - output_id="image_output", data=pa.array([image_response]) + output_id="image_output", data=pa.array([image_response]), ) # Check if it's an audio URL elif input_id == "audio_input": audio_response = process_audio(value.as_py()) # Convert from PyArrow node.send_output( - output_id="audio_output", data=pa.array([audio_response]) + output_id="audio_output", data=pa.array([audio_response]), ) From c6f1583a18b3f4678a55cf118254a52b20f8acd9 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Thu, 13 Mar 2025 18:26:49 +0100 Subject: [PATCH 3/8] Make docker container multi platform --- .github/workflows/docker-image.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index 9276bf4f..283bb9ec 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -27,5 +27,5 @@ jobs: - name: Build the Docker image run: | - docker build docker/slim --tag ghcr.io/dora-rs/dora-slim:latest + docker build docker/slim --platform linux/amd64,linux/arm64 -t multi-platform --tag ghcr.io/dora-rs/dora-slim:latest docker push ghcr.io/dora-rs/dora-slim:latest From 96570ed5523f15be895fce7804576f1899fa15f0 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Thu, 13 Mar 2025 18:50:48 +0100 Subject: [PATCH 4/8] add arm32 into docker build process --- .github/workflows/docker-image.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index 283bb9ec..01417178 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -27,5 +27,5 @@ jobs: - name: Build the Docker image run: | - docker build docker/slim --platform linux/amd64,linux/arm64 -t multi-platform --tag ghcr.io/dora-rs/dora-slim:latest + docker build docker/slim --platform linux/amd64,linux/arm64,linux/arm32v6,linux/arm32v7 -t multi-platform --tag ghcr.io/dora-rs/dora-slim:latest docker push ghcr.io/dora-rs/dora-slim:latest From 6b0ccc5d6bdb8578e0b24627d79a344ad8193a24 Mon Sep 17 00:00:00 2001 From: Munish Mummadi Date: Thu, 13 Mar 2025 19:54:09 -0500 Subject: [PATCH 5/8] feat: added a pain point i faced --- node-hub/README.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/node-hub/README.md b/node-hub/README.md index 9dd0d5c6..e1301030 100644 --- a/node-hub/README.md +++ b/node-hub/README.md @@ -98,6 +98,28 @@ dev = ["pytest >=8.1.1", "ruff >=0.9.1"] opencv-plot = "opencv_plot.main:main" ``` +## Adding git dependency +- If a git repository is added as submodule. Proper path should be added in `pyproject.toml` inorder to make sure that linting and testing are exempted for that dependency. +- A very good example of how this can be done is as follows + +Correct approach: +```toml +[tool.ruff] +exclude = ["dora_magma/Magma"] + +[tool.black] +extend.exclude = "dora_magma/Magma" +``` +Incorrect Approach: +```toml +[tool.ruff] +exclude = ["dora-magma/dora_magma/Magma"] + +[tool.black] +extend.exclude = "dora_magma/Magma" +``` +##### Note: +- `dora-magma` is root folder of the node. ## License From 9dfaac509aae539f8540c501dd4b1437b273366d Mon Sep 17 00:00:00 2001 From: 7SOMAY Date: Fri, 14 Mar 2025 22:01:19 +0530 Subject: [PATCH 6/8] Changed the torch_dtype from torch.float32 to torch.bloat16 in case of CPU for efficiency --- node-hub/dora-phi4/dora_phi4/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node-hub/dora-phi4/dora_phi4/main.py b/node-hub/dora-phi4/dora_phi4/main.py index 830c0a4c..7c15160c 100644 --- a/node-hub/dora-phi4/dora_phi4/main.py +++ b/node-hub/dora-phi4/dora_phi4/main.py @@ -23,7 +23,7 @@ elif torch.backends.mps.is_available(): torch_dtype = torch.float16 # Reduce memory usage for MPS else: device = "cpu" - torch_dtype = torch.float32 # CPU uses float32 + torch_dtype = torch.bfloat16 # CPU uses bfloat16 for efficiency # Load the model and processor From d0b304c397f8645c347913c18f9d12c59e5eeb2f Mon Sep 17 00:00:00 2001 From: haixuantao Date: Sat, 15 Mar 2025 21:45:52 +0100 Subject: [PATCH 7/8] Minor fix and adding test cases --- node-hub/dora-phi4/dora_phi4/main.py | 122 ++++++------------ .../pyarrow-assert/pyarrow_assert/main.py | 11 +- .../pyarrow-sender/pyarrow_sender/main.py | 7 +- tests/llm/README.md | 10 ++ tests/llm/phi4.yaml | 24 ++++ tests/llm/qwen2.5.yaml | 24 ++++ 6 files changed, 105 insertions(+), 93 deletions(-) create mode 100644 tests/llm/README.md create mode 100644 tests/llm/phi4.yaml create mode 100644 tests/llm/qwen2.5.yaml diff --git a/node-hub/dora-phi4/dora_phi4/main.py b/node-hub/dora-phi4/dora_phi4/main.py index 7c15160c..5e9944a8 100644 --- a/node-hub/dora-phi4/dora_phi4/main.py +++ b/node-hub/dora-phi4/dora_phi4/main.py @@ -1,13 +1,7 @@ -import io -from urllib.request import urlopen - import pyarrow as pa -import requests -import soundfile as sf import torch from accelerate import infer_auto_device_map from dora import Node -from PIL import Image from transformers import ( AutoModelForCausalLM, AutoProcessor, @@ -18,9 +12,10 @@ from transformers import ( if torch.cuda.is_available(): device = "cuda" torch_dtype = torch.float16 # Use float16 for efficiency -elif torch.backends.mps.is_available(): - device = "mps" - torch_dtype = torch.float16 # Reduce memory usage for MPS +# TODO: Uncomment this once phi4 support mps backend. +# elif torch.backends.mps.is_available(): +# device = "mps" +# torch_dtype = torch.float16 # Reduce memory usage for MPS else: device = "cpu" torch_dtype = torch.bfloat16 # CPU uses bfloat16 for efficiency @@ -29,7 +24,9 @@ else: # Load the model and processor MODEL_PATH = "microsoft/Phi-4-multimodal-instruct" -processor = AutoProcessor.from_pretrained(MODEL_PATH, trust_remote_code=True) +processor = AutoProcessor.from_pretrained( + MODEL_PATH, trust_remote_code=True, use_fast=True +) # Define model config MODEL_CONFIG = { @@ -42,66 +39,20 @@ MODEL_CONFIG = { } # Infer device map without full initialization -device_map = infer_auto_device_map(AutoModelForCausalLM.from_pretrained(MODEL_PATH, **MODEL_CONFIG)) +device_map = infer_auto_device_map( + AutoModelForCausalLM.from_pretrained(MODEL_PATH, **MODEL_CONFIG) +) # Load the model directly with the inferred device map -model = AutoModelForCausalLM.from_pretrained(MODEL_PATH, **MODEL_CONFIG, device_map=device_map) +model = AutoModelForCausalLM.from_pretrained( + MODEL_PATH, **MODEL_CONFIG, device_map=device_map +) generation_config = GenerationConfig.from_pretrained(MODEL_PATH) -# Define prompt structure -USER_PROMPT = "<|user|>" -ASSISTANT_PROMPT = "<|assistant|>" -PROMPT_SUFFIX = "<|end|>" - - -def process_image(image_url): - """Processes an image through the model and returns the response.""" - prompt = f"{USER_PROMPT}<|image_1|>What is shown in this image?{PROMPT_SUFFIX}{ASSISTANT_PROMPT}" - - # Download and open image - image = Image.open(requests.get(image_url, stream=True).raw) - - # Process input - inputs = processor(text=prompt, images=image, return_tensors="pt").to(model.device) - - # Generate response - with torch.no_grad(): - generate_ids = model.generate( - **inputs, max_new_tokens=512, generation_config=generation_config, - ) - generate_ids = generate_ids[:, inputs["input_ids"].shape[1] :] - - response = processor.batch_decode( - generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False, - )[0] - return response - - -def process_audio(audio_url): - """Processes an audio file through the model and returns the transcript + translation.""" - speech_prompt = "Transcribe the audio to text, and then translate the audio to French. Use as a separator." - prompt = f"{USER_PROMPT}<|audio_1|>{speech_prompt}{PROMPT_SUFFIX}{ASSISTANT_PROMPT}" - - # Download and read audio file - audio, samplerate = sf.read(io.BytesIO(urlopen(audio_url).read())) - - # Process input - inputs = processor( - text=prompt, audios=[(audio, samplerate)], return_tensors="pt", - ).to(model.device) - - # Generate response - with torch.no_grad(): - generate_ids = model.generate( - **inputs, max_new_tokens=512, generation_config=generation_config, - ) - generate_ids = generate_ids[:, inputs["input_ids"].shape[1] :] - - response = processor.batch_decode( - generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False, - )[0] - return response +user_prompt = "<|user|>" +assistant_prompt = "<|assistant|>" +prompt_suffix = "<|end|>" def main(): @@ -110,23 +61,30 @@ def main(): for event in node: if event["type"] == "INPUT": input_id = event["id"] - value = event["value"] - - print(f"Received event: {input_id}, value: {value}") - - # Check if it's an image URL - if input_id == "image_input": - image_response = process_image(value.as_py()) # Convert from PyArrow - node.send_output( - output_id="image_output", data=pa.array([image_response]), - ) - - # Check if it's an audio URL - elif input_id == "audio_input": - audio_response = process_audio(value.as_py()) # Convert from PyArrow - node.send_output( - output_id="audio_output", data=pa.array([audio_response]), - ) + if input_id == "text": + text = event["value"][0].as_py() + prompt = f"{user_prompt}{text}{prompt_suffix}{assistant_prompt}" + + # Process input + inputs = processor( + text=prompt, + return_tensors="pt", + ).to(model.device) + # Generate response + with torch.no_grad(): + generate_ids = model.generate( + **inputs, + max_new_tokens=512, + generation_config=generation_config, + ) + generate_ids = generate_ids[:, inputs["input_ids"].shape[1] :] + + response = processor.batch_decode( + generate_ids, + skip_special_tokens=True, + clean_up_tokenization_spaces=False, + )[0] + node.send_output("text", pa.array([response])) if __name__ == "__main__": diff --git a/node-hub/pyarrow-assert/pyarrow_assert/main.py b/node-hub/pyarrow-assert/pyarrow_assert/main.py index 1dd047fc..f862256d 100644 --- a/node-hub/pyarrow-assert/pyarrow_assert/main.py +++ b/node-hub/pyarrow-assert/pyarrow_assert/main.py @@ -7,8 +7,6 @@ import os import pyarrow as pa from dora import Node -RUNNER_CI = True if os.getenv("CI") == "true" else False - def main(): # Handle dynamic nodes, ask for the name of the node in the dataflow, and the same values as the ENV variables. @@ -38,11 +36,12 @@ def main(): args.name, ) # provide the name to connect to the dataflow if dynamic node - data = ast.literal_eval(data) + try: + data = ast.literal_eval(data) + except ValueError: + print("Passing input as string") - if isinstance(data, list): - data = pa.array(data) # initialize pyarrow array - elif isinstance(data, str) or isinstance(data, int) or isinstance(data, float): + if isinstance(data, (str, int, float)): data = pa.array([data]) else: data = pa.array(data) # initialize pyarrow array diff --git a/node-hub/pyarrow-sender/pyarrow_sender/main.py b/node-hub/pyarrow-sender/pyarrow_sender/main.py index 68ce3a92..a8f3bfd4 100644 --- a/node-hub/pyarrow-sender/pyarrow_sender/main.py +++ b/node-hub/pyarrow-sender/pyarrow_sender/main.py @@ -7,8 +7,6 @@ import os import pyarrow as pa from dora import Node -RUNNER_CI = True if os.getenv("CI") == "true" else False - def main(): # Handle dynamic nodes, ask for the name of the node in the dataflow, and the same values as the ENV variables. @@ -46,9 +44,8 @@ def main(): data = ast.literal_eval(data) except ValueError: print("Passing input as string") - if isinstance(data, list): - data = pa.array(data) # initialize pyarrow array - elif isinstance(data, str) or isinstance(data, int) or isinstance(data, float): + + if isinstance(data, (str, int, float)): data = pa.array([data]) else: data = pa.array(data) # initialize pyarrow array diff --git a/tests/llm/README.md b/tests/llm/README.md new file mode 100644 index 00000000..0eb47e13 --- /dev/null +++ b/tests/llm/README.md @@ -0,0 +1,10 @@ +# Test an LLM on a simple prompt + +To run: + +```bash +cd tests/llm +uv venv --seed -p 3.11 +dora build phi4.yaml --uv +dora run phi4.yaml --uv +``` diff --git a/tests/llm/phi4.yaml b/tests/llm/phi4.yaml new file mode 100644 index 00000000..3187e61d --- /dev/null +++ b/tests/llm/phi4.yaml @@ -0,0 +1,24 @@ +nodes: + - id: pyarrow-sender + build: pip install -e ../../node-hub/pyarrow-sender + path: pyarrow-sender + outputs: + - data + env: + DATA: "'Please only generate the following output: This is a test'" + + - id: dora-phi4 + build: pip install -e ../../node-hub/dora-phi4 + path: dora-phi4 + inputs: + text: pyarrow-sender/data + outputs: + - text + + - id: pyarrow-assert + build: pip install -e ../../node-hub/pyarrow-assert + path: pyarrow-assert + inputs: + data: dora-phi4/text + env: + DATA: "'This is a test'" diff --git a/tests/llm/qwen2.5.yaml b/tests/llm/qwen2.5.yaml new file mode 100644 index 00000000..9cff8060 --- /dev/null +++ b/tests/llm/qwen2.5.yaml @@ -0,0 +1,24 @@ +nodes: + - id: pyarrow-sender + build: pip install -e ../../node-hub/pyarrow-sender + path: pyarrow-sender + outputs: + - data + env: + DATA: "'Please only output: This is a test'" + + - id: dora-qwen2.5 + build: pip install -e ../../node-hub/dora-qwen2.5 + path: dora-qwen2-5 + inputs: + text: pyarrow-sender/data + outputs: + - text + + - id: pyarrow-assert + build: pip install -e ../../node-hub/pyarrow-assert + path: pyarrow-assert + inputs: + data: dora-phi4/text + env: + DATA: "'This is a test'" From 8f3061e8731547974e833fc5ebf9b344c6d08990 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Sat, 15 Mar 2025 22:00:10 +0100 Subject: [PATCH 8/8] Making exception wider for testing purposes --- node-hub/pyarrow-assert/pyarrow_assert/main.py | 2 +- node-hub/pyarrow-sender/pyarrow_sender/main.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/node-hub/pyarrow-assert/pyarrow_assert/main.py b/node-hub/pyarrow-assert/pyarrow_assert/main.py index f862256d..dfa0414b 100644 --- a/node-hub/pyarrow-assert/pyarrow_assert/main.py +++ b/node-hub/pyarrow-assert/pyarrow_assert/main.py @@ -38,7 +38,7 @@ def main(): try: data = ast.literal_eval(data) - except ValueError: + except Exception: # noqa print("Passing input as string") if isinstance(data, (str, int, float)): diff --git a/node-hub/pyarrow-sender/pyarrow_sender/main.py b/node-hub/pyarrow-sender/pyarrow_sender/main.py index a8f3bfd4..f584380c 100644 --- a/node-hub/pyarrow-sender/pyarrow_sender/main.py +++ b/node-hub/pyarrow-sender/pyarrow_sender/main.py @@ -42,7 +42,7 @@ def main(): ) try: data = ast.literal_eval(data) - except ValueError: + except Exception: # noqa print("Passing input as string") if isinstance(data, (str, int, float)):