|
- """TODO: Add docstring."""
-
- import os
- import time
- import re
-
- import numpy as np
- import pyarrow as pa
- from dora import Node
-
-
- node = Node()
-
- IMAGE_RESIZE_RATIO = float(os.getenv("IMAGE_RESIZE_RATIO", "1.0"))
-
- queue = []
-
- last_prompt = ""
-
-
- def handle_event(text: str):
- global queue
- if "stop" in text:
- node.send_output("points", pa.array([], type=pa.float64()))
- elif "follow" in text:
- text = f"Given the prompt: {text}. Output the bounding boxes for the given followed object"
- node.send_output("look_ahead", pa.array([1.0]))
- time.sleep(0.5) # Sync image
- node.send_output("text", pa.array([text]), {"image_id": "image_left"})
- node.send_output("follow_pose", pa.array([1.0]))
- elif "raise your arms" in text:
- node.send_output("raise_arm_pose", pa.array([1.0]))
-
- elif (
- ("pick" in text and "place" in text)
- or ("make a hot dog" in text)
- or ("cook" in text)
- ):
- if "make a hot dog" in text:
- text = "pick the sausage, place it on the black grill, wait, flip the sausage, wait, pick the sausage, place it on the bread, speak it's ready!"
- elif "cook" in text:
- # Match 'grill' followed by two words
- match = re.search(r"\bcook\b\s+(\w+)\s+(\w+)", text)
-
- if match:
- word1 = match.group(1)
- word2 = match.group(2)
- grilled_item = word1 + " " + word2
- text = f"pick {grilled_item}, place it on the black grill, wait, flip {grilled_item}, wait, pick {grilled_item} again, place it on the white plate, speak {grilled_item} is ready!"
-
- if "," or "." in text:
- prompts = re.split(r"[,.]", text)
- queue = prompts
- first_prompt = queue[0]
- queue = queue[1:]
- handle_event(first_prompt)
-
- elif "pick " in text:
- text = text.replace("can you", "")
- text = text.replace("please", "")
- text = text.replace("reachy", "")
-
- node.send_output("speech", pa.array(["I'm going to " + text]))
-
- text = f"Given the prompt: {text}. Output the bounding boxes for the given picked object"
-
- node.send_output(
- "text",
- pa.array([text]),
- {"image_id": "image_depth", "action": "pick"},
- )
- elif "place " in text:
- text = text.replace("can you", "")
- text = text.replace("please", "")
- text = text.replace("reachy", "")
-
- node.send_output("speech", pa.array(["I'm going to " + text]))
-
- text = f"Given the prompt: {text}. Output the bounding boxes for the place to place the object"
- node.send_output(
- "text",
- pa.array([text]),
- {"image_id": "image_depth", "action": "release"},
- )
- elif " wait" in text:
- node.send_output("speech", pa.array(["I'm going to wait for 5 seconds."]))
-
- time.sleep(5)
- if len(queue) > 0:
- first_prompt = queue[0]
- queue = queue[1:]
- handle_event(first_prompt)
- elif " speak" in text:
- node.send_output("speech", pa.array([text.replace("speak ", "")]))
-
- if len(queue) > 0:
- first_prompt = queue[0]
- queue = queue[1:]
- handle_event(first_prompt)
- elif " flip" in text:
- text = text.replace("can you", "")
- text = text.replace("please", "")
- text = text.replace("reachy", "")
-
- node.send_output("speech", pa.array(["I'm going to " + text]))
-
- text = f"Given the prompt: {text}. Output the bounding boxes for the flipped object"
- node.send_output(
- "text",
- pa.array([text]),
- {"image_id": "image_depth", "action": "flip"},
- )
- # elif "flip " in text:
- # node.send_output("flip", pa.array([True]))
- elif "release left" in text:
- node.send_output("action_release_left", pa.array([1.0]))
- elif "release right" in text:
- node.send_output("action_release_right", pa.array([1.0]))
- elif "turn left" in text:
- action = pa.array([0.0, 0, 0, 0, 0, np.deg2rad(30)])
- node.send_output("action", action)
- node.send_output("points", pa.array([]))
- elif "turn right" in text:
- action = pa.array([0.0, 0, 0, 0, 0, -np.deg2rad(30)])
- node.send_output("action", action)
- node.send_output("points", pa.array([]))
- elif "turn around" in text:
- action = pa.array([0.0, 0, 0, 0, 0, -np.deg2rad(180)])
- node.send_output("action", action)
- node.send_output("points", pa.array([]))
- elif "move left" in text:
- action = pa.array([0.0, 0.2, 0, 0, 0, 0])
- node.send_output("action", action)
- node.send_output("points", pa.array([]))
- elif "move right" in text:
- action = pa.array([0.0, -0.2, 0, 0, 0, 0])
- node.send_output("action", action)
- node.send_output("points", pa.array([]))
- elif "move forward" in text:
- action = pa.array([0.2, 0, 0, 0, 0, 0])
- node.send_output("action", action)
- node.send_output("points", pa.array([]))
- elif "move backward" in text:
- action = pa.array([-0.2, 0, 0, 0, 0, 0])
- node.send_output("action", action)
- node.send_output("points", pa.array([]))
-
-
- for event in node:
- if event["type"] == "INPUT":
- if event["id"] == "text":
- event_text = event["value"][0].as_py().lower()
- handle_event(event_text)
- elif event["id"] == "success":
- if len(queue) > 0:
- time.sleep(0.3)
- first_prompt = queue[0]
- queue = queue[1:]
- handle_event(first_prompt)
|