Browse Source

updated and refactored code

tags/v0.3.12-rc0
ShashwatPatil 7 months ago
parent
commit
3610ffb54f
8 changed files with 662 additions and 352 deletions
  1. +93
    -120
      examples/franka-mujoco-control/02_target_pose_control/nodes/franka_controller_pytorch.py
  2. +2
    -2
      examples/franka-mujoco-control/02_target_pose_control/nodes/pose_publisher.py
  3. +19
    -15
      examples/franka-mujoco-control/02_target_pose_control/target_pose_control_pytorch.yml
  4. +20
    -10
      examples/franka-mujoco-control/03_gamepad_control/gamepad_control.yml
  5. +110
    -173
      examples/franka-mujoco-control/03_gamepad_control/nodes/franka_gamepad_controller.py
  6. +289
    -0
      examples/franka-mujoco-control/kuka_iiwa/model.urdf
  7. +76
    -9
      node-hub/dora-pytorch-kinematics/dora_pytorch_kinematics/main.py
  8. +53
    -23
      node-hub/gamepad/gamepad/main.py

+ 93
- 120
examples/franka-mujoco-control/02_target_pose_control/nodes/franka_controller_pytorch.py View File

@@ -1,198 +1,171 @@
import torch
import numpy as np import numpy as np
import time import time
import pyarrow as pa import pyarrow as pa
from dora import Node from dora import Node
import pytorch_kinematics as pk
from scipy.spatial.transform import Rotation from scipy.spatial.transform import Rotation


class FrankaController: class FrankaController:
def __init__(self): def __init__(self):
""" """
Initialize the Franka controller with differential IK nullspace control
Initialize the Franka controller
""" """
# Load the robot model for IK and FK
urdf_path = "../franka_panda/panda.urdf"
with open(urdf_path, 'rb') as f:
urdf_content = f.read()
# Build serial chain for kinematics
self.chain = pk.build_serial_chain_from_urdf(urdf_content, "panda_hand")
# Move to GPU if available
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.chain = self.chain.to(device=self.device)
# Control parameters # Control parameters
self.integration_dt = 0.1 self.integration_dt = 0.1
self.damping = 1e-4 self.damping = 1e-4
self.Kpos = 0.95 # Position gain self.Kpos = 0.95 # Position gain
self.Kori = 0.95 # Orientation gain self.Kori = 0.95 # Orientation gain
self.Kn = np.array([10.0, 10.0, 10.0, 10.0, 5.0, 5.0, 5.0]) # Nullspace gains
self.max_angvel = 0.785 # Max joint velocity (rad/s)
self.max_angvel = 3.14 # Max joint velocity (rad/s)
# State variables # State variables
self.current_joint_pos = np.array([0.0, -0.785, 0.0, -2.356, 0.0, 1.571, 0.785])
self.current_joint_vel = np.zeros(7)
self.dof = None
self.current_joint_pos = None # Full robot state
self.home_pos = None # Home position for arm joints only
self.target_pos = np.array([0.4, 0.0, 0.3]) # Conservative default target self.target_pos = np.array([0.4, 0.0, 0.3]) # Conservative default target
self.target_rpy = [180.0, 0.0, 90.0] # Downward orientation self.target_rpy = [180.0, 0.0, 90.0] # Downward orientation
self.home_pos = np.array([0, 0, 0, -1.57079, 0, 1.57079, -0.7853]) # joint angles for home position
# Initialize timestamps
self.last_time = time.time()
print("FrankaController initialized with PyTorch Kinematics Differential IK")
# print(f"Chain DOF: {len(self.chain.get_joint_parameter_names())}")
self.current_ee_pose = None # End-effector pose
self.current_jacobian = None # Jacobian matrix
print("FrankaController initialized")
def _initialize_robot(self, positions, jacobian_dof=None):
"""Initialize controller state with appropriate dimensions"""
self.full_joint_count = len(positions)
# Set DOF from Jacobian if available
self.dof = jacobian_dof if jacobian_dof is not None else self.full_joint_count

self.current_joint_pos = positions.copy()
self.home_pos = np.zeros(self.dof)
def get_target_rotation_matrix(self): def get_target_rotation_matrix(self):
"""Convert RPY to rotation matrix.""" """Convert RPY to rotation matrix."""
roll_rad, pitch_rad, yaw_rad = np.radians(self.target_rpy) roll_rad, pitch_rad, yaw_rad = np.radians(self.target_rpy)
desired_rot = Rotation.from_euler('ZYX', [yaw_rad, pitch_rad, roll_rad]) desired_rot = Rotation.from_euler('ZYX', [yaw_rad, pitch_rad, roll_rad])
return desired_rot.as_matrix() return desired_rot.as_matrix()
def compute_jacobian_pytorch(self, joint_angles):
"""Compute Jacobian using PyTorch Kinematics."""
# Convert to torch tensor
q = torch.tensor(joint_angles, device=self.device, dtype=torch.float32).unsqueeze(0)
# Compute Jacobian (returns 6x7 matrix: [linear_vel; angular_vel])
J = self.chain.jacobian(q) # Shape: (1, 6, 7)
return J.squeeze(0).cpu().numpy() # Convert back to numpy (6, 7)
def get_current_ee_pose(self, joint_angles):
"""Get current end-effector pose using forward kinematics."""
# Convert to torch tensor
q = torch.tensor(joint_angles, device=self.device, dtype=torch.float32).unsqueeze(0)
# Forward kinematics
tf = self.chain.forward_kinematics(q) # Returns Transform3d
# Extract position and rotation
transform_matrix = tf.get_matrix().squeeze(0).cpu().numpy() # (4, 4)
position = transform_matrix[:3, 3]
rotation_matrix = transform_matrix[:3, :3]
return position, rotation_matrix
def update_joint_state(self, positions, velocities):
"""Update the current joint state."""
# Filter to only use first 7 joints
self.current_joint_pos = positions[:7]
self.current_joint_vel = velocities[:7]
def set_target_pose(self, pose_array): def set_target_pose(self, pose_array):
"""Set target pose from input array.""" """Set target pose from input array."""
if len(pose_array) >= 3:
self.target_pos = np.array(pose_array[:3])
# print(f"Updated target position: {self.target_pos}")
if len(pose_array) >= 6:
self.target_pos = np.array(pose_array[:3])
if len(pose_array) == 6:
self.target_rpy = list(pose_array[3:6]) self.target_rpy = list(pose_array[3:6])
# print(f"Updated target orientation (RPY): {self.target_rpy}")
else:
self.target_rpy = [180.0, 0.0, 90.0] # Default orientation if not provided
def update_jacobian(self, jacobian_flat, shape):
"""Update current jacobian and initialize DOF if needed."""
jacobian_dof = shape[1]
self.current_jacobian = jacobian_flat.reshape(shape)

