Browse Source

Adding LLM example

tags/v0.3.3-rc1^2
haixuanTao 1 year ago
parent
commit
7628b61d22
14 changed files with 924 additions and 175 deletions
  1. +0
    -3
      examples/python-operator-dataflow/dataflow.yml
  2. +69
    -0
      examples/python-operator-dataflow/dataflow_llm.yml
  3. +87
    -0
      examples/python-operator-dataflow/dataflow_record.yml
  4. +44
    -0
      examples/python-operator-dataflow/file_saver_op.py
  5. +94
    -0
      examples/python-operator-dataflow/keyboard_op.py
  6. +319
    -0
      examples/python-operator-dataflow/llm_op.py
  7. +12
    -0
      examples/python-operator-dataflow/merge.py
  8. +36
    -0
      examples/python-operator-dataflow/microphone_op.py
  9. +16
    -37
      examples/python-operator-dataflow/object_detection.py
  10. +81
    -118
      examples/python-operator-dataflow/plot.py
  11. +100
    -0
      examples/python-operator-dataflow/sentence_transformers_op.py
  12. +24
    -1
      examples/python-operator-dataflow/utils.py
  13. +17
    -16
      examples/python-operator-dataflow/webcam.py
  14. +25
    -0
      examples/python-operator-dataflow/whisper_op.py

+ 0
- 3
examples/python-operator-dataflow/dataflow.yml View File

@@ -10,12 +10,10 @@ nodes:
- id: object_detection
operator:
python: object_detection.py
send_stdout_as: stdout
inputs:
image: webcam/image
outputs:
- bbox
- stdout

- id: plot
operator:
@@ -23,4 +21,3 @@ nodes:
inputs:
image: webcam/image
bbox: object_detection/bbox
object_detection_stdout: object_detection/stdout

+ 69
- 0
examples/python-operator-dataflow/dataflow_llm.yml View File

@@ -0,0 +1,69 @@
nodes:
- id: webcam
operator:
python: webcam.py
inputs:
tick: dora/timer/millis/50
outputs:
- image

- id: object_detection
operator:
python: object_detection.py
inputs:
image: webcam/image
outputs:
- bbox

- id: plot
operator:
python: plot.py
inputs:
image: webcam/image
bbox: object_detection/bbox
line: llm/line
keyboard_buffer: keyboard/buffer
user_message: keyboard/submitted
assistant_message: llm/assistant_message

## Speech to text
- id: keyboard
custom:
source: keyboard_op.py
outputs:
- buffer
- submitted
- record
- ask
- send
- change

## Code Modifier
- id: vectordb
operator:
python: sentence_transformers_op.py
inputs:
query: keyboard/change
saved_file: file_saver/saved_file
outputs:
- raw_file

- id: llm
operator:
python: llm_op.py
inputs:
code_modifier: vectordb/raw_file
assistant: keyboard/ask
message_sender: keyboard/send
outputs:
- modified_file
- line
- assistant_message

- id: file_saver
operator:
python: file_saver_op.py
inputs:
file: llm/modified_file
outputs:
- saved_file

+ 87
- 0
examples/python-operator-dataflow/dataflow_record.yml View File

@@ -0,0 +1,87 @@
nodes:
- id: webcam
operator:
python: webcam.py
inputs:
tick: dora/timer/millis/50
outputs:
- image

- id: object_detection
operator:
python: object_detection.py
inputs:
image: webcam/image
outputs:
- bbox

- id: plot
operator:
python: plot.py
inputs:
image: webcam/image
bbox: object_detection/bbox
line: llm/line
keyboard_buffer: keyboard/buffer
user_message: keyboard/submitted
assistant_message: llm/assistant_message

## Speech to text
- id: keyboard
custom:
source: keyboard_op.py
outputs:
- buffer
- submitted
- record
- ask
- send
- change
inputs:
recording: whisper/text

- id: microphone
operator:
python: microphone_op.py
inputs:
record: keyboard/record
outputs:
- audio

- id: whisper
operator:
python: whisper_op.py
inputs:
audio: microphone/audio
outputs:
- text

## Code Modifier
- id: vectordb
operator:
python: sentence_transformers_op.py
inputs:
query: keyboard/change
saved_file: file_saver/saved_file
outputs:
- raw_file

