Browse Source

removed threading from policy inference, added optional saving for avif frames

pull/1041/head
ShashwatPatil 6 months ago
parent
commit
6780f55f38
6 changed files with 250 additions and 101 deletions
  1. +5
    -1
      examples/lerobot-dataset/dataset_record.yml
  2. +116
    -0
      examples/lerobot-dataset/dataset_record_avif.yml
  3. +5
    -3
      examples/lerobot-dataset/policy_inference.yml
  4. +18
    -17
      node-hub/dora-dataset-record/README.md
  5. +55
    -5
      node-hub/dora-dataset-record/dora_dataset_record/main.py
  6. +51
    -75
      node-hub/dora-policy-inference/dora_policy_inference/main.py

+ 5
- 1
examples/lerobot-dataset/dataset_record.yml View File

@@ -85,4 +85,8 @@ nodes:
inputs:
image_laptop: laptop_cam/image
image_front: front_cam/image
text: dataset_recorder/text
text: dataset_recorder/text
jointstate_so101_new_calib: so101/pose
env:
so101_new_calib_urdf: "so_arm101_description"
so101_new_calib_transform: "0. 0. 0. 1. 0. 0. 0."

+ 116
- 0
examples/lerobot-dataset/dataset_record_avif.yml View File

@@ -0,0 +1,116 @@
nodes:
- id: laptop_cam
build: pip install opencv-video-capture
path: opencv-video-capture
inputs:
tick: dora/timer/millis/33
outputs:
- image
env:
CAPTURE_PATH: "0"
ENCODING: "rgb8"
IMAGE_WIDTH: "1920"
IMAGE_HEIGHT: "1080"

- id: front_cam
build: pip install opencv-video-capture
path: opencv-video-capture
inputs:
tick: dora/timer/millis/33
outputs:
- image
env:
CAPTURE_PATH: "1"
ENCODING: "rgb8"
IMAGE_WIDTH: "640"
IMAGE_HEIGHT: "480"

- id: so101
build: pip install -e ../../node-hub/dora-rustypot
path: dora-rustypot
inputs:
tick: dora/timer/millis/10
pose: leader_interface/pose
outputs:
- pose
env:
PORT: "/dev/ttyACM1"
IDS: "1,2,3,4,5,6"

- id: leader_interface
path: dora-rustypot
build: pip install -e ../../node-hub/dora-rustypot
inputs:
tick: dora/timer/millis/10
outputs:
- pose
env:
PORT: "/dev/ttyACM0"
IDS: "1,2,3,4,5,6"

- id: dataset_recorder
build: pip install -e ../../node-hub/dora-dataset-record
path: dora-dataset-record
inputs:
laptop: laptop_cam/image
front: front_cam/image
robot_state: so101/pose
robot_action: leader_interface/pose
rav1e_laptop: rav1e-laptop/image
rav1e_front: rav1e-front/image
outputs:
- text
env:
REPO_ID: "HF_username/Name_your_dataset"
SINGLE_TASK: "Pick up the cube and place it in the box"
ROBOT_TYPE: "so101_follower" # e.g., "koch", "franka", "ur5e" etc

FPS: "30"
TOTAL_EPISODES: "3"
EPISODE_DURATION_S: "100"
RESET_DURATION_S: "5"

CAMERA_NAMES: "laptop, front"
CAMERA_LAPTOP_RESOLUTION: "1080,1920,3"
CAMERA_FRONT_RESOLUTION: "480,640,3"
ROBOT_JOINTS: "shoulder_pan.pos,shoulder_lift.pos,elbow_flex.pos,wrist_flex.pos,wrist_roll.pos,gripper.pos"

# ROOT_PATH: "path where you want to save the dataset" # if not set, will save to ~.cache/huggingface/lerobot
USE_VIDEOS: "true"
SAVE_AVIF_FRAMES: "true"
PUSH_TO_HUB: "false"
PRIVATE: "false"
TAGS: "robotics, manipulation"

- id: plot
build: pip install dora-rerun
path: dora-rerun
inputs:
image_laptop: laptop_cam/image
image_front: front_cam/image
text: dataset_recorder/text
jointstate_so101_new_calib: so101/pose
env:
so101_new_calib_urdf: "so_arm101_description"
so101_new_calib_transform: "0. 0. 0. 1. 0. 0. 0."