if self.dof is None:
if self.current_joint_pos is not None:
self._initialize_robot(self.current_joint_pos, jacobian_dof)
else:
self.dof = jacobian_dof
elif self.dof != jacobian_dof:
self.dof = jacobian_dof
self.home_pos = np.zeros(self.dof)
def apply_differential_ik_control(self): def apply_differential_ik_control(self):
"""Apply differential IK control with nullspace projection.""" """Apply differential IK control with nullspace projection."""
current_ee_pos, current_ee_rot = self.get_current_ee_pose(self.current_joint_pos)
if self.current_ee_pose is None or self.current_jacobian is None:
return self.current_joint_pos
current_ee_pos = self.current_ee_pose['position']
current_ee_rpy = self.current_ee_pose['rpy']


# Calculate position and orientation error
pos_error = self.target_pos - current_ee_pos pos_error = self.target_pos - current_ee_pos
twist = np.zeros(6) twist = np.zeros(6)
twist[:3] = self.Kpos * pos_error / self.integration_dt twist[:3] = self.Kpos * pos_error / self.integration_dt
current_rot = Rotation.from_matrix(current_ee_rot)
# Convert current RPY to rotation matrix
current_rot = Rotation.from_euler('XYZ', current_ee_rpy)
desired_rot = Rotation.from_matrix(self.get_target_rotation_matrix()) desired_rot = Rotation.from_matrix(self.get_target_rotation_matrix())
rot_error = (desired_rot * current_rot.inv()).as_rotvec() rot_error = (desired_rot * current_rot.inv()).as_rotvec()
twist[3:] = self.Kori * rot_error / self.integration_dt twist[3:] = self.Kori * rot_error / self.integration_dt
# Get Jacobian using PyTorch Kinematics
jac = self.compute_jacobian_pytorch(self.current_joint_pos) # (6, 7)
jac = self.current_jacobian
# Damped least squares inverse kinematics # Damped least squares inverse kinematics
diag = self.damping * np.eye(6) diag = self.damping * np.eye(6)
dq = jac.T @ np.linalg.solve(jac @ jac.T + diag, twist) dq = jac.T @ np.linalg.solve(jac @ jac.T + diag, twist)
# Nullspace control
jac_pinv = np.linalg.pinv(jac)
N = np.eye(7) - jac_pinv @ jac # Nullspace projection matrix
dq_null = self.Kn * (self.home_pos - self.current_joint_pos) # Nullspace velocity
dq += N @ dq_null # nullspace movement
# # Apply nullspace control
# # Calculate nullspace projection # uncomment to enable nullspace control
# current_arm = self.current_joint_pos[:self.dof]
# jac_pinv = np.linalg.pinv(jac)
# N = np.eye(self.dof) - jac_pinv @ jac
# # Apply gains to pull towards home position
# k_null = np.ones(self.dof) * 5.0
# dq_null = k_null * (self.home_pos - current_arm) # Nullspace velocity
# dq += N @ dq_null # Nullspace movement
# Limit joint velocities # Limit joint velocities
dq_abs_max = np.abs(dq).max() dq_abs_max = np.abs(dq).max()
if dq_abs_max > self.max_angvel: if dq_abs_max > self.max_angvel:
dq *= self.max_angvel / dq_abs_max dq *= self.max_angvel / dq_abs_max
# Integrate to get new joint positions
new_joints = self.current_joint_pos + dq * self.integration_dt
# Apply joint limits (simple clipping - you could load from URDF)
joint_limits = np.array([
[-2.8973, 2.8973],
[-1.7628, 1.7628],
[-2.8973, 2.8973],
[-3.0718, -0.0698],
[-2.8973, 2.8973],
[-0.0175, 3.7525],
[-2.8973, 2.8973]
])
for i in range(7):
new_joints[i] = np.clip(new_joints[i], joint_limits[i][0], joint_limits[i][1])
# Debug output
# pos_error_mag = np.linalg.norm(pos_error)
# rot_error_mag = np.linalg.norm(rot_error)
# print(f"Position error: {pos_error_mag:.4f}, Rotation error: {rot_error_mag:.4f}")
# print(f"Joint velocity magnitude: {np.linalg.norm(dq):.4f}")
# Create full joint command (apply IK result to arm joints, keep other joints unchanged)
new_joints = self.current_joint_pos.copy()
new_joints[:self.dof] += dq * self.integration_dt
return new_joints return new_joints


def main(): def main():
node = Node() node = Node()
controller = FrankaController() controller = FrankaController()
for event in node: for event in node:
if event["type"] == "INPUT": if event["type"] == "INPUT":
if event["id"] == "joint_positions": if event["id"] == "joint_positions":
joint_pos = event["value"].to_numpy() joint_pos = event["value"].to_numpy()
controller.update_joint_state(joint_pos, controller.current_joint_vel)
# Apply differential IK control
joint_commands = controller.apply_differential_ik_control()
# Pad control to match full robot DOF if needed
if len(joint_commands) == 7: # Arm only
# Add zero control for gripper joints
full_control = np.zeros(9)
full_control[:7] = joint_commands
control_to_send = full_control

if controller.current_joint_pos is None:
controller._initialize_robot(joint_pos)
else: else:
control_to_send = joint_commands
controller.current_joint_pos = joint_pos
# Send control commands to the robot
# Request FK computation
node.send_output( node.send_output(
"joint_commands",
pa.array(control_to_send, type=pa.float32()),
metadata={"timestamp": time.time()}
"fk_request",
pa.array(controller.current_joint_pos, type=pa.float32()),
metadata={"encoding": "jointstate", "timestamp": time.time()}
) )
# Send current end-effector position
current_ee_pos, _ = controller.get_current_ee_pose(controller.current_joint_pos)
# Request Jacobian computation
node.send_output( node.send_output(
"ee_position",
pa.array(current_ee_pos, type=pa.float32()),
"jacobian_request",
pa.array(controller.current_joint_pos, type=pa.float32()),
metadata={"encoding": "jacobian", "timestamp": time.time()}
)
joint_commands = controller.apply_differential_ik_control()
# Send control commands to the robot
node.send_output(
"joint_commands",
pa.array(joint_commands, type=pa.float32()),
metadata={"timestamp": time.time()} metadata={"timestamp": time.time()}
) )
if event["id"] == "joint_velocities":
joint_vel = event["value"].to_numpy()
controller.update_joint_state(controller.current_joint_pos, joint_vel)

# Handle target pose updates # Handle target pose updates
if event["id"] == "target_pose": if event["id"] == "target_pose":
target_pose = event["value"].to_numpy() target_pose = event["value"].to_numpy()
# print(f"Received target pose from publisher: {target_pose}")
controller.set_target_pose(target_pose) controller.set_target_pose(target_pose)
# Handle FK results
if event["id"] == "fk_result":
if event["metadata"]["encoding"] == "xyzrpy":
ee_pose = event["value"].to_numpy()
controller.current_ee_pose = {'position': ee_pose[:3],'rpy': ee_pose[3:6]}
# Handle Jacobian results
if event["id"] == "jacobian_result":
if event["metadata"]["encoding"] == "jacobian_result":
jacobian_flat = event["value"].to_numpy()
jacobian_shape = event["metadata"]["jacobian_shape"]
controller.update_jacobian(jacobian_flat, jacobian_shape)


if __name__ == "__main__": if __name__ == "__main__":
main() main()

+ 2
- 2
examples/franka-mujoco-control/02_target_pose_control/nodes/pose_publisher.py View File