- id: llm
operator:
python: llm_op.py
inputs:
code_modifier: vectordb/raw_file
assistant: keyboard/ask
message_sender: keyboard/send
outputs:
- modified_file
- line
- assistant_message

- id: file_saver
operator:
python: file_saver_op.py
inputs:
file: llm/modified_file
outputs:
- saved_file

+ 44
- 0
examples/python-operator-dataflow/file_saver_op.py View File

@@ -0,0 +1,44 @@
import pyarrow as pa

from dora import DoraStatus


class Operator:
"""
Infering object from images
"""

def __init__(self):
self.last_file = ""
self.last_path = ""
self.last_netadata = None

def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
if dora_event["type"] == "INPUT" and dora_event["id"] == "file":
input = dora_event["value"][0].as_py()

with open(input["path"], "r") as file:
self.last_file = file.read()
self.last_path = input["path"]
self.last_metadata = dora_event["metadata"]
with open(input["path"], "w") as file:
file.write(input["raw"])

send_output(
"saved_file",
pa.array(
[
{
"raw": input["raw"],
"path": input["path"],
"origin": dora_event["id"],
}
]
),
dora_event["metadata"],
)
return DoraStatus.CONTINUE

+ 94
- 0
examples/python-operator-dataflow/keyboard_op.py View File

@@ -0,0 +1,94 @@
from pynput import keyboard
from pynput.keyboard import Key, Events
import pyarrow as pa
from dora import Node
from tkinter import Tk
import tkinter as tk


node = Node()
buffer_text = ""
ctrl = False
submitted_text = []
cursor = 0

NODE_TOPIC = ["record", "send", "ask", "change"]

with keyboard.Events() as events:
while True:
dora_event = node.next(0.01)
if (
dora_event is not None
and dora_event["type"] == "INPUT"
and dora_event["id"] == "recording"
):
buffer_text += dora_event["value"][0].as_py()
node.send_output("buffer", pa.array([buffer_text]))
continue

event = events.get(1.0)
if event is not None and isinstance(event, Events.Press):
if hasattr(event.key, "char"):
cursor = 0
if ctrl and event.key.char == "v":
r = Tk()
r.update()
try:
selection = r.clipboard_get()
r.withdraw()
r.update()
except tk.TclError:
selection = ""
r.destroy()
buffer_text += selection
node.send_output("buffer", pa.array([buffer_text]))
elif ctrl and event.key.char == "c":
r = Tk()
r.clipboard_clear()
r.clipboard_append(buffer_text)
r.update()
r.destroy()
elif ctrl and event.key.char == "x":
r = Tk()
r.clipboard_clear()
r.clipboard_append(buffer_text)
r.update()
r.destroy()
buffer_text = ""
node.send_output("buffer", pa.array([buffer_text]))
else:
buffer_text += event.key.char
node.send_output("buffer", pa.array([buffer_text]))
else:
if event.key == Key.backspace:
buffer_text = buffer_text[:-1]
node.send_output("buffer", pa.array([buffer_text]))
elif event.key == Key.esc:
buffer_text = ""
node.send_output("buffer", pa.array([buffer_text]))
elif event.key == Key.enter:
node.send_output("submitted", pa.array([buffer_text]))
first_word = buffer_text.split(" ")[0]
if first_word in NODE_TOPIC:
node.send_output(first_word, pa.array([buffer_text]))
submitted_text.append(buffer_text)
buffer_text = ""
node.send_output("buffer", pa.array([buffer_text]))
elif event.key == Key.ctrl:
ctrl = True
elif event.key == Key.space:
buffer_text += " "
node.send_output("buffer", pa.array([buffer_text]))
elif event.key == Key.up:
if len(submitted_text) > 0:
cursor = max(cursor - 1, -len(submitted_text))
buffer_text = submitted_text[cursor]
node.send_output("buffer", pa.array([buffer_text]))
elif event.key == Key.down:
if len(submitted_text) > 0:
cursor = min(cursor + 1, 0)
buffer_text = submitted_text[cursor]
node.send_output("buffer", pa.array([buffer_text]))
elif event is not None and isinstance(event, Events.Release):
if event.key == Key.ctrl:
ctrl = False