# This nodes are for avif encoding using dora-rav1e
- id: rav1e-laptop
path: dora-rav1e
build: cargo build -p dora-rav1e --release
inputs:
image: laptop_cam/image
outputs:
- image
env:
ENCODING: avif

- id: rav1e-front
path: dora-rav1e
build: cargo build -p dora-rav1e --release
inputs:
image: front_cam/image
outputs:
- image
env:
ENCODING: avif

+ 5
- 3
examples/lerobot-dataset/policy_inference.yml View File

@@ -52,9 +52,7 @@ nodes:
MODEL_PATH: "./outputs/train/act_so101_test/checkpoints/last/pretrained_model/" # Path to your trained model
TASK_DESCRIPTION: "Your task"
ROBOT_TYPE: "so101_follower"
CAMERA_NAMES: "laptop, front"
CAMERA_FRONT_RESOLUTION: "480,640,3"
CAMERA_LAPTOP_RESOLUTION: "480,640,3"
CAMERA_NAMES: "front, laptop"
INFERENCE_FPS: "30"

# Visualization in rerun (optional)
@@ -65,3 +63,7 @@ nodes:
image_laptop: laptop_cam/image
image_front: front_cam/image
status: policy_inference/status
jointstate_so101_new_calib: so101/pose
env:
so101_new_calib_urdf: "so_arm101_description"
so101_new_calib_transform: "0. 0. 0. 1. 0. 0. 0."

+ 18
- 17
node-hub/dora-dataset-record/README.md View File

@@ -56,6 +56,7 @@ nodes:
# Optional settings
USE_VIDEOS: "true"
SAVE_AVIF_FRAMES: "true" # This will additionally save frames
PUSH_TO_HUB: "false"
PRIVATE: "false"
TAGS: "robotics,manipulation,imitation_learning"
@@ -81,26 +82,26 @@ The node will send instructions on dora-rerun, about episode starting, reset tim

### Required Environment Variables

| Variable | Description | Example |
|----------|-------------|---------|
| `REPO_ID` | HuggingFace dataset repo | `"username/dataset_name"` |
| `SINGLE_TASK` | Task description | `"Pick and place objects"` |
| `CAMERA_NAMES` | Comma-separated camera names | `"laptop,front,top"` |
| `CAMERA_*_RESOLUTION` | Resolution for each camera | `"480,640,3"` |
| `ROBOT_JOINTS` | Comma-separated joint names | `"joint1,joint2,gripper"` |
| Variable | Description | Example |
| --------------------- | ---------------------------- | -------------------------- |
| `REPO_ID` | HuggingFace dataset repo | `"username/dataset_name"` |
| `SINGLE_TASK` | Task description | `"Pick and place objects"` |
| `CAMERA_NAMES` | Comma-separated camera names | `"laptop,front,top"` |
| `CAMERA_*_RESOLUTION` | Resolution for each camera | `"480,640,3"` |
| `ROBOT_JOINTS` | Comma-separated joint names | `"joint1,joint2,gripper"` |

### Optional Settings

| Variable | Default | Description |
|----------|---------|-------------|
| `FPS` | `30` | Recording frame rate (match camera fps) |
| `TOTAL_EPISODES` | `10` | Number of episodes to record |
| `EPISODE_DURATION_S` | `60` | Episode length in seconds |
| `RESET_DURATION_S` | `15` | Break between episodes to reset the environment |
| `USE_VIDEOS` | `true` | Encode as MP4 videos, else saves images |
| `PUSH_TO_HUB` | `false` | Upload to HuggingFace Hub |
| `PRIVATE` | `false` | Make dataset private |
| `ROOT_PATH` | `~/.cache/huggingface/lerobot/your_repo_id` | Local storage path where you want to save the dataset |
| Variable | Default | Description |
| -------------------- | ------------------------------------------- | ----------------------------------------------------- |
| `FPS` | `30` | Recording frame rate (match camera fps) |
| `TOTAL_EPISODES` | `10` | Number of episodes to record |
| `EPISODE_DURATION_S` | `60` | Episode length in seconds |
| `RESET_DURATION_S` | `15` | Break between episodes to reset the environment |
| `USE_VIDEOS` | `true` | Encode as MP4 videos, else saves images |
| `PUSH_TO_HUB` | `false` | Upload to HuggingFace Hub |
| `PRIVATE` | `false` | Make dataset private |
| `ROOT_PATH` | `~/.cache/huggingface/lerobot/your_repo_id` | Local storage path where you want to save the dataset |