@@ -12,8 +12,8 @@ class PosePublisher:
[0.5, 0.5, 0.3, 180.0, 0.0, 90.0], [0.5, 0.5, 0.3, 180.0, 0.0, 90.0],
[0.6, 0.2, 0.5, 180.0, 0.0, 45.0], [0.6, 0.2, 0.5, 180.0, 0.0, 45.0],
[0.7, 0.1, 0.4, 90.0, 90.0, 90.0], [0.7, 0.1, 0.4, 90.0, 90.0, 90.0],
[0.5, -0.3, 0.6, 180.0, 0.0, 135.0],
[-0.3, -0.7, 0.2, 180.0, 0.0, 90.0],
[0.4, -0.3, 0.5, 180.0, 0.0, 135.0],
[-0.3, -0.6, 0.3, 180.0, 0.0, 90.0],
] ]
self.current_pose_index = 0 self.current_pose_index = 0


+ 19
- 15
examples/franka-mujoco-control/02_target_pose_control/target_pose_control_pytorch.yml View File

@@ -4,7 +4,7 @@ nodes:
path: dora-mujoco path: dora-mujoco
inputs: inputs:
tick: dora/timer/millis/2 tick: dora/timer/millis/2
control_input: franka_controller_pytorch/joint_commands
control_input: kuka_controller_pytorch/joint_commands
outputs: outputs:
- joint_positions - joint_positions
- joint_velocities - joint_velocities
@@ -13,32 +13,36 @@ nodes:
env: env:
MODEL_NAME: "panda" MODEL_NAME: "panda"


- id: franka_controller_pytorch
- id: kuka_controller_pytorch
path: nodes/franka_controller_pytorch.py path: nodes/franka_controller_pytorch.py
inputs: inputs:
joint_positions: mujoco_sim/joint_positions joint_positions: mujoco_sim/joint_positions
joint_velocities: mujoco_sim/joint_velocities joint_velocities: mujoco_sim/joint_velocities
target_pose: pose_publisher/target_pose target_pose: pose_publisher/target_pose
# fk_result: pytorch_kinematics/fk_result # Make sure this matches the ACTUAL output name
fk_result: pytorch_kinematics/fk_request
jacobian_result: pytorch_kinematics/jacobian_request
outputs: outputs:
- joint_commands - joint_commands
- fk_request - fk_request
- ee_position
- jacobian_request


# - id: pytorch_kinematics
# build: pip install -e ../../../node-hub/dora-pytorch-kinematics
# path: dora-pytorch-kinematics
# inputs:
# fk_request: franka_controller_pytorch/fk_request
# outputs:
# - fk_result # Check this output name matches what this node produces
# env:
# URDF_PATH: "/home/shashwatgpatil/Downloads/panda.urdf"
# END_EFFECTOR_LINK: "panda_hand"
- id: pytorch_kinematics
build: pip install -e ../../../node-hub/dora-pytorch-kinematics
path: dora-pytorch-kinematics
inputs:
fk_request: kuka_controller_pytorch/fk_request
jacobian_request: kuka_controller_pytorch/jacobian_request
outputs:
- fk_request
- jacobian_request
env:
URDF_PATH: "/home/shashwatgpatil/Dora-rs/dora/examples/franka-mujoco-control/franka_panda/panda.urdf"
END_EFFECTOR_LINK: "panda_hand"
TRANSFORM: "0. 0. 0. 1. 0. 0. 0." # Pytorch kinematics uses wxyz format for quaternion


- id: pose_publisher - id: pose_publisher
path: nodes/pose_publisher.py path: nodes/pose_publisher.py
inputs: inputs:
tick: dora/timer/millis/10000
tick: dora/timer/millis/5000
outputs: outputs:
- target_pose - target_pose

+ 20
- 10
examples/franka-mujoco-control/03_gamepad_control/gamepad_control.yml View File

@@ -20,22 +20,32 @@ nodes:
- actuator_controls - actuator_controls
- sensor_data - sensor_data
env: env:
MODEL_NAME: "panda"
MODEL_NAME: "kuka_iiwa14"


- id: franka_gamepad_controller - id: franka_gamepad_controller
path: nodes/franka_gamepad_controller.py path: nodes/franka_gamepad_controller.py
inputs: inputs:
joint_positions: mujoco_sim/joint_positions joint_positions: mujoco_sim/joint_positions
joint_velocities: mujoco_sim/joint_velocities
raw_control: gamepad/raw_control raw_control: gamepad/raw_control
# target_pose: pose_publisher/target_pose # Optional
cmd_vel: gamepad/cmd_vel
fk_result: pytorch_kinematics/fk_request
jacobian_result: pytorch_kinematics/jacobian_request
outputs: outputs:
- joint_commands - joint_commands
- ee_position
- fk_request
- jacobian_request


# Optional: uncomment to also have programmatic control
# - id: pose_publisher
# path: ../02_target_pose_control/nodes/pose_publisher.py
# inputs:
# tick: dora/timer/millis/20000 # New pose every 20 seconds
# outputs:
# - target_pose
- id: pytorch_kinematics
build: pip install -e ../../../node-hub/dora-pytorch-kinematics
path: dora-pytorch-kinematics
inputs:
fk_request: franka_gamepad_controller/fk_request
jacobian_request: franka_gamepad_controller/jacobian_request
outputs:
- fk_request
- jacobian_request
env:
URDF_PATH: "/home/shashwatgpatil/Dora-rs/dora/examples/franka-mujoco-control/kuka_iiwa/model.urdf"
END_EFFECTOR_LINK: "lbr_iiwa_link_7"
TRANSFORM: "0. 0. 0. 1. 0. 0. 0."

+ 110
- 173
examples/franka-mujoco-control/03_gamepad_control/nodes/franka_gamepad_controller.py View File

@@ -1,104 +1,60 @@
"""Enhanced Franka controller with gamepad support using PyTorch Kinematics.
"""

import json import json
import time import time
import numpy as np import numpy as np
import pyarrow as pa import pyarrow as pa
import torch
from dora import Node from dora import Node
import pytorch_kinematics as pk
from scipy.spatial.transform import Rotation from scipy.spatial.transform import Rotation


class EnhancedFrankaController:
"""Franka controller with gamepad and target pose support using PyTorch Kinematics."""
class GamepadFrankaController:
def __init__(self): def __init__(self):
"""Initialize controller with PyTorch Kinematics."""
# Load the robot model for IK and FK
urdf_path = "../franka_panda/panda.urdf"
with open(urdf_path, 'rb') as f:
urdf_content = f.read()
# Build serial chain for kinematics
self.chain = pk.build_serial_chain_from_urdf(urdf_content, "panda_hand")
# Move to GPU if available
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.chain = self.chain.to(device=self.device)
# Control parameters # Control parameters
self.integration_dt = 0.1 self.integration_dt = 0.1
self.damping = 1e-4 self.damping = 1e-4
self.Kpos = 0.95 # Position gain self.Kpos = 0.95 # Position gain
self.Kori = 0.95 # Orientation gain self.Kori = 0.95 # Orientation gain
self.Kn = np.array([10.0, 10.0, 10.0, 10.0, 5.0, 5.0, 5.0]) # Nullspace gains
self.max_angvel = 0.785 # Max joint velocity (rad/s)
self.max_angvel = 1.57 # Max joint velocity (rad/s)

