Browse Source

Minor fix and adding test cases

tags/v0.3.11-rc1
haixuantao 10 months ago
parent
commit
d0b304c397
6 changed files with 105 additions and 93 deletions
  1. +40
    -82
      node-hub/dora-phi4/dora_phi4/main.py
  2. +5
    -6
      node-hub/pyarrow-assert/pyarrow_assert/main.py
  3. +2
    -5
      node-hub/pyarrow-sender/pyarrow_sender/main.py
  4. +10
    -0
      tests/llm/README.md
  5. +24
    -0
      tests/llm/phi4.yaml
  6. +24
    -0
      tests/llm/qwen2.5.yaml

+ 40
- 82
node-hub/dora-phi4/dora_phi4/main.py View File

@@ -1,13 +1,7 @@
import io
from urllib.request import urlopen

import pyarrow as pa
import requests
import soundfile as sf
import torch
from accelerate import infer_auto_device_map
from dora import Node
from PIL import Image
from transformers import (
AutoModelForCausalLM,
AutoProcessor,
@@ -18,9 +12,10 @@ from transformers import (
if torch.cuda.is_available():
device = "cuda"
torch_dtype = torch.float16 # Use float16 for efficiency
elif torch.backends.mps.is_available():
device = "mps"
torch_dtype = torch.float16 # Reduce memory usage for MPS
# TODO: Uncomment this once phi4 support mps backend.
# elif torch.backends.mps.is_available():
# device = "mps"
# torch_dtype = torch.float16 # Reduce memory usage for MPS
else:
device = "cpu"
torch_dtype = torch.bfloat16 # CPU uses bfloat16 for efficiency
@@ -29,7 +24,9 @@ else:
# Load the model and processor
MODEL_PATH = "microsoft/Phi-4-multimodal-instruct"

processor = AutoProcessor.from_pretrained(MODEL_PATH, trust_remote_code=True)
processor = AutoProcessor.from_pretrained(
MODEL_PATH, trust_remote_code=True, use_fast=True
)

# Define model config
MODEL_CONFIG = {
@@ -42,66 +39,20 @@ MODEL_CONFIG = {
}

# Infer device map without full initialization
device_map = infer_auto_device_map(AutoModelForCausalLM.from_pretrained(MODEL_PATH, **MODEL_CONFIG))
device_map = infer_auto_device_map(
AutoModelForCausalLM.from_pretrained(MODEL_PATH, **MODEL_CONFIG)
)

# Load the model directly with the inferred device map
model = AutoModelForCausalLM.from_pretrained(MODEL_PATH, **MODEL_CONFIG, device_map=device_map)
model = AutoModelForCausalLM.from_pretrained(
MODEL_PATH, **MODEL_CONFIG, device_map=device_map
)

generation_config = GenerationConfig.from_pretrained(MODEL_PATH)

# Define prompt structure
USER_PROMPT = "<|user|>"
ASSISTANT_PROMPT = "<|assistant|>"
PROMPT_SUFFIX = "<|end|>"


def process_image(image_url):
"""Processes an image through the model and returns the response."""
prompt = f"{USER_PROMPT}<|image_1|>What is shown in this image?{PROMPT_SUFFIX}{ASSISTANT_PROMPT}"

# Download and open image
image = Image.open(requests.get(image_url, stream=True).raw)

# Process input
inputs = processor(text=prompt, images=image, return_tensors="pt").to(model.device)

# Generate response
with torch.no_grad():
generate_ids = model.generate(
**inputs, max_new_tokens=512, generation_config=generation_config,
)
generate_ids = generate_ids[:, inputs["input_ids"].shape[1] :]

response = processor.batch_decode(
generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False,
)[0]
return response


def process_audio(audio_url):
"""Processes an audio file through the model and returns the transcript + translation."""
speech_prompt = "Transcribe the audio to text, and then translate the audio to French. Use <sep> as a separator."
prompt = f"{USER_PROMPT}<|audio_1|>{speech_prompt}{PROMPT_SUFFIX}{ASSISTANT_PROMPT}"

# Download and read audio file
audio, samplerate = sf.read(io.BytesIO(urlopen(audio_url).read()))

# Process input
inputs = processor(
text=prompt, audios=[(audio, samplerate)], return_tensors="pt",
).to(model.device)

# Generate response
with torch.no_grad():
generate_ids = model.generate(
**inputs, max_new_tokens=512, generation_config=generation_config,
)
generate_ids = generate_ids[:, inputs["input_ids"].shape[1] :]

response = processor.batch_decode(
generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False,
)[0]
return response
user_prompt = "<|user|>"
assistant_prompt = "<|assistant|>"
prompt_suffix = "<|end|>"


def main():
@@ -110,23 +61,30 @@ def main():
for event in node:
if event["type"] == "INPUT":
input_id = event["id"]
value = event["value"]

print(f"Received event: {input_id}, value: {value}")

# Check if it's an image URL
if input_id == "image_input":
image_response = process_image(value.as_py()) # Convert from PyArrow
node.send_output(
output_id="image_output", data=pa.array([image_response]),
)

# Check if it's an audio URL
elif input_id == "audio_input":
audio_response = process_audio(value.as_py()) # Convert from PyArrow
node.send_output(
output_id="audio_output", data=pa.array([audio_response]),
)
if input_id == "text":
text = event["value"][0].as_py()
prompt = f"{user_prompt}{text}{prompt_suffix}{assistant_prompt}"

# Process input
inputs = processor(
text=prompt,
return_tensors="pt",
).to(model.device)
# Generate response
with torch.no_grad():
generate_ids = model.generate(
**inputs,
max_new_tokens=512,
generation_config=generation_config,
)
generate_ids = generate_ids[:, inputs["input_ids"].shape[1] :]

response = processor.batch_decode(
generate_ids,
skip_special_tokens=True,
clean_up_tokenization_spaces=False,
)[0]
node.send_output("text", pa.array([response]))


if __name__ == "__main__":


+ 5
- 6
node-hub/pyarrow-assert/pyarrow_assert/main.py View File

@@ -7,8 +7,6 @@ import os
import pyarrow as pa
from dora import Node

RUNNER_CI = True if os.getenv("CI") == "true" else False


def main():
# Handle dynamic nodes, ask for the name of the node in the dataflow, and the same values as the ENV variables.
@@ -38,11 +36,12 @@ def main():
args.name,
) # provide the name to connect to the dataflow if dynamic node

data = ast.literal_eval(data)
try:
data = ast.literal_eval(data)
except ValueError:
print("Passing input as string")

if isinstance(data, list):
data = pa.array(data) # initialize pyarrow array
elif isinstance(data, str) or isinstance(data, int) or isinstance(data, float):
if isinstance(data, (str, int, float)):
data = pa.array([data])
else:
data = pa.array(data) # initialize pyarrow array


+ 2
- 5
node-hub/pyarrow-sender/pyarrow_sender/main.py View File

@@ -7,8 +7,6 @@ import os
import pyarrow as pa
from dora import Node

RUNNER_CI = True if os.getenv("CI") == "true" else False


def main():
# Handle dynamic nodes, ask for the name of the node in the dataflow, and the same values as the ENV variables.
@@ -46,9 +44,8 @@ def main():
data = ast.literal_eval(data)
except ValueError:
print("Passing input as string")
if isinstance(data, list):
data = pa.array(data) # initialize pyarrow array
elif isinstance(data, str) or isinstance(data, int) or isinstance(data, float):

if isinstance(data, (str, int, float)):
data = pa.array([data])
else:
data = pa.array(data) # initialize pyarrow array


+ 10
- 0
tests/llm/README.md View File

@@ -0,0 +1,10 @@
# Test an LLM on a simple prompt

To run:

```bash
cd tests/llm
uv venv --seed -p 3.11
dora build phi4.yaml --uv
dora run phi4.yaml --uv
```

+ 24
- 0
tests/llm/phi4.yaml View File

@@ -0,0 +1,24 @@
nodes:
- id: pyarrow-sender
build: pip install -e ../../node-hub/pyarrow-sender
path: pyarrow-sender
outputs:
- data
env:
DATA: "'Please only generate the following output: This is a test'"

- id: dora-phi4
build: pip install -e ../../node-hub/dora-phi4
path: dora-phi4
inputs:
text: pyarrow-sender/data
outputs:
- text

- id: pyarrow-assert
build: pip install -e ../../node-hub/pyarrow-assert
path: pyarrow-assert
inputs:
data: dora-phi4/text
env:
DATA: "'This is a test'"

+ 24
- 0
tests/llm/qwen2.5.yaml View File

@@ -0,0 +1,24 @@
nodes:
- id: pyarrow-sender
build: pip install -e ../../node-hub/pyarrow-sender
path: pyarrow-sender
outputs:
- data
env:
DATA: "'Please only output: This is a test'"

- id: dora-qwen2.5
build: pip install -e ../../node-hub/dora-qwen2.5
path: dora-qwen2-5
inputs:
text: pyarrow-sender/data
outputs:
- text

- id: pyarrow-assert
build: pip install -e ../../node-hub/pyarrow-assert
path: pyarrow-assert
inputs:
data: dora-phi4/text
env:
DATA: "'This is a test'"

Loading…
Cancel
Save