|
|
|
@@ -1,7 +1,8 @@ |
|
|
|
"""TODO: Add docstring.""" |
|
|
|
|
|
|
|
import importlib |
|
|
|
import os |
|
|
|
from typing import List, Optional, Tuple, Union |
|
|
|
from pathlib import Path |
|
|
|
|
|
|
|
import numpy as np |
|
|
|
import pyarrow as pa |
|
|
|
@@ -9,12 +10,10 @@ import pytorch_kinematics as pk |
|
|
|
import torch |
|
|
|
from dora import Node |
|
|
|
from pytorch_kinematics.transforms.rotation_conversions import matrix_to_euler_angles |
|
|
|
from pathlib import Path |
|
|
|
import importlib |
|
|
|
|
|
|
|
TRANSFORM = np.array(os.getenv("TRANSFORM", "0. 0. 0. 1. 0. 0. 0.").split(" ")).astype( |
|
|
|
np.float32, |
|
|
|
) # wxyz format |
|
|
|
) # wxyz format |
|
|
|
pos = torch.tensor([TRANSFORM[0], TRANSFORM[1], TRANSFORM[2]]) |
|
|
|
rot = torch.tensor( |
|
|
|
[ |
|
|
|
@@ -28,10 +27,11 @@ ROB_TF = pk.Transform3d(pos=pos, rot=rot) |
|
|
|
|
|
|
|
|
|
|
|
def get_xyz_rpy_array_from_transform3d( |
|
|
|
transform: "pk.transforms.Transform3d", convention: str = "XYZ", |
|
|
|
transform: "pk.transforms.Transform3d", |
|
|
|
convention: str = "XYZ", |
|
|
|
) -> torch.Tensor: |
|
|
|
"""XYZ-RPY conversion. |
|
|
|
|
|
|
|
|
|
|
|
Extract translation (xyz) and rotation (RPY Euler angles in radians) |
|
|
|
from a pytorch_kinematics Transform3d object and returns them concatenated |
|
|
|
into a single tensor. |
|
|
|
@@ -63,7 +63,8 @@ def get_xyz_rpy_array_from_transform3d( |
|
|
|
# Convert the rotation matrix to Euler angles in radians |
|
|
|
# The matrix_to_euler_angles function likely exists based on pytorch3d's structure |
|
|
|
rpy = matrix_to_euler_angles( |
|
|
|
rotation_matrix, convention=convention, |
|
|
|
rotation_matrix, |
|
|
|
convention=convention, |
|
|
|
) # Shape (..., 3) |
|
|
|
|
|
|
|
# Concatenate xyz and rpy along the last dimension |
|
|
|
@@ -78,7 +79,7 @@ class RobotKinematics: |
|
|
|
urdf_path: str, |
|
|
|
urdf: str, |
|
|
|
end_effector_link: str, |
|
|
|
device: Union[str, torch.device] = "cpu", |
|
|
|
device: str | torch.device = "cpu", |
|
|
|
): |
|
|
|
"""Initialize the kinematic chain from a URDF. |
|
|
|
|
|
|
|
@@ -94,18 +95,23 @@ class RobotKinematics: |
|
|
|
urdf = open(urdf_path, mode="rb").read() |
|
|
|
# Load kinematic chain (core pytorch_kinematics object) |
|
|
|
self.chain = pk.build_serial_chain_from_urdf( |
|
|
|
urdf, end_effector_link, |
|
|
|
urdf, |
|
|
|
end_effector_link, |
|
|
|
).to(device=self.device) |
|
|
|
|
|
|
|
self.num_joints = len(self.chain.get_joint_parameter_names(exclude_fixed=True)) |
|
|
|
# Default initial guess for IK if none provided |
|
|
|
self._default_q = torch.zeros( |
|
|
|
(1, self.num_joints), device=self.device, dtype=torch.float32, |
|
|
|
(1, self.num_joints), |
|
|
|
device=self.device, |
|
|
|
dtype=torch.float32, |
|
|
|
) |
|
|
|
# print(f"Initialized RobotKinematicsConcise: {self.num_joints} joints, EE='{end_effector_link}', device='{self.device}'") # Optional print |
|
|
|
|
|
|
|
def _prepare_joint_tensor( |
|
|
|
self, joints: Union[List[float], torch.Tensor], batch_dim_required: bool = True, |
|
|
|
self, |
|
|
|
joints: list[float] | torch.Tensor, |
|
|
|
batch_dim_required: bool = True, |
|
|
|
) -> torch.Tensor: |
|
|
|
"""Validate and formats joint angles input tensor.""" |
|
|
|
if isinstance(joints, list): |
|
|
|
@@ -116,25 +122,30 @@ class RobotKinematics: |
|
|
|
j = joints.to(device=self.device, dtype=torch.float32) |
|
|
|
else: |
|
|
|
raise TypeError( |
|
|
|
"Joints must be a list or torch.Tensor and got: ", joints.type, |
|
|
|
"Joints must be a list or torch.Tensor and got: ", |
|
|
|
joints.type, |
|
|
|
) |
|
|
|
|
|
|
|
if j.ndim == 1: |
|
|
|
# Handle case with extra joints (e.g., gripper joints) |
|
|
|
if j.shape[0] > self.num_joints: |
|
|
|
j = j[:self.num_joints] # Truncate griper or extra joints |
|
|
|
j = j[: self.num_joints] # Truncate griper or extra joints |
|
|
|
elif j.shape[0] < self.num_joints: |
|
|
|
raise ValueError(f"Expected at least {self.num_joints} joints, got {j.shape[0]}") |
|
|
|
|
|
|
|
raise ValueError( |
|
|
|
f"Expected at least {self.num_joints} joints, got {j.shape[0]}" |
|
|
|
) |
|
|
|
|
|
|
|
if batch_dim_required: |
|
|
|
j = j.unsqueeze(0) # Add batch dimension if needed |
|
|
|
elif j.ndim == 2: |
|
|
|
# Handle batched input with extra joints |
|
|
|
if j.shape[1] > self.num_joints: |
|
|
|
j = j[:, :self.num_joints] # Truncate griper or extra joints |
|
|
|
j = j[:, : self.num_joints] # Truncate griper or extra joints |
|
|
|
elif j.shape[1] < self.num_joints: |
|
|
|
raise ValueError(f"Expected at least {self.num_joints} joints (dim 1), got {j.shape[1]}") |
|
|
|
|
|
|
|
raise ValueError( |
|
|
|
f"Expected at least {self.num_joints} joints (dim 1), got {j.shape[1]}" |
|
|
|
) |
|
|
|
|
|
|
|
if batch_dim_required and j.shape[0] != 1: |
|
|
|
# Most common IK solvers expect batch=1 for initial guess, FK can handle batches |
|
|
|
# Relaxing this check slightly, but user should be aware for IK |
|
|
|
@@ -144,7 +155,8 @@ class RobotKinematics: |
|
|
|
return j |
|
|
|
|
|
|
|
def compute_fk( |
|
|
|
self, joint_angles: Union[List[float], torch.Tensor], |
|
|
|
self, |
|
|
|
joint_angles: list[float] | torch.Tensor, |
|
|
|
) -> pk.Transform3d: |
|
|
|
"""Compute Forward Kinematics (FK). |
|
|
|
|
|
|
|
@@ -159,23 +171,26 @@ class RobotKinematics: |
|
|
|
# robot frame |
|
|
|
|
|
|
|
angles_tensor = self._prepare_joint_tensor( |
|
|
|
joint_angles, batch_dim_required=False, |
|
|
|
joint_angles, |
|
|
|
batch_dim_required=False, |
|
|
|
) # FK handles batches naturally |
|
|
|
# Direct call to pytorch_kinematics FK |
|
|
|
goal_in_rob_frame_tf = self.chain.forward_kinematics( |
|
|
|
angles_tensor, end_only=True, |
|
|
|
angles_tensor, |
|
|
|
end_only=True, |
|
|
|
) |
|
|
|
|
|
|
|
goal_tf = ROB_TF.compose(goal_in_rob_frame_tf) |
|
|
|
return goal_tf |
|
|
|
|
|
|
|
def compute_ik( |
|
|
|
self, |
|
|
|
target_pose: pk.Transform3d, |
|
|
|
initial_guess: Optional[Union[List[float], torch.Tensor]] = None, |
|
|
|
initial_guess: list[float] | torch.Tensor | None = None, |
|
|
|
iterations: int = 100, |
|
|
|
lr: float = 0.1, |
|
|
|
error_tolerance: float = 1e-4, |
|
|
|
) -> Tuple[torch.Tensor, bool]: |
|
|
|
) -> tuple[torch.Tensor, bool]: |
|
|
|
"""Compute Inverse Kinematics (IK) using PseudoInverseIK. |
|
|
|
|
|
|
|
Args: |
|
|
|
@@ -203,6 +218,7 @@ class RobotKinematics: |
|
|
|
initial_guess if initial_guess is not None else self._default_q, |
|
|
|
batch_dim_required=True, |
|
|
|
) |
|
|
|
|
|
|
|
if q_init.shape[0] != 1: |
|
|
|
raise ValueError( |
|
|
|
"initial_guess must result in batch size 1 for this IK setup.", |
|
|
|
@@ -232,49 +248,52 @@ class RobotKinematics: |
|
|
|
return solution_angles.solutions.detach() |
|
|
|
|
|
|
|
def compute_jacobian( |
|
|
|
self, joint_angles: Union[List[float], torch.Tensor] |
|
|
|
self, |
|
|
|
joint_angles: list[float] | torch.Tensor, |
|
|
|
) -> torch.Tensor: |
|
|
|
"""Compute Jacobian matrix using PyTorch Kinematics. |
|
|
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
joint_angles (Union[List[float], torch.Tensor]): Joint angles (radians). |
|
|
|
Shape (num_joints,) or (B, num_joints). |
|
|
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
torch.Tensor: Jacobian matrix of shape (B, 6, num_joints) or (6, num_joints) |
|
|
|
where rows are [linear_vel_x, linear_vel_y, linear_vel_z, |
|
|
|
where rows are [linear_vel_x, linear_vel_y, linear_vel_z, |
|
|
|
angular_vel_x, angular_vel_y, angular_vel_z] |
|
|
|
|
|
|
|
""" |
|
|
|
angles_tensor = self._prepare_joint_tensor( |
|
|
|
joint_angles, batch_dim_required=False |
|
|
|
joint_angles, |
|
|
|
batch_dim_required=False, |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
# Ensure we have batch dimension for jacobian computation |
|
|
|
if angles_tensor.ndim == 1: |
|
|
|
angles_tensor = angles_tensor.unsqueeze(0) |
|
|
|
squeeze_output = True |
|
|
|
else: |
|
|
|
squeeze_output = False |
|
|
|
|
|
|
|
|
|
|
|
# Compute Jacobian (returns shape: (B, 6, num_joints)) |
|
|
|
J = self.chain.jacobian(angles_tensor) |
|
|
|
|
|
|
|
|
|
|
|
# Remove batch dimension if input was 1D |
|
|
|
if squeeze_output: |
|
|
|
J = J.squeeze(0) |
|
|
|
|
|
|
|
|
|
|
|
return J |
|
|
|
|
|
|
|
def compute_jacobian_numpy( |
|
|
|
self, joint_angles: Union[List[float], torch.Tensor, np.ndarray] |
|
|
|
self, |
|
|
|
joint_angles: list[float] | torch.Tensor | np.ndarray, |
|
|
|
) -> np.ndarray: |
|
|
|
"""Compute Jacobian matrix and return as numpy array. |
|
|
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
joint_angles: Joint angles (radians). Can be list, torch.Tensor, or np.ndarray. |
|
|
|
Shape (num_joints,) or (B, num_joints). |
|
|
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
np.ndarray: Jacobian matrix as numpy array of shape (6, num_joints) or (B, 6, num_joints) |
|
|
|
|
|
|
|
@@ -331,9 +350,13 @@ def main(): |
|
|
|
if not model or not Path(model).exists(): |
|
|
|
model_name = os.getenv("MODEL_NAME") |
|
|
|
model = load_robot_description_with_cache_substitution(model_name) |
|
|
|
robot = RobotKinematics(urdf_path="", urdf=model, end_effector_link=end_effector_link) |
|
|
|
robot = RobotKinematics( |
|
|
|
urdf_path="", urdf=model, end_effector_link=end_effector_link |
|
|
|
) |
|
|
|
else: |
|
|
|
robot = RobotKinematics(urdf_path=model, urdf="", end_effector_link=end_effector_link) |
|
|
|
robot = RobotKinematics( |
|
|
|
urdf_path=model, urdf="", end_effector_link=end_effector_link |
|
|
|
) |
|
|
|
last_known_state = None |
|
|
|
|
|
|
|
for event in node: |
|
|
|
@@ -342,22 +365,28 @@ def main(): |
|
|
|
|
|
|
|
if event["id"] == "cmd_vel": |
|
|
|
if last_known_state is not None: |
|
|
|
target_vel = event["value"].to_numpy() # expect 100ms |
|
|
|
target_vel = event["value"].to_numpy() # expect 100ms |
|
|
|
# Apply Forward Kinematics |
|
|
|
target = robot.compute_fk(last_known_state) |
|
|
|
target = np.array(get_xyz_rpy_array_from_transform3d(target)) + target_vel |
|
|
|
target = robot.compute_fk(last_known_state) |
|
|
|
target = ( |
|
|
|
np.array(get_xyz_rpy_array_from_transform3d(target)) |
|
|
|
+ target_vel |
|
|
|
) |
|
|
|
target = pa.array(target.ravel(), type=pa.float32()) |
|
|
|
target = pk.Transform3d( |
|
|
|
pos=target[:3], |
|
|
|
rot=pk.transforms.euler_angles_to_matrix( |
|
|
|
torch.tensor(target[3:6]), convention="XYZ", |
|
|
|
torch.tensor(target[3:6]), |
|
|
|
convention="XYZ", |
|
|
|
), |
|
|
|
) |
|
|
|
rob_target = ROB_TF.inverse().compose(target) |
|
|
|
solution = robot.compute_ik(rob_target, last_known_state) |
|
|
|
if solution is None: |
|
|
|
print( |
|
|
|
"No IK Solution for :", target, "skipping this frame.", |
|
|
|
"No IK Solution for :", |
|
|
|
target, |
|
|
|
"skipping this frame.", |
|
|
|
) |
|
|
|
continue |
|
|
|
solution = solution.numpy().ravel() |
|
|
|
@@ -373,7 +402,8 @@ def main(): |
|
|
|
target = event["value"].to_numpy() |
|
|
|
target = target.astype(np.float32) |
|
|
|
target = pk.Transform3d( |
|
|
|
pos=target[:3], rot=torch.tensor(target[3:7]), |
|
|
|
pos=target[:3], |
|
|
|
rot=torch.tensor(target[3:7]), |
|
|
|
) |
|
|
|
rob_target = ROB_TF.inverse().compose(target) |
|
|
|
solution = robot.compute_ik(rob_target, last_known_state) |
|
|
|
@@ -389,19 +419,24 @@ def main(): |
|
|
|
target = pk.Transform3d( |
|
|
|
pos=target[:3], |
|
|
|
rot=pk.transforms.euler_angles_to_matrix( |
|
|
|
torch.tensor(target[3:6]), convention="XYZ", |
|
|
|
torch.tensor(target[3:6]), |
|
|
|
convention="XYZ", |
|
|
|
), |
|
|
|
) |
|
|
|
rob_target = ROB_TF.inverse().compose(target) |
|
|
|
solution = robot.compute_ik(rob_target, last_known_state) |
|
|
|
if solution is None: |
|
|
|
print( |
|
|
|
"No IK Solution for :", target, "skipping this frame.", |
|
|
|
"No IK Solution for :", |
|
|
|
target, |
|
|
|
"skipping this frame.", |
|
|
|
) |
|
|
|
continue |
|
|
|
|
|
|
|
solution = solution.numpy().ravel() |
|
|
|
delta_angles = solution - last_known_state[:len(solution)] # match with dof |
|
|
|
delta_angles = ( |
|
|
|
solution - last_known_state[: len(solution)] |
|
|
|
) # match with dof |
|
|
|
|
|
|
|
valid = np.all( |
|
|
|
(delta_angles >= -np.pi) & (delta_angles <= np.pi), |
|
|
|
@@ -428,15 +463,15 @@ def main(): |
|
|
|
# Compute Jacobian matrix |
|
|
|
joint_angles = event["value"].to_numpy() |
|
|
|
jacobian = robot.compute_jacobian_numpy(joint_angles) |
|
|
|
|
|
|
|
|
|
|
|
jacobian_flat = jacobian.flatten() |
|
|
|
jacobian_array = pa.array(jacobian_flat, type=pa.float32()) |
|
|
|
|
|
|
|
|
|
|
|
metadata["encoding"] = "jacobian_result" |
|
|
|
metadata["jacobian_shape"] = list(jacobian.shape) |
|
|
|
|
|
|
|
|
|
|
|
node.send_output(event["id"], jacobian_array, metadata=metadata) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
main() |