# Gamepad control parameters # Gamepad control parameters
self.speed_scale = 0.5 self.speed_scale = 0.5
self.deadzone = 0.05
self.last_input_source = None
# Gripper control parameters
self.gripper_range = [0, 255]
self.gripper_state = 0.0 # (0=closed, 1=open)
# Robot state variables
self.dof = None
self.current_joint_pos = None # Full robot state
self.home_pos = None # Home position for arm joints only
# Robot state
self.current_joint_pos = None
self.target_pos = np.array([0.55, 0.0, 0.6]) # Conservative default target
# Target pose (independent of DOF)
self.target_pos = np.array([0.4, 0.0, 0.3]) # Conservative default target
self.target_rpy = [180.0, 0.0, 90.0] # Downward orientation self.target_rpy = [180.0, 0.0, 90.0] # Downward orientation
self.home_pos = np.array([0, 0, 0, -1.67079, 0, 1.64079, -0.7853])
print("Enhanced Franka Controller initialized with PyTorch Kinematics")
# print(f"Chain DOF: {len(self.chain.get_joint_parameter_names())}")
def apply_deadzone(self, value, deadzone=None):
"""Apply deadzone to joystick input."""
deadzone = deadzone or self.deadzone
return 0.0 if abs(value) < deadzone else value
def get_current_ee_pose(self, joint_angles):
"""Get current end-effector pose using PyTorch Kinematics forward kinematics."""
q = torch.tensor(joint_angles, device=self.device, dtype=torch.float32).unsqueeze(0)
# Forward kinematics
tf = self.chain.forward_kinematics(q) # Returns Transform3d
# Extract position and rotation
transform_matrix = tf.get_matrix().squeeze(0).cpu().numpy() # (4, 4)
position = transform_matrix[:3, 3]
rotation_matrix = transform_matrix[:3, :3]
# Cache for external computations
self.current_ee_pose = None # End-effector pose
self.current_jacobian = None # Jacobian matrix
return position, rotation_matrix
print("Gamepad Franka Controller initialized")
def compute_jacobian_pytorch(self, joint_angles):
"""Compute Jacobian using PyTorch Kinematics."""
q = torch.tensor(joint_angles, device=self.device, dtype=torch.float32).unsqueeze(0)
# Compute Jacobian (returns 6x7 matrix: [linear_vel; angular_vel])
J = self.chain.jacobian(q) # Shape: (1, 6, 7)
def _initialize_robot(self, positions, jacobian_dof=None):
self.full_joint_count = len(positions)
self.dof = jacobian_dof if jacobian_dof is not None else self.full_joint_count
self.current_joint_pos = positions.copy()
self.home_pos = np.zeros(self.dof)
def process_cmd_vel(self, cmd_vel):
print(f"Processing cmd_vel: {cmd_vel}")
dx = cmd_vel[0] * self.speed_scale * 0.1
dy = cmd_vel[1] * self.speed_scale * 0.1
dz = cmd_vel[2] * self.speed_scale * 0.1
return J.squeeze(0).cpu().numpy() # Convert back to numpy (6, 7)
# Update target position incrementally
if abs(dx) > 0 or abs(dy) > 0 or abs(dz) > 0:
self.target_pos += np.array([dx, dy, dz])
def process_gamepad_input(self, raw_control): def process_gamepad_input(self, raw_control):
"""Process gamepad input exactly like the original."""
axes = raw_control["axes"]
buttons = raw_control["buttons"] buttons = raw_control["buttons"]


# Reset position with START button # Reset position with START button
if len(buttons) > 9 and buttons[9]: if len(buttons) > 9 and buttons[9]:
# Reset target to a position based on home joint angles
if self.current_joint_pos is not None:
# Use current joint positions to get home EE position
home_ee_pos, _ = self.get_current_ee_pose(self.home_pos)
self.target_pos = home_ee_pos.copy()
print("Reset target to home position")
# Gripper control with X and Y buttons (exactly like original)
if len(buttons) > 0 and buttons[0]: # X button - Close gripper
self.gripper_state = 0.0
print("Gripper: CLOSED")
elif len(buttons) > 3 and buttons[3]: # Y button - Open gripper
self.gripper_state = 1.0
print("Gripper: OPEN")
self.target_pos = np.array([0.4, 0.0, 0.3])
print("Reset target to home position")
# Speed scaling with bumpers (LB/RB) # Speed scaling with bumpers (LB/RB)
if len(buttons) > 4 and buttons[4]: # LB if len(buttons) > 4 and buttons[4]: # LB
@@ -107,149 +63,130 @@ class EnhancedFrankaController:
if len(buttons) > 5 and buttons[5]: # RB if len(buttons) > 5 and buttons[5]: # RB
self.speed_scale = min(1.0, self.speed_scale + 0.1) self.speed_scale = min(1.0, self.speed_scale + 0.1)
print(f"Speed: {self.speed_scale:.1f}") print(f"Speed: {self.speed_scale:.1f}")
# Get cartesian control inputs with deadzone
dx = self.apply_deadzone(axes[0], self.deadzone) * self.speed_scale * 0.1
dy = -self.apply_deadzone(axes[1], self.deadzone) * self.speed_scale * 0.1
dz = -self.apply_deadzone(axes[3], self.deadzone) * self.speed_scale * 0.1
# Update target position incrementally
if abs(dx) > 0 or abs(dy) > 0 or abs(dz) > 0:
self.target_pos += np.array([dx, dy, dz])
self.last_input_source = "gamepad"
# print(f"Gamepad control: dx={dx:.3f}, dy={dy:.3f}, dz={dz:.3f}")
# print(f"New target: {self.target_pos}")

def process_target_pose(self, pose_array):
"""Process target pose command."""
if len(pose_array) >= 3:
self.target_pos = np.array(pose_array[:3])
print(f"Updated target position: {self.target_pos}")
if len(pose_array) >= 6:
self.target_rpy = list(pose_array[3:6])
print(f"Updated target orientation (RPY): {self.target_rpy}")
self.last_input_source = "target_pose"
def get_target_rotation_matrix(self): def get_target_rotation_matrix(self):
"""Convert RPY to rotation matrix."""
roll_rad, pitch_rad, yaw_rad = np.radians(self.target_rpy) roll_rad, pitch_rad, yaw_rad = np.radians(self.target_rpy)
desired_rot = Rotation.from_euler('ZYX', [yaw_rad, pitch_rad, roll_rad]) desired_rot = Rotation.from_euler('ZYX', [yaw_rad, pitch_rad, roll_rad])
return desired_rot.as_matrix() return desired_rot.as_matrix()
def apply_cartesian_control(self, current_joints):
"""Apply Cartesian control using PyTorch Kinematics differential IK."""
self.current_joint_pos = current_joints[:7]
# Get current end-effector pose using PyTorch Kinematics FK
current_ee_pos, current_ee_rot = self.get_current_ee_pose(self.current_joint_pos)
# Calculate position error
pos_error = self.target_pos - current_ee_pos
def update_jacobian(self, jacobian_flat, shape):
jacobian_dof = shape[1]
self.current_jacobian = jacobian_flat.reshape(shape)

