diff --git a/examples/lerobot-dataset/dataset_record.yml b/examples/lerobot-dataset/dataset_record.yml index c734c2fc..0a19ebdb 100644 --- a/examples/lerobot-dataset/dataset_record.yml +++ b/examples/lerobot-dataset/dataset_record.yml @@ -85,4 +85,8 @@ nodes: inputs: image_laptop: laptop_cam/image image_front: front_cam/image - text: dataset_recorder/text \ No newline at end of file + text: dataset_recorder/text + jointstate_so101_new_calib: so101/pose + env: + so101_new_calib_urdf: "so_arm101_description" + so101_new_calib_transform: "0. 0. 0. 1. 0. 0. 0." \ No newline at end of file diff --git a/examples/lerobot-dataset/dataset_record_avif.yml b/examples/lerobot-dataset/dataset_record_avif.yml new file mode 100644 index 00000000..76937029 --- /dev/null +++ b/examples/lerobot-dataset/dataset_record_avif.yml @@ -0,0 +1,116 @@ +nodes: + - id: laptop_cam + build: pip install opencv-video-capture + path: opencv-video-capture + inputs: + tick: dora/timer/millis/33 + outputs: + - image + env: + CAPTURE_PATH: "0" + ENCODING: "rgb8" + IMAGE_WIDTH: "1920" + IMAGE_HEIGHT: "1080" + + - id: front_cam + build: pip install opencv-video-capture + path: opencv-video-capture + inputs: + tick: dora/timer/millis/33 + outputs: + - image + env: + CAPTURE_PATH: "1" + ENCODING: "rgb8" + IMAGE_WIDTH: "640" + IMAGE_HEIGHT: "480" + + - id: so101 + build: pip install -e ../../node-hub/dora-rustypot + path: dora-rustypot + inputs: + tick: dora/timer/millis/10 + pose: leader_interface/pose + outputs: + - pose + env: + PORT: "/dev/ttyACM1" + IDS: "1,2,3,4,5,6" + + - id: leader_interface + path: dora-rustypot + build: pip install -e ../../node-hub/dora-rustypot + inputs: + tick: dora/timer/millis/10 + outputs: + - pose + env: + PORT: "/dev/ttyACM0" + IDS: "1,2,3,4,5,6" + + - id: dataset_recorder + build: pip install -e ../../node-hub/dora-dataset-record + path: dora-dataset-record + inputs: + laptop: laptop_cam/image + front: front_cam/image + robot_state: so101/pose + robot_action: leader_interface/pose + rav1e_laptop: rav1e-laptop/image + rav1e_front: rav1e-front/image + outputs: + - text + env: + REPO_ID: "HF_username/Name_your_dataset" + SINGLE_TASK: "Pick up the cube and place it in the box" + ROBOT_TYPE: "so101_follower" # e.g., "koch", "franka", "ur5e" etc + + FPS: "30" + TOTAL_EPISODES: "3" + EPISODE_DURATION_S: "100" + RESET_DURATION_S: "5" + + CAMERA_NAMES: "laptop, front" + CAMERA_LAPTOP_RESOLUTION: "1080,1920,3" + CAMERA_FRONT_RESOLUTION: "480,640,3" + ROBOT_JOINTS: "shoulder_pan.pos,shoulder_lift.pos,elbow_flex.pos,wrist_flex.pos,wrist_roll.pos,gripper.pos" + + # ROOT_PATH: "path where you want to save the dataset" # if not set, will save to ~.cache/huggingface/lerobot + USE_VIDEOS: "true" + SAVE_AVIF_FRAMES: "true" + PUSH_TO_HUB: "false" + PRIVATE: "false" + TAGS: "robotics, manipulation" + + - id: plot + build: pip install dora-rerun + path: dora-rerun + inputs: + image_laptop: laptop_cam/image + image_front: front_cam/image + text: dataset_recorder/text + jointstate_so101_new_calib: so101/pose + env: + so101_new_calib_urdf: "so_arm101_description" + so101_new_calib_transform: "0. 0. 0. 1. 0. 0. 0." + +# This nodes are for avif encoding using dora-rav1e + - id: rav1e-laptop + path: dora-rav1e + build: cargo build -p dora-rav1e --release + inputs: + image: laptop_cam/image + outputs: + - image + env: + ENCODING: avif + + - id: rav1e-front + path: dora-rav1e + build: cargo build -p dora-rav1e --release + inputs: + image: front_cam/image + outputs: + - image + env: + ENCODING: avif \ No newline at end of file diff --git a/examples/lerobot-dataset/policy_inference.yml b/examples/lerobot-dataset/policy_inference.yml index a5d6396b..1d5d8f4e 100644 --- a/examples/lerobot-dataset/policy_inference.yml +++ b/examples/lerobot-dataset/policy_inference.yml @@ -52,9 +52,7 @@ nodes: MODEL_PATH: "./outputs/train/act_so101_test/checkpoints/last/pretrained_model/" # Path to your trained model TASK_DESCRIPTION: "Your task" ROBOT_TYPE: "so101_follower" - CAMERA_NAMES: "laptop, front" - CAMERA_FRONT_RESOLUTION: "480,640,3" - CAMERA_LAPTOP_RESOLUTION: "480,640,3" + CAMERA_NAMES: "front, laptop" INFERENCE_FPS: "30" # Visualization in rerun (optional) @@ -65,3 +63,7 @@ nodes: image_laptop: laptop_cam/image image_front: front_cam/image status: policy_inference/status + jointstate_so101_new_calib: so101/pose + env: + so101_new_calib_urdf: "so_arm101_description" + so101_new_calib_transform: "0. 0. 0. 1. 0. 0. 0." diff --git a/node-hub/dora-dataset-record/README.md b/node-hub/dora-dataset-record/README.md index 2655f2ec..2c6cfbcf 100644 --- a/node-hub/dora-dataset-record/README.md +++ b/node-hub/dora-dataset-record/README.md @@ -56,6 +56,7 @@ nodes: # Optional settings USE_VIDEOS: "true" + SAVE_AVIF_FRAMES: "true" # This will additionally save frames PUSH_TO_HUB: "false" PRIVATE: "false" TAGS: "robotics,manipulation,imitation_learning" @@ -81,26 +82,26 @@ The node will send instructions on dora-rerun, about episode starting, reset tim ### Required Environment Variables -| Variable | Description | Example | -|----------|-------------|---------| -| `REPO_ID` | HuggingFace dataset repo | `"username/dataset_name"` | -| `SINGLE_TASK` | Task description | `"Pick and place objects"` | -| `CAMERA_NAMES` | Comma-separated camera names | `"laptop,front,top"` | -| `CAMERA_*_RESOLUTION` | Resolution for each camera | `"480,640,3"` | -| `ROBOT_JOINTS` | Comma-separated joint names | `"joint1,joint2,gripper"` | +| Variable | Description | Example | +| --------------------- | ---------------------------- | -------------------------- | +| `REPO_ID` | HuggingFace dataset repo | `"username/dataset_name"` | +| `SINGLE_TASK` | Task description | `"Pick and place objects"` | +| `CAMERA_NAMES` | Comma-separated camera names | `"laptop,front,top"` | +| `CAMERA_*_RESOLUTION` | Resolution for each camera | `"480,640,3"` | +| `ROBOT_JOINTS` | Comma-separated joint names | `"joint1,joint2,gripper"` | ### Optional Settings -| Variable | Default | Description | -|----------|---------|-------------| -| `FPS` | `30` | Recording frame rate (match camera fps) | -| `TOTAL_EPISODES` | `10` | Number of episodes to record | -| `EPISODE_DURATION_S` | `60` | Episode length in seconds | -| `RESET_DURATION_S` | `15` | Break between episodes to reset the environment | -| `USE_VIDEOS` | `true` | Encode as MP4 videos, else saves images | -| `PUSH_TO_HUB` | `false` | Upload to HuggingFace Hub | -| `PRIVATE` | `false` | Make dataset private | -| `ROOT_PATH` | `~/.cache/huggingface/lerobot/your_repo_id` | Local storage path where you want to save the dataset | +| Variable | Default | Description | +| -------------------- | ------------------------------------------- | ----------------------------------------------------- | +| `FPS` | `30` | Recording frame rate (match camera fps) | +| `TOTAL_EPISODES` | `10` | Number of episodes to record | +| `EPISODE_DURATION_S` | `60` | Episode length in seconds | +| `RESET_DURATION_S` | `15` | Break between episodes to reset the environment | +| `USE_VIDEOS` | `true` | Encode as MP4 videos, else saves images | +| `PUSH_TO_HUB` | `false` | Upload to HuggingFace Hub | +| `PRIVATE` | `false` | Make dataset private | +| `ROOT_PATH` | `~/.cache/huggingface/lerobot/your_repo_id` | Local storage path where you want to save the dataset | ## License diff --git a/node-hub/dora-dataset-record/dora_dataset_record/main.py b/node-hub/dora-dataset-record/dora_dataset_record/main.py index 9f41ce03..844e24ac 100644 --- a/node-hub/dora-dataset-record/dora_dataset_record/main.py +++ b/node-hub/dora-dataset-record/dora_dataset_record/main.py @@ -4,6 +4,7 @@ import os import queue import threading import time +from pathlib import Path from typing import Any import cv2 @@ -50,6 +51,10 @@ class DoraLeRobotRecorder: self.last_frame_time = None self.shutdown = False + # AVIF frame saving + self.save_avif_frames = os.getenv("SAVE_AVIF_FRAMES", "false").lower() == "true" + self.avif_data_buffer = {} + self._setup_dataset() self._start_frame_timer() @@ -143,6 +148,40 @@ class DoraLeRobotRecorder: * len(self.cameras), ) + # Setup AVIF frame save directories + if self.save_avif_frames: + self._setup_avif_directories() + + def _setup_avif_directories(self): + """Set up directories for AVIF frame saving.""" + self.dataset_root = Path(self.dataset.root) + self.avif_base_path = self.dataset_root / "avif_frames" / "chunk-000" + + for camera_name in self.cameras: + camera_path = self.avif_base_path / f"observation.images.{camera_name}" + camera_path.mkdir(parents=True, exist_ok=True) + + def _get_avif_save_path(self, camera_name: str, frame_index: int) -> Path: + """Get the save path for AVIF frame.""" + episode_dir = ( + self.avif_base_path + / f"observation.images.{camera_name}" + / f"episode_{self.episode_index:06d}" + ) + episode_dir.mkdir(parents=True, exist_ok=True) + return episode_dir / f"frame_{frame_index:06d}.avif" + + def _save_avif_frames(self): + """Save AVIF encoded frames when adding frame to dataset.""" + for camera_name in self.cameras: + avif_input_id = f"rav1e_{camera_name}" + if avif_input_id in self.avif_data_buffer: + avif_data = self.avif_data_buffer[avif_input_id]["data"] + save_path = self._get_avif_save_path(camera_name, self.frame_count) + + with open(save_path, "wb") as f: + f.write(avif_data.to_numpy()) + def _check_episode_timing(self): """Check if we need to start/end Episodes.""" current_time = time.time() @@ -237,11 +276,19 @@ class DoraLeRobotRecorder: # Only store data if not in reset phase if not self.in_reset_phase: with self.buffer_lock: - self.data_buffer[input_id] = { - "data": data, - "timestamp": time.time(), - "metadata": metadata, - } + # Store AVIF data separately for saving + if self.save_avif_frames and input_id.startswith("rav1e_"): + self.avif_data_buffer[input_id] = { + "data": data, + "timestamp": time.time(), + "metadata": metadata, + } + else: + self.data_buffer[input_id] = { + "data": data, + "timestamp": time.time(), + "metadata": metadata, + } should_stop = self._check_episode_timing() if should_stop: @@ -316,6 +363,9 @@ class DoraLeRobotRecorder: print(f"Missing required data in frame: {missing_keys}") return + if self.save_avif_frames: + self._save_avif_frames() + self.dataset.add_frame( frame=frame_data, task=os.getenv("SINGLE_TASK", "Your task"), diff --git a/node-hub/dora-policy-inference/dora_policy_inference/main.py b/node-hub/dora-policy-inference/dora_policy_inference/main.py index 00445c0c..5956209b 100644 --- a/node-hub/dora-policy-inference/dora_policy_inference/main.py +++ b/node-hub/dora-policy-inference/dora_policy_inference/main.py @@ -2,7 +2,6 @@ import os import queue -import threading import time from pathlib import Path from typing import Any @@ -25,33 +24,23 @@ class DoraPolicyInference: self.message_queue = queue.Queue() self.data_buffer = {} - self.buffer_lock = threading.Lock() self.cameras = self._parse_cameras() self.task_description = os.getenv("TASK_DESCRIPTION", "") self.policy = None self.inference_fps = int(os.getenv("INFERENCE_FPS", "30")) - self.last_inference_time = None - self.inference_interval = 1.0 / self.inference_fps + self.min_inference_interval = 1.0 / self.inference_fps + self.last_inference_time = 0 self.shutdown = False self._load_policy() - self._start_timer() def _parse_cameras(self) -> dict: """Parse camera configuration from environment variables.""" camera_names_str = os.getenv("CAMERA_NAMES", "laptop,front") camera_names = [name.strip() for name in camera_names_str.split(",")] - cameras = {} - for camera_name in camera_names: - resolution = os.getenv( - f"CAMERA_{camera_name.upper()}_RESOLUTION", "480,640,3" - ) - dims = [int(d.strip()) for d in resolution.split(",")] - cameras[camera_name] = dims - - return cameras + return camera_names def _load_policy(self): """Load the trained LeRobot policy.""" @@ -75,64 +64,52 @@ class DoraPolicyInference: self._output(f"Policy loaded successfully on device: {self.device}") self._output(f"Policy type: {config.type}") - def _start_timer(self): - """Start the inference timing thread.""" - self.stop_timer = False - self.inference_thread = threading.Thread( - target=self._inference_loop, daemon=True - ) - self.inference_thread.start() + def _check_inference_trigger(self, input_id: str) -> bool: + """Determine if inference should be triggered based on the input received.""" + current_time = time.time() + # Assign lead camera as trigger for inference + if input_id != self.cameras[0]: + return False - def _inference_loop(self): - """Inference loop.""" - while not self.stop_timer and not self.shutdown: - current_time = time.time() - if ( - self.last_inference_time is None - or current_time - self.last_inference_time >= self.inference_interval - ): - self._run_inference() - self.last_inference_time = current_time + # don't run inference too frequently, in case camera fps is too high + if current_time - self.last_inference_time < self.min_inference_interval: + return False - time.sleep(0.001) # Small sleep to prevent busy waiting + return True def _run_inference(self): """Run policy inference on current observations.""" - with self.buffer_lock: - required_inputs = set(self.cameras.keys()) | {"robot_state"} - available_inputs = set(self.data_buffer.keys()) - - if not required_inputs.issubset(available_inputs): - return - - observation = {} - for camera_name in self.cameras: - if camera_name in self.data_buffer: - image = self._convert_camera_data( - self.data_buffer[camera_name]["data"], - self.data_buffer[camera_name]["metadata"], - ) - observation[f"observation.images.{camera_name}"] = image - - state = self._convert_robot_data(self.data_buffer["robot_state"]["data"]) - observation["observation.state"] = state - - action = ( - predict_action( - observation=observation, - policy=self.policy, - device=self.device, - use_amp=self.policy.config.use_amp, - task=self.task_description, + current_time = time.time() + + observation = {} + for camera_name in self.cameras: + if camera_name in self.data_buffer: + image = self._convert_camera_data( + self.data_buffer[camera_name]["data"], + self.data_buffer[camera_name]["metadata"], ) - .cpu() - .numpy() + observation[f"observation.images.{camera_name}"] = image + + state = self._convert_robot_data(self.data_buffer["robot_state"]["data"]) + observation["observation.state"] = state + + action = ( + predict_action( + observation=observation, + policy=self.policy, + device=self.device, + use_amp=self.policy.config.use_amp, + task=self.task_description, ) + .cpu() + .numpy() + ) - # Convert from degrees to radians - action = np.deg2rad(action) + # Convert from degrees to radians + action = np.deg2rad(action) + self._send_action(action) - self._send_action(action) + self.last_inference_time = current_time def _convert_camera_data(self, dora_data, metadata) -> np.ndarray: """Convert camera data from Dora format to numpy.""" @@ -160,13 +137,17 @@ class DoraPolicyInference: self.message_queue.put(("action", action_message)) def handle_input(self, input_id: str, data: Any, metadata: Any): - """Handle incoming data.""" - with self.buffer_lock: - self.data_buffer[input_id] = { - "data": data, - "timestamp": time.time(), - "metadata": metadata, - } + """Handle incoming data and trigger inference if conditions are met.""" + # Update data buffer + self.data_buffer[input_id] = { + "data": data, + "timestamp": time.time(), + "metadata": metadata, + } + + # Check if inference should be triggered + if self._check_inference_trigger(input_id): + self._run_inference() def get_pending_messages(self): """Get all pending messages from the queue.""" @@ -185,11 +166,6 @@ class DoraPolicyInference: """Shutdown the inference node.""" self._output("Shutting down policy inference...") self.shutdown = True - self.stop_timer = True - - if self.inference_thread.is_alive(): - self.inference_thread.join(timeout=5.0) - self._output("Policy inference shutdown complete")