+ 319
- 0
examples/python-operator-dataflow/llm_op.py View File

@@ -0,0 +1,319 @@
from dora import DoraStatus
import pylcs
import os
import pyarrow as pa
from transformers import AutoModelForCausalLM, AutoTokenizer
import json

import re
import time

MODEL_NAME_OR_PATH = "TheBloke/deepseek-coder-6.7B-instruct-GPTQ"
# MODEL_NAME_OR_PATH = "hanspeterlyngsoeraaschoujensen/deepseek-math-7b-instruct-GPTQ"

CODE_MODIFIER_TEMPLATE = """
### Instruction
Respond with the small modified code only. No explaination.

```python
{code}
```

{user_message}

### Response:
"""


MESSAGE_SENDER_TEMPLATE = """
### Instruction
You're a json expert. Format your response as a json with a topic and a data field in a ```json block. No explaination needed. No code needed.
The schema for those json are:
- line: Int[4]

The response should look like this:
```json

[
{{ "topic": "line", "data": [10, 10, 90, 10] }},
]
```

{user_message}

### Response:
"""

ASSISTANT_TEMPLATE = """
### Instruction
You're a helpuf assistant named dora.
Reply with a short message. No code needed.

User {user_message}

### Response:
"""


model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME_OR_PATH,
device_map="auto",
trust_remote_code=True,
revision="main",
)


tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME_OR_PATH, use_fast=True)


def extract_python_code_blocks(text):
"""
Extracts Python code blocks from the given text that are enclosed in triple backticks with a python language identifier.

Parameters:
- text: A string that may contain one or more Python code blocks.

Returns:
- A list of strings, where each string is a block of Python code extracted from the text.
"""
pattern = r"```python\n(.*?)\n```"
matches = re.findall(pattern, text, re.DOTALL)
if len(matches) == 0:
pattern = r"```python\n(.*?)(?:\n```|$)"
matches = re.findall(pattern, text, re.DOTALL)
if len(matches) == 0:
return [text]

return matches


def extract_json_code_blocks(text):
"""
Extracts json code blocks from the given text that are enclosed in triple backticks with a json language identifier.

Parameters:
- text: A string that may contain one or more json code blocks.

Returns:
- A list of strings, where each string is a block of json code extracted from the text.
"""
pattern = r"```json\n(.*?)\n```"
matches = re.findall(pattern, text, re.DOTALL)
if len(matches) == 0:
pattern = r"```json\n(.*?)(?:\n```|$)"
matches = re.findall(pattern, text, re.DOTALL)
if len(matches) == 0:
return [text]

return matches


def remove_last_line(python_code):
"""
Removes the last line from a given string of Python code.

Parameters:
- python_code: A string representing Python source code.

Returns:
- A string with the last line removed.
"""
lines = python_code.split("\n") # Split the string into lines
if lines: # Check if there are any lines to remove
lines.pop() # Remove the last line
return "\n".join(lines) # Join the remaining lines back into a string


def calculate_similarity(source, target):
"""
Calculate a similarity score between the source and target strings.
This uses the edit distance relative to the length of the strings.
"""
edit_distance = pylcs.edit_distance(source, target)
max_length = max(len(source), len(target))
# Normalize the score by the maximum possible edit distance (the length of the longer string)
similarity = 1 - (edit_distance / max_length)
return similarity


def find_best_match_location(source_code, target_block):
"""
Find the best match for the target_block within the source_code by searching line by line,
considering blocks of varying lengths.
"""
source_lines = source_code.split("\n")
target_lines = target_block.split("\n")

best_similarity = 0
best_start_index = 0
best_end_index = -1

# Iterate over the source lines to find the best matching range for all lines in target_block
for start_index in range(len(source_lines) - len(target_lines) + 1):
for end_index in range(start_index + len(target_lines), len(source_lines) + 1):
current_window = "\n".join(source_lines[start_index:end_index])
current_similarity = calculate_similarity(current_window, target_block)
if current_similarity > best_similarity:
best_similarity = current_similarity
best_start_index = start_index
best_end_index = end_index

# Convert line indices back to character indices for replacement
char_start_index = len("\n".join(source_lines[:best_start_index])) + (
1 if best_start_index > 0 else 0
)
char_end_index = len("\n".join(source_lines[:best_end_index]))