if self.dof is None:
if self.current_joint_pos is not None:
self._initialize_robot(self.current_joint_pos, jacobian_dof)
else:
self.dof = jacobian_dof
elif self.dof != jacobian_dof:
self.dof = jacobian_dof
self.home_pos = np.zeros(self.dof)
def apply_differential_ik_control(self):
if self.current_ee_pose is None or self.current_jacobian is None:
return self.current_joint_pos
current_ee_pos = self.current_ee_pose['position']
current_ee_rpy = self.current_ee_pose['rpy']


# Construct 6D twist (3 position + 3 orientation)
pos_error = self.target_pos - current_ee_pos
twist = np.zeros(6) twist = np.zeros(6)
twist[:3] = self.Kpos * pos_error / self.integration_dt twist[:3] = self.Kpos * pos_error / self.integration_dt
# Orientation control
current_rot = Rotation.from_matrix(current_ee_rot)
# Convert current RPY to rotation matrix
current_rot = Rotation.from_euler('XYZ', current_ee_rpy)
desired_rot = Rotation.from_matrix(self.get_target_rotation_matrix()) desired_rot = Rotation.from_matrix(self.get_target_rotation_matrix())
rot_error = (desired_rot * current_rot.inv()).as_rotvec() rot_error = (desired_rot * current_rot.inv()).as_rotvec()
twist[3:] = self.Kori * rot_error / self.integration_dt twist[3:] = self.Kori * rot_error / self.integration_dt
jac = self.compute_jacobian_pytorch(self.current_joint_pos) # (6, 7)
jac = self.current_jacobian
# Damped least squares inverse kinematics # Damped least squares inverse kinematics
diag = self.damping * np.eye(6) diag = self.damping * np.eye(6)
dq = jac.T @ np.linalg.solve(jac @ jac.T + diag, twist) dq = jac.T @ np.linalg.solve(jac @ jac.T + diag, twist)
# Nullspace control - drive towards home position in nullspace
jac_pinv = np.linalg.pinv(jac)
N = np.eye(7) - jac_pinv @ jac # Nullspace projection matrix
dq_null = self.Kn * (self.home_pos - self.current_joint_pos) # Nullspace velocity
dq += N @ dq_null # Add nullspace movement
# Apply nullspace control if we have redundant DOF
if self.dof > 6:
current_arm = self.current_joint_pos[:self.dof]
jac_pinv = np.linalg.pinv(jac)
N = np.eye(self.dof) - jac_pinv @ jac
k_null = np.ones(self.dof) * 5.0
dq_null = k_null * (self.home_pos - current_arm)
dq += N @ dq_null
# Limit joint velocities # Limit joint velocities
dq_abs_max = np.abs(dq).max() dq_abs_max = np.abs(dq).max()
if dq_abs_max > self.max_angvel: if dq_abs_max > self.max_angvel:
dq *= self.max_angvel / dq_abs_max dq *= self.max_angvel / dq_abs_max
# Integrate to get new joint positions
new_joints = self.current_joint_pos + dq * self.integration_dt
# Apply joint limits (Franka Panda limits)
joint_limits = np.array([
[-2.8973, 2.8973],
[-1.7628, 1.7628],
[-2.8973, 2.8973],
[-3.0718, -0.0698],
[-2.8973, 2.8973],
[-0.0175, 3.7525],
[-2.8973, 2.8973]
])
for i in range(7):
new_joints[i] = np.clip(new_joints[i], joint_limits[i][0], joint_limits[i][1])
# Return 8-dimensional control: 7 arm joints + gripper
full_commands = np.zeros(8)
full_commands[:7] = new_joints
# Map gripper state to control range
full_commands[7] = self.gripper_range[0] if self.gripper_state < 0.5 else self.gripper_range[1]
# Create full joint command
new_joints = self.current_joint_pos.copy()
new_joints[:self.dof] += dq * self.integration_dt
# Debug output
# pos_error_mag = np.linalg.norm(pos_error)
# if pos_error_mag > 0.01: # Only print if there's significant error
# print(f"Position error: {pos_error_mag:.4f}")
return full_commands
return new_joints




def main(): def main():
node = Node("franka_controller")
controller = EnhancedFrankaController()
node = Node()
controller = GamepadFrankaController()
print("Enhanced Franka Controller Node Started")
print("\nGamepad Controls:")
print(" Left Stick X/Y: Move in X/Y plane")
print(" Right Stick Y: Move up/down (Z)")
print(" LB/RB: Decrease/Increase speed")
print(" START: Reset to home position")
print(" X: Close gripper")
print(" Y: Open gripper")
print("\nAlso accepts target_pose commands")
print("Ready to receive inputs...")
print("Gamepad Franka Controller Started")
for event in node: for event in node:
if event["type"] == "INPUT": if event["type"] == "INPUT":
if event["id"] == "joint_positions": if event["id"] == "joint_positions":
# Update current state and compute commands
joint_positions = event["value"].to_numpy()
full_commands = controller.apply_cartesian_control(joint_positions)
joint_pos = event["value"].to_numpy()
if controller.current_joint_pos is None:
controller._initialize_robot(joint_pos)
else:
controller.current_joint_pos = joint_pos
# Request FK computation
node.send_output( node.send_output(
"joint_commands",
pa.array(full_commands, type=pa.float64()),
metadata={"timestamp": time.time()}
"fk_request",
pa.array(controller.current_joint_pos, type=pa.float32()),
metadata={"encoding": "jointstate", "timestamp": time.time()}
)
# Request Jacobian computation
node.send_output(
"jacobian_request",
pa.array(controller.current_joint_pos, type=pa.float32()),
metadata={"encoding": "jacobian", "timestamp": time.time()}
) )
# Send current end-effector position
if controller.current_joint_pos is not None:
current_ee_pos, _ = controller.get_current_ee_pose(controller.current_joint_pos)
node.send_output(
"ee_position",
pa.array(current_ee_pos, type=pa.float64()),
metadata={"timestamp": time.time()}
)
# Apply differential IK control
joint_commands = controller.apply_differential_ik_control()
# Send control commands to the robot
node.send_output(
"joint_commands",
pa.array(joint_commands, type=pa.float32()),
metadata={"timestamp": time.time()}
)
elif event["id"] == "raw_control": elif event["id"] == "raw_control":
raw_control = json.loads(event["value"].to_pylist()[0]) raw_control = json.loads(event["value"].to_pylist()[0])
controller.process_gamepad_input(raw_control) controller.process_gamepad_input(raw_control)
elif event["id"] == "target_pose":
pose_array = event["value"].to_numpy()
controller.process_target_pose(pose_array)
elif event["id"] == "cmd_vel":
cmd_vel = event["value"].to_numpy()
controller.process_cmd_vel(cmd_vel)
# Handle FK results
if event["id"] == "fk_result":
if event["metadata"].get("encoding") == "xyzrpy":
ee_pose = event["value"].to_numpy()
controller.current_ee_pose = {'position': ee_pose[:3], 'rpy': ee_pose[3:6]}
# Handle Jacobian results
if event["id"] == "jacobian_result":
if event["metadata"].get("encoding") == "jacobian_result":
jacobian_flat = event["value"].to_numpy()
jacobian_shape = event["metadata"]["jacobian_shape"]
controller.update_jacobian(jacobian_flat, jacobian_shape)


