| @@ -1,7 +1,8 @@ | |||
| """TODO: Add docstring.""" | |||
| import importlib | |||
| import os | |||
| from typing import List, Optional, Tuple, Union | |||
| from pathlib import Path | |||
| import numpy as np | |||
| import pyarrow as pa | |||
| @@ -9,12 +10,10 @@ import pytorch_kinematics as pk | |||
| import torch | |||
| from dora import Node | |||
| from pytorch_kinematics.transforms.rotation_conversions import matrix_to_euler_angles | |||
| from pathlib import Path | |||
| import importlib | |||
| TRANSFORM = np.array(os.getenv("TRANSFORM", "0. 0. 0. 1. 0. 0. 0.").split(" ")).astype( | |||
| np.float32, | |||
| ) # wxyz format | |||
| ) # wxyz format | |||
| pos = torch.tensor([TRANSFORM[0], TRANSFORM[1], TRANSFORM[2]]) | |||
| rot = torch.tensor( | |||
| [ | |||
| @@ -28,10 +27,11 @@ ROB_TF = pk.Transform3d(pos=pos, rot=rot) | |||
| def get_xyz_rpy_array_from_transform3d( | |||
| transform: "pk.transforms.Transform3d", convention: str = "XYZ", | |||
| transform: "pk.transforms.Transform3d", | |||
| convention: str = "XYZ", | |||
| ) -> torch.Tensor: | |||
| """XYZ-RPY conversion. | |||
| Extract translation (xyz) and rotation (RPY Euler angles in radians) | |||
| from a pytorch_kinematics Transform3d object and returns them concatenated | |||
| into a single tensor. | |||
| @@ -63,7 +63,8 @@ def get_xyz_rpy_array_from_transform3d( | |||
| # Convert the rotation matrix to Euler angles in radians | |||
| # The matrix_to_euler_angles function likely exists based on pytorch3d's structure | |||
| rpy = matrix_to_euler_angles( | |||
| rotation_matrix, convention=convention, | |||
| rotation_matrix, | |||
| convention=convention, | |||
| ) # Shape (..., 3) | |||
| # Concatenate xyz and rpy along the last dimension | |||
| @@ -78,7 +79,7 @@ class RobotKinematics: | |||
| urdf_path: str, | |||
| urdf: str, | |||
| end_effector_link: str, | |||
| device: Union[str, torch.device] = "cpu", | |||
| device: str | torch.device = "cpu", | |||
| ): | |||
| """Initialize the kinematic chain from a URDF. | |||
| @@ -94,18 +95,23 @@ class RobotKinematics: | |||
| urdf = open(urdf_path, mode="rb").read() | |||
| # Load kinematic chain (core pytorch_kinematics object) | |||
| self.chain = pk.build_serial_chain_from_urdf( | |||
| urdf, end_effector_link, | |||
| urdf, | |||
| end_effector_link, | |||
| ).to(device=self.device) | |||
| self.num_joints = len(self.chain.get_joint_parameter_names(exclude_fixed=True)) | |||
| # Default initial guess for IK if none provided | |||
| self._default_q = torch.zeros( | |||
| (1, self.num_joints), device=self.device, dtype=torch.float32, | |||
| (1, self.num_joints), | |||
| device=self.device, | |||
| dtype=torch.float32, | |||
| ) | |||
| # print(f"Initialized RobotKinematicsConcise: {self.num_joints} joints, EE='{end_effector_link}', device='{self.device}'") # Optional print | |||
| def _prepare_joint_tensor( | |||
| self, joints: Union[List[float], torch.Tensor], batch_dim_required: bool = True, | |||
| self, | |||
| joints: list[float] | torch.Tensor, | |||
| batch_dim_required: bool = True, | |||
| ) -> torch.Tensor: | |||
| """Validate and formats joint angles input tensor.""" | |||
| if isinstance(joints, list): | |||
| @@ -116,25 +122,30 @@ class RobotKinematics: | |||
| j = joints.to(device=self.device, dtype=torch.float32) | |||
| else: | |||
| raise TypeError( | |||
| "Joints must be a list or torch.Tensor and got: ", joints.type, | |||
| "Joints must be a list or torch.Tensor and got: ", | |||
| joints.type, | |||
| ) | |||
| if j.ndim == 1: | |||
| # Handle case with extra joints (e.g., gripper joints) | |||
| if j.shape[0] > self.num_joints: | |||
| j = j[:self.num_joints] # Truncate griper or extra joints | |||
| j = j[: self.num_joints] # Truncate griper or extra joints | |||
| elif j.shape[0] < self.num_joints: | |||
| raise ValueError(f"Expected at least {self.num_joints} joints, got {j.shape[0]}") | |||
| raise ValueError( | |||
| f"Expected at least {self.num_joints} joints, got {j.shape[0]}" | |||
| ) | |||
| if batch_dim_required: | |||
| j = j.unsqueeze(0) # Add batch dimension if needed | |||
| elif j.ndim == 2: | |||
| # Handle batched input with extra joints | |||
| if j.shape[1] > self.num_joints: | |||
| j = j[:, :self.num_joints] # Truncate griper or extra joints | |||
| j = j[:, : self.num_joints] # Truncate griper or extra joints | |||
| elif j.shape[1] < self.num_joints: | |||
| raise ValueError(f"Expected at least {self.num_joints} joints (dim 1), got {j.shape[1]}") | |||
| raise ValueError( | |||
| f"Expected at least {self.num_joints} joints (dim 1), got {j.shape[1]}" | |||
| ) | |||
| if batch_dim_required and j.shape[0] != 1: | |||
| # Most common IK solvers expect batch=1 for initial guess, FK can handle batches | |||
| # Relaxing this check slightly, but user should be aware for IK | |||
| @@ -144,7 +155,8 @@ class RobotKinematics: | |||
| return j | |||
| def compute_fk( | |||
| self, joint_angles: Union[List[float], torch.Tensor], | |||
| self, | |||
| joint_angles: list[float] | torch.Tensor, | |||
| ) -> pk.Transform3d: | |||
| """Compute Forward Kinematics (FK). | |||
| @@ -159,23 +171,26 @@ class RobotKinematics: | |||
| # robot frame | |||
| angles_tensor = self._prepare_joint_tensor( | |||
| joint_angles, batch_dim_required=False, | |||
| joint_angles, | |||
| batch_dim_required=False, | |||
| ) # FK handles batches naturally | |||
| # Direct call to pytorch_kinematics FK | |||
| goal_in_rob_frame_tf = self.chain.forward_kinematics( | |||
| angles_tensor, end_only=True, | |||
| angles_tensor, | |||
| end_only=True, | |||
| ) | |||
| goal_tf = ROB_TF.compose(goal_in_rob_frame_tf) | |||
| return goal_tf | |||
| def compute_ik( | |||
| self, | |||
| target_pose: pk.Transform3d, | |||
| initial_guess: Optional[Union[List[float], torch.Tensor]] = None, | |||
| initial_guess: list[float] | torch.Tensor | None = None, | |||
| iterations: int = 100, | |||
| lr: float = 0.1, | |||
| error_tolerance: float = 1e-4, | |||
| ) -> Tuple[torch.Tensor, bool]: | |||
| ) -> tuple[torch.Tensor, bool]: | |||
| """Compute Inverse Kinematics (IK) using PseudoInverseIK. | |||
| Args: | |||
| @@ -203,6 +218,7 @@ class RobotKinematics: | |||
| initial_guess if initial_guess is not None else self._default_q, | |||
| batch_dim_required=True, | |||
| ) | |||
| if q_init.shape[0] != 1: | |||
| raise ValueError( | |||
| "initial_guess must result in batch size 1 for this IK setup.", | |||
| @@ -232,49 +248,52 @@ class RobotKinematics: | |||
| return solution_angles.solutions.detach() | |||
| def compute_jacobian( | |||
| self, joint_angles: Union[List[float], torch.Tensor] | |||
| self, | |||
| joint_angles: list[float] | torch.Tensor, | |||
| ) -> torch.Tensor: | |||
| """Compute Jacobian matrix using PyTorch Kinematics. | |||
| Args: | |||
| joint_angles (Union[List[float], torch.Tensor]): Joint angles (radians). | |||
| Shape (num_joints,) or (B, num_joints). | |||
| Returns: | |||
| torch.Tensor: Jacobian matrix of shape (B, 6, num_joints) or (6, num_joints) | |||
| where rows are [linear_vel_x, linear_vel_y, linear_vel_z, | |||
| where rows are [linear_vel_x, linear_vel_y, linear_vel_z, | |||
| angular_vel_x, angular_vel_y, angular_vel_z] | |||
| """ | |||
| angles_tensor = self._prepare_joint_tensor( | |||
| joint_angles, batch_dim_required=False | |||
| joint_angles, | |||
| batch_dim_required=False, | |||
| ) | |||
| # Ensure we have batch dimension for jacobian computation | |||
| if angles_tensor.ndim == 1: | |||
| angles_tensor = angles_tensor.unsqueeze(0) | |||
| squeeze_output = True | |||
| else: | |||
| squeeze_output = False | |||
| # Compute Jacobian (returns shape: (B, 6, num_joints)) | |||
| J = self.chain.jacobian(angles_tensor) | |||
| # Remove batch dimension if input was 1D | |||
| if squeeze_output: | |||
| J = J.squeeze(0) | |||
| return J | |||
| def compute_jacobian_numpy( | |||
| self, joint_angles: Union[List[float], torch.Tensor, np.ndarray] | |||
| self, | |||
| joint_angles: list[float] | torch.Tensor | np.ndarray, | |||
| ) -> np.ndarray: | |||
| """Compute Jacobian matrix and return as numpy array. | |||
| Args: | |||
| joint_angles: Joint angles (radians). Can be list, torch.Tensor, or np.ndarray. | |||
| Shape (num_joints,) or (B, num_joints). | |||
| Returns: | |||
| np.ndarray: Jacobian matrix as numpy array of shape (6, num_joints) or (B, 6, num_joints) | |||
| @@ -331,9 +350,13 @@ def main(): | |||
| if not model or not Path(model).exists(): | |||
| model_name = os.getenv("MODEL_NAME") | |||
| model = load_robot_description_with_cache_substitution(model_name) | |||
| robot = RobotKinematics(urdf_path="", urdf=model, end_effector_link=end_effector_link) | |||
| robot = RobotKinematics( | |||
| urdf_path="", urdf=model, end_effector_link=end_effector_link | |||
| ) | |||
| else: | |||
| robot = RobotKinematics(urdf_path=model, urdf="", end_effector_link=end_effector_link) | |||
| robot = RobotKinematics( | |||
| urdf_path=model, urdf="", end_effector_link=end_effector_link | |||
| ) | |||
| last_known_state = None | |||
| for event in node: | |||
| @@ -342,22 +365,28 @@ def main(): | |||
| if event["id"] == "cmd_vel": | |||
| if last_known_state is not None: | |||
| target_vel = event["value"].to_numpy() # expect 100ms | |||
| target_vel = event["value"].to_numpy() # expect 100ms | |||
| # Apply Forward Kinematics | |||
| target = robot.compute_fk(last_known_state) | |||
| target = np.array(get_xyz_rpy_array_from_transform3d(target)) + target_vel | |||
| target = robot.compute_fk(last_known_state) | |||
| target = ( | |||
| np.array(get_xyz_rpy_array_from_transform3d(target)) | |||
| + target_vel | |||
| ) | |||
| target = pa.array(target.ravel(), type=pa.float32()) | |||
| target = pk.Transform3d( | |||
| pos=target[:3], | |||
| rot=pk.transforms.euler_angles_to_matrix( | |||
| torch.tensor(target[3:6]), convention="XYZ", | |||
| torch.tensor(target[3:6]), | |||
| convention="XYZ", | |||
| ), | |||
| ) | |||
| rob_target = ROB_TF.inverse().compose(target) | |||
| solution = robot.compute_ik(rob_target, last_known_state) | |||
| if solution is None: | |||
| print( | |||
| "No IK Solution for :", target, "skipping this frame.", | |||
| "No IK Solution for :", | |||
| target, | |||
| "skipping this frame.", | |||
| ) | |||
| continue | |||
| solution = solution.numpy().ravel() | |||
| @@ -373,7 +402,8 @@ def main(): | |||
| target = event["value"].to_numpy() | |||
| target = target.astype(np.float32) | |||
| target = pk.Transform3d( | |||
| pos=target[:3], rot=torch.tensor(target[3:7]), | |||
| pos=target[:3], | |||
| rot=torch.tensor(target[3:7]), | |||
| ) | |||
| rob_target = ROB_TF.inverse().compose(target) | |||
| solution = robot.compute_ik(rob_target, last_known_state) | |||
| @@ -389,19 +419,24 @@ def main(): | |||
| target = pk.Transform3d( | |||
| pos=target[:3], | |||
| rot=pk.transforms.euler_angles_to_matrix( | |||
| torch.tensor(target[3:6]), convention="XYZ", | |||
| torch.tensor(target[3:6]), | |||
| convention="XYZ", | |||
| ), | |||
| ) | |||
| rob_target = ROB_TF.inverse().compose(target) | |||
| solution = robot.compute_ik(rob_target, last_known_state) | |||
| if solution is None: | |||
| print( | |||
| "No IK Solution for :", target, "skipping this frame.", | |||
| "No IK Solution for :", | |||
| target, | |||
| "skipping this frame.", | |||
| ) | |||
| continue | |||
| solution = solution.numpy().ravel() | |||
| delta_angles = solution - last_known_state[:len(solution)] # match with dof | |||
| delta_angles = ( | |||
| solution - last_known_state[: len(solution)] | |||
| ) # match with dof | |||
| valid = np.all( | |||
| (delta_angles >= -np.pi) & (delta_angles <= np.pi), | |||
| @@ -428,15 +463,15 @@ def main(): | |||
| # Compute Jacobian matrix | |||
| joint_angles = event["value"].to_numpy() | |||
| jacobian = robot.compute_jacobian_numpy(joint_angles) | |||
| jacobian_flat = jacobian.flatten() | |||
| jacobian_array = pa.array(jacobian_flat, type=pa.float32()) | |||
| metadata["encoding"] = "jacobian_result" | |||
| metadata["jacobian_shape"] = list(jacobian.shape) | |||
| node.send_output(event["id"], jacobian_array, metadata=metadata) | |||
| if __name__ == "__main__": | |||
| main() | |||