return char_start_index, char_end_index


def replace_code_in_source(source_code, replacement_block: str):
"""
Replace the best matching block in the source_code with the replacement_block, considering variable block lengths.
"""
replacement_block = extract_python_code_blocks(replacement_block)[0]
replacement_block = remove_last_line(replacement_block)
start_index, end_index = find_best_match_location(source_code, replacement_block)
if start_index != -1 and end_index != -1:
# Replace the best matching part with the replacement block
new_source = (
source_code[:start_index] + replacement_block + source_code[end_index:]
)
return new_source
else:
return source_code


class Operator:

def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
if dora_event["type"] == "INPUT" and dora_event["id"] == "code_modifier":
input = dora_event["value"][0].as_py()

with open(input["path"], "r", encoding="utf8") as f:
code = f.read()

user_message = input["user_message"]
start_llm = time.time()
output = self.ask_llm(
CODE_MODIFIER_TEMPLATE.format(code=code, user_message=user_message)
)

source_code = replace_code_in_source(code, output)
print("response time:", time.time() - start_llm, flush=True)
send_output(
"modified_file",
pa.array(
[
{
"raw": source_code,
"path": input["path"],
"response": output,
"prompt": input["user_message"],
}
]
),
dora_event["metadata"],
)
print("response: ", output, flush=True)
send_output(
"assistant_message",
pa.array([output]),
dora_event["metadata"],
)
elif dora_event["type"] == "INPUT" and dora_event["id"] == "message_sender":
user_message = dora_event["value"][0].as_py()
output = self.ask_llm(
MESSAGE_SENDER_TEMPLATE.format(user_message=user_message)
)
outputs = extract_json_code_blocks(output)[0]
try:
outputs = json.loads(outputs)
if not isinstance(outputs, list):
outputs = [outputs]
for output in outputs:
if not isinstance(output["data"], list):
output["data"] = [output["data"]]

if output["topic"] in [
"line",
]:
send_output(
output["topic"],
pa.array(output["data"]),
dora_event["metadata"],
)
else:
print("Could not find the topic: {}".format(output["topic"]))
except:
print("Could not parse json")
# if data is not iterable, put data in a list
elif dora_event["type"] == "INPUT" and dora_event["id"] == "assistant":
user_message = dora_event["value"][0].as_py()
output = self.ask_llm(ASSISTANT_TEMPLATE.format(user_message=user_message))
send_output(
"assistant_message",
pa.array([output]),
dora_event["metadata"],
)
return DoraStatus.CONTINUE

def ask_llm(self, prompt):

# Generate output
# prompt = PROMPT_TEMPLATE.format(system_message=system_message, prompt=prompt))
input = tokenizer(prompt, return_tensors="pt")
input_ids = input.input_ids.cuda()

# add attention mask here
attention_mask = input["attention_mask"]

output = model.generate(
inputs=input_ids,
temperature=0.7,
do_sample=True,
top_p=0.95,
top_k=40,
max_new_tokens=512,
attention_mask=attention_mask,
eos_token_id=tokenizer.eos_token_id,
)
# Get the tokens from the output, decode them, print them

# Get text between im_start and im_end
return tokenizer.decode(output[0], skip_special_tokens=True)[len(prompt) :]


if __name__ == "__main__":
op = Operator()

# Path to the current file
current_file_path = __file__

# Directory of the current file
current_directory = os.path.dirname(current_file_path)

path = current_directory + "object_detection.py"
with open(path, "r", encoding="utf8") as f:
raw = f.read()

op.on_event(
{
"type": "INPUT",
"id": "message_sender",
"value": pa.array(
[
{
"path": path,
"user_message": "send a star ",
},
]
),
"metadata": [],
},
print,
)

+ 12
- 0
examples/python-operator-dataflow/merge.py View File

@@ -0,0 +1,12 @@
import pyarrow as pa

with pa.memory_map("image.arrow", "r") as source:
df_i = pa.ipc.open_file(source).read_all()

with pa.memory_map("bbox.arrow", "r") as source:
df_b = pa.ipc.open_file(source).read_all()

df_i = df_i.to_pandas()
df_b = df_b.to_pandas()

df = df_i.merge(df_b, on="trace_id")