if __name__ == "__main__": if __name__ == "__main__":
main() main()

+ 289
- 0
examples/franka-mujoco-control/kuka_iiwa/model.urdf View File

@@ -0,0 +1,289 @@
<?xml version="1.0" ?>
<!-- ======================================================================= -->
<!-- | This document was autogenerated by xacro from lbr_iiwa.urdf.xacro | -->
<!-- | Original xacro: https://github.com/rtkg/lbr_iiwa/archive/master.zip | -->
<!-- | EDITING THIS FILE BY HAND IS NOT RECOMMENDED | -->
<!-- | Changes (kohlhoff): | -->
<!-- | * Removed gazebo tags. | -->
<!-- | * Removed unused materials. | -->
<!-- | * Made mesh paths relative. | -->
<!-- | * Removed material fields from collision segments. | -->
<!-- | * Removed the self_collision_checking segment. | -->
<!-- | * Removed transmission segments, since they didn't match the | -->
<!-- | convention, will have to added back later. | -->
<!-- ======================================================================= -->
<!--LICENSE: -->
<!--Copyright (c) 2015, Robert Krug & Todor Stoyanov, AASS Research Center, -->
<!--Orebro University, Sweden -->
<!--All rights reserved. -->
<!-- -->
<!--Redistribution and use in source and binary forms, with or without -->
<!--modification, are permitted provided that the following conditions are -->
<!--met: -->
<!-- -->
<!--1. Redistributions of source code must retain the above copyright notice,-->
<!-- this list of conditions and the following disclaimer. -->
<!-- -->
<!--2. Redistributions in binary form must reproduce the above copyright -->
<!-- notice, this list of conditions and the following disclaimer in the -->
<!-- documentation and/or other materials provided with the distribution. -->
<!-- -->
<!--THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -->
<!--IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,-->
<!--THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -->
<!--PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR -->
<!--CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -->
<!--EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -->
<!--PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -->
<!--PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -->
<!--LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -->
<!--NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -->
<!--SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -->
<robot name="lbr_iiwa" xmlns:xacro="http://www.ros.org/wiki/xacro">
<!-- Import Rviz colors -->
<material name="Grey">
<color rgba="0.2 0.2 0.2 1.0"/>
</material>
<material name="Orange">
<color rgba="1.0 0.423529411765 0.0392156862745 1.0"/>
</material>
<material name="Blue">
<color rgba="0.5 0.7 1.0 1.0"/>
</material>

<!--Import the lbr iiwa macro -->
<!--Import Transmissions -->
<!--Include Utilities -->
<!--The following macros are adapted from the LWR 4 definitions of the RCPRG - https://github.com/RCPRG-ros-pkg/lwr_robot -->
<!--Little helper macros to define the inertia matrix needed for links.-->
<!--Cuboid-->
<!--Cylinder: length is along the y-axis! -->
<!--lbr-->
<link name="lbr_iiwa_link_0">
<inertial>
<origin rpy="0 0 0" xyz="-0.1 0 0.07"/>
<!--Increase mass from 5 Kg original to provide a stable base to carry the
arm.-->
<mass value="0.0"/>
<inertia ixx="0.05" ixy="0" ixz="0" iyy="0.06" iyz="0" izz="0.03"/>
</inertial>
<visual>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_0.obj"/>
</geometry>
<material name="Grey"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_0.stl"/>
</geometry>
</collision>
</link>
<!-- joint between link_0 and link_1 -->
<joint name="lbr_iiwa_joint_1" type="revolute">
<parent link="lbr_iiwa_link_0"/>
<child link="lbr_iiwa_link_1"/>
<origin rpy="0 0 0" xyz="0 0 0.1575"/>
<axis xyz="0 0 1"/>
<limit effort="300" lower="-2.96705972839" upper="2.96705972839" velocity="10"/>
<dynamics damping="0.5"/>
</joint>
<link name="lbr_iiwa_link_1">
<inertial>
<origin rpy="0 0 0" xyz="0 -0.03 0.12"/>
<mass value="4"/>
<inertia ixx="0.1" ixy="0" ixz="0" iyy="0.09" iyz="0" izz="0.02"/>
</inertial>
<visual>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_1.obj"/>
</geometry>
<material name="Blue"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_1.stl"/>
</geometry>
</collision>
</link>
<!-- joint between link_1 and link_2 -->
<joint name="lbr_iiwa_joint_2" type="revolute">
<parent link="lbr_iiwa_link_1"/>
<child link="lbr_iiwa_link_2"/>
<origin rpy="1.57079632679 0 3.14159265359" xyz="0 0 0.2025"/>
<axis xyz="0 0 1"/>
<limit effort="300" lower="-2.09439510239" upper="2.09439510239" velocity="10"/>
<dynamics damping="0.5"/>
</joint>
<link name="lbr_iiwa_link_2">
<inertial>
<origin rpy="0 0 0" xyz="0.0003 0.059 0.042"/>
<mass value="4"/>
<inertia ixx="0.05" ixy="0" ixz="0" iyy="0.018" iyz="0" izz="0.044"/>
</inertial>
<visual>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_2.obj"/>
</geometry>
<material name="Blue"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_2.stl"/>
</geometry>
</collision>
</link>
<!-- joint between link_2 and link_3 -->
<joint name="lbr_iiwa_joint_3" type="revolute">
<parent link="lbr_iiwa_link_2"/>
<child link="lbr_iiwa_link_3"/>
<origin rpy="1.57079632679 0 3.14159265359" xyz="0 0.2045 0"/>
<axis xyz="0 0 1"/>
<limit effort="300" lower="-2.96705972839" upper="2.96705972839" velocity="10"/>
<dynamics damping="0.5"/>
</joint>
<link name="lbr_iiwa_link_3">
<inertial>
<origin rpy="0 0 0" xyz="0 0.03 0.13"/>
<mass value="3"/>
<inertia ixx="0.08" ixy="0" ixz="0" iyy="0.075" iyz="0" izz="0.01"/>
</inertial>
<visual>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_3.obj"/>
</geometry>
<material name="Orange"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_3.stl"/>
</geometry>
</collision>
</link>
<!-- joint between link_3 and link_4 -->
<joint name="lbr_iiwa_joint_4" type="revolute">
<parent link="lbr_iiwa_link_3"/>
<child link="lbr_iiwa_link_4"/>
<origin rpy="1.57079632679 0 0" xyz="0 0 0.2155"/>
<axis xyz="0 0 1"/>
<limit effort="300" lower="-2.09439510239" upper="2.09439510239" velocity="10"/>
<dynamics damping="0.5"/>
</joint>
<link name="lbr_iiwa_link_4">
<inertial>
<origin rpy="0 0 0" xyz="0 0.067 0.034"/>
<mass value="2.7"/>
<inertia ixx="0.03" ixy="0" ixz="0" iyy="0.01" iyz="0" izz="0.029"/>
</inertial>
<visual>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_4.obj"/>
</geometry>
<material name="Blue"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_4.stl"/>
</geometry>
</collision>
</link>
<!-- joint between link_4 and link_5 -->
<joint name="lbr_iiwa_joint_5" type="revolute">
<parent link="lbr_iiwa_link_4"/>
<child link="lbr_iiwa_link_5"/>
<origin rpy="-1.57079632679 3.14159265359 0" xyz="0 0.1845 0"/>
<axis xyz="0 0 1"/>
<limit effort="300" lower="-2.96705972839" upper="2.96705972839" velocity="10"/>
<dynamics damping="0.5"/>
</joint>
<link name="lbr_iiwa_link_5">
<inertial>
<origin rpy="0 0 0" xyz="0.0001 0.021 0.076"/>
<mass value="1.7"/>
<inertia ixx="0.02" ixy="0" ixz="0" iyy="0.018" iyz="0" izz="0.005"/>
</inertial>
<visual>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_5.obj"/>
</geometry>
<material name="Blue"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_5.stl"/>
</geometry>
</collision>
</link>
<!-- joint between link_5 and link_6 -->
<joint name="lbr_iiwa_joint_6" type="revolute">
<parent link="lbr_iiwa_link_5"/>
<child link="lbr_iiwa_link_6"/>
<origin rpy="1.57079632679 0 0" xyz="0 0 0.2155"/>
<axis xyz="0 0 1"/>
<limit effort="300" lower="-2.09439510239" upper="2.09439510239" velocity="10"/>
<dynamics damping="0.5"/>
</joint>
<link name="lbr_iiwa_link_6">
<inertial>
<origin rpy="0 0 0" xyz="0 0.0006 0.0004"/>
<mass value="1.8"/>
<inertia ixx="0.005" ixy="0" ixz="0" iyy="0.0036" iyz="0" izz="0.0047"/>
</inertial>
<visual>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_6.obj"/>
</geometry>
<material name="Orange"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_6.stl"/>
</geometry>
</collision>
</link>
<!-- joint between link_6 and link_7 -->
<joint name="lbr_iiwa_joint_7" type="revolute">
<parent link="lbr_iiwa_link_6"/>
<child link="lbr_iiwa_link_7"/>
<origin rpy="-1.57079632679 3.14159265359 0" xyz="0 0.081 0"/>
<axis xyz="0 0 1"/>
<limit effort="300" lower="-3.05432619099" upper="3.05432619099" velocity="10"/>
<dynamics damping="0.5"/>
</joint>
<link name="lbr_iiwa_link_7">
<inertial>
<origin rpy="0 0 0" xyz="0 0 0.02"/>
<mass value="0.3"/>
<inertia ixx="0.001" ixy="0" ixz="0" iyy="0.001" iyz="0" izz="0.001"/>
</inertial>
<visual>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_7.obj"/>
</geometry>
<material name="Grey"/>
</visual>
<collision>
<origin rpy="0 0 0" xyz="0 0 0"/>
<geometry>
<mesh filename="meshes/link_7.stl"/>
</geometry>
</collision>
</link>
</robot>