## License


+ 55
- 5
node-hub/dora-dataset-record/dora_dataset_record/main.py View File

@@ -4,6 +4,7 @@ import os
import queue
import threading
import time
from pathlib import Path
from typing import Any

import cv2
@@ -50,6 +51,10 @@ class DoraLeRobotRecorder:
self.last_frame_time = None
self.shutdown = False

# AVIF frame saving
self.save_avif_frames = os.getenv("SAVE_AVIF_FRAMES", "false").lower() == "true"
self.avif_data_buffer = {}

self._setup_dataset()
self._start_frame_timer()

@@ -143,6 +148,40 @@ class DoraLeRobotRecorder:
* len(self.cameras),
)

# Setup AVIF frame save directories
if self.save_avif_frames:
self._setup_avif_directories()

def _setup_avif_directories(self):
"""Set up directories for AVIF frame saving."""
self.dataset_root = Path(self.dataset.root)
self.avif_base_path = self.dataset_root / "avif_frames" / "chunk-000"

for camera_name in self.cameras:
camera_path = self.avif_base_path / f"observation.images.{camera_name}"
camera_path.mkdir(parents=True, exist_ok=True)

def _get_avif_save_path(self, camera_name: str, frame_index: int) -> Path:
"""Get the save path for AVIF frame."""
episode_dir = (
self.avif_base_path
/ f"observation.images.{camera_name}"
/ f"episode_{self.episode_index:06d}"
)
episode_dir.mkdir(parents=True, exist_ok=True)
return episode_dir / f"frame_{frame_index:06d}.avif"

def _save_avif_frames(self):
"""Save AVIF encoded frames when adding frame to dataset."""
for camera_name in self.cameras:
avif_input_id = f"rav1e_{camera_name}"
if avif_input_id in self.avif_data_buffer:
avif_data = self.avif_data_buffer[avif_input_id]["data"]
save_path = self._get_avif_save_path(camera_name, self.frame_count)

with open(save_path, "wb") as f:
f.write(avif_data.to_numpy())

def _check_episode_timing(self):
"""Check if we need to start/end Episodes."""
current_time = time.time()
@@ -237,11 +276,19 @@ class DoraLeRobotRecorder:
# Only store data if not in reset phase
if not self.in_reset_phase:
with self.buffer_lock:
self.data_buffer[input_id] = {
"data": data,
"timestamp": time.time(),
"metadata": metadata,
}
# Store AVIF data separately for saving
if self.save_avif_frames and input_id.startswith("rav1e_"):
self.avif_data_buffer[input_id] = {
"data": data,
"timestamp": time.time(),
"metadata": metadata,
}
else:
self.data_buffer[input_id] = {
"data": data,
"timestamp": time.time(),
"metadata": metadata,
}

should_stop = self._check_episode_timing()
if should_stop:
@@ -316,6 +363,9 @@ class DoraLeRobotRecorder:
print(f"Missing required data in frame: {missing_keys}")
return

if self.save_avif_frames:
self._save_avif_frames()