+ 36
- 0
examples/python-operator-dataflow/microphone_op.py View File

@@ -0,0 +1,36 @@
import numpy as np
import pyarrow as pa
import sounddevice as sd

from dora import DoraStatus

# Set the parameters for recording
SAMPLE_RATE = 16000
MAX_DURATION = 5


class Operator:
"""
Microphone operator that records the audio
"""

def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
if dora_event["type"] == "INPUT":
audio_data = sd.rec(
int(SAMPLE_RATE * MAX_DURATION),
samplerate=SAMPLE_RATE,
channels=1,
dtype=np.int16,
blocking=True,
)

audio_data = audio_data.ravel().astype(np.float32) / 32768.0
if len(audio_data) > 0:
send_output("audio", pa.array(audio_data), dora_event["metadata"])
elif dora_event["type"] == "INPUT":
print("Microphone is not recording", dora_event["value"][0].as_py())
return DoraStatus.CONTINUE

+ 16
- 37
examples/python-operator-dataflow/object_detection.py View File

@@ -1,61 +1,40 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import numpy as np
import pyarrow as pa

from dora import DoraStatus
from ultralytics import YOLO

pa.array([])

CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480


model = YOLO("yolov8n.pt")


class Operator:
"""
Infering object from images
"""

def __init__(self):
self.model = YOLO("yolov8n.pt")

def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
if dora_event["type"] == "INPUT":
return self.on_input(dora_event, send_output)
return DoraStatus.CONTINUE
frame = (
dora_event["value"].to_numpy().reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
)
frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB)
results = model(frame, verbose=False) # includes NMS
# Process results
boxes = np.array(results[0].boxes.xyxy.cpu())
conf = np.array(results[0].boxes.conf.cpu())
label = np.array(results[0].boxes.cls.cpu())
# concatenate them together
arrays = np.concatenate((boxes, conf[:, None], label[:, None]), axis=1)

send_output("bbox", pa.array(arrays.ravel()), dora_event["metadata"])

def on_input(
self,
dora_input,
send_output,
) -> DoraStatus:
"""Handle image
Args:
dora_input (dict) containing the "id", value, and "metadata"
send_output Callable[[str, bytes | pa.Array, Optional[dict]], None]:
Function for sending output to the dataflow:
- First argument is the `output_id`
- Second argument is the data as either bytes or `pa.Array`
- Third argument is dora metadata dict
e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
"""

frame = dora_input["value"].to_numpy().reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB)
results = self.model(frame) # includes NMS
# Process results
boxes = np.array(results[0].boxes.xyxy.cpu())
conf = np.array(results[0].boxes.conf.cpu())
label = np.array(results[0].boxes.cls.cpu())
# concatenate them together
arrays = np.concatenate((boxes, conf[:, None], label[:, None]), axis=1)

send_output("bbox", pa.array(arrays.ravel()), dora_input["metadata"])
return DoraStatus.CONTINUE

+ 81
- 118
examples/python-operator-dataflow/plot.py View File

@@ -1,22 +1,8 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os

import cv2
import numpy as np
import pyarrow as pa

from dora import DoraStatus
from utils import LABELS

pa.array([])

CI = os.environ.get("CI")
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480

font = cv2.FONT_HERSHEY_SIMPLEX
from dora import DoraStatus
from utils import LABELS, put_text, CAMERA_HEIGHT, CAMERA_WIDTH, FONT, CI


class Operator:
@@ -25,114 +11,91 @@ class Operator:
"""

def __init__(self):
self.image = []
self.bboxs = []
self.bounding_box_messages = 0
self.image_messages = 0
self.object_detection_stdout = []
self.buffer = ""
self.submitted = []
self.lines = []

def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
):
if dora_event["type"] == "INPUT":
return self.on_input(dora_event, send_output)
return DoraStatus.CONTINUE

def on_input(
self,
dora_input,
send_output,
) -> DoraStatus:
"""
Put image and bounding box on cv2 window.

