| @@ -5,7 +5,7 @@ readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.m | |||
| # Read the content of the README file | |||
| try: | |||
| with open(readme_path, "r", encoding="utf-8") as f: | |||
| with open(readme_path, encoding="utf-8") as f: | |||
| __doc__ = f.read() | |||
| except FileNotFoundError: | |||
| __doc__ = "README file not found." | |||
| @@ -1,78 +1,78 @@ | |||
| from dora import Node | |||
| """MuJoCo Husky simulation node for Dora. | |||
| This module provides a Dora node that simulates a Husky robot using the MuJoCo physics engine. | |||
| It handles velocity commands and publishes position and velocity feedback. | |||
| """ | |||
| import os | |||
| import mujoco | |||
| import mujoco.viewer | |||
| import pyarrow as pa | |||
| import os | |||
| from dora import Node | |||
| from .mesh_downloader import ensure_meshes | |||
| def clamp(value, min_val, max_val): | |||
| """Clamp value between min_val and max_val""" | |||
| """Clamp value between min_val and max_val.""" | |||
| return max(min(value, max_val), min_val) | |||
| def main(): | |||
| try: | |||
| # Initialize node | |||
| node = Node("mujoco_husky") | |||
| print("Initializing Mujoco Husky simulation...") | |||
| ensure_meshes() | |||
| # Load Husky model | |||
| model_path = os.path.join(os.path.dirname(__file__), "husky/husky.xml") | |||
| model = mujoco.MjModel.from_xml_path(model_path) | |||
| data = mujoco.MjData(model) | |||
| # Define velocity limits | |||
| MAX_LINEAR_VELOCITY = 2.0 | |||
| MAX_ANGULAR_VELOCITY = 3.0 | |||
| max_linear_velocity = 2.0 | |||
| max_angular_velocity = 3.0 | |||
| # Initialize viewer | |||
| with mujoco.viewer.launch_passive(model, data) as viewer: | |||
| print("Simulation initialized successfully") | |||
| for event in node: | |||
| if event["type"] == "INPUT" and event["id"] == "cmd_vel": | |||
| try: | |||
| cmd_vel = event["value"].to_pylist() | |||
| linear_x = clamp(float(cmd_vel[0]), -MAX_LINEAR_VELOCITY, MAX_LINEAR_VELOCITY) | |||
| angular_z = clamp(float(cmd_vel[5]), -MAX_ANGULAR_VELOCITY, MAX_ANGULAR_VELOCITY) | |||
| print("husky cmd_vel: ", linear_x, angular_z) # Debug | |||
| cmd_vel = event["value"].to_pylist() | |||
| linear_x = clamp(float(cmd_vel[0]), -max_linear_velocity, max_linear_velocity) | |||
| angular_z = clamp(float(cmd_vel[5]), -max_angular_velocity, max_angular_velocity) | |||
| wheel_radius = 0.17775 # Actual Husky wheel radius | |||
| wheel_separation = 0.59 # Actual Husky track width | |||
| left_velocity = ((linear_x - angular_z * wheel_separation / 2.0) / wheel_radius) / 3.0 # added these to limit the huskys speed | |||
| right_velocity = ((linear_x + angular_z * wheel_separation / 2.0) / wheel_radius) / 3.0 | |||
| # Apply wheel velocities | |||
| data.ctrl[0] = left_velocity | |||
| data.ctrl[1] = right_velocity | |||
| data.ctrl[2] = left_velocity | |||
| data.ctrl[3] = right_velocity | |||
| mujoco.mj_step(model, data) | |||
| viewer.sync() | |||
| # Send position and velocity data or any other form of feedback from mujoco | |||
| position = data.qpos[:3].tolist() | |||
| velocity = data.qvel[:3].tolist() | |||
| node.send_output("position", | |||
| data=pa.array(position, type=pa.float64()), | |||
| metadata={"type": "position"} | |||
| ) | |||
| node.send_output("velocity", | |||
| data=pa.array(velocity, type=pa.float64()), | |||
| metadata={"type": "velocity"} | |||
| ) | |||
| except Exception as e: | |||
| print(f"Error processing cmd_vel: {e}") | |||
| raise e | |||
| wheel_radius = 0.17775 # Actual Husky wheel radius | |||
| wheel_separation = 0.59 # Actual Husky track width | |||
| left_velocity = ((linear_x - angular_z * wheel_separation / 2.0) / wheel_radius) | |||
| right_velocity = ((linear_x + angular_z * wheel_separation / 2.0) / wheel_radius) | |||
| # Apply wheel velocities | |||
| data.ctrl[0] = left_velocity | |||
| data.ctrl[1] = right_velocity | |||
| data.ctrl[2] = left_velocity | |||
| data.ctrl[3] = right_velocity | |||
| mujoco.mj_step(model, data) | |||
| viewer.sync() | |||
| # Send position and velocity data or any other form of feedback from mujoco | |||
| position = data.qpos[:3].tolist() | |||
| velocity = data.qvel[:3].tolist() | |||
| node.send_output("position", | |||
| data=pa.array(position, type=pa.float64()), | |||
| metadata={"type": "position"} | |||
| ) | |||
| node.send_output("velocity", | |||
| data=pa.array(velocity, type=pa.float64()), | |||
| metadata={"type": "velocity"} | |||
| ) | |||
| except KeyboardInterrupt: | |||
| print("\nExiting simulation...") | |||
| @@ -1,4 +1,7 @@ | |||
| """Mesh file downloader for MuJoCo Husky simulation.""" | |||
| from pathlib import Path | |||
| from huggingface_hub import hf_hub_download | |||
| MESH_FILES = [ | |||
| @@ -9,24 +12,27 @@ MESH_FILES = [ | |||
| "bumper.stl" | |||
| ] | |||
| # Replace with your Hugging Face username | |||
| REPO_ID = "SGPatil/mujoco-husky-meshes" | |||
| REPO_TYPE = "dataset" | |||
| def get_cache_dir(): | |||
| """Get or create cache directory for mesh files""" | |||
| cache_dir = Path.home() / ".cache" / "dora-mujoco-husky" / "meshes" | |||
| cache_dir.mkdir(parents=True, exist_ok=True) | |||
| return cache_dir | |||
| def ensure_meshes(): | |||
| """Download mesh files from Hugging Face Hub if necessary""" | |||
| """Download and install required mesh files for the Husky robot simulation if they are not already present in the local cache. | |||
| Raises: | |||
| Exception: If there is an error downloading any of the mesh files. | |||
| """ | |||
| mesh_dir = Path(__file__).parent / "husky" / "meshes" | |||
| mesh_dir.mkdir(parents=True, exist_ok=True) | |||
| print("Checking mesh files...") | |||
| for filename in MESH_FILES: | |||
| try: | |||
| try: | |||
| for filename in MESH_FILES: | |||
| # Download file from Hugging Face Hub | |||
| downloaded_path = hf_hub_download( | |||
| repo_id=REPO_ID, | |||
| @@ -35,15 +41,14 @@ def ensure_meshes(): | |||
| cache_dir=get_cache_dir() | |||
| ) | |||
| # Copy to mesh directory if needed | |||
| mesh_file = mesh_dir / filename | |||
| if not mesh_file.exists(): | |||
| print(f"Installing {filename} to {mesh_file}") | |||
| mesh_file.write_bytes(Path(downloaded_path).read_bytes()) | |||
| except Exception as e: | |||
| print(f"Error downloading {filename}: {e}") | |||
| raise | |||
| except Exception as e: | |||
| print(f"Error downloading mesh files: {e}") | |||
| raise e | |||
| print("All mesh files are ready") | |||
| @@ -19,3 +19,13 @@ dev = ["pytest >=8.1.1", "ruff >=0.9.1"] | |||
| [project.scripts] | |||
| dora-mujoco-husky = "dora_mujoco_husky.main:main" | |||
| [tool.ruff.lint] | |||
| extend-select = [ | |||
| "UP", # Ruff's UP rule | |||
| "PERF", # Ruff's PERF rule | |||
| "RET", # Ruff's RET rule | |||
| "RSE", # Ruff's RSE rule | |||
| "NPY", # Ruff's NPY rule | |||
| "N", # Ruff's N rule | |||
| ] | |||
| @@ -5,7 +5,7 @@ readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.m | |||
| # Read the content of the README file | |||
| try: | |||
| with open(readme_path, "r", encoding="utf-8") as f: | |||
| with open(readme_path, encoding="utf-8") as f: | |||
| __doc__ = f.read() | |||
| except FileNotFoundError: | |||
| __doc__ = "README file not found." | |||
| @@ -1,55 +1,58 @@ | |||
| """Gamepad controller node for Dora. | |||
| This module provides a Dora node that reads input from a controller and publishes velocity commands for robot control. | |||
| It handles controller mapping, deadzone filtering, and velocity scaling. | |||
| """ | |||
| from dora import Node | |||
| import pyarrow as pa | |||
| import pygame | |||
| import pyarrow as pa | |||
| class LogitechF710: | |||
| """Logitech F710 controller mapping""" | |||
| fullName = 'Logitech Wireless Gamepad F710' | |||
| class Controller: | |||
| """controller mapping.""" | |||
| def __init__(self, joystickNumber=0): | |||
| self.joystickNumber = joystickNumber | |||
| def __init__(self): | |||
| """Change this according to your controller mapping. Currently Logitech F710.""" | |||
| self.axisNames = { | |||
| 0: 'LEFT-X', # Left stick X axis | |||
| 1: 'LEFT-Y', # Left stick Y axis | |||
| 2: 'LT', # Left trigger | |||
| 3: 'RIGHT-X', # Right stick X axis | |||
| 4: 'RIGHT-Y', # Right stick Y axis | |||
| 5: 'RT', # Right trigger | |||
| 6: 'DPAD-X', # D-pad X axis | |||
| 7: 'DPAD-Y' # D-pad Y axis | |||
| 'LEFT-X': 0, | |||
| 'LEFT-Y': 1, | |||
| 'LT': 2, | |||
| 'RIGHT-X': 3, | |||
| 'RIGHT-Y': 4, | |||
| 'RT': 5, | |||
| 'DPAD-X': 6, | |||
| 'DPAD-Y': 7 | |||
| } | |||
| self.buttonNames = { | |||
| 0: 'A', | |||
| 1: 'B', | |||
| 2: 'X', | |||
| 3: 'Y', | |||
| 4: 'LB', | |||
| 5: 'RB', | |||
| 6: 'BACK', | |||
| 7: 'START', | |||
| 8: 'LOGITECH', | |||
| 9: 'LEFT-STICK', | |||
| 10: 'RIGHT-STICK' | |||
| 'A': 0, | |||
| 'B': 1, | |||
| 'X': 2, | |||
| 'Y': 3, | |||
| 'LB': 4, | |||
| 'RB': 5, | |||
| 'BACK': 6, | |||
| 'START': 7, | |||
| 'LOGITECH': 8, | |||
| 'LEFT-STICK': 9, | |||
| 'RIGHT-STICK': 10 | |||
| } | |||
| def main(): | |||
| node = Node("gamepad") | |||
| # Initialize pygame for joystick handling | |||
| pygame.init() | |||
| pygame.joystick.init() | |||
| # Check for connected joysticks | |||
| if pygame.joystick.get_count() == 0: | |||
| print("No gamepad found! Please connect your Logitech F710 controller.") | |||
| print("No gamepad found! Please connect your controller.") | |||
| return | |||
| # Initialize the first joystick | |||
| joystick = pygame.joystick.Joystick(0) | |||
| joystick.init() | |||
| controller = Controller() | |||
| # Control parameters | |||
| max_linear_speed = 2.0 # Maximum speed in m/s | |||
| max_linear_speed = 1.0 # Maximum speed in m/s | |||
| max_angular_speed = 1.5 # Maximum angular speed in rad/s | |||
| print("Gamepad Controls:") | |||
| @@ -60,59 +63,46 @@ def main(): | |||
| try: | |||
| for event in node: | |||
| # Process pygame events | |||
| pygame.event.pump() | |||
| # print("#############################################") | |||
| try: | |||
| # Get axis values (-1 to 1) | |||
| forward = -joystick.get_axis(1) # Left stick Y (inverted) | |||
| turn = -joystick.get_axis(0) # Left stick X | |||
| # Apply deadzone | |||
| deadzone = 0.05 | |||
| forward = 0.0 if abs(forward) < deadzone else forward | |||
| turn = 0.0 if abs(turn) < deadzone else turn | |||
| # Calculate velocities | |||
| forward_speed = forward * max_linear_speed | |||
| turn_speed = turn * max_angular_speed | |||
| # Create cmd_vel array | |||
| cmd_vel = [ | |||
| forward_speed, # Linear X | |||
| 0.0, # Linear Y | |||
| 0.0, # Linear Z | |||
| 0.0, # Angular X | |||
| 0.0, # Angular Y | |||
| turn_speed # Angular Z | |||
| ] | |||
| # Send command | |||
| node.send_output( | |||
| output_id="cmd_vel", | |||
| data=pa.array(cmd_vel, type=pa.float64()), | |||
| metadata={"type": "cmd_vel"} | |||
| ) | |||
| # print("cmd vel : ", cmd_vel) | |||
| except Exception as e: | |||
| print(f"Controller error: {e}") | |||
| forward = -joystick.get_axis(controller.axisNames['LEFT-Y']) | |||
| turn = -joystick.get_axis(controller.axisNames['LEFT-X']) | |||
| deadzone = 0.05 | |||
| forward = 0.0 if abs(forward) < deadzone else forward | |||
| turn = 0.0 if abs(turn) < deadzone else turn | |||
| forward_speed = forward * max_linear_speed | |||
| turn_speed = turn * max_angular_speed | |||
| cmd_vel = [ | |||
| forward_speed, | |||
| 0.0, | |||
| 0.0, | |||
| 0.0, | |||
| 0.0, | |||
| turn_speed | |||
| ] | |||
| node.send_output( | |||
| output_id="cmd_vel", | |||
| data=pa.array(cmd_vel, type=pa.float64()), | |||
| metadata={"type": "cmd_vel"} | |||
| ) | |||
| except KeyboardInterrupt: | |||
| print("\nExiting...") | |||
| finally: | |||
| # Cleanup | |||
| pygame.quit() | |||
| # Send zero velocity | |||
| # Send zero velocity at cleanup | |||
| zero_cmd = [0.0] * 6 | |||
| try: | |||
| node.send_output( | |||
| output_id="cmd_vel", | |||
| data=pa.array(zero_cmd, type=pa.float64()), | |||
| metadata={"type": "cmd_vel"} | |||
| ) | |||
| except Exception as e: | |||
| print(f"Failed to send zero velocity: {e}") | |||
| node.send_output( | |||
| output_id="cmd_vel", | |||
| data=pa.array(zero_cmd, type=pa.float64()), | |||
| metadata={"type": "cmd_vel"} | |||
| ) | |||
| if __name__ == "__main__": | |||
| main() | |||
| @@ -18,3 +18,13 @@ dev = ["pytest >=8.1.1", "ruff >=0.9.1"] | |||
| [project.scripts] | |||
| gamepad = "gamepad.main:main" | |||
| [tool.ruff.lint] | |||
| extend-select = [ | |||
| "UP", # Ruff's UP rule | |||
| "PERF", # Ruff's PERF rule | |||
| "RET", # Ruff's RET rule | |||
| "RSE", # Ruff's RSE rule | |||
| "NPY", # Ruff's NPY rule | |||
| "N", # Ruff's N rule | |||
| ] | |||