self.dataset.add_frame(
frame=frame_data,
task=os.getenv("SINGLE_TASK", "Your task"),


+ 51
- 75
node-hub/dora-policy-inference/dora_policy_inference/main.py View File

@@ -2,7 +2,6 @@

import os
import queue
import threading
import time
from pathlib import Path
from typing import Any
@@ -25,33 +24,23 @@ class DoraPolicyInference:
self.message_queue = queue.Queue()

self.data_buffer = {}
self.buffer_lock = threading.Lock()
self.cameras = self._parse_cameras()
self.task_description = os.getenv("TASK_DESCRIPTION", "")

self.policy = None
self.inference_fps = int(os.getenv("INFERENCE_FPS", "30"))
self.last_inference_time = None
self.inference_interval = 1.0 / self.inference_fps
self.min_inference_interval = 1.0 / self.inference_fps
self.last_inference_time = 0

self.shutdown = False
self._load_policy()
self._start_timer()

def _parse_cameras(self) -> dict:
"""Parse camera configuration from environment variables."""
camera_names_str = os.getenv("CAMERA_NAMES", "laptop,front")
camera_names = [name.strip() for name in camera_names_str.split(",")]

cameras = {}
for camera_name in camera_names:
resolution = os.getenv(
f"CAMERA_{camera_name.upper()}_RESOLUTION", "480,640,3"
)
dims = [int(d.strip()) for d in resolution.split(",")]
cameras[camera_name] = dims

return cameras
return camera_names

def _load_policy(self):
"""Load the trained LeRobot policy."""
@@ -75,64 +64,52 @@ class DoraPolicyInference:
self._output(f"Policy loaded successfully on device: {self.device}")
self._output(f"Policy type: {config.type}")

def _start_timer(self):
"""Start the inference timing thread."""
self.stop_timer = False
self.inference_thread = threading.Thread(
target=self._inference_loop, daemon=True
)
self.inference_thread.start()
def _check_inference_trigger(self, input_id: str) -> bool:
"""Determine if inference should be triggered based on the input received."""
current_time = time.time()
# Assign lead camera as trigger for inference
if input_id != self.cameras[0]:
return False

def _inference_loop(self):
"""Inference loop."""
while not self.stop_timer and not self.shutdown:
current_time = time.time()
if (
self.last_inference_time is None
or current_time - self.last_inference_time >= self.inference_interval
):
self._run_inference()
self.last_inference_time = current_time
# don't run inference too frequently, in case camera fps is too high
if current_time - self.last_inference_time < self.min_inference_interval:
return False

time.sleep(0.001) # Small sleep to prevent busy waiting
return True

def _run_inference(self):
"""Run policy inference on current observations."""
with self.buffer_lock:
required_inputs = set(self.cameras.keys()) | {"robot_state"}
available_inputs = set(self.data_buffer.keys())

if not required_inputs.issubset(available_inputs):
return

observation = {}
for camera_name in self.cameras:
if camera_name in self.data_buffer:
image = self._convert_camera_data(
self.data_buffer[camera_name]["data"],
self.data_buffer[camera_name]["metadata"],
)
observation[f"observation.images.{camera_name}"] = image

state = self._convert_robot_data(self.data_buffer["robot_state"]["data"])
observation["observation.state"] = state

action = (
predict_action(
observation=observation,
policy=self.policy,
device=self.device,
use_amp=self.policy.config.use_amp,
task=self.task_description,
current_time = time.time()

observation = {}
for camera_name in self.cameras:
if camera_name in self.data_buffer:
image = self._convert_camera_data(
self.data_buffer[camera_name]["data"],
self.data_buffer[camera_name]["metadata"],
)
.cpu()
.numpy()
observation[f"observation.images.{camera_name}"] = image

state = self._convert_robot_data(self.data_buffer["robot_state"]["data"])
observation["observation.state"] = state

action = (
predict_action(
observation=observation,
policy=self.policy,
device=self.device,
use_amp=self.policy.config.use_amp,
task=self.task_description,
)
.cpu()
.numpy()
)

# Convert from degrees to radians
action = np.deg2rad(action)
# Convert from degrees to radians
action = np.deg2rad(action)
self._send_action(action)

self._send_action(action)
self.last_inference_time = current_time

def _convert_camera_data(self, dora_data, metadata) -> np.ndarray:
"""Convert camera data from Dora format to numpy."""
@@ -160,13 +137,17 @@ class DoraPolicyInference:
self.message_queue.put(("action", action_message))

def handle_input(self, input_id: str, data: Any, metadata: Any):
"""Handle incoming data."""
with self.buffer_lock:
self.data_buffer[input_id] = {
"data": data,
"timestamp": time.time(),
"metadata": metadata,
}
"""Handle incoming data and trigger inference if conditions are met."""
# Update data buffer
self.data_buffer[input_id] = {
"data": data,
"timestamp": time.time(),
"metadata": metadata,
}

# Check if inference should be triggered
if self._check_inference_trigger(input_id):
self._run_inference()

def get_pending_messages(self):
"""Get all pending messages from the queue."""
@@ -185,11 +166,6 @@ class DoraPolicyInference:
"""Shutdown the inference node."""
self._output("Shutting down policy inference...")
self.shutdown = True
self.stop_timer = True

if self.inference_thread.is_alive():
self.inference_thread.join(timeout=5.0)

self._output("Policy inference shutdown complete")




Loading…
Cancel
Save