Args:
dora_input["id"] (str): Id of the dora_input declared in the yaml configuration
dora_input["value"] (arrow array): message of the dora_input
send_output Callable[[str, bytes | pa.Array, Optional[dict]], None]:
Function for sending output to the dataflow:
- First argument is the `output_id`
- Second argument is the data as either bytes or `pa.Array`
- Third argument is dora metadata dict
e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
"""
if dora_input["id"] == "image":
frame = (
dora_input["value"]
.to_numpy()
.reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
.copy() # copy the image because we want to modify it below
)
self.image = frame

self.image_messages += 1
print("received " + str(self.image_messages) + " images")

elif dora_input["id"] == "object_detection_stdout":
stdout = dora_input["value"][0].as_py()
self.object_detection_stdout += [stdout]
## Only keep last 10 stdout
self.object_detection_stdout = self.object_detection_stdout[-10:]
return DoraStatus.CONTINUE

elif dora_input["id"] == "bbox" and len(self.image) != 0:
bboxs = dora_input["value"].to_numpy()
self.bboxs = np.reshape(bboxs, (-1, 6))

self.bounding_box_messages += 1
print("received " + str(self.bounding_box_messages) + " bounding boxes")
return DoraStatus.CONTINUE
else:
return DoraStatus.CONTINUE

for bbox in self.bboxs:
[
min_x,
min_y,
max_x,
max_y,
confidence,
label,
] = bbox
cv2.rectangle(
self.image,
(int(min_x), int(min_y)),
(int(max_x), int(max_y)),
(0, 255, 0),
2,
)

cv2.putText(
self.image,
LABELS[int(label)] + f", {confidence:0.2f}",
(int(max_x), int(max_y)),
font,
0.75,
(0, 255, 0),
2,
1,
)

for i, log in enumerate(self.object_detection_stdout):
cv2.putText(
self.image,
log,
(10, 10 + 20 * i),
font,
0.5,
(0, 255, 0),
2,
1,
)

if CI != "true":
cv2.imshow("frame", self.image)
if cv2.waitKey(1) & 0xFF == ord("q"):
return DoraStatus.STOP
id = dora_event["id"]
value = dora_event["value"]
if id == "image":

image = (
value.to_numpy().reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3)).copy()
)

for bbox in self.bboxs:
[
min_x,
min_y,
max_x,
max_y,
confidence,
label,
] = bbox
cv2.rectangle(
image,
(int(min_x), int(min_y)),
(int(max_x), int(max_y)),
(0, 255, 0),
)
cv2.putText(
image,
f"{LABELS[int(label)]}, {confidence:0.2f}",
(int(max_x), int(max_y)),
FONT,
0.45,
(0, 255, 0),
2,
1,
)

put_text(
image,
self.buffer,
(20, 12 * 25),
(190, 250, 0),
)

for i, text in enumerate(self.submitted[::-1]):
put_text(
image,
text["content"],
(20, 25 + (10 - i) * 25),
(0, 255, 190),
)

for line in self.lines:
cv2.line(
image,
(int(line[0]), int(line[1])),
(int(line[2]), int(line[3])),
(0, 0, 255),
2,
)

if CI != "true":
cv2.imshow("frame", image)
if cv2.waitKey(1) & 0xFF == ord("q"):
return DoraStatus.STOP
elif id == "bbox":
self.bboxs = value.to_numpy().reshape((-1, 6))
elif id == "keyboard_buffer":
self.buffer = value[0].as_py()
elif id == "line":
self.lines += [value.to_pylist()]
elif "message" in id:
self.submitted += [
{
"role": id,
"content": value[0].as_py(),
}
]

return DoraStatus.CONTINUE

def __del__(self):
cv2.destroyAllWindows()

+ 100
- 0
examples/python-operator-dataflow/sentence_transformers_op.py View File

@@ -0,0 +1,100 @@
from sentence_transformers import SentenceTransformer
from sentence_transformers import util

from dora import DoraStatus
import os
import sys
import inspect
import torch
import pyarrow as pa

SHOULD_NOT_BE_INCLUDED = [
"utils.py",
"sentence_transformers_op.py",
"chatgpt_op.py",
"llm_op.py",
]

SHOULD_BE_INCLUDED = [
"webcam.py",
"object_detection.py",
"plot.py",
]


## Get all python files path in given directory
def get_all_functions(path):
raw = []
paths = []
for root, dirs, files in os.walk(path):
for file in files:
if file.endswith(".py"):
if file not in SHOULD_BE_INCLUDED:
continue
path = os.path.join(root, file)
with open(path, "r", encoding="utf8") as f:
## add file folder to system path
sys.path.append(root)
## import module from path
raw.append(f.read())
paths.append(path)

return raw, paths


def search(query_embedding, corpus_embeddings, paths, raw, k=5, file_extension=None):
cos_scores = util.cos_sim(query_embedding, corpus_embeddings)[0]
top_results = torch.topk(cos_scores, k=min(k, len(cos_scores)), sorted=True)
out = []
for score, idx in zip(top_results[0], top_results[1]):
out.extend([raw[idx], paths[idx], score])
return out


class Operator:
""" """

def __init__(self):
## TODO: Add a initialisation step
self.model = SentenceTransformer("BAAI/bge-large-en-v1.5")
self.encoding = []
# file directory
path = os.path.dirname(os.path.abspath(__file__))

self.raw, self.path = get_all_functions(path)
# Encode all files
self.encoding = self.model.encode(self.raw)

def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
if dora_event["type"] == "INPUT":
if dora_event["id"] == "query":
values = dora_event["value"].to_pylist()

query_embeddings = self.model.encode(values)
output = search(
query_embeddings,
self.encoding,
self.path,
self.raw,
)
[raw, path, score] = output[0:3]
send_output(
"raw_file",
pa.array([{"raw": raw, "path": path, "user_message": values[0]}]),
dora_event["metadata"],
)
else:
input = dora_event["value"][0].as_py()
index = self.path.index(input["path"])
self.raw[index] = input["raw"]
self.encoding[index] = self.model.encode([input["raw"]])[0]

return DoraStatus.CONTINUE


if __name__ == "__main__":
operator = Operator()

+ 24
- 1
examples/python-operator-dataflow/utils.py View File

@@ -1,5 +1,28 @@
import os
import cv2


def put_text(image, text, position, color):
cv2.putText(
image,
text,
position,
cv2.FONT_HERSHEY_SIMPLEX,
0.45,
color,
2,
1,
)


CI = os.environ.get("CI")

CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480

FONT = cv2.FONT_HERSHEY_SIMPLEX
LABELS = [
"ABC",
"person",
"bicycle",
"car",
"motorcycle",


+ 17
- 16
examples/python-operator-dataflow/webcam.py View File

@@ -1,6 +1,3 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import time

@@ -13,6 +10,7 @@ from dora import DoraStatus
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))
CI = os.environ.get("CI")

font = cv2.FONT_HERSHEY_SIMPLEX

@@ -27,6 +25,7 @@ class Operator:
self.start_time = time.time()
self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
self.failure_count = 0

def on_event(
self,
@@ -38,20 +37,22 @@ class Operator:
ret, frame = self.video_capture.read()
if ret:
frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))
self.failure_count = 0
## Push an error image in case the camera is not available.
else:
frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
cv2.putText(
frame,
"No Webcam was found at index %d" % (CAMERA_INDEX),
(int(30), int(30)),
font,
0.75,
(255, 255, 255),
2,
1,
)
if self.failure_count > 10:
frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
cv2.putText(
frame,
"No Webcam was found at index %d" % (CAMERA_INDEX),
(int(30), int(30)),
font,
0.75,
(255, 255, 255),
2,
1,
)
self.failure_count += 1

send_output(
"image",
@@ -63,7 +64,7 @@ class Operator:
else:
print("received unexpected event:", event_type)

if time.time() - self.start_time < 20:
if time.time() - self.start_time < 200 or CI != "true":
return DoraStatus.CONTINUE
else:
return DoraStatus.STOP


+ 25
- 0
examples/python-operator-dataflow/whisper_op.py View File

@@ -0,0 +1,25 @@
import pyarrow as pa
import whisper

from dora import DoraStatus


model = whisper.load_model("base")


class Operator:
"""
Transforming Speech to Text using OpenAI Whisper model
"""

def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
if dora_event["type"] == "INPUT":
audio = dora_event["value"].to_numpy()
audio = whisper.pad_or_trim(audio)
result = model.transcribe(audio, language="en")
send_output("text", pa.array([result["text"]]), dora_event["metadata"])
return DoraStatus.CONTINUE

Loading…
Cancel
Save