+ 76
- 9
node-hub/dora-pytorch-kinematics/dora_pytorch_kinematics/main.py View File

@@ -10,9 +10,9 @@ import torch
from dora import Node from dora import Node
from pytorch_kinematics.transforms.rotation_conversions import matrix_to_euler_angles from pytorch_kinematics.transforms.rotation_conversions import matrix_to_euler_angles


TRANSFORM = np.array(os.getenv("TRANSFORM", "0. 0. 0. 0. 0. 0. 1.").split(" ")).astype(
TRANSFORM = np.array(os.getenv("TRANSFORM", "0. 0. 0. 1. 0. 0. 0.").split(" ")).astype(
np.float32, np.float32,
)
) # wxyz format
pos = torch.tensor([TRANSFORM[0], TRANSFORM[1], TRANSFORM[2]]) pos = torch.tensor([TRANSFORM[0], TRANSFORM[1], TRANSFORM[2]])
rot = torch.tensor( rot = torch.tensor(
[ [
@@ -117,15 +117,21 @@ class RobotKinematics:
) )


if j.ndim == 1: if j.ndim == 1:
if j.shape[0] != self.num_joints:
raise ValueError(f"Expected {self.num_joints} joints, got {j.shape[0]}")
# Handle case with extra joints (e.g., gripper joints)
if j.shape[0] > self.num_joints:
j = j[:self.num_joints] # Truncate griper or extra joints
elif j.shape[0] < self.num_joints:
raise ValueError(f"Expected at least {self.num_joints} joints, got {j.shape[0]}")
if batch_dim_required: if batch_dim_required:
j = j.unsqueeze(0) # Add batch dimension if needed (e.g., shape (1, N))
j = j.unsqueeze(0) # Add batch dimension if needed
elif j.ndim == 2: elif j.ndim == 2:
if j.shape[1] != self.num_joints:
raise ValueError(
f"Expected {self.num_joints} joints (dim 1), got {j.shape[1]}",
)
# Handle batched input with extra joints
if j.shape[1] > self.num_joints:
j = j[:, :self.num_joints] # Truncate griper or extra joints
elif j.shape[1] < self.num_joints:
raise ValueError(f"Expected at least {self.num_joints} joints (dim 1), got {j.shape[1]}")
if batch_dim_required and j.shape[0] != 1: if batch_dim_required and j.shape[0] != 1:
# Most common IK solvers expect batch=1 for initial guess, FK can handle batches # Most common IK solvers expect batch=1 for initial guess, FK can handle batches
# Relaxing this check slightly, but user should be aware for IK # Relaxing this check slightly, but user should be aware for IK
@@ -222,6 +228,55 @@ class RobotKinematics:
return None return None
return solution_angles.solutions.detach() return solution_angles.solutions.detach()


def compute_jacobian(
self, joint_angles: Union[List[float], torch.Tensor]
) -> torch.Tensor:
"""Compute Jacobian matrix using PyTorch Kinematics.
Args:
joint_angles (Union[List[float], torch.Tensor]): Joint angles (radians).
Shape (num_joints,) or (B, num_joints).
Returns:
torch.Tensor: Jacobian matrix of shape (B, 6, num_joints) or (6, num_joints)
where rows are [linear_vel_x, linear_vel_y, linear_vel_z,
angular_vel_x, angular_vel_y, angular_vel_z]
"""
angles_tensor = self._prepare_joint_tensor(
joint_angles, batch_dim_required=False
)
# Ensure we have batch dimension for jacobian computation
if angles_tensor.ndim == 1:
angles_tensor = angles_tensor.unsqueeze(0)
squeeze_output = True
else:
squeeze_output = False
# Compute Jacobian (returns shape: (B, 6, num_joints))
J = self.chain.jacobian(angles_tensor)
# Remove batch dimension if input was 1D
if squeeze_output:
J = J.squeeze(0)
return J

def compute_jacobian_numpy(
self, joint_angles: Union[List[float], torch.Tensor, np.ndarray]
) -> np.ndarray:
"""Compute Jacobian matrix and return as numpy array.
Args:
joint_angles: Joint angles (radians). Can be list, torch.Tensor, or np.ndarray.
Shape (num_joints,) or (B, num_joints).
Returns:
np.ndarray: Jacobian matrix as numpy array of shape (6, num_joints) or (B, 6, num_joints)
"""
J = self.compute_jacobian(joint_angles)
return J.cpu().numpy()



def main(): def main():
"""TODO: Add docstring.""" """TODO: Add docstring."""
@@ -293,6 +348,18 @@ def main():
target = pa.array(target.ravel(), type=pa.float32()) target = pa.array(target.ravel(), type=pa.float32())
metadata["encoding"] = "xyzrpy" metadata["encoding"] = "xyzrpy"
node.send_output(event["id"], target, metadata=metadata) node.send_output(event["id"], target, metadata=metadata)
case "jacobian":
# Compute Jacobian matrix
joint_angles = event["value"].to_numpy()
jacobian = robot.compute_jacobian_numpy(joint_angles)
jacobian_flat = jacobian.flatten()
jacobian_array = pa.array(jacobian_flat, type=pa.float32())
metadata["encoding"] = "jacobian_result"
metadata["jacobian_shape"] = list(jacobian.shape)
node.send_output(event["id"], jacobian_array, metadata=metadata)




if __name__ == "__main__": if __name__ == "__main__":


+ 53
- 23
node-hub/gamepad/gamepad/main.py View File

@@ -1,9 +1,4 @@
"""Gamepad controller node for Dora.

This module provides a Dora node that reads input from a controller and publishes:
1. velocity commands for robot control
2. raw controller state for debugging and custom mappings
"""
"""Gamepad controller node for Dora."""


from dora import Node from dora import Node
import pygame import pygame
@@ -11,7 +6,7 @@ import pyarrow as pa
import json import json


class Controller: class Controller:
"""controller mapping."""
"""Controller mapping."""


def __init__(self): def __init__(self):
"""Change this according to your controller mapping. Currently Logitech F710.""" """Change this according to your controller mapping. Currently Logitech F710."""
@@ -33,8 +28,9 @@ class Controller:
'BACK': 8, 'BACK': 8,
'START': 9, 'START': 9,
'LEFT-STICK': 10, 'LEFT-STICK': 10,
'RIGHT-STICK': 11
'RIGHT-STICK': 11,
} }
self.hatIndex = 0 # Index of the D-pad hat


