From 3610ffb54f5cb262dc5059e178b772b5fa923e90 Mon Sep 17 00:00:00 2001 From: ShashwatPatil Date: Fri, 13 Jun 2025 17:45:52 +0530 Subject: [PATCH] updated and refactored code --- .../nodes/franka_controller_pytorch.py | 213 ++++++------- .../nodes/pose_publisher.py | 4 +- .../target_pose_control_pytorch.yml | 34 ++- .../03_gamepad_control/gamepad_control.yml | 30 +- .../nodes/franka_gamepad_controller.py | 283 +++++++---------- .../kuka_iiwa/model.urdf | 289 ++++++++++++++++++ .../dora_pytorch_kinematics/main.py | 85 +++++- node-hub/gamepad/gamepad/main.py | 76 +++-- 8 files changed, 662 insertions(+), 352 deletions(-) create mode 100644 examples/franka-mujoco-control/kuka_iiwa/model.urdf diff --git a/examples/franka-mujoco-control/02_target_pose_control/nodes/franka_controller_pytorch.py b/examples/franka-mujoco-control/02_target_pose_control/nodes/franka_controller_pytorch.py index dc89d5c9..18cd3832 100644 --- a/examples/franka-mujoco-control/02_target_pose_control/nodes/franka_controller_pytorch.py +++ b/examples/franka-mujoco-control/02_target_pose_control/nodes/franka_controller_pytorch.py @@ -1,198 +1,171 @@ -import torch import numpy as np import time import pyarrow as pa from dora import Node -import pytorch_kinematics as pk from scipy.spatial.transform import Rotation class FrankaController: def __init__(self): """ - Initialize the Franka controller with differential IK nullspace control + Initialize the Franka controller """ - # Load the robot model for IK and FK - urdf_path = "../franka_panda/panda.urdf" - with open(urdf_path, 'rb') as f: - urdf_content = f.read() - - # Build serial chain for kinematics - self.chain = pk.build_serial_chain_from_urdf(urdf_content, "panda_hand") - - # Move to GPU if available - self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - self.chain = self.chain.to(device=self.device) - # Control parameters self.integration_dt = 0.1 self.damping = 1e-4 self.Kpos = 0.95 # Position gain self.Kori = 0.95 # Orientation gain - self.Kn = np.array([10.0, 10.0, 10.0, 10.0, 5.0, 5.0, 5.0]) # Nullspace gains - self.max_angvel = 0.785 # Max joint velocity (rad/s) + self.max_angvel = 3.14 # Max joint velocity (rad/s) # State variables - self.current_joint_pos = np.array([0.0, -0.785, 0.0, -2.356, 0.0, 1.571, 0.785]) - self.current_joint_vel = np.zeros(7) + self.dof = None + self.current_joint_pos = None # Full robot state + self.home_pos = None # Home position for arm joints only + self.target_pos = np.array([0.4, 0.0, 0.3]) # Conservative default target self.target_rpy = [180.0, 0.0, 90.0] # Downward orientation - self.home_pos = np.array([0, 0, 0, -1.57079, 0, 1.57079, -0.7853]) # joint angles for home position - - # Initialize timestamps - self.last_time = time.time() - print("FrankaController initialized with PyTorch Kinematics Differential IK") - # print(f"Chain DOF: {len(self.chain.get_joint_parameter_names())}") + self.current_ee_pose = None # End-effector pose + self.current_jacobian = None # Jacobian matrix + print("FrankaController initialized") + + def _initialize_robot(self, positions, jacobian_dof=None): + """Initialize controller state with appropriate dimensions""" + self.full_joint_count = len(positions) + # Set DOF from Jacobian if available + self.dof = jacobian_dof if jacobian_dof is not None else self.full_joint_count + + self.current_joint_pos = positions.copy() + self.home_pos = np.zeros(self.dof) + def get_target_rotation_matrix(self): """Convert RPY to rotation matrix.""" roll_rad, pitch_rad, yaw_rad = np.radians(self.target_rpy) desired_rot = Rotation.from_euler('ZYX', [yaw_rad, pitch_rad, roll_rad]) return desired_rot.as_matrix() - def compute_jacobian_pytorch(self, joint_angles): - """Compute Jacobian using PyTorch Kinematics.""" - # Convert to torch tensor - q = torch.tensor(joint_angles, device=self.device, dtype=torch.float32).unsqueeze(0) - - # Compute Jacobian (returns 6x7 matrix: [linear_vel; angular_vel]) - J = self.chain.jacobian(q) # Shape: (1, 6, 7) - - return J.squeeze(0).cpu().numpy() # Convert back to numpy (6, 7) - - def get_current_ee_pose(self, joint_angles): - """Get current end-effector pose using forward kinematics.""" - # Convert to torch tensor - q = torch.tensor(joint_angles, device=self.device, dtype=torch.float32).unsqueeze(0) - - # Forward kinematics - tf = self.chain.forward_kinematics(q) # Returns Transform3d - - # Extract position and rotation - transform_matrix = tf.get_matrix().squeeze(0).cpu().numpy() # (4, 4) - position = transform_matrix[:3, 3] - rotation_matrix = transform_matrix[:3, :3] - - return position, rotation_matrix - - def update_joint_state(self, positions, velocities): - """Update the current joint state.""" - # Filter to only use first 7 joints - self.current_joint_pos = positions[:7] - self.current_joint_vel = velocities[:7] - def set_target_pose(self, pose_array): """Set target pose from input array.""" - if len(pose_array) >= 3: - self.target_pos = np.array(pose_array[:3]) - # print(f"Updated target position: {self.target_pos}") - - if len(pose_array) >= 6: + self.target_pos = np.array(pose_array[:3]) + if len(pose_array) == 6: self.target_rpy = list(pose_array[3:6]) - # print(f"Updated target orientation (RPY): {self.target_rpy}") + else: + self.target_rpy = [180.0, 0.0, 90.0] # Default orientation if not provided + + def update_jacobian(self, jacobian_flat, shape): + """Update current jacobian and initialize DOF if needed.""" + jacobian_dof = shape[1] + self.current_jacobian = jacobian_flat.reshape(shape) + + if self.dof is None: + if self.current_joint_pos is not None: + self._initialize_robot(self.current_joint_pos, jacobian_dof) + else: + self.dof = jacobian_dof + elif self.dof != jacobian_dof: + self.dof = jacobian_dof + self.home_pos = np.zeros(self.dof) def apply_differential_ik_control(self): """Apply differential IK control with nullspace projection.""" - current_ee_pos, current_ee_rot = self.get_current_ee_pose(self.current_joint_pos) + if self.current_ee_pose is None or self.current_jacobian is None: + return self.current_joint_pos + + current_ee_pos = self.current_ee_pose['position'] + current_ee_rpy = self.current_ee_pose['rpy'] + # Calculate position and orientation error pos_error = self.target_pos - current_ee_pos twist = np.zeros(6) twist[:3] = self.Kpos * pos_error / self.integration_dt - current_rot = Rotation.from_matrix(current_ee_rot) + # Convert current RPY to rotation matrix + current_rot = Rotation.from_euler('XYZ', current_ee_rpy) desired_rot = Rotation.from_matrix(self.get_target_rotation_matrix()) rot_error = (desired_rot * current_rot.inv()).as_rotvec() twist[3:] = self.Kori * rot_error / self.integration_dt - - # Get Jacobian using PyTorch Kinematics - jac = self.compute_jacobian_pytorch(self.current_joint_pos) # (6, 7) + jac = self.current_jacobian # Damped least squares inverse kinematics diag = self.damping * np.eye(6) dq = jac.T @ np.linalg.solve(jac @ jac.T + diag, twist) - # Nullspace control - jac_pinv = np.linalg.pinv(jac) - N = np.eye(7) - jac_pinv @ jac # Nullspace projection matrix - dq_null = self.Kn * (self.home_pos - self.current_joint_pos) # Nullspace velocity - dq += N @ dq_null # nullspace movement + # # Apply nullspace control + # # Calculate nullspace projection # uncomment to enable nullspace control + + # current_arm = self.current_joint_pos[:self.dof] + # jac_pinv = np.linalg.pinv(jac) + # N = np.eye(self.dof) - jac_pinv @ jac + + # # Apply gains to pull towards home position + # k_null = np.ones(self.dof) * 5.0 + # dq_null = k_null * (self.home_pos - current_arm) # Nullspace velocity + # dq += N @ dq_null # Nullspace movement # Limit joint velocities dq_abs_max = np.abs(dq).max() if dq_abs_max > self.max_angvel: dq *= self.max_angvel / dq_abs_max - # Integrate to get new joint positions - new_joints = self.current_joint_pos + dq * self.integration_dt - - # Apply joint limits (simple clipping - you could load from URDF) - joint_limits = np.array([ - [-2.8973, 2.8973], - [-1.7628, 1.7628], - [-2.8973, 2.8973], - [-3.0718, -0.0698], - [-2.8973, 2.8973], - [-0.0175, 3.7525], - [-2.8973, 2.8973] - ]) - - for i in range(7): - new_joints[i] = np.clip(new_joints[i], joint_limits[i][0], joint_limits[i][1]) - - # Debug output - # pos_error_mag = np.linalg.norm(pos_error) - # rot_error_mag = np.linalg.norm(rot_error) - # print(f"Position error: {pos_error_mag:.4f}, Rotation error: {rot_error_mag:.4f}") - # print(f"Joint velocity magnitude: {np.linalg.norm(dq):.4f}") + # Create full joint command (apply IK result to arm joints, keep other joints unchanged) + new_joints = self.current_joint_pos.copy() + new_joints[:self.dof] += dq * self.integration_dt return new_joints def main(): node = Node() - controller = FrankaController() for event in node: if event["type"] == "INPUT": if event["id"] == "joint_positions": joint_pos = event["value"].to_numpy() - controller.update_joint_state(joint_pos, controller.current_joint_vel) - # Apply differential IK control - joint_commands = controller.apply_differential_ik_control() - - # Pad control to match full robot DOF if needed - if len(joint_commands) == 7: # Arm only - # Add zero control for gripper joints - full_control = np.zeros(9) - full_control[:7] = joint_commands - control_to_send = full_control + + if controller.current_joint_pos is None: + controller._initialize_robot(joint_pos) else: - control_to_send = joint_commands + controller.current_joint_pos = joint_pos - # Send control commands to the robot + # Request FK computation node.send_output( - "joint_commands", - pa.array(control_to_send, type=pa.float32()), - metadata={"timestamp": time.time()} + "fk_request", + pa.array(controller.current_joint_pos, type=pa.float32()), + metadata={"encoding": "jointstate", "timestamp": time.time()} ) - # Send current end-effector position - current_ee_pos, _ = controller.get_current_ee_pose(controller.current_joint_pos) + # Request Jacobian computation node.send_output( - "ee_position", - pa.array(current_ee_pos, type=pa.float32()), + "jacobian_request", + pa.array(controller.current_joint_pos, type=pa.float32()), + metadata={"encoding": "jacobian", "timestamp": time.time()} + ) + joint_commands = controller.apply_differential_ik_control() + + # Send control commands to the robot + node.send_output( + "joint_commands", + pa.array(joint_commands, type=pa.float32()), metadata={"timestamp": time.time()} ) - - if event["id"] == "joint_velocities": - joint_vel = event["value"].to_numpy() - controller.update_joint_state(controller.current_joint_pos, joint_vel) - + # Handle target pose updates if event["id"] == "target_pose": target_pose = event["value"].to_numpy() - # print(f"Received target pose from publisher: {target_pose}") controller.set_target_pose(target_pose) + + # Handle FK results + if event["id"] == "fk_result": + if event["metadata"]["encoding"] == "xyzrpy": + ee_pose = event["value"].to_numpy() + controller.current_ee_pose = {'position': ee_pose[:3],'rpy': ee_pose[3:6]} + + # Handle Jacobian results + if event["id"] == "jacobian_result": + if event["metadata"]["encoding"] == "jacobian_result": + jacobian_flat = event["value"].to_numpy() + jacobian_shape = event["metadata"]["jacobian_shape"] + controller.update_jacobian(jacobian_flat, jacobian_shape) if __name__ == "__main__": main() \ No newline at end of file diff --git a/examples/franka-mujoco-control/02_target_pose_control/nodes/pose_publisher.py b/examples/franka-mujoco-control/02_target_pose_control/nodes/pose_publisher.py index 6b964483..355dc01f 100644 --- a/examples/franka-mujoco-control/02_target_pose_control/nodes/pose_publisher.py +++ b/examples/franka-mujoco-control/02_target_pose_control/nodes/pose_publisher.py @@ -12,8 +12,8 @@ class PosePublisher: [0.5, 0.5, 0.3, 180.0, 0.0, 90.0], [0.6, 0.2, 0.5, 180.0, 0.0, 45.0], [0.7, 0.1, 0.4, 90.0, 90.0, 90.0], - [0.5, -0.3, 0.6, 180.0, 0.0, 135.0], - [-0.3, -0.7, 0.2, 180.0, 0.0, 90.0], + [0.4, -0.3, 0.5, 180.0, 0.0, 135.0], + [-0.3, -0.6, 0.3, 180.0, 0.0, 90.0], ] self.current_pose_index = 0 diff --git a/examples/franka-mujoco-control/02_target_pose_control/target_pose_control_pytorch.yml b/examples/franka-mujoco-control/02_target_pose_control/target_pose_control_pytorch.yml index 1166e35d..aa9d8fa5 100644 --- a/examples/franka-mujoco-control/02_target_pose_control/target_pose_control_pytorch.yml +++ b/examples/franka-mujoco-control/02_target_pose_control/target_pose_control_pytorch.yml @@ -4,7 +4,7 @@ nodes: path: dora-mujoco inputs: tick: dora/timer/millis/2 - control_input: franka_controller_pytorch/joint_commands + control_input: kuka_controller_pytorch/joint_commands outputs: - joint_positions - joint_velocities @@ -13,32 +13,36 @@ nodes: env: MODEL_NAME: "panda" - - id: franka_controller_pytorch + - id: kuka_controller_pytorch path: nodes/franka_controller_pytorch.py inputs: joint_positions: mujoco_sim/joint_positions joint_velocities: mujoco_sim/joint_velocities target_pose: pose_publisher/target_pose - # fk_result: pytorch_kinematics/fk_result # Make sure this matches the ACTUAL output name + fk_result: pytorch_kinematics/fk_request + jacobian_result: pytorch_kinematics/jacobian_request outputs: - joint_commands - fk_request - - ee_position + - jacobian_request - # - id: pytorch_kinematics - # build: pip install -e ../../../node-hub/dora-pytorch-kinematics - # path: dora-pytorch-kinematics - # inputs: - # fk_request: franka_controller_pytorch/fk_request - # outputs: - # - fk_result # Check this output name matches what this node produces - # env: - # URDF_PATH: "/home/shashwatgpatil/Downloads/panda.urdf" - # END_EFFECTOR_LINK: "panda_hand" + - id: pytorch_kinematics + build: pip install -e ../../../node-hub/dora-pytorch-kinematics + path: dora-pytorch-kinematics + inputs: + fk_request: kuka_controller_pytorch/fk_request + jacobian_request: kuka_controller_pytorch/jacobian_request + outputs: + - fk_request + - jacobian_request + env: + URDF_PATH: "/home/shashwatgpatil/Dora-rs/dora/examples/franka-mujoco-control/franka_panda/panda.urdf" + END_EFFECTOR_LINK: "panda_hand" + TRANSFORM: "0. 0. 0. 1. 0. 0. 0." # Pytorch kinematics uses wxyz format for quaternion - id: pose_publisher path: nodes/pose_publisher.py inputs: - tick: dora/timer/millis/10000 + tick: dora/timer/millis/5000 outputs: - target_pose \ No newline at end of file diff --git a/examples/franka-mujoco-control/03_gamepad_control/gamepad_control.yml b/examples/franka-mujoco-control/03_gamepad_control/gamepad_control.yml index 5fa4a493..e62cac4b 100644 --- a/examples/franka-mujoco-control/03_gamepad_control/gamepad_control.yml +++ b/examples/franka-mujoco-control/03_gamepad_control/gamepad_control.yml @@ -20,22 +20,32 @@ nodes: - actuator_controls - sensor_data env: - MODEL_NAME: "panda" + MODEL_NAME: "kuka_iiwa14" - id: franka_gamepad_controller path: nodes/franka_gamepad_controller.py inputs: joint_positions: mujoco_sim/joint_positions + joint_velocities: mujoco_sim/joint_velocities raw_control: gamepad/raw_control - # target_pose: pose_publisher/target_pose # Optional + cmd_vel: gamepad/cmd_vel + fk_result: pytorch_kinematics/fk_request + jacobian_result: pytorch_kinematics/jacobian_request outputs: - joint_commands - - ee_position + - fk_request + - jacobian_request - # Optional: uncomment to also have programmatic control - # - id: pose_publisher - # path: ../02_target_pose_control/nodes/pose_publisher.py - # inputs: - # tick: dora/timer/millis/20000 # New pose every 20 seconds - # outputs: - # - target_pose \ No newline at end of file + - id: pytorch_kinematics + build: pip install -e ../../../node-hub/dora-pytorch-kinematics + path: dora-pytorch-kinematics + inputs: + fk_request: franka_gamepad_controller/fk_request + jacobian_request: franka_gamepad_controller/jacobian_request + outputs: + - fk_request + - jacobian_request + env: + URDF_PATH: "/home/shashwatgpatil/Dora-rs/dora/examples/franka-mujoco-control/kuka_iiwa/model.urdf" + END_EFFECTOR_LINK: "lbr_iiwa_link_7" + TRANSFORM: "0. 0. 0. 1. 0. 0. 0." diff --git a/examples/franka-mujoco-control/03_gamepad_control/nodes/franka_gamepad_controller.py b/examples/franka-mujoco-control/03_gamepad_control/nodes/franka_gamepad_controller.py index 7443136e..ae8ba8e2 100644 --- a/examples/franka-mujoco-control/03_gamepad_control/nodes/franka_gamepad_controller.py +++ b/examples/franka-mujoco-control/03_gamepad_control/nodes/franka_gamepad_controller.py @@ -1,104 +1,60 @@ -"""Enhanced Franka controller with gamepad support using PyTorch Kinematics. -""" - import json import time import numpy as np import pyarrow as pa -import torch from dora import Node -import pytorch_kinematics as pk from scipy.spatial.transform import Rotation -class EnhancedFrankaController: - """Franka controller with gamepad and target pose support using PyTorch Kinematics.""" - +class GamepadFrankaController: def __init__(self): - """Initialize controller with PyTorch Kinematics.""" - # Load the robot model for IK and FK - urdf_path = "../franka_panda/panda.urdf" - with open(urdf_path, 'rb') as f: - urdf_content = f.read() - - # Build serial chain for kinematics - self.chain = pk.build_serial_chain_from_urdf(urdf_content, "panda_hand") - - # Move to GPU if available - self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - self.chain = self.chain.to(device=self.device) - # Control parameters self.integration_dt = 0.1 self.damping = 1e-4 self.Kpos = 0.95 # Position gain self.Kori = 0.95 # Orientation gain - self.Kn = np.array([10.0, 10.0, 10.0, 10.0, 5.0, 5.0, 5.0]) # Nullspace gains - self.max_angvel = 0.785 # Max joint velocity (rad/s) - + self.max_angvel = 1.57 # Max joint velocity (rad/s) + # Gamepad control parameters self.speed_scale = 0.5 - self.deadzone = 0.05 - self.last_input_source = None - # Gripper control parameters - self.gripper_range = [0, 255] - self.gripper_state = 0.0 # (0=closed, 1=open) + # Robot state variables + self.dof = None + self.current_joint_pos = None # Full robot state + self.home_pos = None # Home position for arm joints only - # Robot state - self.current_joint_pos = None - self.target_pos = np.array([0.55, 0.0, 0.6]) # Conservative default target + # Target pose (independent of DOF) + self.target_pos = np.array([0.4, 0.0, 0.3]) # Conservative default target self.target_rpy = [180.0, 0.0, 90.0] # Downward orientation - self.home_pos = np.array([0, 0, 0, -1.67079, 0, 1.64079, -0.7853]) - print("Enhanced Franka Controller initialized with PyTorch Kinematics") - # print(f"Chain DOF: {len(self.chain.get_joint_parameter_names())}") - - def apply_deadzone(self, value, deadzone=None): - """Apply deadzone to joystick input.""" - deadzone = deadzone or self.deadzone - return 0.0 if abs(value) < deadzone else value - - def get_current_ee_pose(self, joint_angles): - """Get current end-effector pose using PyTorch Kinematics forward kinematics.""" - q = torch.tensor(joint_angles, device=self.device, dtype=torch.float32).unsqueeze(0) - # Forward kinematics - tf = self.chain.forward_kinematics(q) # Returns Transform3d - # Extract position and rotation - transform_matrix = tf.get_matrix().squeeze(0).cpu().numpy() # (4, 4) - position = transform_matrix[:3, 3] - rotation_matrix = transform_matrix[:3, :3] + # Cache for external computations + self.current_ee_pose = None # End-effector pose + self.current_jacobian = None # Jacobian matrix - return position, rotation_matrix + print("Gamepad Franka Controller initialized") - def compute_jacobian_pytorch(self, joint_angles): - """Compute Jacobian using PyTorch Kinematics.""" - q = torch.tensor(joint_angles, device=self.device, dtype=torch.float32).unsqueeze(0) - # Compute Jacobian (returns 6x7 matrix: [linear_vel; angular_vel]) - J = self.chain.jacobian(q) # Shape: (1, 6, 7) + def _initialize_robot(self, positions, jacobian_dof=None): + self.full_joint_count = len(positions) + self.dof = jacobian_dof if jacobian_dof is not None else self.full_joint_count + self.current_joint_pos = positions.copy() + self.home_pos = np.zeros(self.dof) + + def process_cmd_vel(self, cmd_vel): + print(f"Processing cmd_vel: {cmd_vel}") + dx = cmd_vel[0] * self.speed_scale * 0.1 + dy = cmd_vel[1] * self.speed_scale * 0.1 + dz = cmd_vel[2] * self.speed_scale * 0.1 - return J.squeeze(0).cpu().numpy() # Convert back to numpy (6, 7) + # Update target position incrementally + if abs(dx) > 0 or abs(dy) > 0 or abs(dz) > 0: + self.target_pos += np.array([dx, dy, dz]) def process_gamepad_input(self, raw_control): - """Process gamepad input exactly like the original.""" - axes = raw_control["axes"] buttons = raw_control["buttons"] # Reset position with START button if len(buttons) > 9 and buttons[9]: - # Reset target to a position based on home joint angles - if self.current_joint_pos is not None: - # Use current joint positions to get home EE position - home_ee_pos, _ = self.get_current_ee_pose(self.home_pos) - self.target_pos = home_ee_pos.copy() - print("Reset target to home position") - - # Gripper control with X and Y buttons (exactly like original) - if len(buttons) > 0 and buttons[0]: # X button - Close gripper - self.gripper_state = 0.0 - print("Gripper: CLOSED") - elif len(buttons) > 3 and buttons[3]: # Y button - Open gripper - self.gripper_state = 1.0 - print("Gripper: OPEN") + self.target_pos = np.array([0.4, 0.0, 0.3]) + print("Reset target to home position") # Speed scaling with bumpers (LB/RB) if len(buttons) > 4 and buttons[4]: # LB @@ -107,149 +63,130 @@ class EnhancedFrankaController: if len(buttons) > 5 and buttons[5]: # RB self.speed_scale = min(1.0, self.speed_scale + 0.1) print(f"Speed: {self.speed_scale:.1f}") - - # Get cartesian control inputs with deadzone - dx = self.apply_deadzone(axes[0], self.deadzone) * self.speed_scale * 0.1 - dy = -self.apply_deadzone(axes[1], self.deadzone) * self.speed_scale * 0.1 - dz = -self.apply_deadzone(axes[3], self.deadzone) * self.speed_scale * 0.1 - - # Update target position incrementally - if abs(dx) > 0 or abs(dy) > 0 or abs(dz) > 0: - self.target_pos += np.array([dx, dy, dz]) - self.last_input_source = "gamepad" - - # print(f"Gamepad control: dx={dx:.3f}, dy={dy:.3f}, dz={dz:.3f}") - # print(f"New target: {self.target_pos}") - - def process_target_pose(self, pose_array): - """Process target pose command.""" - if len(pose_array) >= 3: - self.target_pos = np.array(pose_array[:3]) - print(f"Updated target position: {self.target_pos}") - - if len(pose_array) >= 6: - self.target_rpy = list(pose_array[3:6]) - print(f"Updated target orientation (RPY): {self.target_rpy}") - - self.last_input_source = "target_pose" def get_target_rotation_matrix(self): - """Convert RPY to rotation matrix.""" roll_rad, pitch_rad, yaw_rad = np.radians(self.target_rpy) desired_rot = Rotation.from_euler('ZYX', [yaw_rad, pitch_rad, roll_rad]) return desired_rot.as_matrix() - def apply_cartesian_control(self, current_joints): - """Apply Cartesian control using PyTorch Kinematics differential IK.""" - self.current_joint_pos = current_joints[:7] - # Get current end-effector pose using PyTorch Kinematics FK - current_ee_pos, current_ee_rot = self.get_current_ee_pose(self.current_joint_pos) - - # Calculate position error - pos_error = self.target_pos - current_ee_pos + def update_jacobian(self, jacobian_flat, shape): + jacobian_dof = shape[1] + self.current_jacobian = jacobian_flat.reshape(shape) + + if self.dof is None: + if self.current_joint_pos is not None: + self._initialize_robot(self.current_joint_pos, jacobian_dof) + else: + self.dof = jacobian_dof + elif self.dof != jacobian_dof: + self.dof = jacobian_dof + self.home_pos = np.zeros(self.dof) + + def apply_differential_ik_control(self): + if self.current_ee_pose is None or self.current_jacobian is None: + return self.current_joint_pos + + current_ee_pos = self.current_ee_pose['position'] + current_ee_rpy = self.current_ee_pose['rpy'] - # Construct 6D twist (3 position + 3 orientation) + pos_error = self.target_pos - current_ee_pos twist = np.zeros(6) twist[:3] = self.Kpos * pos_error / self.integration_dt - # Orientation control - current_rot = Rotation.from_matrix(current_ee_rot) + # Convert current RPY to rotation matrix + current_rot = Rotation.from_euler('XYZ', current_ee_rpy) desired_rot = Rotation.from_matrix(self.get_target_rotation_matrix()) rot_error = (desired_rot * current_rot.inv()).as_rotvec() twist[3:] = self.Kori * rot_error / self.integration_dt - jac = self.compute_jacobian_pytorch(self.current_joint_pos) # (6, 7) + jac = self.current_jacobian + # Damped least squares inverse kinematics diag = self.damping * np.eye(6) dq = jac.T @ np.linalg.solve(jac @ jac.T + diag, twist) - # Nullspace control - drive towards home position in nullspace - jac_pinv = np.linalg.pinv(jac) - N = np.eye(7) - jac_pinv @ jac # Nullspace projection matrix - dq_null = self.Kn * (self.home_pos - self.current_joint_pos) # Nullspace velocity - dq += N @ dq_null # Add nullspace movement + # Apply nullspace control if we have redundant DOF + if self.dof > 6: + current_arm = self.current_joint_pos[:self.dof] + jac_pinv = np.linalg.pinv(jac) + N = np.eye(self.dof) - jac_pinv @ jac + k_null = np.ones(self.dof) * 5.0 + dq_null = k_null * (self.home_pos - current_arm) + dq += N @ dq_null # Limit joint velocities dq_abs_max = np.abs(dq).max() if dq_abs_max > self.max_angvel: dq *= self.max_angvel / dq_abs_max - # Integrate to get new joint positions - new_joints = self.current_joint_pos + dq * self.integration_dt - - # Apply joint limits (Franka Panda limits) - joint_limits = np.array([ - [-2.8973, 2.8973], - [-1.7628, 1.7628], - [-2.8973, 2.8973], - [-3.0718, -0.0698], - [-2.8973, 2.8973], - [-0.0175, 3.7525], - [-2.8973, 2.8973] - ]) - - for i in range(7): - new_joints[i] = np.clip(new_joints[i], joint_limits[i][0], joint_limits[i][1]) - - # Return 8-dimensional control: 7 arm joints + gripper - full_commands = np.zeros(8) - full_commands[:7] = new_joints - # Map gripper state to control range - full_commands[7] = self.gripper_range[0] if self.gripper_state < 0.5 else self.gripper_range[1] + # Create full joint command + new_joints = self.current_joint_pos.copy() + new_joints[:self.dof] += dq * self.integration_dt - # Debug output - # pos_error_mag = np.linalg.norm(pos_error) - # if pos_error_mag > 0.01: # Only print if there's significant error - # print(f"Position error: {pos_error_mag:.4f}") - - return full_commands + return new_joints def main(): - node = Node("franka_controller") - controller = EnhancedFrankaController() + node = Node() + controller = GamepadFrankaController() - print("Enhanced Franka Controller Node Started") - print("\nGamepad Controls:") - print(" Left Stick X/Y: Move in X/Y plane") - print(" Right Stick Y: Move up/down (Z)") - print(" LB/RB: Decrease/Increase speed") - print(" START: Reset to home position") - print(" X: Close gripper") - print(" Y: Open gripper") - print("\nAlso accepts target_pose commands") - print("Ready to receive inputs...") + print("Gamepad Franka Controller Started") for event in node: if event["type"] == "INPUT": if event["id"] == "joint_positions": - # Update current state and compute commands - joint_positions = event["value"].to_numpy() - full_commands = controller.apply_cartesian_control(joint_positions) + joint_pos = event["value"].to_numpy() + + if controller.current_joint_pos is None: + controller._initialize_robot(joint_pos) + else: + controller.current_joint_pos = joint_pos + # Request FK computation node.send_output( - "joint_commands", - pa.array(full_commands, type=pa.float64()), - metadata={"timestamp": time.time()} + "fk_request", + pa.array(controller.current_joint_pos, type=pa.float32()), + metadata={"encoding": "jointstate", "timestamp": time.time()} + ) + + # Request Jacobian computation + node.send_output( + "jacobian_request", + pa.array(controller.current_joint_pos, type=pa.float32()), + metadata={"encoding": "jacobian", "timestamp": time.time()} ) - # Send current end-effector position - if controller.current_joint_pos is not None: - current_ee_pos, _ = controller.get_current_ee_pose(controller.current_joint_pos) - node.send_output( - "ee_position", - pa.array(current_ee_pos, type=pa.float64()), - metadata={"timestamp": time.time()} - ) + # Apply differential IK control + joint_commands = controller.apply_differential_ik_control() + + # Send control commands to the robot + node.send_output( + "joint_commands", + pa.array(joint_commands, type=pa.float32()), + metadata={"timestamp": time.time()} + ) elif event["id"] == "raw_control": raw_control = json.loads(event["value"].to_pylist()[0]) controller.process_gamepad_input(raw_control) - elif event["id"] == "target_pose": - pose_array = event["value"].to_numpy() - controller.process_target_pose(pose_array) + elif event["id"] == "cmd_vel": + cmd_vel = event["value"].to_numpy() + controller.process_cmd_vel(cmd_vel) + + # Handle FK results + if event["id"] == "fk_result": + if event["metadata"].get("encoding") == "xyzrpy": + ee_pose = event["value"].to_numpy() + controller.current_ee_pose = {'position': ee_pose[:3], 'rpy': ee_pose[3:6]} + + # Handle Jacobian results + if event["id"] == "jacobian_result": + if event["metadata"].get("encoding") == "jacobian_result": + jacobian_flat = event["value"].to_numpy() + jacobian_shape = event["metadata"]["jacobian_shape"] + controller.update_jacobian(jacobian_flat, jacobian_shape) if __name__ == "__main__": main() \ No newline at end of file diff --git a/examples/franka-mujoco-control/kuka_iiwa/model.urdf b/examples/franka-mujoco-control/kuka_iiwa/model.urdf new file mode 100644 index 00000000..98e54cb3 --- /dev/null +++ b/examples/franka-mujoco-control/kuka_iiwa/model.urdf @@ -0,0 +1,289 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/node-hub/dora-pytorch-kinematics/dora_pytorch_kinematics/main.py b/node-hub/dora-pytorch-kinematics/dora_pytorch_kinematics/main.py index 244787be..f51babb0 100644 --- a/node-hub/dora-pytorch-kinematics/dora_pytorch_kinematics/main.py +++ b/node-hub/dora-pytorch-kinematics/dora_pytorch_kinematics/main.py @@ -10,9 +10,9 @@ import torch from dora import Node from pytorch_kinematics.transforms.rotation_conversions import matrix_to_euler_angles -TRANSFORM = np.array(os.getenv("TRANSFORM", "0. 0. 0. 0. 0. 0. 1.").split(" ")).astype( +TRANSFORM = np.array(os.getenv("TRANSFORM", "0. 0. 0. 1. 0. 0. 0.").split(" ")).astype( np.float32, -) +) # wxyz format pos = torch.tensor([TRANSFORM[0], TRANSFORM[1], TRANSFORM[2]]) rot = torch.tensor( [ @@ -117,15 +117,21 @@ class RobotKinematics: ) if j.ndim == 1: - if j.shape[0] != self.num_joints: - raise ValueError(f"Expected {self.num_joints} joints, got {j.shape[0]}") + # Handle case with extra joints (e.g., gripper joints) + if j.shape[0] > self.num_joints: + j = j[:self.num_joints] # Truncate griper or extra joints + elif j.shape[0] < self.num_joints: + raise ValueError(f"Expected at least {self.num_joints} joints, got {j.shape[0]}") + if batch_dim_required: - j = j.unsqueeze(0) # Add batch dimension if needed (e.g., shape (1, N)) + j = j.unsqueeze(0) # Add batch dimension if needed elif j.ndim == 2: - if j.shape[1] != self.num_joints: - raise ValueError( - f"Expected {self.num_joints} joints (dim 1), got {j.shape[1]}", - ) + # Handle batched input with extra joints + if j.shape[1] > self.num_joints: + j = j[:, :self.num_joints] # Truncate griper or extra joints + elif j.shape[1] < self.num_joints: + raise ValueError(f"Expected at least {self.num_joints} joints (dim 1), got {j.shape[1]}") + if batch_dim_required and j.shape[0] != 1: # Most common IK solvers expect batch=1 for initial guess, FK can handle batches # Relaxing this check slightly, but user should be aware for IK @@ -222,6 +228,55 @@ class RobotKinematics: return None return solution_angles.solutions.detach() + def compute_jacobian( + self, joint_angles: Union[List[float], torch.Tensor] + ) -> torch.Tensor: + """Compute Jacobian matrix using PyTorch Kinematics. + + Args: + joint_angles (Union[List[float], torch.Tensor]): Joint angles (radians). + Shape (num_joints,) or (B, num_joints). + + Returns: + torch.Tensor: Jacobian matrix of shape (B, 6, num_joints) or (6, num_joints) + where rows are [linear_vel_x, linear_vel_y, linear_vel_z, + angular_vel_x, angular_vel_y, angular_vel_z] + """ + angles_tensor = self._prepare_joint_tensor( + joint_angles, batch_dim_required=False + ) + + # Ensure we have batch dimension for jacobian computation + if angles_tensor.ndim == 1: + angles_tensor = angles_tensor.unsqueeze(0) + squeeze_output = True + else: + squeeze_output = False + + # Compute Jacobian (returns shape: (B, 6, num_joints)) + J = self.chain.jacobian(angles_tensor) + + # Remove batch dimension if input was 1D + if squeeze_output: + J = J.squeeze(0) + + return J + + def compute_jacobian_numpy( + self, joint_angles: Union[List[float], torch.Tensor, np.ndarray] + ) -> np.ndarray: + """Compute Jacobian matrix and return as numpy array. + + Args: + joint_angles: Joint angles (radians). Can be list, torch.Tensor, or np.ndarray. + Shape (num_joints,) or (B, num_joints). + + Returns: + np.ndarray: Jacobian matrix as numpy array of shape (6, num_joints) or (B, 6, num_joints) + """ + J = self.compute_jacobian(joint_angles) + return J.cpu().numpy() + def main(): """TODO: Add docstring.""" @@ -293,6 +348,18 @@ def main(): target = pa.array(target.ravel(), type=pa.float32()) metadata["encoding"] = "xyzrpy" node.send_output(event["id"], target, metadata=metadata) + case "jacobian": + # Compute Jacobian matrix + joint_angles = event["value"].to_numpy() + jacobian = robot.compute_jacobian_numpy(joint_angles) + + jacobian_flat = jacobian.flatten() + jacobian_array = pa.array(jacobian_flat, type=pa.float32()) + + metadata["encoding"] = "jacobian_result" + metadata["jacobian_shape"] = list(jacobian.shape) + + node.send_output(event["id"], jacobian_array, metadata=metadata) if __name__ == "__main__": diff --git a/node-hub/gamepad/gamepad/main.py b/node-hub/gamepad/gamepad/main.py index 56a55578..47acf19b 100644 --- a/node-hub/gamepad/gamepad/main.py +++ b/node-hub/gamepad/gamepad/main.py @@ -1,9 +1,4 @@ -"""Gamepad controller node for Dora. - -This module provides a Dora node that reads input from a controller and publishes: -1. velocity commands for robot control -2. raw controller state for debugging and custom mappings -""" +"""Gamepad controller node for Dora.""" from dora import Node import pygame @@ -11,7 +6,7 @@ import pyarrow as pa import json class Controller: - """controller mapping.""" + """Controller mapping.""" def __init__(self): """Change this according to your controller mapping. Currently Logitech F710.""" @@ -33,8 +28,9 @@ class Controller: 'BACK': 8, 'START': 9, 'LEFT-STICK': 10, - 'RIGHT-STICK': 11 + 'RIGHT-STICK': 11, } + self.hatIndex = 0 # Index of the D-pad hat def main(): node = Node("gamepad") @@ -51,8 +47,9 @@ def main(): controller = Controller() - max_linear_speed = 1.0 # Maximum speed in m/s - max_angular_speed = 1.5 # Maximum angular speed in rad/s + move_speed = 0.05 # Fixed increment for D-pad + max_linear_z_speed = 0.1 # Maximum linear Z speed + max_angular_speed = 0.8 # Maximum angular speed print(f"Detected controller: {joystick.get_name()}") print(f"Number of axes: {joystick.get_numaxes()}") @@ -66,30 +63,64 @@ def main(): # Get all controller states axes = [joystick.get_axis(i) for i in range(joystick.get_numaxes())] buttons = [joystick.get_button(i) for i in range(joystick.get_numbuttons())] + + # Get hat state (D-pad) + dpad_x, dpad_y = 0, 0 + if joystick.get_numhats() > 0: + dpad_x, dpad_y = joystick.get_hat(controller.hatIndex) # Create raw control state raw_control = { "axes": axes, "buttons": buttons, + "hats": [[dpad_x, dpad_y]], "mapping": { "axes": controller.axisNames, "buttons": controller.buttonNames } } - # print("raw_control:", raw_control) # uncomment for debugging and re-map - # Regular cmd_vel processing - forward = -joystick.get_axis(controller.axisNames['LEFT-Y']) - turn = -joystick.get_axis(controller.axisNames['LEFT-X']) - + # cmd_vel processing: + # 1. D-pad vertical for X axis + # 2. D-pad horizontal for Y axis + # 3. Right stick vertical for Z axis + # 4. Right stick horizontal for rotation around Z + # 5. Left stick vertical for rotation around X + # 6. Left stick horizontal for rotation around Y + deadzone = 0.05 - forward = 0.0 if abs(forward) < deadzone else forward - turn = 0.0 if abs(turn) < deadzone else turn + + # Linear X velocity from D-pad vertical + linear_x = 0.0 + if dpad_y != 0: + linear_x = dpad_y * move_speed + + # Linear Y velocity from D-pad horizontal + linear_y = 0.0 + if dpad_x != 0: + linear_y = dpad_x * move_speed - forward_speed = forward * max_linear_speed - turn_speed = turn * max_angular_speed - - cmd_vel = [forward_speed, 0.0, 0.0, 0.0, 0.0, turn_speed] + # Linear Z velocity from right stick vertical + right_y = -joystick.get_axis(controller.axisNames['RIGHT-Y']) + right_y = 0.0 if abs(right_y) < deadzone else right_y + linear_z = right_y * max_linear_z_speed + + # Angular Z velocity (rotation) from right stick horizontal + right_x = -joystick.get_axis(controller.axisNames['RIGHT-X']) + right_x = 0.0 if abs(right_x) < deadzone else right_x + angular_z = right_x * max_angular_speed + + # Angular X velocity from left stick vertical + left_y = -joystick.get_axis(controller.axisNames['LEFT-Y']) + left_y = 0.0 if abs(left_y) < deadzone else left_y + angular_x = left_y * max_angular_speed + + # Angular Y velocity from left stick horizontal + left_x = -joystick.get_axis(controller.axisNames['LEFT-X']) + left_x = 0.0 if abs(left_x) < deadzone else left_x + angular_y = left_x * max_angular_speed + + cmd_vel = [linear_x, linear_y, linear_z, angular_x, angular_y, angular_z] node.send_output( output_id="cmd_vel", @@ -107,7 +138,6 @@ def main(): print("\nExiting...") finally: pygame.quit() - # Send zero velocity at cleanup zero_cmd = [0.0] * 6 node.send_output( output_id="cmd_vel", @@ -116,4 +146,4 @@ def main(): ) if __name__ == "__main__": - main() + main() \ No newline at end of file