@ShashwatPatil I have made some overall improvement. Could you check the commits and see if its ok for you to merge?tags/v0.3.12-rc0
| @@ -0,0 +1,21 @@ | |||
| # MuJoCo Sim Tutorial | |||
| This comprehensive tutorial demonstrates how to build a robot control system using Dora with the `dora-mujoco` simulation node and control logic. | |||
| ## Tutorial Structure | |||
| ### [01. Basic Simulation](basic_simulation/) | |||
| Load a robot in simulation using the `dora-mujoco` node. | |||
| - Learn the fundamentals of MuJoCo simulation in Dora | |||
| - Understand the simulation node architecture | |||
| - See how robot descriptions are loaded automatically | |||
| ### [02. Target Pose Control](target_pose_control/) | |||
| Add control logic with pose commands as target. | |||
| - Implement Cartesian space control. | |||
| - Create generic controller node that is able to control any robotic arm by using `dora-pytorch-kinematics` | |||
| ### [03. Gamepad Control](gamepad_control/) | |||
| Connect a gamepad for real-time interactive control. | |||
| - Integrate with dora's `gamepad` node | |||
| - Demonstrate real-time teleoperation | |||
| @@ -0,0 +1,40 @@ | |||
| # 01. Basic Simulation | |||
| This example demonstrates the simplest possible setup: loading and running a robot simulation using the `dora-mujoco` node. | |||
| - Understand how the `dora-mujoco` node works | |||
| - See how robot models are loaded from `robot-descriptions` | |||
| - Learn the basic dataflow for physics simulation | |||
| The simulation runs at 500Hz and outputs: | |||
| - Joint positions for all robot joints | |||
| - Joint velocities | |||
| - Sensor data (if available) | |||
| - Current simulation time | |||
| ### Running the Example | |||
| ```bash | |||
| cd basic_simulation | |||
| dora build basic.yml | |||
| dora run basic.yml | |||
| ``` | |||
| You should see: | |||
| 1. MuJoCo viewer window opens with GO2 robot | |||
| 2. Robot is effected by gravity (enabled by default) | |||
| ### What's Happening | |||
| 1. **Model Loading**: The `dora-mujoco` node loads the RoboDog (go2) model using `load_robot_description("go2_mj_description")` | |||
| 2. **Physics Loop**: Timer triggers simulation steps at 500Hz (This is default step time for Mujoco) | |||
| 3. **Data Output**: Joint states are published | |||
| 4. **Visualization**: MuJoCo viewer shows real-time simulation | |||
| ### Configuration Details | |||
| The `basic.yml` configures: | |||
| - Model name: `"go2"` you change this to other robots name | |||
| - Update rate: 2ms (500Hz) | |||
| - Outputs: Joint positions, velocities, and sensor data | |||
| @@ -0,0 +1,12 @@ | |||
| nodes: | |||
| - id: mujoco_sim | |||
| build: pip install -e ../../../node-hub/dora-mujoco | |||
| path: dora-mujoco | |||
| inputs: | |||
| tick: dora/timer/millis/2 # 500 Hz simulation | |||
| outputs: | |||
| - joint_positions | |||
| - joint_velocities | |||
| - sensor_data | |||
| env: | |||
| MODEL_NAME: "go2_mj_description" # Load GO2 | |||
| @@ -0,0 +1,134 @@ | |||
| # 03. Gamepad Control | |||
| This example demonstrates real-time interactive control by connecting a gamepad to the controller. It builds upon the target pose control example by adding gamepad input processing for teleoperation of the robot arm. | |||
| ## Controller Types | |||
| ### 1. Basic Gamepad Control (`gamepad_control_basic.yml`) | |||
| - **Direct IK approach**: Uses simple IK solver for immediate position updates | |||
| - The movement fells jumpy | |||
| ### 2. Advanced Gamepad Control (`gamepad_control_advanced.yml`) | |||
| - **Differential IK approach**: Smooth velocity-based control with Jacobian | |||
| - **Smooth**: Continuous motion interpolation | |||
| - **Use case**: Precise manipulation, smooth trajectories | |||
| ## Gamepad Controls | |||
| - **D-pad Vertical**: Move along X-axis (forward/backward) | |||
| - **D-pad Horizontal**: Move along Y-axis (left/right) | |||
| - **Right Stick Y**: Move along Z-axis (up/down) | |||
| - **LB/RB**: Decrease/Increase movement speed (0.1-1.0x scale) | |||
| - **START**: Reset to home position [0.4, 0.0, 0.3] | |||
| ## Running the Examples | |||
| 1. **Connect a gamepad** (Xbox/PlayStation controller via USB or Bluetooth) | |||
| ### Basic Gamepad Control | |||
| ```bash | |||
| cd gamepad_control | |||
| dora build gamepad_control_basic.yml | |||
| dora run gamepad_control_basic.yml | |||
| ``` | |||
| ### Advanced Gamepad Control | |||
| ```bash | |||
| cd gamepad_control | |||
| dora build gamepad_control_advanced.yml | |||
| dora run gamepad_control_advanced.yml | |||
| ``` | |||
| You should see: | |||
| 1. Robot responds to gamepad input in real-time | |||
| 2. **Basic**: Immediate position jumps based on gamepad input | |||
| 3. **Advanced**: Smooth incremental movement with velocity control | |||
| 4. Speed scaling with bumper buttons | |||
| 5. Reset functionality with START button | |||
| ### **Gamepad Node** (`gamepad`) | |||
| Built-in Dora node that interfaces with system gamepad drivers using Pygame. | |||
| ```yaml | |||
| - id: gamepad | |||
| build: pip install -e ../../../node-hub/gamepad | |||
| path: gamepad | |||
| outputs: | |||
| - cmd_vel # 6DOF velocity commands | |||
| - raw_control # Raw button/stick states | |||
| inputs: | |||
| tick: dora/timer/millis/10 # 100Hz polling | |||
| ``` | |||
| - **`cmd_vel`**: 6DOF velocity array `[linear_x, linear_y, linear_z, angular_x, angular_y, angular_z]` | |||
| - Generated from D-pad and analog stick positions | |||
| - Continuous updates while controls are held | |||
| - **`raw_control`**: JSON format gamepad state | |||
| ### **Gamepad Controller Scripts** | |||
| **Basic Controller (`gamepad_controller_ik.py`)**: | |||
| ``` | |||
| Gamepad Input → Target Position Update → IK Request → Joint Commands | |||
| ``` | |||
| - Updates target position incrementally based on gamepad | |||
| - Immediate position jumps | |||
| **Advanced Controller (`gamepad_controller_differential_ik.py`)**: | |||
| ``` | |||
| Gamepad Input → Target Position → Pose Error → Velocity → Joint Commands | |||
| ``` | |||
| - Continuous pose error calculation and velocity control | |||
| - Smooth interpolation between current and target poses | |||
| - Real-time Jacobian-based control | |||
| ## Gamepad Mapping | |||
| The controller expects standard gamepad layout: (The mapping may change based on controller) | |||
| | Control | Function | | |||
| |---------|----------| | |||
| | D-pad Up/Down | X-axis movement | | |||
| | D-pad Left/Right | Y-axis movement | | |||
| | Right Stick Y | Z-axis movement | | |||
| | Left Bumper | Decrease speed | | |||
| | Right Bumper | Increase speed | | |||
| | START button | Reset position | | |||
| ## Key Features | |||
| **Real-time Teleoperation:** | |||
| - Incremental position updates based on continuous input | |||
| - Immediate feedback through robot motion | |||
| **Speed Control:** | |||
| - Dynamic speed scaling (0.1x to 1.0x) | |||
| - Allows both coarse and fine manipulation | |||
| **Features:** | |||
| - Home position reset capability | |||
| - Bounded movement through incremental updates | |||
| - Working on collision avoidance | |||
| ## Troubleshooting | |||
| **"Gamepad not responding"** | |||
| ```bash | |||
| # Check if gamepad is connected | |||
| ls /dev/input/js* | |||
| # Test gamepad input | |||
| jstest /dev/input/js0 | |||
| # Grant permissions | |||
| sudo chmod 666 /dev/input/js0 | |||
| ``` | |||
| **"Robot doesn't move with D-pad"** | |||
| - Check `cmd_vel` output: should show non-zero values when D-pad pressed | |||
| - Verify correct gamepad mapping for your controller model | |||
| **"Movement too fast/slow"** | |||
| - Use LB/RB buttons to adjust speed scale | |||
| - Modify `delta = cmd_vel[:3] * 0.03` scaling factor in code if required | |||
| @@ -0,0 +1,51 @@ | |||
| nodes: | |||
| - id: gamepad | |||
| build: pip install -e ../../../node-hub/gamepad | |||
| path: gamepad | |||
| outputs: | |||
| - cmd_vel | |||
| - raw_control | |||
| inputs: | |||
| tick: dora/timer/millis/10 | |||
| - id: mujoco_sim | |||
| build: pip install -e ../../../node-hub/dora-mujoco | |||
| path: dora-mujoco | |||
| inputs: | |||
| tick: dora/timer/millis/2 # 500 Hz simulation | |||
| control_input: gamepad_controller/joint_commands | |||
| outputs: | |||
| - joint_positions | |||
| - joint_velocities | |||
| - actuator_controls | |||
| - sensor_data | |||
| env: | |||
| MODEL_NAME: "iiwa14_mj_description" | |||
| - id: gamepad_controller | |||
| path: nodes/gamepad_controller_differential_ik.py | |||
| inputs: | |||
| joint_positions: mujoco_sim/joint_positions | |||
| joint_velocities: mujoco_sim/joint_velocities | |||
| raw_control: gamepad/raw_control | |||
| cmd_vel: gamepad/cmd_vel | |||
| fk_result: pytorch_kinematics/fk_request | |||
| jacobian_result: pytorch_kinematics/jacobian_request | |||
| outputs: | |||
| - joint_commands | |||
| - fk_request | |||
| - jacobian_request | |||
| - id: pytorch_kinematics | |||
| build: pip install -e ../../../node-hub/dora-pytorch-kinematics | |||
| path: dora-pytorch-kinematics | |||
| inputs: | |||
| fk_request: gamepad_controller/fk_request | |||
| jacobian_request: gamepad_controller/jacobian_request | |||
| outputs: | |||
| - fk_request | |||
| - jacobian_request | |||
| env: | |||
| MODEL_NAME: "iiwa14_description" | |||
| END_EFFECTOR_LINK: "iiwa_link_7" | |||
| TRANSFORM: "0. 0. 0. 1. 0. 0. 0." | |||
| @@ -0,0 +1,37 @@ | |||
| nodes: | |||
| - id: gamepad | |||
| build: pip install -e ../../../node-hub/gamepad | |||
| path: gamepad | |||
| outputs: | |||
| - cmd_vel | |||
| - raw_control | |||
| inputs: | |||
| tick: dora/timer/millis/10 | |||
| - id: mujoco_sim | |||
| build: pip install -e ../../../node-hub/dora-mujoco | |||
| path: dora-mujoco | |||
| inputs: | |||
| tick: dora/timer/millis/2 # 500 Hz simulation | |||
| control_input: pytorch_kinematics/cmd_vel | |||
| outputs: | |||
| - joint_positions | |||
| - joint_velocities | |||
| - actuator_controls | |||
| - sensor_data | |||
| env: | |||
| MODEL_NAME: "iiwa14_mj_description" | |||
| - id: pytorch_kinematics | |||
| build: pip install -e ../../../node-hub/dora-pytorch-kinematics | |||
| path: dora-pytorch-kinematics | |||
| inputs: | |||
| cmd_vel: gamepad/cmd_vel | |||
| pose: mujoco_sim/joint_positions | |||
| outputs: | |||
| - cmd_vel | |||
| - pose | |||
| env: | |||
| MODEL_NAME: "iiwa14_description" | |||
| END_EFFECTOR_LINK: "iiwa_link_7" | |||
| TRANSFORM: "0. 0. 0. 1. 0. 0. 0." | |||
| @@ -0,0 +1,188 @@ | |||
| import json | |||
| import time | |||
| import numpy as np | |||
| import pyarrow as pa | |||
| from dora import Node | |||
| from scipy.spatial.transform import Rotation | |||
| class GamepadController: | |||
| def __init__(self): | |||
| # Control parameters | |||
| self.integration_dt = 0.1 | |||
| self.damping = 1e-4 | |||
| self.Kpos = 0.95 # Position gain | |||
| self.Kori = 0.95 # Orientation gain | |||
| # Gamepad control parameters | |||
| self.speed_scale = 0.5 | |||
| self.max_angvel = 3.14 * self.speed_scale # Max joint velocity (rad/s) | |||
| # Robot state variables | |||
| self.dof = None | |||
| self.current_joint_pos = None # Full robot state | |||
| self.home_pos = None # Home position for arm joints only | |||
| # Target pose (independent of DOF) | |||
| self.target_pos = np.array([0.4, 0.0, 0.3]) # Conservative default target | |||
| self.target_rpy = [180.0, 0.0, 90.0] # Downward orientation | |||
| # Cache for external computations | |||
| self.current_ee_pose = None # End-effector pose | |||
| self.current_jacobian = None # Jacobian matrix | |||
| print("Gamepad Controller initialized") | |||
| def _initialize_robot(self, positions, jacobian_dof=None): | |||
| self.full_joint_count = len(positions) | |||
| self.dof = jacobian_dof if jacobian_dof is not None else self.full_joint_count | |||
| self.current_joint_pos = positions.copy() | |||
| self.home_pos = np.zeros(self.dof) | |||
| def process_cmd_vel(self, cmd_vel): | |||
| # print(f"Processing cmd_vel: {cmd_vel}") | |||
| delta = cmd_vel[:3] * 0.03 | |||
| dx, dy, dz = delta | |||
| # Update target position incrementally | |||
| if abs(dx) > 0 or abs(dy) > 0 or abs(dz) > 0: | |||
| self.target_pos += np.array([dx, -dy, dz]) | |||
| def process_gamepad_input(self, raw_control): | |||
| buttons = raw_control["buttons"] | |||
| # Reset position with START button | |||
| if len(buttons) > 9 and buttons[9]: | |||
| self.target_pos = np.array([0.4, 0.0, 0.3]) | |||
| print("Reset target to home position") | |||
| # Speed scaling with bumpers (LB/RB) | |||
| if len(buttons) > 4 and buttons[4]: # LB | |||
| self.speed_scale = max(0.1, self.speed_scale - 0.1) | |||
| print(f"Speed: {self.speed_scale:.1f}") | |||
| if len(buttons) > 5 and buttons[5]: # RB | |||
| self.speed_scale = min(1.0, self.speed_scale + 0.1) | |||
| print(f"Speed: {self.speed_scale:.1f}") | |||
| def get_target_rotation_matrix(self): | |||
| roll_rad, pitch_rad, yaw_rad = np.radians(self.target_rpy) | |||
| desired_rot = Rotation.from_euler('ZYX', [yaw_rad, pitch_rad, roll_rad]) | |||
| return desired_rot.as_matrix() | |||
| def update_jacobian(self, jacobian_flat, shape): | |||
| jacobian_dof = shape[1] | |||
| self.current_jacobian = jacobian_flat.reshape(shape) | |||
| if self.dof is None: | |||
| if self.current_joint_pos is not None: | |||
| self._initialize_robot(self.current_joint_pos, jacobian_dof) | |||
| else: | |||
| self.dof = jacobian_dof | |||
| elif self.dof != jacobian_dof: | |||
| self.dof = jacobian_dof | |||
| self.home_pos = np.zeros(self.dof) | |||
| def apply_differential_ik_control(self): | |||
| if self.current_ee_pose is None or self.current_jacobian is None: | |||
| return self.current_joint_pos | |||
| current_ee_pos = self.current_ee_pose['position'] | |||
| current_ee_rpy = self.current_ee_pose['rpy'] | |||
| pos_error = self.target_pos - current_ee_pos | |||
| twist = np.zeros(6) | |||
| twist[:3] = self.Kpos * pos_error / self.integration_dt | |||
| # Convert current RPY to rotation matrix | |||
| current_rot = Rotation.from_euler('XYZ', current_ee_rpy) | |||
| desired_rot = Rotation.from_matrix(self.get_target_rotation_matrix()) | |||
| rot_error = (desired_rot * current_rot.inv()).as_rotvec() | |||
| twist[3:] = self.Kori * rot_error / self.integration_dt | |||
| jac = self.current_jacobian | |||
| # Damped least squares inverse kinematics | |||
| diag = self.damping * np.eye(6) | |||
| dq = jac.T @ np.linalg.solve(jac @ jac.T + diag, twist) | |||
| # # Apply nullspace control | |||
| # current_arm = self.current_joint_pos[:self.dof] | |||
| # jac_pinv = np.linalg.pinv(jac) | |||
| # N = np.eye(self.dof) - jac_pinv @ jac | |||
| # k_null = np.ones(self.dof) * 5.0 | |||
| # dq_null = k_null * (self.home_pos - current_arm) | |||
| # dq += N @ dq_null | |||
| # Limit joint velocities | |||
| dq_abs_max = np.abs(dq).max() | |||
| if dq_abs_max > self.max_angvel: | |||
| dq *= self.max_angvel / dq_abs_max | |||
| # Create full joint command | |||
| new_joints = self.current_joint_pos.copy() | |||
| new_joints[:self.dof] += dq * self.integration_dt | |||
| return new_joints | |||
| def main(): | |||
| node = Node() | |||
| controller = GamepadController() | |||
| for event in node: | |||
| if event["type"] == "INPUT": | |||
| if event["id"] == "joint_positions": | |||
| joint_pos = event["value"].to_numpy() | |||
| if controller.current_joint_pos is None: | |||
| controller._initialize_robot(joint_pos) | |||
| else: | |||
| controller.current_joint_pos = joint_pos | |||
| # Request FK computation | |||
| node.send_output( | |||
| "fk_request", | |||
| pa.array(controller.current_joint_pos, type=pa.float32()), | |||
| metadata={"encoding": "jointstate", "timestamp": time.time()} | |||
| ) | |||
| # Request Jacobian computation | |||
| node.send_output( | |||
| "jacobian_request", | |||
| pa.array(controller.current_joint_pos, type=pa.float32()), | |||
| metadata={"encoding": "jacobian", "timestamp": time.time()} | |||
| ) | |||
| # Apply differential IK control | |||
| joint_commands = controller.apply_differential_ik_control() | |||
| # Send control commands to the robot | |||
| node.send_output( | |||
| "joint_commands", | |||
| pa.array(joint_commands, type=pa.float32()), | |||
| metadata={"timestamp": time.time()} | |||
| ) | |||
| elif event["id"] == "raw_control": | |||
| raw_control = json.loads(event["value"].to_pylist()[0]) | |||
| controller.process_gamepad_input(raw_control) | |||
| elif event["id"] == "cmd_vel": | |||
| cmd_vel = event["value"].to_numpy() | |||
| controller.process_cmd_vel(cmd_vel) | |||
| # Handle FK results | |||
| if event["id"] == "fk_result": | |||
| if event["metadata"].get("encoding") == "xyzrpy": | |||
| ee_pose = event["value"].to_numpy() | |||
| controller.current_ee_pose = {'position': ee_pose[:3], 'rpy': ee_pose[3:6]} | |||
| # Handle Jacobian results | |||
| if event["id"] == "jacobian_result": | |||
| if event["metadata"].get("encoding") == "jacobian_result": | |||
| jacobian_flat = event["value"].to_numpy() | |||
| jacobian_shape = event["metadata"]["jacobian_shape"] | |||
| controller.update_jacobian(jacobian_flat, jacobian_shape) | |||
| if __name__ == "__main__": | |||
| main() | |||
| @@ -0,0 +1,106 @@ | |||
| import json | |||
| import time | |||
| import numpy as np | |||
| import pyarrow as pa | |||
| from dora import Node | |||
| class GamepadController: | |||
| def __init__(self): | |||
| """ | |||
| Initialize the simple gamepad controller | |||
| """ | |||
| # Robot state variables | |||
| self.dof = None | |||
| self.current_joint_pos = None | |||
| # Target pose (independent of DOF) | |||
| self.target_pos = np.array([0.4, 0.0, 0.3]) # Conservative default target | |||
| self.target_rpy = [180.0, 0.0, 90.0] # Downward orientation | |||
| self.ik_request_sent = True | |||
| print("Simple Gamepad Controller initialized") | |||
| def _initialize_robot(self, positions): | |||
| """Initialize controller state with appropriate dimensions""" | |||
| self.full_joint_count = len(positions) | |||
| self.current_joint_pos = positions.copy() | |||
| if self.dof is None: | |||
| self.dof = self.full_joint_count | |||
| def process_cmd_vel(self, cmd_vel): | |||
| """Process gamepad velocity commands to update target position""" | |||
| delta = cmd_vel[:3] * 0.03 | |||
| dx, dy, dz = delta | |||
| # Update target position incrementally | |||
| if abs(dx) > 0 or abs(dy) > 0 or abs(dz) > 0: | |||
| self.target_pos += np.array([dx, -dy, dz]) | |||
| self.ik_request_sent = False | |||
| def process_gamepad_input(self, raw_control): | |||
| """Process gamepad button inputs""" | |||
| buttons = raw_control["buttons"] | |||
| # Reset position with START button | |||
| if len(buttons) > 9 and buttons[9]: | |||
| self.target_pos = np.array([0.4, 0.0, 0.3]) | |||
| print("Reset target to home position") | |||
| self.ik_request_sent = False | |||
| def get_target_pose_array(self): | |||
| """Get target pose as 6D array [x, y, z, roll, pitch, yaw]""" | |||
| return np.concatenate([self.target_pos, self.target_rpy]) | |||
| def main(): | |||
| node = Node() | |||
| controller = GamepadController() | |||
| for event in node: | |||
| if event["type"] == "INPUT": | |||
| if event["id"] == "joint_positions": | |||
| joint_pos = event["value"].to_numpy() | |||
| if controller.current_joint_pos is None: | |||
| controller._initialize_robot(joint_pos) | |||
| else: | |||
| controller.current_joint_pos = joint_pos | |||
| # Request IK solution directly | |||
| target_pose = controller.get_target_pose_array() | |||
| if not controller.ik_request_sent: | |||
| node.send_output( | |||
| "ik_request", | |||
| pa.array(target_pose, type=pa.float32()), | |||
| metadata={"encoding": "xyzrpy", "timestamp": time.time()} | |||
| ) | |||
| controller.ik_request_sent = True | |||
| node.send_output( | |||
| "joint_state", | |||
| pa.array(joint_pos, type=pa.float32()), | |||
| metadata={"encoding": "jointstate", "timestamp": time.time()} | |||
| ) | |||
| elif event["id"] == "raw_control": | |||
| raw_control = json.loads(event["value"].to_pylist()[0]) | |||
| controller.process_gamepad_input(raw_control) | |||
| elif event["id"] == "cmd_vel": | |||
| cmd_vel = event["value"].to_numpy() | |||
| controller.process_cmd_vel(cmd_vel) | |||
| # Handle IK results and send directly as joint commands | |||
| elif event["id"] == "ik_request": | |||
| if event["metadata"]["encoding"] == "jointstate": | |||
| ik_solution = event["value"].to_numpy() | |||
| # Send IK solution directly as joint commands | |||
| node.send_output( | |||
| "joint_commands", | |||
| pa.array(ik_solution, type=pa.float32()), | |||
| metadata={"timestamp": time.time()} | |||
| ) | |||
| if __name__ == "__main__": | |||
| main() | |||
| @@ -0,0 +1,167 @@ | |||
| # 02. Target Pose Control | |||
| This example demonstrates Cartesian space control using two different approaches: **Direct Inverse Kinematics(IK)**(Basic) and **Differential IK**(advance). Both create a Controller node that processes target pose commands and outputs joint commands using **dora-pytorch-kinematics**. | |||
| ## Controller Types | |||
| ### 1. Direct IK Controller (`control.yml`) | |||
| - **Simple approach**: Directly passes target pose to IK solver | |||
| - **Fast**: Single IK computation per target pose | |||
| ### 2. Differential IK Controller (`control_advanced.yml`) | |||
| - **Smooth approach**: Uses pose error feedback and Jacobian-based control | |||
| - **Continuous**: Interpolates smoothly between current and target poses | |||
| - **Use case**: Smooth trajectories | |||
| ## Running the Examples | |||
| ### Direct IK Control | |||
| ```bash | |||
| cd target_pose_control | |||
| dora build control.yml | |||
| dora run control.yml | |||
| ``` | |||
| ### Differential IK Control | |||
| ```bash | |||
| cd target_pose_control | |||
| dora build control_advanced.yml | |||
| dora run control_advanced.yml | |||
| ``` | |||
| You should see: | |||
| 1. Robot moves to predefined target poses automatically | |||
| 2. **Direct IK**: Immediate jumps to target poses | |||
| 3. **Differential IK**: Smooth Cartesian space motion with continuous interpolation | |||
| 4. End-effector following target positions accurately | |||
| ### Nodes | |||
| #### 1. **Pose Publisher Script** (`pose_publisher.py`) | |||
| ```python | |||
| class PosePublisher: | |||
| def __init__(self): | |||
| # Predefined sequence of target poses [x, y, z, roll, pitch, yaw] | |||
| self.target_poses = [ | |||
| [0.5, 0.5, 0.3, 180.0, 0.0, 90.0], # Position + RPY orientation | |||
| [0.6, 0.2, 0.5, 180.0, 0.0, 45.0], # Different orientation | |||
| # ... more poses | |||
| ] | |||
| ``` | |||
| - Sends target poses every 5 seconds | |||
| - Cycles through predefined positions and orientations | |||
| - Can be replaced with path planning, user input, or any pose generation logic | |||
| - Outputs `target_pose` array `[x, y, z, roll, pitch, yaw]` | |||
| #### 2. **Controller Scripts** | |||
| ##### Direct IK Controller (`controller_ik.py`) | |||
| **How it works:** | |||
| 1. **Target Input**: Receives new target pose `[x, y, z, roll, pitch, yaw]` | |||
| 2. **IK Request**: Sends target pose directly to `dora-pytorch-kinematics` | |||
| 3. **Joint Solution**: Receives complete joint configuration for target pose | |||
| 4. **Direct Application**: Passes IK solution directly as joint commands to robot *(sometimes for certain target pose there is no IK solution)* | |||
| **Advantages:** | |||
| - Simple and fast0 | |||
| - Minimal computation | |||
| - Direct pose-to-joint mapping | |||
| **Disadvantages:** | |||
| - Sudden jumps between poses | |||
| - No trajectory smoothing | |||
| - May cause joint velocity spikes | |||
| ##### Differential IK Controller (`controller_differential_ik.py`) | |||
| **How it works:** | |||
| 1. **Pose Error Calculation**: Computes difference between target and current end-effector pose | |||
| 2. **Velocity Command**: Converts pose error to desired end-effector velocity using PD control: | |||
| ```python | |||
| pos_error = target_pos - current_ee_pos | |||
| twist[:3] = Kpos * pos_error / integration_dt # Linear velocity | |||
| twist[3:] = Kori * rot_error / integration_dt # Angular velocity | |||
| ``` | |||
| 3. **Jacobian Inverse**: Uses robot Jacobian to map end-effector velocity to joint velocities: | |||
| ```python | |||
| # Damped least squares to avoid singularities | |||
| dq = J^T @ (J @ J^T + λI)^(-1) @ twist | |||
| ``` | |||
| 4. **Interpolation**: Integrates joint velocities to get next joint positions: | |||
| ```python | |||
| new_joints = current_joints + dq * dt | |||
| ``` | |||
| 5. **Nullspace Control** (optional): Projects secondary objectives (like joint limits avoidance) into the nullspace | |||
| **Advantages:** | |||
| - Smooth, continuous motion | |||
| - Velocity-controlled approach | |||
| - Handles robot singularities | |||
| - Real-time reactive control | |||
| <!-- **Jacobian Role:** | |||
| - **Forward Kinematics**: Maps joint space to Cartesian space | |||
| - **Jacobian Matrix**: Linear mapping between joint velocities and end-effector velocities | |||
| - **Inverse Mapping**: Converts desired end-effector motion to required joint motions | |||
| - **Singularity Handling**: Damped least squares prevents numerical instability near singularities --> | |||
| ##### 3. **PyTorch Kinematics Node** (`dora-pytorch-kinematics`) | |||
| A dedicated kinematics computation node that provides three core robotic functions: | |||
| ```yaml | |||
| - id: pytorch_kinematics | |||
| build: pip install -e ../../../node-hub/dora-pytorch-kinematics | |||
| path: dora-pytorch-kinematics | |||
| inputs: | |||
| ik_request: controller/ik_request # For inverse kinematics | |||
| fk_request: controller/fk_request # For forward kinematics | |||
| jacobian_request: controller/jacobian_request # For Jacobian computation | |||
| outputs: | |||
| - ik_request # Joint solution for target pose | |||
| - fk_request # End-effector pose for joint configuration | |||
| - jacobian_request # Jacobian matrix for velocity mapping | |||
| env: | |||
| URDF_PATH: "../URDF/franka_panda/panda.urdf" | |||
| END_EFFECTOR_LINK: "panda_hand" | |||
| TRANSFORM: "0. 0. 0. 1. 0. 0. 0." | |||
| ``` | |||
| 1. **Inverse Kinematics (IK)** | |||
| - **Input**: Target pose `[x, y, z, roll, pitch, yaw]` or `[x, y, z, qw, qx, qy, qz]` + current joint state | |||
| - **Output**: Complete joint configuration to achieve target pose | |||
| - **Use case**: Convert Cartesian target to joint angles | |||
| 2. **Forward Kinematics (FK)** | |||
| - **Input**: Joint positions array | |||
| - **Output**: Current end-effector pose `[x, y, z, qw, qx, qy, qz]` | |||
| - **Use case**: Determine end-effector position from joint angles | |||
| 3. **Jacobian Computation** | |||
| - **Input**: Current joint positions | |||
| - **Output**: 6×N Jacobian matrix (N = number of joints) | |||
| - **Use case**: Map joint velocities to end-effector velocities | |||
| **Configuration:** | |||
| - **URDF_PATH**: Robot model definition file | |||
| - **END_EFFECTOR_LINK**: Target link for pose calculations | |||
| - **TRANSFORM**: Optional transform offset (position + quaternion wxyz format) | |||
| **Usage in Controllers:** | |||
| - **Direct IK**: Uses only `ik_request` → `ik_result` | |||
| - **Differential IK**: Uses `fk_request` → `fk_result` and `jacobian_request` → `jacobian_result` | |||
| #### 4. **MuJoCo Simulation Node** (`dora-mujoco`) | |||
| - **Process**: Physics simulation, dynamics integration, rendering | |||
| - **Output**: Joint positions, velocities, sensor data | |||
| ## References | |||
| This controller design draws inspiration from the kinematic control strategies presented in [mjctrl](https://github.com/kevinzakka/mjctrl), specifically the [differential ik control example](https://github.com/kevinzakka/mjctrl/blob/main/diffik.py). | |||
| The URDF model for the robotic arms was sourced from the [PyBullet GitHub repository](https://github.com/bulletphysics/bullet3/tree/master/examples/pybullet/gym/pybullet_data). Or you could google search the robot and get its urdf. | |||
| @@ -0,0 +1,46 @@ | |||
| nodes: | |||
| - id: mujoco_sim | |||
| build: pip install -e ../../../node-hub/dora-mujoco | |||
| path: dora-mujoco | |||
| inputs: | |||
| tick: dora/timer/millis/2 | |||
| control_input: controller/joint_commands | |||
| outputs: | |||
| - joint_positions | |||
| - joint_velocities | |||
| - actuator_controls | |||
| - sensor_data | |||
| env: | |||
| MODEL_NAME: "panda_mj_description" | |||
| - id: controller | |||
| path: nodes/controller_ik.py | |||
| inputs: | |||
| joint_positions: mujoco_sim/joint_positions | |||
| target_pose: pose_publisher/target_pose | |||
| ik_request: pytorch_kinematics/ik_request | |||
| outputs: | |||
| - joint_commands | |||
| - ik_request | |||
| - joint_state | |||
| - id: pytorch_kinematics | |||
| build: pip install -e ../../../node-hub/dora-pytorch-kinematics | |||
| path: dora-pytorch-kinematics | |||
| inputs: | |||
| ik_request: controller/ik_request | |||
| joint_state: controller/joint_state | |||
| outputs: | |||
| - ik_request | |||
| - joint_state | |||
| env: | |||
| MODEL_NAME: "panda_description" | |||
| END_EFFECTOR_LINK: "panda_hand" | |||
| TRANSFORM: "0. 0. 0. 1. 0. 0. 0." # Pytorch kinematics uses wxyz format for quaternion | |||
| - id: pose_publisher | |||
| path: nodes/pose_publisher.py | |||
| inputs: | |||
| tick: dora/timer/millis/5000 | |||
| outputs: | |||
| - target_pose | |||
| @@ -0,0 +1,48 @@ | |||
| nodes: | |||
| - id: mujoco_sim | |||
| build: pip install -e ../../../node-hub/dora-mujoco | |||
| path: dora-mujoco | |||
| inputs: | |||
| tick: dora/timer/millis/2 | |||
| control_input: controller/joint_commands | |||
| outputs: | |||
| - joint_positions | |||
| - joint_velocities | |||
| - actuator_controls | |||
| - sensor_data | |||
| env: | |||
| MODEL_NAME: "panda_mj_description" | |||
| - id: controller | |||
| path: nodes/controller_differential_ik.py | |||
| inputs: | |||
| joint_positions: mujoco_sim/joint_positions | |||
| joint_velocities: mujoco_sim/joint_velocities | |||
| target_pose: pose_publisher/target_pose | |||
| fk_result: pytorch_kinematics/fk_request | |||
| jacobian_result: pytorch_kinematics/jacobian_request | |||
| outputs: | |||
| - joint_commands | |||
| - fk_request | |||
| - jacobian_request | |||
| - id: pytorch_kinematics | |||
| build: pip install -e ../../../node-hub/dora-pytorch-kinematics | |||
| path: dora-pytorch-kinematics | |||
| inputs: | |||
| fk_request: controller/fk_request | |||
| jacobian_request: controller/jacobian_request | |||
| outputs: | |||
| - fk_request | |||
| - jacobian_request | |||
| env: | |||
| MODEL_NAME: "panda_description" | |||
| END_EFFECTOR_LINK: "panda_hand" | |||
| TRANSFORM: "0. 0. 0. 1. 0. 0. 0." # Pytorch kinematics uses wxyz format for quaternion | |||
| - id: pose_publisher | |||
| path: nodes/pose_publisher.py | |||
| inputs: | |||
| tick: dora/timer/millis/5000 | |||
| outputs: | |||
| - target_pose | |||
| @@ -0,0 +1,171 @@ | |||
| import numpy as np | |||
| import time | |||
| import pyarrow as pa | |||
| from dora import Node | |||
| from scipy.spatial.transform import Rotation | |||
| class Controller: | |||
| def __init__(self): | |||
| """ | |||
| Initialize the controller | |||
| """ | |||
| # Control parameters | |||
| self.integration_dt = 0.1 | |||
| self.damping = 1e-4 | |||
| self.Kpos = 0.95 # Position gain | |||
| self.Kori = 0.95 # Orientation gain | |||
| self.max_angvel = 3.14 # Max joint velocity (rad/s) | |||
| # State variables | |||
| self.dof = None | |||
| self.current_joint_pos = None # Full robot state | |||
| self.home_pos = None # Home position for arm joints only | |||
| self.target_pos = np.array([0.4, 0.0, 0.3]) # Conservative default target | |||
| self.target_rpy = [180.0, 0.0, 90.0] # Downward orientation | |||
| self.current_ee_pose = None # End-effector pose | |||
| self.current_jacobian = None # Jacobian matrix | |||
| print("Controller initialized") | |||
| def _initialize_robot(self, positions, jacobian_dof=None): | |||
| """Initialize controller state with appropriate dimensions""" | |||
| self.full_joint_count = len(positions) | |||
| # Set DOF from Jacobian if available | |||
| self.dof = jacobian_dof if jacobian_dof is not None else self.full_joint_count | |||
| self.current_joint_pos = positions.copy() | |||
| self.home_pos = np.zeros(self.dof) | |||
| def get_target_rotation_matrix(self): | |||
| """Convert RPY to rotation matrix.""" | |||
| roll_rad, pitch_rad, yaw_rad = np.radians(self.target_rpy) | |||
| desired_rot = Rotation.from_euler('ZYX', [yaw_rad, pitch_rad, roll_rad]) | |||
| return desired_rot.as_matrix() | |||
| def set_target_pose(self, pose_array): | |||
| """Set target pose from input array.""" | |||
| self.target_pos = np.array(pose_array[:3]) | |||
| if len(pose_array) == 6: | |||
| self.target_rpy = list(pose_array[3:6]) | |||
| else: | |||
| self.target_rpy = [180.0, 0.0, 90.0] # Default orientation if not provided | |||
| def update_jacobian(self, jacobian_flat, shape): | |||
| """Update current jacobian and initialize DOF if needed.""" | |||
| jacobian_dof = shape[1] | |||
| self.current_jacobian = jacobian_flat.reshape(shape) | |||
| if self.dof is None: | |||
| if self.current_joint_pos is not None: | |||
| self._initialize_robot(self.current_joint_pos, jacobian_dof) | |||
| else: | |||
| self.dof = jacobian_dof | |||
| elif self.dof != jacobian_dof: | |||
| self.dof = jacobian_dof | |||
| self.home_pos = np.zeros(self.dof) | |||
| def apply_differential_ik_control(self): | |||
| """Apply differential IK control with nullspace projection.""" | |||
| if self.current_ee_pose is None or self.current_jacobian is None: | |||
| return self.current_joint_pos | |||
| current_ee_pos = self.current_ee_pose['position'] | |||
| current_ee_rpy = self.current_ee_pose['rpy'] | |||
| # Calculate position and orientation error | |||
| pos_error = self.target_pos - current_ee_pos | |||
| twist = np.zeros(6) | |||
| twist[:3] = self.Kpos * pos_error / self.integration_dt | |||
| # Convert current RPY to rotation matrix | |||
| current_rot = Rotation.from_euler('XYZ', current_ee_rpy) | |||
| desired_rot = Rotation.from_matrix(self.get_target_rotation_matrix()) | |||
| rot_error = (desired_rot * current_rot.inv()).as_rotvec() | |||
| twist[3:] = self.Kori * rot_error / self.integration_dt | |||
| jac = self.current_jacobian | |||
| # Damped least squares inverse kinematics | |||
| diag = self.damping * np.eye(6) | |||
| dq = jac.T @ np.linalg.solve(jac @ jac.T + diag, twist) | |||
| # # Apply nullspace control | |||
| # # Calculate nullspace projection # uncomment to enable nullspace control | |||
| # current_arm = self.current_joint_pos[:self.dof] | |||
| # jac_pinv = np.linalg.pinv(jac) | |||
| # N = np.eye(self.dof) - jac_pinv @ jac | |||
| # # Apply gains to pull towards home position | |||
| # k_null = np.ones(self.dof) * 5.0 | |||
| # dq_null = k_null * (self.home_pos - current_arm) # Nullspace velocity | |||
| # dq += N @ dq_null # Nullspace movement | |||
| # Limit joint velocities | |||
| dq_abs_max = np.abs(dq).max() | |||
| if dq_abs_max > self.max_angvel: | |||
| dq *= self.max_angvel / dq_abs_max | |||
| # Create full joint command (apply IK result to arm joints, keep other joints unchanged) | |||
| new_joints = self.current_joint_pos.copy() | |||
| new_joints[:self.dof] += dq * self.integration_dt | |||
| return new_joints | |||
| def main(): | |||
| node = Node() | |||
| controller = Controller() | |||
| for event in node: | |||
| if event["type"] == "INPUT": | |||
| if event["id"] == "joint_positions": | |||
| joint_pos = event["value"].to_numpy() | |||
| if controller.current_joint_pos is None: | |||
| controller._initialize_robot(joint_pos) | |||
| else: | |||
| controller.current_joint_pos = joint_pos | |||
| # Request FK computation | |||
| node.send_output( | |||
| "fk_request", | |||
| pa.array(controller.current_joint_pos, type=pa.float32()), | |||
| metadata={"encoding": "jointstate", "timestamp": time.time()} | |||
| ) | |||
| # Request Jacobian computation | |||
| node.send_output( | |||
| "jacobian_request", | |||
| pa.array(controller.current_joint_pos, type=pa.float32()), | |||
| metadata={"encoding": "jacobian", "timestamp": time.time()} | |||
| ) | |||
| joint_commands = controller.apply_differential_ik_control() | |||
| # Send control commands to the robot | |||
| node.send_output( | |||
| "joint_commands", | |||
| pa.array(joint_commands, type=pa.float32()), | |||
| metadata={"timestamp": time.time()} | |||
| ) | |||
| # Handle target pose updates | |||
| if event["id"] == "target_pose": | |||
| target_pose = event["value"].to_numpy() | |||
| controller.set_target_pose(target_pose) | |||
| # Handle FK results | |||
| if event["id"] == "fk_result": | |||
| if event["metadata"]["encoding"] == "xyzrpy": | |||
| ee_pose = event["value"].to_numpy() | |||
| controller.current_ee_pose = {'position': ee_pose[:3],'rpy': ee_pose[3:6]} | |||
| # Handle Jacobian results | |||
| if event["id"] == "jacobian_result": | |||
| if event["metadata"]["encoding"] == "jacobian_result": | |||
| jacobian_flat = event["value"].to_numpy() | |||
| jacobian_shape = event["metadata"]["jacobian_shape"] | |||
| controller.update_jacobian(jacobian_flat, jacobian_shape) | |||
| if __name__ == "__main__": | |||
| main() | |||
| @@ -0,0 +1,89 @@ | |||
| import numpy as np | |||
| import time | |||
| import pyarrow as pa | |||
| from dora import Node | |||
| class Controller: | |||
| def __init__(self): | |||
| """ | |||
| Initialize the simple controller | |||
| """ | |||
| # State variables | |||
| self.dof = None | |||
| self.current_joint_pos = None | |||
| self.target_pos = np.array([0.4, 0.0, 0.3]) # Conservative default target | |||
| self.target_rpy = [180.0, 0.0, 90.0] # Downward orientation | |||
| self.ik_request_sent = True | |||
| print("Simple Controller initialized") | |||
| def _initialize_robot(self, positions): | |||
| """Initialize controller state with appropriate dimensions""" | |||
| self.full_joint_count = len(positions) | |||
| self.current_joint_pos = positions.copy() | |||
| if self.dof is None: | |||
| self.dof = self.full_joint_count | |||
| def set_target_pose(self, pose_array): | |||
| """Set target pose from input array.""" | |||
| self.target_pos = np.array(pose_array[:3]) | |||
| if len(pose_array) == 6: | |||
| self.target_rpy = list(pose_array[3:6]) | |||
| else: | |||
| self.target_rpy = [180.0, 0.0, 90.0] # Default orientation if not provided | |||
| self.ik_request_sent = False | |||
| def get_target_pose_array(self): | |||
| """Get target pose as 6D array [x, y, z, roll, pitch, yaw]""" | |||
| return np.concatenate([self.target_pos, self.target_rpy]) | |||
| def main(): | |||
| node = Node() | |||
| controller = Controller() | |||
| for event in node: | |||
| if event["type"] == "INPUT": | |||
| if event["id"] == "joint_positions": | |||
| joint_pos = event["value"].to_numpy() | |||
| if controller.current_joint_pos is None: | |||
| controller._initialize_robot(joint_pos) | |||
| else: | |||
| controller.current_joint_pos = joint_pos | |||
| # Request IK solution directly | |||
| target_pose = controller.get_target_pose_array() | |||
| if not controller.ik_request_sent: | |||
| node.send_output( | |||
| "ik_request", | |||
| pa.array(target_pose, type=pa.float32()), | |||
| metadata={"encoding": "xyzrpy", "timestamp": time.time()} | |||
| ) | |||
| controller.ik_request_sent = True | |||
| node.send_output( | |||
| "joint_state", | |||
| pa.array(joint_pos, type=pa.float32()), | |||
| metadata={"encoding": "jointstate", "timestamp": time.time()} | |||
| ) | |||
| # Handle target pose updates | |||
| if event["id"] == "target_pose": | |||
| target_pose = event["value"].to_numpy() | |||
| controller.set_target_pose(target_pose) | |||
| # Handle IK results and send directly as joint commands | |||
| if event["id"] == "ik_request": | |||
| if event["metadata"]["encoding"] == "jointstate": | |||
| ik_solution = event["value"].to_numpy() | |||
| # print("ik_solution", ik_solution) | |||
| # Send IK solution directly as joint commands | |||
| node.send_output( | |||
| "joint_commands", | |||
| pa.array(ik_solution, type=pa.float32()), | |||
| metadata={"timestamp": time.time()} | |||
| ) | |||
| if __name__ == "__main__": | |||
| main() | |||
| @@ -0,0 +1,44 @@ | |||
| import time | |||
| import pyarrow as pa | |||
| from dora import Node | |||
| class PosePublisher: | |||
| """Publishes target poses in sequence.""" | |||
| def __init__(self): | |||
| """Initialize pose publisher with predefined target poses sequence.""" | |||
| # target poses [x, y, z, roll, pitch, yaw] | |||
| self.target_poses = [ | |||
| [0.5, 0.5, 0.3, 180.0, 0.0, 90.0], | |||
| [0.6, 0.2, 0.5, 180.0, 0.0, 45.0], | |||
| [0.7, 0.1, 0.4, 90.0, 90.0, 90.0], | |||
| [0.4, -0.3, 0.5, 180.0, 0.0, 135.0], | |||
| [-0.3, -0.6, 0.3, 180.0, 0.0, 90.0], | |||
| ] | |||
| self.current_pose_index = 0 | |||
| print("Pose Publisher initialized") | |||
| def get_next_pose(self): | |||
| """Get the next target pose in sequence.""" | |||
| pose = self.target_poses[self.current_pose_index] | |||
| self.current_pose_index = (self.current_pose_index + 1) % len(self.target_poses) | |||
| return pose | |||
| def main(): | |||
| node = Node("pose_publisher") | |||
| publisher = PosePublisher() | |||
| time.sleep(3) # Allow time for simulation to start | |||
| for event in node: | |||
| if event["type"] == "INPUT" and event["id"] == "tick": | |||
| target_pose = publisher.get_next_pose() | |||
| print(f"Publishing target pose: {target_pose}") | |||
| node.send_output( | |||
| "target_pose", | |||
| pa.array(target_pose, type=pa.float64()), | |||
| metadata={"timestamp": time.time()} | |||
| ) | |||
| if __name__ == "__main__": | |||
| main() | |||
| @@ -0,0 +1,235 @@ | |||
| # dora-mujoco | |||
| A MuJoCo physics simulation node for the Dora dataflow framework. This node provides real-time physics simulation with support for a wide range of robots from the `robot_descriptions` package, as well as custom robot models. Designed for modular robotics control architectures. | |||
| ## Features | |||
| - **Wide Robot Support**: Built-in support for 50+ robot models including quadrupeds, humanoids, arms, drones, and more | |||
| - **Flexible Model Loading**: Load robots by name (via robot_descriptions) or direct XML file paths | |||
| - **Real-time Simulation**: Continuous physics simulation with configurable tick rates | |||
| - **Live Visualization**: Optional MuJoCo viewer for real-time 3D visualization | |||
| - **Generic Control Interface**: Accepts control commands for any robot type | |||
| - **Rich Data Output**: Joint positions, velocities, accelerations, and sensor data | |||
| ## Supported Robot Categories | |||
| | Category | Examples | | |||
| |----------|----------| | |||
| | **Quadrupeds** | Unitree Go1/Go2/A1, ANYmal B/C, Boston Dynamics Spot | | |||
| | **Humanoids** | Unitree G1/H1, Apptronik Apollo, TALOS, JVRC-1 | | |||
| | **Arms** | Franka Panda, KUKA iiwa14, Universal Robots UR5e/UR10e | | |||
| | **Dual Arms** | Aloha 2, Baxter, YuMi | | |||
| | **End Effectors** | Allegro Hand, Shadow Hand, Robotiq 2F-85 | | |||
| | **Drones** | Crazyflie 2.0, Skydio X2 | | |||
| | **Educational** | Double Pendulum | | |||
| ## Getting Started | |||
| ### Quick Start | |||
| 1. **Run a simple simulation**: | |||
| ```bash | |||
| dora build demo.yml | |||
| # Start with Unitree Go2 quadruped | |||
| dora run demo.yml | |||
| ``` | |||
| 2. **Use different robots**: | |||
| ```python | |||
| # In main.py, modify the model name (line ~95) | |||
| model_path_or_name = "franka_panda" # Change this line | |||
| # Examples: "go2", "franka_panda", "g1", "spot", etc. | |||
| ``` | |||
| 3. **Use custom robot models**: | |||
| ```python | |||
| # In main.py, use file path instead of model name | |||
| model_path_or_name = "/path/to/my_robot/scene.xml" | |||
| ``` | |||
| ## Usage Examples | |||
| TODO | |||
| ### Available Robot Models | |||
| The simulator supports all models from the `robot_descriptions` package. Common names include: | |||
| - **Quadrupeds**: `go1`, `go2`, `a1`, `aliengo`, `anymal_b`, `anymal_c`, `spot` | |||
| - **Humanoids**: `g1`, `h1`, `apollo`, `talos`, `jvrc`, `cassie` | |||
| - **Arms**: `panda`, `ur5e`, `ur10e`, `iiwa14`, `gen3`, `sawyer` | |||
| - **Hands**: `allegro_hand`, `shadow_hand`, `leap_hand`, `robotiq_2f85` | |||
| See [`robot_models.json`](dora_mujoco/robot_models.json) for the complete list. | |||
| ## Architecture | |||
| The `dora-mujoco` node is designed to be **robot-agnostic** and work with **robot-specific controllers**: | |||
| ``` | |||
| ┌─────────────────┐ | |||
| │ Command Node │ target_pose/cmd_vel | |||
| │ (Gamepad, etc) │─────────────┐ | |||
| └─────────────────┘ │ | |||
| ▼ | |||
| ┌─────────────────┐ control_input ┌─────────────────┐ | |||
| │ Robot Controller│───────────────────▶│ dora-mujoco │ | |||
| │ (Franka, Husky) │ │ │ | |||
| └─────────────────┘ │ ┌─────────────┐ │ joint_positions | |||
| ▲ │ │ Simulator │ │──────────────────▶ | |||
| │ joint_positions │ │ │ │ joint_velocities | |||
| └───────────────────────────│ │ ┌────────┐ │ │──────────────────▶ | |||
| │ │ │Renderer│ │ │ actuator_controls | |||
| │ │ └────────┘ │ │──────────────────▶ | |||
| │ │ │ │ sensor_data | |||
| │ └─────────────┘ │──────────────────▶ | |||
| └─────────────────┘ | |||
| the control nodes are optional (the robot will still spawn without them) | |||
| ``` | |||
| ## YAML Specification | |||
| ### Node Configuration | |||
| ```yaml | |||
| nodes: | |||
| - id: mujoco_sim | |||
| build: pip install -e . | |||
| path: dora-mujoco | |||
| env: | |||
| MODEL_NAME: "go2" # Robot model name or file path | |||
| inputs: | |||
| TICK: dora/timer/millis/2 # Simulation tick rate (500Hz) | |||
| outputs: | |||
| - joint_positions # Joint position array | |||
| - joint_velocities # Joint velocity array | |||
| - sensor_data # Sensor readings array | |||
| ``` | |||
| ### Input Specification | |||
| | Input | Type | Description | | |||
| |-------|------|-------------| | |||
| | `tick` | Timer | Triggers simulation steps and data output | | |||
| | `control_input` | `pa.array(float64)` | Control commands for actuators | | |||
| **Control Input Format**: | |||
| - **Manipulator Arms**: Joint position commands `[q1, q2, ..., q7, gripper]` | |||
| - **Mobile Robots**: Wheel velocity commands `[wheel1, wheel2, wheel3, wheel4]` | |||
| - **Quadrupeds**: Joint position commands `[hip1, thigh1, calf1, ...]` | |||
| ### Output Specification | |||
| | Output | Type | Description | Metadata | | |||
| |--------|------|-------------|----------| | |||
| | `joint_positions` | `pa.array(float64)` | Joint positions (qpos) | `timestamp` | | |||
| | `joint_velocities` | `pa.array(float64)` | Joint velocities (qvel) | `timestamp` | | |||
| | `actuator_controls` | `pa.array(float64)` | Current actuator commands | `timestamp` | | |||
| | `sensor_data` | `pa.array(float64)` | Sensor readings (if available) | `timestamp` | | |||
| ## Configuration Options | |||
| ### Environment Variables | |||
| - `MODEL_NAME`: Robot model name or XML file path (default: "go2") | |||
| ### Custom Robot Models | |||
| ```python | |||
| # Use direct XML file path | |||
| env: | |||
| MODEL_NAME: "/path/to/my_robot/scene.xml" | |||
| ``` | |||
| ### Simulation Parameters | |||
| - **Physics Timestep**: Fixed at MuJoCo default (0.002s = 500Hz) | |||
| - **Output Rate**: Controlled by `tick` input frequency | |||
| - **Control Rate**: Determined by `control_input` frequency | |||
| ## Development | |||
| ### Code Quality | |||
| ```bash | |||
| # Format code | |||
| uv run ruff check . --fix | |||
| # Lint code | |||
| uv run ruff check . | |||
| # Run tests | |||
| uv pip install pytest | |||
| uv run pytest . | |||
| ``` | |||
| ### Adding New Models | |||
| To add support for new robot models: | |||
| 1. Add the model mapping to [`robot_models.json`](dora_mujoco/robot_models.json): | |||
| ```json | |||
| { | |||
| "category_name": { | |||
| "my_robot": "my_robot_mj_description" | |||
| } | |||
| } | |||
| ``` | |||
| 2. Ensure the model is available in `robot_descriptions` or provide the XML file path directly. | |||
| ### Creating Robot Controllers | |||
| Robot-specific controllers should: | |||
| 1. Subscribe to `joint_positions` from the simulation | |||
| 2. Implement robot-specific control logic (IK, dynamics, etc.) | |||
| 3. Publish `control_input` commands to the simulation | |||
| ## Troubleshooting | |||
| ### Common Issues | |||
| 1. **Model not found**: | |||
| ``` | |||
| ERROR: Model file not found for my_robot | |||
| ``` | |||
| - Check if the model name exists in `robot_models.json` | |||
| - Verify `robot_descriptions` has the model installed | |||
| - Use absolute file path for custom models | |||
| 2. **Scene variant missing**: | |||
| ``` | |||
| WARNING: Failed to load scene variant | |||
| ``` | |||
| - Normal behavior - falls back to robot-only model | |||
| - Scene variants include ground plane and lighting | |||
| 3. **Control dimension mismatch**: | |||
| ``` | |||
| WARNING: Control input size doesn't match actuators | |||
| ``` | |||
| - Check that control commands match the robot's actuator count | |||
| - Use `print(model.nu)` to see expected control dimensions | |||
| 4. **Viewer issues**: | |||
| - Ensure proper OpenGL/graphics drivers | |||
| - Use headless mode for server environments | |||
| ## Examples | |||
| Complete working examples are available in: | |||
| - `dora/examples/mujoco-sim/` | |||
| - Target pose control with Cartesian space control | |||
| - Gamepad control with real-time interaction | |||
| ## License | |||
| This project is released under the MIT License. See the LICENSE file for details. | |||
| ## Contributing | |||
| Contributions are welcome! Please: | |||
| 1. Follow the existing code style (ruff formatting) | |||
| 2. Add tests for new features | |||
| 3. Update documentation as needed | |||
| 4. Submit pull requests with clear descriptions | |||
| ## Related Projects | |||
| - [Dora-rs](https://github.com/dora-rs/dora) - The dataflow framework | |||
| - [MuJoCo](https://github.com/google-deepmind/mujoco) - Physics simulation engine | |||
| - [robot_descriptions](https://github.com/robot-descriptions/robot_descriptions) - Robot model collection | |||
| @@ -0,0 +1,28 @@ | |||
| nodes: | |||
| - id: mujoco_sim | |||
| build: pip install -e . | |||
| path: dora-mujoco | |||
| inputs: | |||
| tick: dora/timer/millis/2 # 500 Hz data collection | |||
| # control_input: controller/output | |||
| outputs: | |||
| - joint_positions | |||
| - joint_velocities | |||
| - actuator_controls | |||
| - sensor_data | |||
| env: | |||
| MODEL_NAME: "go2" # Load go2 from robot-descriptions | |||
| # - id: controller | |||
| # # Your control logic node | |||
| # inputs: | |||
| # joint_positions: mujoco_sim/joint_positions | |||
| # outputs: | |||
| # - output | |||
| # - id: data_collector | |||
| # # Your data collection node | |||
| # inputs: | |||
| # joint_positions: mujoco_sim/joint_positions | |||
| # joint_velocities: mujoco_sim/joint_velocities | |||
| # sensor_data: mujoco_sim/sensor_data | |||
| @@ -0,0 +1,13 @@ | |||
| """TODO: Add docstring.""" | |||
| import os | |||
| # Define the path to the README file relative to the package directory | |||
| readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.md") | |||
| # Read the content of the README file | |||
| try: | |||
| with open(readme_path, encoding="utf-8") as f: | |||
| __doc__ = f.read() | |||
| except FileNotFoundError: | |||
| __doc__ = "README file not found." | |||
| @@ -0,0 +1,6 @@ | |||
| """TODO: Add docstring.""" | |||
| from .main import main | |||
| if __name__ == "__main__": | |||
| main() | |||
| @@ -0,0 +1,194 @@ | |||
| """MuJoCo simulation node for Dora with robot descriptions support.""" | |||
| import numpy as np | |||
| import pyarrow as pa | |||
| import mujoco | |||
| import mujoco.viewer | |||
| from dora import Node | |||
| import json | |||
| import os | |||
| from pathlib import Path | |||
| import time | |||
| from typing import Dict, Any | |||
| from robot_descriptions.loaders.mujoco import load_robot_description | |||
| class MuJoCoSimulator: | |||
| """MuJoCo simulator for Dora.""" | |||
| def __init__(self, model_path_or_name: str = None): | |||
| """Initialize the MuJoCo simulator.""" | |||
| # Check environment variable first, then use parameter, then default | |||
| self.model_path_or_name = ( | |||
| os.getenv("MODEL_NAME") or | |||
| model_path_or_name or | |||
| "go2_mj_description" | |||
| ) | |||
| self.model = None | |||
| self.data = None | |||
| self.viewer = None | |||
| self.state_data = {} | |||
| self.load_model() | |||
| print(f"MuJoCo Simulator initialized with model: {self.model_path_or_name}") | |||
| def load_model(self) -> bool: | |||
| """Load MuJoCo model from path or robot description name.""" | |||
| model_path = Path(self.model_path_or_name) | |||
| if model_path.exists() and model_path.suffix == '.xml': | |||
| print(f"Loading model from direct path: {model_path}") | |||
| self.model = mujoco.MjModel.from_xml_path(str(model_path)) | |||
| else: | |||
| self.model = load_robot_description(self.model_path_or_name, variant="scene") | |||
| # Initialize simulation data | |||
| self.data = mujoco.MjData(self.model) | |||
| # Set control to neutral position | |||
| if self.model.nkey > 0: | |||
| mujoco.mj_resetDataKeyframe(self.model, self.data, 0) | |||
| else: | |||
| mujoco.mj_resetData(self.model, self.data) | |||
| # Print model info for debugging | |||
| print("Model loaded successfully:") | |||
| print(f" DOF (nq): {self.model.nq}") | |||
| print(f" Velocities (nv): {self.model.nv}") | |||
| print(f" Actuators (nu): {self.model.nu}") | |||
| print(f" Control inputs: {len(self.data.ctrl)}") | |||
| # Print actuator info | |||
| if self.model.nu > 0: | |||
| print("Actuators:") | |||
| for i in range(self.model.nu): | |||
| actuator_name = mujoco.mj_id2name(self.model, mujoco.mjtObj.mjOBJ_ACTUATOR, i) | |||
| joint_id = self.model.actuator_trnid[i, 0] # First transmission joint | |||
| joint_name = mujoco.mj_id2name(self.model, mujoco.mjtObj.mjOBJ_JOINT, joint_id) if joint_id >= 0 else "N/A" | |||
| ctrl_range = self.model.actuator_ctrlrange[i] | |||
| print(f" [{i}] {actuator_name or f'actuator_{i}'} -> joint: {joint_name} | range: [{ctrl_range[0]:.2f}, {ctrl_range[1]:.2f}]") | |||
| # Initialize state data | |||
| self._update_state_data() | |||
| return True | |||
| def apply_control(self, control_input: np.ndarray): | |||
| """Apply control input to the simulation. | |||
| Args: | |||
| control_input: Control values for actuators | |||
| """ | |||
| if control_input is None or len(control_input) == 0: | |||
| return | |||
| # Ensure we don't exceed the number of actuators | |||
| n_controls = min(len(control_input), self.model.nu) | |||
| # Apply control directly to actuators | |||
| for i in range(n_controls): | |||
| # Apply joint limits if available | |||
| ctrl_range = self.model.actuator_ctrlrange[i] | |||
| if ctrl_range[0] < ctrl_range[1]: # Valid range | |||
| control_value = np.clip(control_input[i], ctrl_range[0], ctrl_range[1]) | |||
| else: | |||
| control_value = control_input[i] | |||
| self.data.ctrl[i] = control_value | |||
| def _get_available_models(self) -> dict: | |||
| """Get available models from the mapping file.""" | |||
| config_path = Path(__file__).parent / "robot_models.json" | |||
| with open(config_path) as f: | |||
| return json.load(f) | |||
| def step_simulation(self): | |||
| """Step the simulation forward.""" | |||
| mujoco.mj_step(self.model, self.data) | |||
| self._update_state_data() | |||
| def _update_state_data(self): | |||
| """Update state data that can be sent via Dora.""" | |||
| self.state_data = { | |||
| "time": self.data.time, # Current simulation time | |||
| "qpos": self.data.qpos.copy(), # Joint positions | |||
| "qvel": self.data.qvel.copy(), # Joint velocities | |||
| "qacc": self.data.qacc.copy(), # Joint accelerations | |||
| "ctrl": self.data.ctrl.copy(), # Control inputs/actuator commands | |||
| "qfrc_applied": self.data.qfrc_applied.copy(), # External forces applied to joints | |||
| "sensordata": self.data.sensordata.copy() if self.model.nsensor > 0 else np.array([]) # Sensor readings | |||
| } | |||
| def get_state(self) -> Dict[str, Any]: | |||
| """Get current simulation state.""" | |||
| return self.state_data.copy() | |||
| def main(): | |||
| """Run the main Dora node function.""" | |||
| node = Node() | |||
| # Initialize simulator | |||
| simulator = MuJoCoSimulator() | |||
| # Load model | |||
| if not simulator.load_model(): | |||
| print("Failed to load MuJoCo model") | |||
| return | |||
| print("MuJoCo simulation node started") | |||
| # Launch viewer | |||
| with mujoco.viewer.launch_passive(simulator.model, simulator.data) as viewer: | |||
| print("MuJoCo viewer launched - simulation running") | |||
| # Main Dora event loop | |||
| for event in node: | |||
| if event["type"] == "INPUT": | |||
| # Handle control input | |||
| if event["id"] == "control_input": | |||
| control_array = event["value"].to_numpy() | |||
| simulator.apply_control(control_array) | |||
| # print(f"Applied control: {control_array[:7]}") # Show first 7 values | |||
| # Step simulation once per loop iteration | |||
| simulator.step_simulation() | |||
| viewer.sync() | |||
| # Send outputs after stepping | |||
| if event["type"] == "INPUT": | |||
| state = simulator.get_state() | |||
| current_time = state.get("time", time.time()) | |||
| # Send joint positions | |||
| node.send_output( | |||
| "joint_positions", | |||
| pa.array(state["qpos"]), | |||
| {"timestamp": current_time, "encoding": "jointstate"} | |||
| ) | |||
| # Send joint velocities | |||
| node.send_output( | |||
| "joint_velocities", | |||
| pa.array(state["qvel"]), | |||
| {"timestamp": current_time} | |||
| ) | |||
| # Send actuator controls | |||
| node.send_output( | |||
| "actuator_controls", | |||
| pa.array(state["ctrl"]), | |||
| {"timestamp": current_time} | |||
| ) | |||
| # Send sensor data if available | |||
| if len(state["sensordata"]) > 0: | |||
| node.send_output( | |||
| "sensor_data", | |||
| pa.array(state["sensordata"]), | |||
| {"timestamp": current_time} | |||
| ) | |||
| if __name__ == "__main__": | |||
| main() | |||
| @@ -0,0 +1,31 @@ | |||
| [project] | |||
| name = "dora-mujoco" | |||
| version = "0.1.0" | |||
| authors = [{ name = "Your Name", email = "email@email.com" }] | |||
| description = "MuJoCo simulation node for Dora" | |||
| license = { text = "MIT" } | |||
| readme = "README.md" | |||
| requires-python = ">=3.8" | |||
| dependencies = [ | |||
| "dora-rs >= 0.3.9", | |||
| "mujoco >= 3.1.6", | |||
| "numpy >= 1.21.0", | |||
| "pyarrow >= 14.0.1", | |||
| "robot_descriptions >= 1.12.0", | |||
| ] | |||
| [dependency-groups] | |||
| dev = ["pytest >=8.1.1", "ruff >=0.9.1"] | |||
| [project.scripts] | |||
| dora-mujoco = "dora_mujoco.main:main" | |||
| [tool.ruff.lint] | |||
| extend-select = [ | |||
| "UP", # pyupgrade | |||
| "PERF", # Ruff's PERF rule | |||
| "RET", # Ruff's RET rule | |||
| "RSE", # Ruff's RSE rule | |||
| "N", # Ruff's N rule | |||
| ] | |||
| @@ -0,0 +1,9 @@ | |||
| import pytest | |||
| def test_import_main(): | |||
| from dora_mujoco.main import main | |||
| # Check that everything is working, and catch dora Runtime Exception as we're not running in a dora dataflow. | |||
| with pytest.raises(RuntimeError): | |||
| main() | |||
| @@ -9,10 +9,12 @@ import pytorch_kinematics as pk | |||
| import torch | |||
| from dora import Node | |||
| from pytorch_kinematics.transforms.rotation_conversions import matrix_to_euler_angles | |||
| from pathlib import Path | |||
| import importlib | |||
| TRANSFORM = np.array(os.getenv("TRANSFORM", "0. 0. 0. 0. 0. 0. 1.").split(" ")).astype( | |||
| TRANSFORM = np.array(os.getenv("TRANSFORM", "0. 0. 0. 1. 0. 0. 0.").split(" ")).astype( | |||
| np.float32, | |||
| ) | |||
| ) # wxyz format | |||
| pos = torch.tensor([TRANSFORM[0], TRANSFORM[1], TRANSFORM[2]]) | |||
| rot = torch.tensor( | |||
| [ | |||
| @@ -74,25 +76,26 @@ class RobotKinematics: | |||
| def __init__( | |||
| self, | |||
| urdf_path: str, | |||
| urdf: str, | |||
| end_effector_link: str, | |||
| device: Union[str, torch.device] = "cpu", | |||
| ): | |||
| """Initialize the kinematic chain from a URDF. | |||
| Args: | |||
| urdf (str): URDF string data of the URDF | |||
| urdf_path (str): Path to the URDF file. | |||
| end_effector_link (str): Name of the end-effector link. | |||
| device (Union[str, torch.device]): Computation device ('cpu' or 'cuda'). | |||
| """ | |||
| self.device = torch.device(device) | |||
| try: | |||
| # Load kinematic chain (core pytorch_kinematics object) | |||
| self.chain = pk.build_serial_chain_from_urdf( | |||
| open(urdf_path, mode="rb").read(), end_effector_link, | |||
| ).to(device=self.device) | |||
| except Exception as e: | |||
| raise RuntimeError(f"Failed to build chain from URDF: {urdf_path}") from e | |||
| if urdf_path: | |||
| urdf = open(urdf_path, mode="rb").read() | |||
| # Load kinematic chain (core pytorch_kinematics object) | |||
| self.chain = pk.build_serial_chain_from_urdf( | |||
| urdf, end_effector_link, | |||
| ).to(device=self.device) | |||
| self.num_joints = len(self.chain.get_joint_parameter_names(exclude_fixed=True)) | |||
| # Default initial guess for IK if none provided | |||
| @@ -117,15 +120,21 @@ class RobotKinematics: | |||
| ) | |||
| if j.ndim == 1: | |||
| if j.shape[0] != self.num_joints: | |||
| raise ValueError(f"Expected {self.num_joints} joints, got {j.shape[0]}") | |||
| # Handle case with extra joints (e.g., gripper joints) | |||
| if j.shape[0] > self.num_joints: | |||
| j = j[:self.num_joints] # Truncate griper or extra joints | |||
| elif j.shape[0] < self.num_joints: | |||
| raise ValueError(f"Expected at least {self.num_joints} joints, got {j.shape[0]}") | |||
| if batch_dim_required: | |||
| j = j.unsqueeze(0) # Add batch dimension if needed (e.g., shape (1, N)) | |||
| j = j.unsqueeze(0) # Add batch dimension if needed | |||
| elif j.ndim == 2: | |||
| if j.shape[1] != self.num_joints: | |||
| raise ValueError( | |||
| f"Expected {self.num_joints} joints (dim 1), got {j.shape[1]}", | |||
| ) | |||
| # Handle batched input with extra joints | |||
| if j.shape[1] > self.num_joints: | |||
| j = j[:, :self.num_joints] # Truncate griper or extra joints | |||
| elif j.shape[1] < self.num_joints: | |||
| raise ValueError(f"Expected at least {self.num_joints} joints (dim 1), got {j.shape[1]}") | |||
| if batch_dim_required and j.shape[0] != 1: | |||
| # Most common IK solvers expect batch=1 for initial guess, FK can handle batches | |||
| # Relaxing this check slightly, but user should be aware for IK | |||
| @@ -222,78 +231,212 @@ class RobotKinematics: | |||
| return None | |||
| return solution_angles.solutions.detach() | |||
| def compute_jacobian( | |||
| self, joint_angles: Union[List[float], torch.Tensor] | |||
| ) -> torch.Tensor: | |||
| """Compute Jacobian matrix using PyTorch Kinematics. | |||
| Args: | |||
| joint_angles (Union[List[float], torch.Tensor]): Joint angles (radians). | |||
| Shape (num_joints,) or (B, num_joints). | |||
| Returns: | |||
| torch.Tensor: Jacobian matrix of shape (B, 6, num_joints) or (6, num_joints) | |||
| where rows are [linear_vel_x, linear_vel_y, linear_vel_z, | |||
| angular_vel_x, angular_vel_y, angular_vel_z] | |||
| """ | |||
| angles_tensor = self._prepare_joint_tensor( | |||
| joint_angles, batch_dim_required=False | |||
| ) | |||
| # Ensure we have batch dimension for jacobian computation | |||
| if angles_tensor.ndim == 1: | |||
| angles_tensor = angles_tensor.unsqueeze(0) | |||
| squeeze_output = True | |||
| else: | |||
| squeeze_output = False | |||
| # Compute Jacobian (returns shape: (B, 6, num_joints)) | |||
| J = self.chain.jacobian(angles_tensor) | |||
| # Remove batch dimension if input was 1D | |||
| if squeeze_output: | |||
| J = J.squeeze(0) | |||
| return J | |||
| def compute_jacobian_numpy( | |||
| self, joint_angles: Union[List[float], torch.Tensor, np.ndarray] | |||
| ) -> np.ndarray: | |||
| """Compute Jacobian matrix and return as numpy array. | |||
| Args: | |||
| joint_angles: Joint angles (radians). Can be list, torch.Tensor, or np.ndarray. | |||
| Shape (num_joints,) or (B, num_joints). | |||
| Returns: | |||
| np.ndarray: Jacobian matrix as numpy array of shape (6, num_joints) or (B, 6, num_joints) | |||
| """ | |||
| J = self.compute_jacobian(joint_angles) | |||
| return J.cpu().numpy() | |||
| def load_robot_description_with_cache_substitution(robot_name: str) -> str: | |||
| """Load a robot's URDF or MJCF file and replace package:// URIs with cache paths. | |||
| Args: | |||
| robot_name: str (e.g., "iiwa7_description"). The robot name handler | |||
| Returns: | |||
| ------- | |||
| - str: File content with all package:// URIs replaced | |||
| """ | |||
| try: | |||
| # Dynamically import the robot description module | |||
| desc_module = importlib.import_module(f"robot_descriptions.{robot_name}") | |||
| # Find the URDF or MJCF path | |||
| if hasattr(desc_module, "URDF_PATH"): | |||
| file_path = Path(desc_module.URDF_PATH) | |||
| elif hasattr(desc_module, "MJCF_PATH"): | |||
| file_path = Path(desc_module.MJCF_PATH) | |||
| else: | |||
| raise ValueError(f"No URDF or MJCF path found for '{robot_name}'.") | |||
| print(f"Loading robot description from: {file_path}") | |||
| # Read and replace | |||
| with open(file_path) as f: | |||
| content = f.read() | |||
| print("URDF PATH: ", file_path) | |||
| content = content.encode("utf-8") | |||
| return content | |||
| except ModuleNotFoundError: | |||
| raise ValueError(f"Robot '{robot_name}' not found.") | |||
| except Exception as e: | |||
| raise RuntimeError(f"Failed to process robot description: {e}") | |||
| def main(): | |||
| """TODO: Add docstring.""" | |||
| node = Node() | |||
| urdf_path = os.getenv("URDF_PATH") | |||
| model = os.getenv("URDF_PATH") | |||
| end_effector_link = os.getenv("END_EFFECTOR_LINK") | |||
| robot = RobotKinematics(urdf_path, end_effector_link) | |||
| if not model or not Path(model).exists(): | |||
| model_name = os.getenv("MODEL_NAME") | |||
| model = load_robot_description_with_cache_substitution(model_name) | |||
| robot = RobotKinematics(urdf_path="", urdf=model, end_effector_link=end_effector_link) | |||
| else: | |||
| robot = RobotKinematics(urdf_path=model, urdf="", end_effector_link=end_effector_link) | |||
| last_known_state = None | |||
| for event in node: | |||
| if event["type"] == "INPUT": | |||
| metadata = event["metadata"] | |||
| match metadata["encoding"]: | |||
| case "xyzquat": | |||
| # Apply Inverse Kinematics | |||
| if last_known_state is not None: | |||
| target = event["value"].to_numpy() | |||
| target = target.astype(np.float32) | |||
| target = pk.Transform3d( | |||
| pos=target[:3], rot=torch.tensor(target[3:7]), | |||
| ) | |||
| solution = robot.compute_ik(target, last_known_state) | |||
| metadata["encoding"] = "jointstate" | |||
| last_known_state = solution.numpy().ravel() | |||
| solution = pa.array(last_known_state) | |||
| node.send_output(event["id"], solution, metadata=metadata) | |||
| case "xyzrpy": | |||
| # Apply Inverse Kinematics | |||
| if last_known_state is not None: | |||
| target = event["value"].to_numpy() | |||
| target = target.astype(np.float32) | |||
| target = pk.Transform3d( | |||
| pos=target[:3], | |||
| rot=pk.transforms.euler_angles_to_matrix( | |||
| torch.tensor(target[3:6]), convention="XYZ", | |||
| ), | |||
| if event["id"] == "cmd_vel": | |||
| if last_known_state is not None: | |||
| target_vel = event["value"].to_numpy() # expect 100ms | |||
| # Apply Forward Kinematics | |||
| target = robot.compute_fk(last_known_state) | |||
| target = np.array(get_xyz_rpy_array_from_transform3d(target)) + target_vel | |||
| target = pa.array(target.ravel(), type=pa.float32()) | |||
| target = pk.Transform3d( | |||
| pos=target[:3], | |||
| rot=pk.transforms.euler_angles_to_matrix( | |||
| torch.tensor(target[3:6]), convention="XYZ", | |||
| ), | |||
| ) | |||
| rob_target = ROB_TF.inverse().compose(target) | |||
| solution = robot.compute_ik(rob_target, last_known_state) | |||
| if solution is None: | |||
| print( | |||
| "No IK Solution for :", target, "skipping this frame.", | |||
| ) | |||
| rob_target = ROB_TF.inverse().compose(target) | |||
| solution = robot.compute_ik(rob_target, last_known_state) | |||
| if solution is None: | |||
| print( | |||
| "No IK Solution for :", target, "skipping this frame.", | |||
| continue | |||
| solution = solution.numpy().ravel() | |||
| metadata["encoding"] = "jointstate" | |||
| last_known_state = solution | |||
| solution = pa.array(last_known_state) | |||
| node.send_output(event["id"], solution, metadata=metadata) | |||
| else: | |||
| match metadata["encoding"]: | |||
| case "xyzquat": | |||
| # Apply Inverse Kinematics | |||
| if last_known_state is not None: | |||
| target = event["value"].to_numpy() | |||
| target = target.astype(np.float32) | |||
| target = pk.Transform3d( | |||
| pos=target[:3], rot=torch.tensor(target[3:7]), | |||
| ) | |||
| continue | |||
| solution = solution.numpy().ravel() | |||
| delta_angles = solution - last_known_state | |||
| valid = np.all( | |||
| (delta_angles >= -np.pi) & (delta_angles <= np.pi), | |||
| ) | |||
| if not valid: | |||
| print( | |||
| "IK solution is not valid, as the rotation are too wide. skipping.", | |||
| rob_target = ROB_TF.inverse().compose(target) | |||
| solution = robot.compute_ik(rob_target, last_known_state) | |||
| metadata["encoding"] = "jointstate" | |||
| last_known_state = solution.numpy().ravel() | |||
| solution = pa.array(last_known_state) | |||
| node.send_output(event["id"], solution, metadata=metadata) | |||
| case "xyzrpy": | |||
| # Apply Inverse Kinematics | |||
| if last_known_state is not None: | |||
| target = event["value"].to_numpy() | |||
| target = target.astype(np.float32) | |||
| target = pk.Transform3d( | |||
| pos=target[:3], | |||
| rot=pk.transforms.euler_angles_to_matrix( | |||
| torch.tensor(target[3:6]), convention="XYZ", | |||
| ), | |||
| ) | |||
| continue | |||
| metadata["encoding"] = "jointstate" | |||
| last_known_state = solution | |||
| solution = pa.array(last_known_state) | |||
| node.send_output(event["id"], solution, metadata=metadata) | |||
| case "jointstate": | |||
| target = event["value"].to_numpy() | |||
| last_known_state = target | |||
| # Apply Forward Kinematics | |||
| target = robot.compute_fk(target) | |||
| target = np.array(get_xyz_rpy_array_from_transform3d(target)) | |||
| target = pa.array(target.ravel(), type=pa.float32()) | |||
| metadata["encoding"] = "xyzrpy" | |||
| node.send_output(event["id"], target, metadata=metadata) | |||
| rob_target = ROB_TF.inverse().compose(target) | |||
| solution = robot.compute_ik(rob_target, last_known_state) | |||
| if solution is None: | |||
| print( | |||
| "No IK Solution for :", target, "skipping this frame.", | |||
| ) | |||
| continue | |||
| solution = solution.numpy().ravel() | |||
| delta_angles = solution - last_known_state[:len(solution)] # match with dof | |||
| valid = np.all( | |||
| (delta_angles >= -np.pi) & (delta_angles <= np.pi), | |||
| ) | |||
| if not valid: | |||
| print( | |||
| "IK solution is not valid, as the rotation are too wide. skipping.", | |||
| ) | |||
| continue | |||
| metadata["encoding"] = "jointstate" | |||
| last_known_state = solution | |||
| solution = pa.array(last_known_state) | |||
| node.send_output(event["id"], solution, metadata=metadata) | |||
| case "jointstate": | |||
| target = event["value"].to_numpy() | |||
| last_known_state = target | |||
| # Apply Forward Kinematics | |||
| target = robot.compute_fk(target) | |||
| target = np.array(get_xyz_rpy_array_from_transform3d(target)) | |||
| target = pa.array(target.ravel(), type=pa.float32()) | |||
| metadata["encoding"] = "xyzrpy" | |||
| node.send_output(event["id"], target, metadata=metadata) | |||
| case "jacobian": | |||
| # Compute Jacobian matrix | |||
| joint_angles = event["value"].to_numpy() | |||
| jacobian = robot.compute_jacobian_numpy(joint_angles) | |||
| jacobian_flat = jacobian.flatten() | |||
| jacobian_array = pa.array(jacobian_flat, type=pa.float32()) | |||
| metadata["encoding"] = "jacobian_result" | |||
| metadata["jacobian_shape"] = list(jacobian.shape) | |||
| node.send_output(event["id"], jacobian_array, metadata=metadata) | |||
| if __name__ == "__main__": | |||
| main() | |||
| @@ -9,7 +9,9 @@ requires-python = ">=3.8" | |||
| dependencies = [ | |||
| "dora-rs >= 0.3.9", | |||
| "mujoco>=3.2.3", | |||
| "pytorch-kinematics>=0.7.5", | |||
| "robot-descriptions>=1.17.0", | |||
| ] | |||
| [dependency-groups] | |||
| @@ -9,6 +9,7 @@ A Dora framework node that provides universal gamepad control functionality. Whi | |||
| - Configurable axis mapping for different controllers | |||
| - Dead zone handling for precise control | |||
| - Real-time velocity command output | |||
| - Raw gamepad state output for custom processing | |||
| - Clean shutdown with automatic zero velocity | |||
| - Customizable speed limits | |||
| - Default configuration optimized for Logitech F710 | |||
| @@ -20,6 +21,7 @@ A Dora framework node that provides universal gamepad control functionality. Whi | |||
| - Python 3.8 or higher | |||
| - Dora Framework 0.3.6 or higher | |||
| - Any compatible USB/Wireless gamepad | |||
| - Pygame library for joystick handling | |||
| ### Installation | |||
| @@ -54,10 +56,18 @@ sudo chmod +x /dev/input/js* | |||
| ### Controls | |||
| - **Left Stick Y-Axis**: Forward/Backward movement (±2.0 m/s) | |||
| - **Left Stick X-Axis**: Left/Right turning (±1.5 rad/s) | |||
| #### Default Control Mapping (cmd_vel output): | |||
| - **D-pad Vertical**: Linear X movement (forward/backward) | |||
| - **D-pad Horizontal**: Linear Y movement (left/right) | |||
| - **Right Stick Y-Axis**: Linear Z movement (up/down, ±0.1 m/s) | |||
| - **Right Stick X-Axis**: Angular Z rotation (turning, ±0.8 rad/s) | |||
| - **Left Stick Y-Axis**: Angular X rotation (±0.8 rad/s) | |||
| - **Left Stick X-Axis**: Angular Y rotation (±0.8 rad/s) | |||
| - **Deadzone**: 5% to prevent drift | |||
| #### Raw Control Output: | |||
| The node also outputs complete gamepad state including all buttons, axes, and D-pad for custom processing. | |||
| ### YAML Specification | |||
| ```yaml | |||
| @@ -66,20 +76,55 @@ nodes: | |||
| build: pip install -e . | |||
| path: gamepad | |||
| outputs: | |||
| - cmd_vel # Velocity commands [vx, vy, vz, wx, wy, wz] | |||
| - cmd_vel # Velocity commands [vx, vy, vz, wx, wy, wz] | |||
| - raw_control # Complete gamepad state (JSON) | |||
| inputs: | |||
| tick: dora/timer/millis/10 # Update rate | |||
| tick: dora/timer/millis/10 # Update rate (100Hz) | |||
| ``` | |||
| ### Outputs | |||
| The node outputs `cmd_vel` as a 6-element array: | |||
| - `[0]`: Linear X velocity (forward/backward) | |||
| - `[1]`: Linear Y velocity (always 0) | |||
| - `[2]`: Linear Z velocity (always 0) | |||
| - `[3]`: Angular X velocity (always 0) | |||
| - `[4]`: Angular Y velocity (always 0) | |||
| - `[5]`: Angular Z velocity (turning) | |||
| #### 1. `cmd_vel` - 6-element velocity array: | |||
| - `[0]`: Linear X velocity (D-pad vertical) | |||
| - `[1]`: Linear Y velocity (D-pad horizontal) | |||
| - `[2]`: Linear Z velocity (right stick Y) | |||
| - `[3]`: Angular X velocity (left stick Y) | |||
| - `[4]`: Angular Y velocity (left stick X) | |||
| - `[5]`: Angular Z velocity (right stick X) | |||
| #### 2. `raw_control` - Complete gamepad state (JSON): | |||
| ```json | |||
| { | |||
| "axes": [left_x, left_y, right_x, right_y, ...], | |||
| "buttons": [X, A, B, Y, LB, RB, LT, RT, BACK, START, LEFT_STICK, RIGHT_STICK], | |||
| "hats": [[dpad_x, dpad_y]], | |||
| "mapping": { | |||
| "axes": {"LEFT-X": 0, "LEFT-Y": 1, "RIGHT-X": 2, "RIGHT-Y": 3}, | |||
| "buttons": {"X": 0, "A": 1, "B": 2, "Y": 3, "LB": 4, "RB": 5, ...} | |||
| } | |||
| } | |||
| ``` | |||
| ## Controller Configuration | |||
| To use a different controller, modify the `Controller` class mapping: | |||
| ```python | |||
| class Controller: | |||
| def __init__(self): | |||
| # Customize for your controller | |||
| self.axisNames = { | |||
| 'LEFT-X': 0, | |||
| 'LEFT-Y': 1, | |||
| 'RIGHT-X': 2, | |||
| 'RIGHT-Y': 3, | |||
| } | |||
| self.buttonNames = { | |||
| 'X': 0, | |||
| 'A': 1, | |||
| # ... add your button mapping | |||
| } | |||
| ``` | |||
| ## Development | |||
| @@ -97,6 +142,28 @@ Run tests with pytest: | |||
| pytest . | |||
| ``` | |||
| ## Integration Examples | |||
| ### Simple Velocity Control: | |||
| ```python | |||
| for event in node: | |||
| if event["id"] == "cmd_vel": | |||
| velocity = event["value"].to_numpy() | |||
| # Apply velocity to your robot/system | |||
| print(f"Velocity: {velocity}") | |||
| ``` | |||
| ### Custom Button Processing: | |||
| ```python | |||
| import json | |||
| for event in node: | |||
| if event["id"] == "raw_control": | |||
| control_state = json.loads(event["value"].to_pylist()[0]) | |||
| buttons = control_state["buttons"] | |||
| axis = control_state[axis] | |||
| ``` | |||
| ## Troubleshooting | |||
| **No gamepad detected**: | |||
| @@ -104,6 +171,16 @@ pytest . | |||
| - Ensure controller is powered on | |||
| - Verify mode switch position | |||
| **Raw control output empty**: | |||
| - Verify gamepad is responding with `jstest /dev/input/js0` | |||
| - Check pygame initialization | |||
| **Incorrect button mapping**: | |||
| - Different controllers have different mappings | |||
| - Use `jstest` to identify your controller's button/axis numbers | |||
| - Update the `Controller` class accordingly | |||
| **If port is not executable then run:** | |||
| ```bash | |||
| sudo chmod +x /dev/input/js* | |||
| @@ -117,4 +194,5 @@ Released under the MIT License. See LICENSE file for details. | |||
| - Uses Pygame for joystick handling | |||
| - Built with the Dora framework | |||
| - Designed for Logitech F710 gamepad | |||
| - Designed for Logitech F710 gamepad | |||
| - Supports any standard USB/Bluetooth gamepad | |||
| @@ -1,41 +1,36 @@ | |||
| """Gamepad controller node for Dora. | |||
| This module provides a Dora node that reads input from a controller and publishes velocity commands for robot control. | |||
| It handles controller mapping, deadzone filtering, and velocity scaling. | |||
| """ | |||
| """Gamepad controller node for Dora.""" | |||
| from dora import Node | |||
| import pygame | |||
| import pyarrow as pa | |||
| import json | |||
| class Controller: | |||
| """controller mapping.""" | |||
| """Controller mapping.""" | |||
| def __init__(self): | |||
| """Change this according to your controller mapping. Currently Logitech F710.""" | |||
| self.axisNames = { | |||
| 'LEFT-X': 0, | |||
| 'LEFT-Y': 1, | |||
| 'LT': 2, | |||
| 'RIGHT-X': 3, | |||
| 'RIGHT-Y': 4, | |||
| 'RT': 5, | |||
| 'DPAD-X': 6, | |||
| 'DPAD-Y': 7 | |||
| 'LEFT-Y': 1, | |||
| 'RIGHT-X': 2, | |||
| 'RIGHT-Y': 3, | |||
| } | |||
| self.buttonNames = { | |||
| 'A': 0, | |||
| 'B': 1, | |||
| 'X': 2, | |||
| 'X': 0, | |||
| 'A': 1, | |||
| 'B': 2, | |||
| 'Y': 3, | |||
| 'LB': 4, | |||
| 'RB': 5, | |||
| 'BACK': 6, | |||
| 'START': 7, | |||
| 'LOGITECH': 8, | |||
| 'LEFT-STICK': 9, | |||
| 'RIGHT-STICK': 10 | |||
| 'LT': 6, | |||
| 'RT': 7, | |||
| 'BACK': 8, | |||
| 'START': 9, | |||
| 'LEFT-STICK': 10, | |||
| 'RIGHT-STICK': 11, | |||
| } | |||
| self.hatIndex = 0 # Index of the D-pad hat | |||
| def main(): | |||
| node = Node("gamepad") | |||
| @@ -52,50 +47,97 @@ def main(): | |||
| controller = Controller() | |||
| max_linear_speed = 1.0 # Maximum speed in m/s | |||
| max_angular_speed = 1.5 # Maximum angular speed in rad/s | |||
| move_speed = 0.05 # Fixed increment for D-pad | |||
| max_linear_z_speed = 0.1 # Maximum linear Z speed | |||
| max_angular_speed = 0.8 # Maximum angular speed | |||
| print("Gamepad Controls:") | |||
| print("Left Stick Y-Axis: Forward/Backward") | |||
| print("Left Stick X-Axis: Left/Right Turn") | |||
| print("Mode switch should be in 'D' position") | |||
| print(f"Detected controller: {joystick.get_name()}") | |||
| print(f"Number of axes: {joystick.get_numaxes()}") | |||
| print(f"Number of buttons: {joystick.get_numbuttons()}") | |||
| print("Press Ctrl+C to exit") | |||
| try: | |||
| for event in node: | |||
| pygame.event.pump() | |||
| forward = -joystick.get_axis(controller.axisNames['LEFT-Y']) | |||
| turn = -joystick.get_axis(controller.axisNames['LEFT-X']) | |||
| # Get all controller states | |||
| axes = [joystick.get_axis(i) for i in range(joystick.get_numaxes())] | |||
| buttons = [joystick.get_button(i) for i in range(joystick.get_numbuttons())] | |||
| # Get hat state (D-pad) | |||
| dpad_x, dpad_y = 0, 0 | |||
| if joystick.get_numhats() > 0: | |||
| dpad_x, dpad_y = joystick.get_hat(controller.hatIndex) | |||
| # Create raw control state | |||
| raw_control = { | |||
| "axes": axes, | |||
| "buttons": buttons, | |||
| "hats": [[dpad_x, dpad_y]], | |||
| "mapping": { | |||
| "axes": controller.axisNames, | |||
| "buttons": controller.buttonNames | |||
| } | |||
| } | |||
| # cmd_vel processing: | |||
| # 1. D-pad vertical for X axis | |||
| # 2. D-pad horizontal for Y axis | |||
| # 3. Right stick vertical for Z axis | |||
| # 4. Right stick horizontal for rotation around Z | |||
| # 5. Left stick vertical for rotation around X | |||
| # 6. Left stick horizontal for rotation around Y | |||
| deadzone = 0.05 | |||
| forward = 0.0 if abs(forward) < deadzone else forward | |||
| turn = 0.0 if abs(turn) < deadzone else turn | |||
| # Linear X velocity from D-pad vertical | |||
| linear_x = 0.0 | |||
| if dpad_y != 0: | |||
| linear_x = dpad_y * move_speed | |||
| # Linear Y velocity from D-pad horizontal | |||
| linear_y = 0.0 | |||
| if dpad_x != 0: | |||
| linear_y = dpad_x * move_speed | |||
| forward_speed = forward * max_linear_speed | |||
| turn_speed = turn * max_angular_speed | |||
| cmd_vel = [ | |||
| forward_speed, | |||
| 0.0, | |||
| 0.0, | |||
| 0.0, | |||
| 0.0, | |||
| turn_speed | |||
| ] | |||
| # Linear Z velocity from right stick vertical | |||
| right_y = -joystick.get_axis(controller.axisNames['RIGHT-Y']) | |||
| right_y = 0.0 if abs(right_y) < deadzone else right_y | |||
| linear_z = right_y * max_linear_z_speed | |||
| # Angular Z velocity (rotation) from right stick horizontal | |||
| right_x = -joystick.get_axis(controller.axisNames['RIGHT-X']) | |||
| right_x = 0.0 if abs(right_x) < deadzone else right_x | |||
| angular_z = 0 * max_angular_speed # TODO: Make z non zero, but on my gamepad the value is never zero | |||
| # Angular X velocity from left stick vertical | |||
| left_y = -joystick.get_axis(controller.axisNames['LEFT-Y']) | |||
| left_y = 0.0 if abs(left_y) < deadzone else left_y | |||
| angular_x = left_y * max_angular_speed | |||
| node.send_output( | |||
| output_id="cmd_vel", | |||
| data=pa.array(cmd_vel, type=pa.float64()), | |||
| metadata={"type": "cmd_vel"} | |||
| ) | |||
| # Angular Y velocity from left stick horizontal | |||
| left_x = -joystick.get_axis(controller.axisNames['LEFT-X']) | |||
| left_x = 0.0 if abs(left_x) < deadzone else left_x | |||
| angular_y = left_x * max_angular_speed | |||
| cmd_vel = [linear_x, linear_y, linear_z, angular_x, angular_y, angular_z] | |||
| if any(cmd_vel): | |||
| node.send_output( | |||
| output_id="cmd_vel", | |||
| data=pa.array(cmd_vel, type=pa.float64()), | |||
| metadata={"type": "cmd_vel"} | |||
| ) | |||
| node.send_output( | |||
| output_id="raw_control", | |||
| data=pa.array([json.dumps(raw_control)], type=pa.string()), | |||
| metadata={"type": "raw_control"} | |||
| ) | |||
| except KeyboardInterrupt: | |||
| print("\nExiting...") | |||
| finally: | |||
| pygame.quit() | |||
| # Send zero velocity at cleanup | |||
| zero_cmd = [0.0] * 6 | |||
| node.send_output( | |||
| output_id="cmd_vel", | |||
| @@ -103,6 +145,5 @@ def main(): | |||
| metadata={"type": "cmd_vel"} | |||
| ) | |||
| if __name__ == "__main__": | |||
| main() | |||
| main() | |||