def main(): def main():
node = Node("gamepad") node = Node("gamepad")
@@ -51,8 +47,9 @@ def main():


controller = Controller() controller = Controller()
max_linear_speed = 1.0 # Maximum speed in m/s
max_angular_speed = 1.5 # Maximum angular speed in rad/s
move_speed = 0.05 # Fixed increment for D-pad
max_linear_z_speed = 0.1 # Maximum linear Z speed
max_angular_speed = 0.8 # Maximum angular speed
print(f"Detected controller: {joystick.get_name()}") print(f"Detected controller: {joystick.get_name()}")
print(f"Number of axes: {joystick.get_numaxes()}") print(f"Number of axes: {joystick.get_numaxes()}")
@@ -66,30 +63,64 @@ def main():
# Get all controller states # Get all controller states
axes = [joystick.get_axis(i) for i in range(joystick.get_numaxes())] axes = [joystick.get_axis(i) for i in range(joystick.get_numaxes())]
buttons = [joystick.get_button(i) for i in range(joystick.get_numbuttons())] buttons = [joystick.get_button(i) for i in range(joystick.get_numbuttons())]
# Get hat state (D-pad)
dpad_x, dpad_y = 0, 0
if joystick.get_numhats() > 0:
dpad_x, dpad_y = joystick.get_hat(controller.hatIndex)


# Create raw control state # Create raw control state
raw_control = { raw_control = {
"axes": axes, "axes": axes,
"buttons": buttons, "buttons": buttons,
"hats": [[dpad_x, dpad_y]],
"mapping": { "mapping": {
"axes": controller.axisNames, "axes": controller.axisNames,
"buttons": controller.buttonNames "buttons": controller.buttonNames
} }
} }
# print("raw_control:", raw_control) # uncomment for debugging and re-map


# Regular cmd_vel processing
forward = -joystick.get_axis(controller.axisNames['LEFT-Y'])
turn = -joystick.get_axis(controller.axisNames['LEFT-X'])
# cmd_vel processing:
# 1. D-pad vertical for X axis
# 2. D-pad horizontal for Y axis
# 3. Right stick vertical for Z axis
# 4. Right stick horizontal for rotation around Z
# 5. Left stick vertical for rotation around X
# 6. Left stick horizontal for rotation around Y

deadzone = 0.05 deadzone = 0.05
forward = 0.0 if abs(forward) < deadzone else forward
turn = 0.0 if abs(turn) < deadzone else turn

# Linear X velocity from D-pad vertical
linear_x = 0.0
if dpad_y != 0:
linear_x = dpad_y * move_speed

# Linear Y velocity from D-pad horizontal
linear_y = 0.0
if dpad_x != 0:
linear_y = dpad_x * move_speed
forward_speed = forward * max_linear_speed
turn_speed = turn * max_angular_speed
cmd_vel = [forward_speed, 0.0, 0.0, 0.0, 0.0, turn_speed]
# Linear Z velocity from right stick vertical
right_y = -joystick.get_axis(controller.axisNames['RIGHT-Y'])
right_y = 0.0 if abs(right_y) < deadzone else right_y
linear_z = right_y * max_linear_z_speed
# Angular Z velocity (rotation) from right stick horizontal
right_x = -joystick.get_axis(controller.axisNames['RIGHT-X'])
right_x = 0.0 if abs(right_x) < deadzone else right_x
angular_z = right_x * max_angular_speed
# Angular X velocity from left stick vertical
left_y = -joystick.get_axis(controller.axisNames['LEFT-Y'])
left_y = 0.0 if abs(left_y) < deadzone else left_y
angular_x = left_y * max_angular_speed
# Angular Y velocity from left stick horizontal
left_x = -joystick.get_axis(controller.axisNames['LEFT-X'])
left_x = 0.0 if abs(left_x) < deadzone else left_x
angular_y = left_x * max_angular_speed
cmd_vel = [linear_x, linear_y, linear_z, angular_x, angular_y, angular_z]
node.send_output( node.send_output(
output_id="cmd_vel", output_id="cmd_vel",
@@ -107,7 +138,6 @@ def main():
print("\nExiting...") print("\nExiting...")
finally: finally:
pygame.quit() pygame.quit()
# Send zero velocity at cleanup
zero_cmd = [0.0] * 6 zero_cmd = [0.0] * 6
node.send_output( node.send_output(
output_id="cmd_vel", output_id="cmd_vel",
@@ -116,4 +146,4 @@ def main():
) )


if __name__ == "__main__": if __name__ == "__main__":
main()
main()

Loading…
Cancel
Save