From abf77bfb757107cb042f9d450142fdc1eab12900 Mon Sep 17 00:00:00 2001 From: 7SOMAY Date: Mon, 17 Mar 2025 21:25:38 +0530 Subject: [PATCH] Fixed necessary linting issues --- examples/alexk-lcr/bus.py | 14 ++ examples/alexk-lcr/configure.py | 13 +- .../alexk-lcr/nodes/interpolate_lcr_to_lcr.py | 7 + .../nodes/interpolate_lcr_to_record.py | 3 + .../nodes/interpolate_lcr_to_simu_lcr.py | 7 + .../nodes/interpolate_replay_to_lcr.py | 3 + examples/aloha/benchmark/python/dynamixel.py | 225 +++++++++++++++++- examples/aloha/benchmark/python/robot.py | 69 +++++- .../python/teleoperate_real_robot.py | 6 + examples/aloha/benchmark/ros2/teleop.py | 6 + examples/aloha/nodes/gym_dora_node.py | 29 +++ examples/aloha/nodes/keyboard_node.py | 6 + examples/aloha/nodes/lerobot_webcam_saver.py | 2 + examples/aloha/nodes/llm_op.py | 88 +++++-- examples/aloha/nodes/plot_node.py | 2 + examples/aloha/nodes/policy.py | 23 ++ examples/aloha/nodes/realsense_node.py | 6 + examples/aloha/nodes/replay.py | 2 + examples/aloha/nodes/webcam.py | 6 + examples/aloha/nodes/whisper_node.py | 14 ++ examples/aloha/tests/test_realsense.py | 2 + examples/lebai/nodes/interpolation.py | 6 + examples/lebai/nodes/key_interpolation.py | 6 + examples/lebai/nodes/prompt_interpolation.py | 2 + examples/lebai/nodes/vlm_prompt.py | 4 + examples/lebai/nodes/voice_interpolation.py | 6 + examples/reachy/nodes/gym_dora_node.py | 47 ++++ examples/reachy/nodes/keyboard_node.py | 6 + examples/reachy/nodes/lerobot_webcam_saver.py | 2 + examples/reachy/nodes/plot_node.py | 6 + examples/reachy/nodes/reachy_client.py | 2 + examples/reachy/nodes/reachy_vision_client.py | 2 + examples/reachy/nodes/replay_node.py | 2 + examples/reachy1/nodes/gym_dora_node.py | 10 + examples/reachy1/nodes/key_interpolation.py | 2 + examples/reachy1/nodes/keyboard_node.py | 2 + .../reachy1/nodes/lerobot_webcam_saver.py | 6 + examples/reachy1/nodes/plot_node.py | 6 + examples/reachy1/nodes/reachy_client.py | 6 + .../reachy1/nodes/reachy_vision_client.py | 6 + examples/reachy1/nodes/replay_node.py | 6 + examples/reachy1/nodes/text_interpolation.py | 6 + examples/so100/bus.py | 112 ++++++++- examples/so100/configure.py | 8 +- .../so100/nodes/interpolate_lcr_to_so100.py | 4 +- .../interpolate_lcr_x_so100_to_record.py | 8 +- .../nodes/interpolate_replay_to_so100.py | 8 +- tests/llm/run_script.py | 2 + 48 files changed, 771 insertions(+), 45 deletions(-) diff --git a/examples/alexk-lcr/bus.py b/examples/alexk-lcr/bus.py index 78043e6a..d67ba380 100644 --- a/examples/alexk-lcr/bus.py +++ b/examples/alexk-lcr/bus.py @@ -58,6 +58,7 @@ def wrap_joints_and_values( struct_array = wrap_joints_and_values(joints, value) This example broadcasts the single integer value to all joints and wraps them into a structured array. + """ if isinstance(values, int): values = [values] * len(joints) @@ -171,6 +172,7 @@ class DynamixelBus: description: A dictionary containing the description of the motors connected to the bus. The keys are the motor names and the values are tuples containing the motor id and the motor model. + """ self.port = port self.descriptions = description @@ -211,6 +213,7 @@ class DynamixelBus: Args: data_name: Name of the parameter to write. data: Structured array containing the data to write. + """ motor_ids = [ self.motor_ctrl[motor_name.as_py()]["id"] @@ -288,6 +291,7 @@ class DynamixelBus: Returns: Structured array containing the read data. + """ motor_ids = [ self.motor_ctrl[motor_name.as_py()]["id"] for motor_name in motor_names @@ -336,6 +340,7 @@ class DynamixelBus: Args: torque_mode: Structured array containing the torque mode for each servo. + """ self.write("Torque_Enable", torque_mode) @@ -344,6 +349,7 @@ class DynamixelBus: Args: operating_mode: Structured array containing the operating mode for each servo. + """ self.write("Operating_Mode", operating_mode) @@ -355,6 +361,7 @@ class DynamixelBus: Returns: Structured array containing the current positions. + """ return self.read("Present_Position", motor_names) @@ -366,6 +373,7 @@ class DynamixelBus: Returns: Structured array containing the current velocities. + """ return self.read("Present_Velocity", motor_names) @@ -377,6 +385,7 @@ class DynamixelBus: Returns: Structured array containing the current currents. + """ return self.read("Present_Current", motor_names) @@ -385,6 +394,7 @@ class DynamixelBus: Args: goal_position: Structured array containing the goal positions. + """ self.write("Goal_Position", goal_position) @@ -393,6 +403,7 @@ class DynamixelBus: Args: goal_current: Structured array containing the goal currents. + """ self.write("Goal_Current", goal_current) @@ -401,6 +412,7 @@ class DynamixelBus: Args: position_p_gain: Structured array containing the position P gains. + """ self.write("Position_P_Gain", position_p_gain) @@ -409,6 +421,7 @@ class DynamixelBus: Args: position_i_gain: Structured array containing the position I gains. + """ self.write("Position_I_Gain", position_i_gain) @@ -417,5 +430,6 @@ class DynamixelBus: Args: position_d_gain: Structured array containing the position D gains. + """ self.write("Position_D_Gain", position_d_gain) diff --git a/examples/alexk-lcr/configure.py b/examples/alexk-lcr/configure.py index dd72bd58..e71d4884 100644 --- a/examples/alexk-lcr/configure.py +++ b/examples/alexk-lcr/configure.py @@ -1,4 +1,7 @@ -"""LCR Configuration Tool: This program is used to automatically configure the Low Cost Robot (LCR) for the user. +"""Module for configuring and setting up the Low Cost Robot (LCR) hardware. + +This module provides functionality for initializing and configuring the LCR robot's +servo motors and other hardware components. The program will: 1. Disable all torque motors of provided LCR. @@ -46,10 +49,17 @@ GRIPPER = pa.array(["gripper"], type=pa.string()) def pause(): + """Pause execution and wait for user input to continue.""" input("Press Enter to continue...") def configure_servos(bus: DynamixelBus): + """Configure servo motors with appropriate settings. + + Args: + bus: DynamixelBus instance for servo communication + + """ bus.write_torque_enable( wrap_joints_and_values(FULL_ARM, [TorqueMode.DISABLED.value] * 6), ) @@ -68,6 +78,7 @@ def configure_servos(bus: DynamixelBus): def main(): + """Initialize and configure the LCR robot hardware components.""" parser = argparse.ArgumentParser( description="LCR Auto Configure: This program is used to automatically configure the Low Cost Robot (LCR) for " "the user.", diff --git a/examples/alexk-lcr/nodes/interpolate_lcr_to_lcr.py b/examples/alexk-lcr/nodes/interpolate_lcr_to_lcr.py index c3c42fb3..70db01ae 100644 --- a/examples/alexk-lcr/nodes/interpolate_lcr_to_lcr.py +++ b/examples/alexk-lcr/nodes/interpolate_lcr_to_lcr.py @@ -1,3 +1,9 @@ +"""Module for interpolating between LCR robot configurations. + +This module provides functionality for calculating and interpolating between +different LCR robot configurations to ensure smooth transitions. +""" + import argparse import json import os @@ -14,6 +20,7 @@ from pwm_position_control.transform import ( def main(): + """Initialize and run the LCR interpolation node.""" parser = argparse.ArgumentParser( description="Interpolation LCR Node: This Dora node is used to calculates appropriate goal positions for the " "LCR followers knowing a Leader position and Follower position.", diff --git a/examples/alexk-lcr/nodes/interpolate_lcr_to_record.py b/examples/alexk-lcr/nodes/interpolate_lcr_to_record.py index c04022f8..8ca90704 100644 --- a/examples/alexk-lcr/nodes/interpolate_lcr_to_record.py +++ b/examples/alexk-lcr/nodes/interpolate_lcr_to_record.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import argparse import json import os @@ -13,6 +15,7 @@ from pwm_position_control.transform import ( def main(): + """TODO: Add docstring.""" parser = argparse.ArgumentParser( description="Interpolation LCR Node: This Dora node is used to calculates appropriate goal positions for the " "LCR followers knowing a Leader position and Follower position.", diff --git a/examples/alexk-lcr/nodes/interpolate_lcr_to_simu_lcr.py b/examples/alexk-lcr/nodes/interpolate_lcr_to_simu_lcr.py index 925af277..d4da0ca3 100644 --- a/examples/alexk-lcr/nodes/interpolate_lcr_to_simu_lcr.py +++ b/examples/alexk-lcr/nodes/interpolate_lcr_to_simu_lcr.py @@ -1,3 +1,9 @@ +"""Module for interpolating between LCR and simulated LCR configurations. + +This module provides functionality for calculating and interpolating between +physical LCR robot configurations and their simulated counterparts. +""" + import argparse import json import os @@ -15,6 +21,7 @@ from pwm_position_control.transform import ( def main(): + """Initialize and run the LCR to simulated LCR interpolation node.""" parser = argparse.ArgumentParser( description="Interpolation LCR Node: This Dora node is used to calculates appropriate goal positions for the " "LCR followers knowing a Leader position and Follower position.", diff --git a/examples/alexk-lcr/nodes/interpolate_replay_to_lcr.py b/examples/alexk-lcr/nodes/interpolate_replay_to_lcr.py index b4ee763b..cdd38826 100644 --- a/examples/alexk-lcr/nodes/interpolate_replay_to_lcr.py +++ b/examples/alexk-lcr/nodes/interpolate_replay_to_lcr.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import argparse import json import os @@ -8,6 +10,7 @@ from pwm_position_control.transform import logical_to_pwm_with_offset_arrow def main(): + """TODO: Add docstring.""" parser = argparse.ArgumentParser( description="Interpolation LCR Node: This Dora node is used to calculates appropriate goal positions for the " "LCR followers knowing a Leader position and Follower position.", diff --git a/examples/aloha/benchmark/python/dynamixel.py b/examples/aloha/benchmark/python/dynamixel.py index fefa28f8..67c5a9e5 100644 --- a/examples/aloha/benchmark/python/dynamixel.py +++ b/examples/aloha/benchmark/python/dynamixel.py @@ -1,3 +1,10 @@ +"""Module for managing Dynamixel servo communication and control. + +This module provides functionality for communicating with and controlling Dynamixel servos +through a serial interface. It includes methods for reading and writing servo parameters, +managing operating modes, and handling joint positions and velocities. +""" + from __future__ import annotations import enum @@ -10,6 +17,8 @@ from dynamixel_sdk import * # Uses Dynamixel SDK library class ReadAttribute(enum.Enum): + """Enumeration for Dynamixel servo read attributes.""" + TEMPERATURE = 146 VOLTAGE = 145 VELOCITY = 128 @@ -22,6 +31,8 @@ class ReadAttribute(enum.Enum): class OperatingMode(enum.Enum): + """Enumeration for Dynamixel servo operating modes.""" + VELOCITY = 1 POSITION = 3 CURRENT_CONTROLLED_POSITION = 5 @@ -30,6 +41,8 @@ class OperatingMode(enum.Enum): class Dynamixel: + """Class for managing communication with Dynamixel servos.""" + ADDR_TORQUE_ENABLE = 64 ADDR_GOAL_POSITION = 116 ADDR_VELOCITY_LIMIT = 44 @@ -41,7 +54,10 @@ class Dynamixel: @dataclass class Config: + """Configuration class for Dynamixel servo settings.""" + def instantiate(self): + """Create a new Dynamixel instance with this configuration.""" return Dynamixel(self) baudrate: int = 57600 @@ -50,10 +66,17 @@ class Dynamixel: dynamixel_id: int = 1 def __init__(self, config: Config): + """Initialize the Dynamixel servo connection. + + Args: + config: Configuration object containing connection settings. + + """ self.config = config self.connect() def connect(self): + """Establish connection with the Dynamixel servo.""" if self.config.device_name == "": for port_name in os.listdir("/dev"): if "ttyUSB" in port_name or "ttyACM" in port_name: @@ -77,9 +100,17 @@ class Dynamixel: return True def disconnect(self): + """Close the connection with the Dynamixel servo.""" self.portHandler.closePort() def set_goal_position(self, motor_id, goal_position): + """Set the goal position for the specified servo. + + Args: + motor_id: ID of the servo to control. + goal_position: Target position value. + + """ # if self.operating_modes[motor_id] is not OperatingMode.POSITION: # self._disable_torque(motor_id) # self.set_operating_mode(motor_id, OperatingMode.POSITION) @@ -95,6 +126,14 @@ class Dynamixel: # print(f'set position of motor {motor_id} to {goal_position}') def set_pwm_value(self, motor_id: int, pwm_value, tries=3): + """Set the PWM value for the specified servo. + + Args: + motor_id: ID of the servo to control. + pwm_value: PWM value to set. + tries: Number of attempts to set the value. + + """ if self.operating_modes[motor_id] is not OperatingMode.PWM: self._disable_torque(motor_id) self.set_operating_mode(motor_id, OperatingMode.PWM) @@ -123,9 +162,27 @@ class Dynamixel: ) def read_temperature(self, motor_id: int): + """Read the temperature of the specified servo. + + Args: + motor_id: ID of the servo to read from. + + Returns: + The current temperature value. + + """ return self._read_value(motor_id, ReadAttribute.TEMPERATURE, 1) def read_velocity(self, motor_id: int): + """Read the current velocity of the specified servo. + + Args: + motor_id: ID of the servo to read from. + + Returns: + The current velocity value. + + """ pos = self._read_value(motor_id, ReadAttribute.VELOCITY, 4) if pos > 2**31: pos -= 2**32 @@ -133,6 +190,15 @@ class Dynamixel: return pos def read_position(self, motor_id: int): + """Read the current position of the specified servo. + + Args: + motor_id: ID of the servo to read from. + + Returns: + The current position value. + + """ pos = self._read_value(motor_id, ReadAttribute.POSITION, 4) if pos > 2**31: pos -= 2**32 @@ -140,30 +206,77 @@ class Dynamixel: return pos def read_position_degrees(self, motor_id: int) -> float: + """Read the current position of the specified servo in degrees. + + Args: + motor_id: ID of the servo to read from. + + Returns: + The current position in degrees. + + """ return (self.read_position(motor_id) / 4096) * 360 def read_position_radians(self, motor_id: int) -> float: + """Read the current position of the specified servo in radians. + + Args: + motor_id: ID of the servo to read from. + + Returns: + The current position in radians. + + """ return (self.read_position(motor_id) / 4096) * 2 * math.pi def read_current(self, motor_id: int): + """Read the current value of the specified servo. + + Args: + motor_id: ID of the servo to read from. + + Returns: + The current value. + + """ current = self._read_value(motor_id, ReadAttribute.CURRENT, 2) if current > 2**15: current -= 2**16 return current def read_present_pwm(self, motor_id: int): + """Read the current PWM value of the specified servo. + + Args: + motor_id: ID of the servo to read from. + + Returns: + The current PWM value. + + """ return self._read_value(motor_id, ReadAttribute.PWM, 2) def read_hardware_error_status(self, motor_id: int): + """Read the hardware error status of the specified servo. + + Args: + motor_id: ID of the servo to read from. + + Returns: + The hardware error status value. + + """ return self._read_value(motor_id, ReadAttribute.HARDWARE_ERROR_STATUS, 1) def set_id(self, old_id, new_id, use_broadcast_id: bool = False): - """Sets the id of the dynamixel servo - @param old_id: current id of the servo - @param new_id: new id - @param use_broadcast_id: set ids of all connected dynamixels if True. - If False, change only servo with self.config.id - @return: + """Set the ID of the Dynamixel servo. + + Args: + old_id: Current ID of the servo. + new_id: New ID to set. + use_broadcast_id: If True, set IDs of all connected servos. + If False, change only servo with self.config.id + """ if use_broadcast_id: current_id = 254 @@ -176,6 +289,12 @@ class Dynamixel: self.config.id = id def _enable_torque(self, motor_id): + """Enable torque for the specified servo. + + Args: + motor_id: ID of the servo to control. + + """ dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx( self.portHandler, motor_id, self.ADDR_TORQUE_ENABLE, 1, ) @@ -183,6 +302,12 @@ class Dynamixel: self.torque_enabled[motor_id] = True def _disable_torque(self, motor_id): + """Disable torque for the specified servo. + + Args: + motor_id: ID of the servo to control. + + """ dxl_comm_result, dxl_error = self.packetHandler.write1ByteTxRx( self.portHandler, motor_id, self.ADDR_TORQUE_ENABLE, 0, ) @@ -190,6 +315,18 @@ class Dynamixel: self.torque_enabled[motor_id] = False def _process_response(self, dxl_comm_result: int, dxl_error: int, motor_id: int): + """Process the response from a servo communication. + + Args: + dxl_comm_result: Communication result. + dxl_error: Error value. + motor_id: ID of the servo that was communicated with. + + Raises: + ConnectionError: If communication failed. + RuntimeError: If servo reported an error. + + """ if dxl_comm_result != COMM_SUCCESS: raise ConnectionError( f"dxl_comm_result for motor {motor_id}: {self.packetHandler.getTxRxResult(dxl_comm_result)}", @@ -201,6 +338,13 @@ class Dynamixel: ) def set_operating_mode(self, motor_id: int, operating_mode: OperatingMode): + """Set the operating mode for the specified servo. + + Args: + motor_id: ID of the servo to control. + operating_mode: Operating mode to set. + + """ dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx( self.portHandler, motor_id, self.OPERATING_MODE_ADDR, operating_mode.value, ) @@ -208,30 +352,67 @@ class Dynamixel: self.operating_modes[motor_id] = operating_mode def set_pwm_limit(self, motor_id: int, limit: int): + """Set the PWM limit for the specified servo. + + Args: + motor_id: ID of the servo to control. + limit: PWM limit value to set. + + """ dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx( self.portHandler, motor_id, 36, limit, ) self._process_response(dxl_comm_result, dxl_error, motor_id) def set_velocity_limit(self, motor_id: int, velocity_limit): + """Set the velocity limit for the specified servo. + + Args: + motor_id: ID of the servo to control. + velocity_limit: Velocity limit value to set. + + """ dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx( self.portHandler, motor_id, self.ADDR_VELOCITY_LIMIT, velocity_limit, ) self._process_response(dxl_comm_result, dxl_error, motor_id) def set_P(self, motor_id: int, P: int): + """Set the position P gain for the specified servo. + + Args: + motor_id: ID of the servo to control. + P: Position P gain value to set. + + """ dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx( self.portHandler, motor_id, self.POSITION_P, P, ) self._process_response(dxl_comm_result, dxl_error, motor_id) def set_I(self, motor_id: int, I: int): + """Set the position I gain for the specified servo. + + Args: + motor_id: ID of the servo to control. + I: Position I gain value to set. + + """ dxl_comm_result, dxl_error = self.packetHandler.write2ByteTxRx( self.portHandler, motor_id, self.POSITION_I, I, ) self._process_response(dxl_comm_result, dxl_error, motor_id) def read_home_offset(self, motor_id: int): + """Read the home offset of the specified servo. + + Args: + motor_id: ID of the servo to read from. + + Returns: + The home offset value. + + """ self._disable_torque(motor_id) # dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx(self.portHandler, motor_id, # ReadAttribute.HOMING_OFFSET.value, home_position) @@ -241,6 +422,13 @@ class Dynamixel: return home_offset def set_home_offset(self, motor_id: int, home_position: int): + """Set the home offset for the specified servo. + + Args: + motor_id: ID of the servo to control. + home_position: Home position value to set. + + """ self._disable_torque(motor_id) dxl_comm_result, dxl_error = self.packetHandler.write4ByteTxRx( self.portHandler, motor_id, ReadAttribute.HOMING_OFFSET.value, home_position, @@ -249,6 +437,13 @@ class Dynamixel: self._enable_torque(motor_id) def set_baudrate(self, motor_id: int, baudrate): + """Set the baudrate for the specified servo. + + Args: + motor_id: ID of the servo to control. + baudrate: Baudrate value to set. + + """ # translate baudrate into dynamixel baudrate setting id if baudrate == 57600: baudrate_id = 1 @@ -270,6 +465,18 @@ class Dynamixel: self._process_response(dxl_comm_result, dxl_error, motor_id) def _read_value(self, motor_id, attribute: ReadAttribute, num_bytes: int, tries=10): + """Read a value from the specified servo address. + + Args: + motor_id: ID of the servo to read from. + attribute: Attribute to read. + num_bytes: Number of bytes to read. + tries: Number of attempts to read the value. + + Returns: + The read value. + + """ try: if num_bytes == 1: value, dxl_comm_result, dxl_error = self.packetHandler.read1ByteTxRx( @@ -310,6 +517,12 @@ class Dynamixel: return value def set_home_position(self, motor_id: int): + """Set the home position for the specified servo. + + Args: + motor_id: ID of the servo to control. + + """ print(f"setting home position for motor {motor_id}") self.set_home_offset(motor_id, 0) current_position = self.read_position(motor_id) diff --git a/examples/aloha/benchmark/python/robot.py b/examples/aloha/benchmark/python/robot.py index b2c1b0d6..5f16bb80 100644 --- a/examples/aloha/benchmark/python/robot.py +++ b/examples/aloha/benchmark/python/robot.py @@ -1,3 +1,9 @@ +"""Module for controlling robot hardware and movements. + +This module provides functionality for controlling robot hardware components, +including servo motors and various control modes. +""" + import time from enum import Enum, auto from typing import Union @@ -15,6 +21,12 @@ from dynamixel_sdk import ( class MotorControlType(Enum): + """Enumeration of different motor control modes. + + Defines the various control modes available for the robot's motors, + including PWM control, position control, and disabled states. + """ + PWM = auto() POSITION_CONTROL = auto() DISABLED = auto() @@ -22,8 +34,22 @@ class MotorControlType(Enum): class Robot: + """A class for controlling robot hardware components. + + This class provides methods for controlling robot servos, managing different + control modes, and reading sensor data. + """ + # def __init__(self, device_name: str, baudrate=1_000_000, servo_ids=[1, 2, 3, 4, 5]): def __init__(self, dynamixel, baudrate=1_000_000, servo_ids=[1, 2, 3, 4, 5]): + """Initialize the robot controller. + + Args: + dynamixel: Dynamixel controller instance + baudrate: Communication baudrate for servo control + servo_ids: List of servo IDs to control + + """ self.servo_ids = servo_ids self.dynamixel = dynamixel # self.dynamixel = Dynamixel.Config(baudrate=baudrate, device_name=device_name).instantiate() @@ -66,9 +92,14 @@ class Robot: self.motor_control_state = MotorControlType.DISABLED def read_position(self, tries=2): - """Reads the joint positions of the robot. 2048 is the center position. 0 and 4096 are 180 degrees in each direction. - :param tries: maximum number of tries to read the position - :return: list of joint positions in range [0, 4096] + """Read the current joint positions of the robot. + + Args: + tries: Maximum number of attempts to read positions + + Returns: + list: Joint positions in range [0, 4096] + """ result = self.position_reader.txRxPacket() if result != 0: @@ -84,8 +115,11 @@ class Robot: return positions def read_velocity(self): - """Reads the joint velocities of the robot. - :return: list of joint velocities, + """Read the current joint velocities of the robot. + + Returns: + list: Current joint velocities + """ self.velocity_reader.txRxPacket() velocties = [] @@ -97,7 +131,11 @@ class Robot: return velocties def set_goal_pos(self, action): - """:param action: list or numpy array of target joint positions in range [0, 4096] + """Set goal positions for the robot joints. + + Args: + action: List or numpy array of target joint positions in range [0, 4096] + """ if self.motor_control_state is not MotorControlType.POSITION_CONTROL: self._set_position_control() @@ -113,8 +151,11 @@ class Robot: self.pos_writer.txPacket() def set_pwm(self, action): - """Sets the pwm values for the servos. - :param action: list or numpy array of pwm values in range [0, 885] + """Set PWM values for the servos. + + Args: + action: List or numpy array of PWM values in range [0, 885] + """ if self.motor_control_state is not MotorControlType.PWM: self._set_pwm_control() @@ -128,15 +169,19 @@ class Robot: self.pwm_writer.txPacket() def set_trigger_torque(self): - """Sets a constant torque torque for the last servo in the chain. This is useful for the trigger of the leader arm + """Set a constant torque for the last servo in the chain. + + This is useful for the trigger of the leader arm. """ self.dynamixel._enable_torque(self.servo_ids[-1]) self.dynamixel.set_pwm_value(self.servo_ids[-1], 200) def limit_pwm(self, limit: Union[int, list, np.ndarray]): - """Limits the pwm values for the servos in for position control - @param limit: 0 ~ 885 - @return: + """Limit the PWM values for the servos in position control. + + Args: + limit: PWM limit value in range 0-885 + """ if isinstance(limit, int): limits = [ diff --git a/examples/aloha/benchmark/python/teleoperate_real_robot.py b/examples/aloha/benchmark/python/teleoperate_real_robot.py index 74675656..004059c3 100644 --- a/examples/aloha/benchmark/python/teleoperate_real_robot.py +++ b/examples/aloha/benchmark/python/teleoperate_real_robot.py @@ -1,3 +1,9 @@ +"""Module for teleoperating a physical robot. + +This module provides functionality for controlling a physical robot through +teleoperation, allowing real-time control of robot movements and actions. +""" + from dynamixel import Dynamixel from robot import Robot diff --git a/examples/aloha/benchmark/ros2/teleop.py b/examples/aloha/benchmark/ros2/teleop.py index dc127a67..d3cbbc18 100644 --- a/examples/aloha/benchmark/ros2/teleop.py +++ b/examples/aloha/benchmark/ros2/teleop.py @@ -1,3 +1,9 @@ +"""Module for ROS2-based robot teleoperation. + +This module provides functionality for controlling robots through ROS2, +enabling teleoperation and real-time control of robot movements. +""" + import dora import numpy as np import pyarrow as pa diff --git a/examples/aloha/nodes/gym_dora_node.py b/examples/aloha/nodes/gym_dora_node.py index 70af510d..fd9b3e69 100644 --- a/examples/aloha/nodes/gym_dora_node.py +++ b/examples/aloha/nodes/gym_dora_node.py @@ -1,3 +1,9 @@ +"""Module for integrating gym environments with Dora nodes. + +This module provides functionality for running gym environments as Dora nodes, +including replay capabilities for recorded robot actions. +""" + import time from pathlib import Path @@ -12,7 +18,20 @@ observation = env.reset() class ReplayPolicy: + """A policy class for replaying recorded robot actions. + + This class handles loading and replaying recorded actions from a dataset, + maintaining timing between actions to match the original recording. + """ + def __init__(self, example_path, epidode=0): + """Initialize the replay policy. + + Args: + example_path: Path to the directory containing recorded actions + epidode: Index of the episode to replay + + """ df_action = pd.read_parquet(example_path / "action.parquet") df_episode_index = pd.read_parquet(example_path / "episode_index.parquet") self.df = pd.merge_asof( @@ -29,6 +48,16 @@ class ReplayPolicy: self.finished = False def select_action(self, obs): + """Select the next action to replay. + + Args: + obs: Current observation from the environment (unused) + + Returns: + tuple: (action, finished) where action is the next action to take + and finished indicates if all actions have been replayed + + """ if self.index < len(self.df): self.index += 1 else: diff --git a/examples/aloha/nodes/keyboard_node.py b/examples/aloha/nodes/keyboard_node.py index 32fee0dc..0427213b 100644 --- a/examples/aloha/nodes/keyboard_node.py +++ b/examples/aloha/nodes/keyboard_node.py @@ -1,3 +1,9 @@ +"""Module for handling keyboard input events in robot control. + +This module provides functionality for capturing and processing keyboard input +events to control robot movements and actions. +""" + import pyarrow as pa from dora import Node from pynput import keyboard diff --git a/examples/aloha/nodes/lerobot_webcam_saver.py b/examples/aloha/nodes/lerobot_webcam_saver.py index 840a39fe..c1766115 100644 --- a/examples/aloha/nodes/lerobot_webcam_saver.py +++ b/examples/aloha/nodes/lerobot_webcam_saver.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + #!/usr/bin/env python3 import os diff --git a/examples/aloha/nodes/llm_op.py b/examples/aloha/nodes/llm_op.py index 07797349..9dc8e657 100644 --- a/examples/aloha/nodes/llm_op.py +++ b/examples/aloha/nodes/llm_op.py @@ -1,3 +1,9 @@ +"""Module for language model-based robot control. + +This module provides functionality for controlling robots using natural language +commands processed by large language models. +""" + import gc # garbage collect library import os import re @@ -39,15 +45,14 @@ tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME_OR_PATH, use_fast=True) def extract_python_code_blocks(text): - """Extracts Python code blocks from the given text that are enclosed in triple backticks with a python language identifier. + """Extract Python code blocks from the given text. - Parameters - ---------- - - text: A string that may contain one or more Python code blocks. + Args: + text: A string that may contain one or more Python code blocks. - Returns - ------- - - A list of strings, where each string is a block of Python code extracted from the text. + Returns: + list: A list of strings, where each string is a block of Python code + extracted from the text. """ pattern = r"```python\n(.*?)\n```" @@ -63,15 +68,13 @@ def extract_python_code_blocks(text): def remove_last_line(python_code): - """Removes the last line from a given string of Python code. + """Remove the last line from a given string of Python code. - Parameters - ---------- - - python_code: A string representing Python source code. + Args: + python_code: A string representing Python source code. - Returns - ------- - - A string with the last line removed. + Returns: + str: A string with the last line removed. """ lines = python_code.split("\n") # Split the string into lines @@ -82,7 +85,16 @@ def remove_last_line(python_code): def calculate_similarity(source, target): """Calculate a similarity score between the source and target strings. + This uses the edit distance relative to the length of the strings. + + Args: + source: First string to compare + target: Second string to compare + + Returns: + float: Similarity score between 0 and 1 + """ edit_distance = pylcs.edit_distance(source, target) max_length = max(len(source), len(target)) @@ -92,8 +104,17 @@ def calculate_similarity(source, target): def find_best_match_location(source_code, target_block): - """Find the best match for the target_block within the source_code by searching line by line, - considering blocks of varying lengths. + """Find the best match for the target_block within the source_code. + + This function searches line by line, considering blocks of varying lengths. + + Args: + source_code: The source code to search within + target_block: The code block to find a match for + + Returns: + tuple: (start_index, end_index) of the best matching location + """ source_lines = source_code.split("\n") target_lines = target_block.split("\n") @@ -122,7 +143,15 @@ def find_best_match_location(source_code, target_block): def replace_code_in_source(source_code, replacement_block: str): - """Replace the best matching block in the source_code with the replacement_block, considering variable block lengths. + """Replace the best matching block in the source_code with the replacement_block. + + Args: + source_code: The original source code + replacement_block: The new code block to insert + + Returns: + str: The modified source code + """ replacement_block = extract_python_code_blocks(replacement_block)[0] start_index, end_index = find_best_match_location(source_code, replacement_block) @@ -136,7 +165,14 @@ def replace_code_in_source(source_code, replacement_block: str): class Operator: + """A class for managing LLM-based code modifications. + + This class handles the process of using LLMs to modify code based on + natural language instructions and managing the modification workflow. + """ + def __init__(self) -> None: + """Initialize the operator with policy initialization flag.""" self.policy_init = False def on_event( @@ -144,6 +180,16 @@ class Operator: dora_event, send_output, ) -> DoraStatus: + """Handle incoming events and process LLM-based code modifications. + + Args: + dora_event: Dictionary containing event information + send_output: Function to send output to the dataflow + + Returns: + DoraStatus: Status indicating whether to continue processing + + """ global model, tokenizer if dora_event["type"] == "INPUT" and dora_event["id"] == "text": input = dora_event["value"][0].as_py() @@ -177,7 +223,15 @@ class Operator: return DoraStatus.CONTINUE def ask_llm(self, prompt): + """Send a prompt to the LLM and get its response. + + Args: + prompt: The prompt to send to the LLM + + Returns: + str: The LLM's response + """ # Generate output # prompt = PROMPT_TEMPLATE.format(system_message=system_message, prompt=prompt)) input = tokenizer(prompt, return_tensors="pt") diff --git a/examples/aloha/nodes/plot_node.py b/examples/aloha/nodes/plot_node.py index cfb482d9..6cdec52c 100644 --- a/examples/aloha/nodes/plot_node.py +++ b/examples/aloha/nodes/plot_node.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import os import cv2 diff --git a/examples/aloha/nodes/policy.py b/examples/aloha/nodes/policy.py index 3825c987..73dc5d88 100644 --- a/examples/aloha/nodes/policy.py +++ b/examples/aloha/nodes/policy.py @@ -1,11 +1,34 @@ +"""Module for implementing robot control policies. + +This module provides functionality for implementing and managing robot control policies, +including action selection and state management. +""" + from dora import DoraStatus class Operator: + """A class for implementing robot control policies. + + This class handles the selection and execution of robot actions based on + input events and current state. + """ + def __init__(self): + """Initialize the operator with available actions.""" self.actions = ["get_food", "get_hat"] def on_event(self, event: dict, send_output) -> DoraStatus: + """Handle incoming events and generate appropriate actions. + + Args: + event: Dictionary containing event information + send_output: Function to send output to the dataflow + + Returns: + DoraStatus: Status indicating whether to continue processing + + """ if event["type"] == "INPUT": id = event["id"] # On initialization diff --git a/examples/aloha/nodes/realsense_node.py b/examples/aloha/nodes/realsense_node.py index 0d8998d8..37e81640 100644 --- a/examples/aloha/nodes/realsense_node.py +++ b/examples/aloha/nodes/realsense_node.py @@ -1,3 +1,9 @@ +"""Module for handling Intel RealSense camera data. + +This module provides functionality for capturing and processing data from +Intel RealSense cameras, including depth and color images. +""" + import os import cv2 diff --git a/examples/aloha/nodes/replay.py b/examples/aloha/nodes/replay.py index 755403e1..98440949 100644 --- a/examples/aloha/nodes/replay.py +++ b/examples/aloha/nodes/replay.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import time import pandas as pd diff --git a/examples/aloha/nodes/webcam.py b/examples/aloha/nodes/webcam.py index 219099f5..40f4b463 100644 --- a/examples/aloha/nodes/webcam.py +++ b/examples/aloha/nodes/webcam.py @@ -1,3 +1,9 @@ +"""Module for handling webcam input and processing. + +This module provides functionality for capturing and processing video input +from webcam devices for robot vision applications. +""" + #!/usr/bin/env python3 import os diff --git a/examples/aloha/nodes/whisper_node.py b/examples/aloha/nodes/whisper_node.py index bf3ad1fe..abde9da9 100644 --- a/examples/aloha/nodes/whisper_node.py +++ b/examples/aloha/nodes/whisper_node.py @@ -1,3 +1,9 @@ +"""Module for speech recognition using Whisper. + +This module provides functionality for capturing audio input and converting +it to text using the Whisper speech recognition model. +""" + import numpy as np import pyarrow as pa import sounddevice as sd @@ -14,7 +20,15 @@ node = Node() def get_text(duration) -> str: + """Capture audio and convert it to text using Whisper. + + Args: + duration: Duration of audio to capture in seconds + + Returns: + str: Transcribed text from the audio input + """ ## Microphone audio_data = sd.rec( int(SAMPLE_RATE * duration), diff --git a/examples/aloha/tests/test_realsense.py b/examples/aloha/tests/test_realsense.py index 30ae9d89..9feccb84 100644 --- a/examples/aloha/tests/test_realsense.py +++ b/examples/aloha/tests/test_realsense.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import os import cv2 diff --git a/examples/lebai/nodes/interpolation.py b/examples/lebai/nodes/interpolation.py index 06a40302..b1ea05bd 100644 --- a/examples/lebai/nodes/interpolation.py +++ b/examples/lebai/nodes/interpolation.py @@ -1,3 +1,9 @@ +"""Module for interpolating between robot configurations. + +This module provides functionality for creating smooth transitions between +robot configurations and movements. +""" + from enum import Enum import pyarrow as pa diff --git a/examples/lebai/nodes/key_interpolation.py b/examples/lebai/nodes/key_interpolation.py index 1776fa6b..34653baf 100644 --- a/examples/lebai/nodes/key_interpolation.py +++ b/examples/lebai/nodes/key_interpolation.py @@ -1,3 +1,9 @@ +"""Module for interpolating between keyframe positions. + +This module provides functionality for creating smooth transitions between +keyframe positions in robot motion sequences. +""" + import pyarrow as pa from dora import Node diff --git a/examples/lebai/nodes/prompt_interpolation.py b/examples/lebai/nodes/prompt_interpolation.py index f81f6b69..822f3166 100644 --- a/examples/lebai/nodes/prompt_interpolation.py +++ b/examples/lebai/nodes/prompt_interpolation.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import pyarrow as pa from dora import Node diff --git a/examples/lebai/nodes/vlm_prompt.py b/examples/lebai/nodes/vlm_prompt.py index 8b137891..14ea498e 100644 --- a/examples/lebai/nodes/vlm_prompt.py +++ b/examples/lebai/nodes/vlm_prompt.py @@ -1 +1,5 @@ +"""Module for handling vision-language model prompts. +This module provides functionality for generating and processing prompts +for vision-language models used in robot control. +""" diff --git a/examples/lebai/nodes/voice_interpolation.py b/examples/lebai/nodes/voice_interpolation.py index 72d5c9c2..b940436a 100644 --- a/examples/lebai/nodes/voice_interpolation.py +++ b/examples/lebai/nodes/voice_interpolation.py @@ -1,3 +1,9 @@ +"""Module for interpolating between voice commands. + +This module provides functionality for processing and interpolating between +voice commands used for robot control. +""" + import pyarrow as pa from dora import Node diff --git a/examples/reachy/nodes/gym_dora_node.py b/examples/reachy/nodes/gym_dora_node.py index c1ce0640..1e7f56e5 100644 --- a/examples/reachy/nodes/gym_dora_node.py +++ b/examples/reachy/nodes/gym_dora_node.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import time import gym_dora # noqa: F401 @@ -12,7 +14,20 @@ observation = env.reset() class ReplayPolicy: + """A policy class for replaying recorded robot actions. + + This class handles loading and replaying recorded actions from a dataset, + maintaining timing between actions to match the original recording. + """ + def __init__(self, example_path, epidode=0): + """Initialize the replay policy. + + Args: + example_path: Path to the directory containing recorded actions + epidode: Index of the episode to replay + + """ df_action = pd.read_parquet(example_path / "action.parquet") df_episode_index = pd.read_parquet(example_path / "episode_index.parquet") self.df = pd.merge_asof( @@ -29,6 +44,16 @@ class ReplayPolicy: self.finished = False def select_action(self, obs): + """Select the next action to replay. + + Args: + obs: Current observation from the environment (unused) + + Returns: + tuple: (action, finished) where action is the next action to take + and finished indicates if all actions have been replayed + + """ if self.index < len(self.df): self.index += 1 else: @@ -42,7 +67,19 @@ class ReplayPolicy: class ReplayLeRobotPolicy: + """A policy class for replaying actions from the LeRobot dataset. + + This class handles loading and replaying actions from the LeRobot dataset, + maintaining the sequence of actions from the original recording. + """ + def __init__(self, episode=21): + """Initialize the LeRobot replay policy. + + Args: + episode: Index of the episode to replay from the dataset + + """ self.index = 0 self.finished = False # episode = 1 @@ -53,6 +90,16 @@ class ReplayLeRobotPolicy: self.actions = dataset.hf_dataset["action"][from_index:to_index] def select_action(self, obs): + """Select the next action to replay from the LeRobot dataset. + + Args: + obs: Current observation from the environment (unused) + + Returns: + tuple: (action, finished) where action is the next action to take + and finished indicates if all actions have been replayed + + """ if self.index < len(self.actions): self.index += 1 else: diff --git a/examples/reachy/nodes/keyboard_node.py b/examples/reachy/nodes/keyboard_node.py index 32fee0dc..ebd723f6 100644 --- a/examples/reachy/nodes/keyboard_node.py +++ b/examples/reachy/nodes/keyboard_node.py @@ -1,3 +1,9 @@ +"""Module for handling keyboard input in the Reachy robot control system. + +This module provides functionality for capturing and processing keyboard input +to control the Reachy robot's movements and actions. +""" + import pyarrow as pa from dora import Node from pynput import keyboard diff --git a/examples/reachy/nodes/lerobot_webcam_saver.py b/examples/reachy/nodes/lerobot_webcam_saver.py index 28ba5271..bf17f396 100644 --- a/examples/reachy/nodes/lerobot_webcam_saver.py +++ b/examples/reachy/nodes/lerobot_webcam_saver.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + #!/usr/bin/env python3 import os diff --git a/examples/reachy/nodes/plot_node.py b/examples/reachy/nodes/plot_node.py index ebc8ebd3..4b759411 100644 --- a/examples/reachy/nodes/plot_node.py +++ b/examples/reachy/nodes/plot_node.py @@ -1,3 +1,9 @@ +"""Module for visualizing robot data and states. + +This module provides functionality for plotting and displaying various aspects +of the robot's state, including positions, movements, and sensor data. +""" + import os import cv2 diff --git a/examples/reachy/nodes/reachy_client.py b/examples/reachy/nodes/reachy_client.py index 4477fa72..cea38412 100644 --- a/examples/reachy/nodes/reachy_client.py +++ b/examples/reachy/nodes/reachy_client.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import time # import h5py diff --git a/examples/reachy/nodes/reachy_vision_client.py b/examples/reachy/nodes/reachy_vision_client.py index 1a4cb7d1..b90601fc 100644 --- a/examples/reachy/nodes/reachy_vision_client.py +++ b/examples/reachy/nodes/reachy_vision_client.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import time import numpy as np diff --git a/examples/reachy/nodes/replay_node.py b/examples/reachy/nodes/replay_node.py index 4f56b02d..882f1c38 100644 --- a/examples/reachy/nodes/replay_node.py +++ b/examples/reachy/nodes/replay_node.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import time import pyarrow as pa diff --git a/examples/reachy1/nodes/gym_dora_node.py b/examples/reachy1/nodes/gym_dora_node.py index c1ce0640..7bad5592 100644 --- a/examples/reachy1/nodes/gym_dora_node.py +++ b/examples/reachy1/nodes/gym_dora_node.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import time import gym_dora # noqa: F401 @@ -12,7 +14,10 @@ observation = env.reset() class ReplayPolicy: + """TODO: Add docstring.""" + def __init__(self, example_path, epidode=0): + """TODO: Add docstring.""" df_action = pd.read_parquet(example_path / "action.parquet") df_episode_index = pd.read_parquet(example_path / "episode_index.parquet") self.df = pd.merge_asof( @@ -29,6 +34,7 @@ class ReplayPolicy: self.finished = False def select_action(self, obs): + """TODO: Add docstring.""" if self.index < len(self.df): self.index += 1 else: @@ -42,7 +48,10 @@ class ReplayPolicy: class ReplayLeRobotPolicy: + """TODO: Add docstring.""" + def __init__(self, episode=21): + """TODO: Add docstring.""" self.index = 0 self.finished = False # episode = 1 @@ -53,6 +62,7 @@ class ReplayLeRobotPolicy: self.actions = dataset.hf_dataset["action"][from_index:to_index] def select_action(self, obs): + """TODO: Add docstring.""" if self.index < len(self.actions): self.index += 1 else: diff --git a/examples/reachy1/nodes/key_interpolation.py b/examples/reachy1/nodes/key_interpolation.py index b5107614..b7d050e5 100644 --- a/examples/reachy1/nodes/key_interpolation.py +++ b/examples/reachy1/nodes/key_interpolation.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import pyarrow as pa from dora import Node diff --git a/examples/reachy1/nodes/keyboard_node.py b/examples/reachy1/nodes/keyboard_node.py index 32fee0dc..1a134a19 100644 --- a/examples/reachy1/nodes/keyboard_node.py +++ b/examples/reachy1/nodes/keyboard_node.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import pyarrow as pa from dora import Node from pynput import keyboard diff --git a/examples/reachy1/nodes/lerobot_webcam_saver.py b/examples/reachy1/nodes/lerobot_webcam_saver.py index 28ba5271..d8ffb79b 100644 --- a/examples/reachy1/nodes/lerobot_webcam_saver.py +++ b/examples/reachy1/nodes/lerobot_webcam_saver.py @@ -1,3 +1,9 @@ +"""Module for saving webcam feed data from LeRobot experiments. + +This module provides functionality for capturing and saving webcam feed data +during LeRobot experiments with the Reachy robot. +""" + #!/usr/bin/env python3 import os diff --git a/examples/reachy1/nodes/plot_node.py b/examples/reachy1/nodes/plot_node.py index ebc8ebd3..6910d6a2 100644 --- a/examples/reachy1/nodes/plot_node.py +++ b/examples/reachy1/nodes/plot_node.py @@ -1,3 +1,9 @@ +"""Module for visualizing robot data and states. + +This module provides functionality for creating real-time visualizations of +robot states, movements, and sensor data during operation. +""" + import os import cv2 diff --git a/examples/reachy1/nodes/reachy_client.py b/examples/reachy1/nodes/reachy_client.py index 4477fa72..ccfbdd7f 100644 --- a/examples/reachy1/nodes/reachy_client.py +++ b/examples/reachy1/nodes/reachy_client.py @@ -1,3 +1,9 @@ +"""Module for client-side communication with the Reachy robot. + +This module provides functionality for establishing and managing communication +with the Reachy robot, including sending commands and receiving sensor data. +""" + import time # import h5py diff --git a/examples/reachy1/nodes/reachy_vision_client.py b/examples/reachy1/nodes/reachy_vision_client.py index 1a4cb7d1..425817f5 100644 --- a/examples/reachy1/nodes/reachy_vision_client.py +++ b/examples/reachy1/nodes/reachy_vision_client.py @@ -1,3 +1,9 @@ +"""Module for handling vision-related communication with the Reachy robot. + +This module provides functionality for processing and managing visual data +from the Reachy robot's cameras and vision sensors. +""" + import time import numpy as np diff --git a/examples/reachy1/nodes/replay_node.py b/examples/reachy1/nodes/replay_node.py index 4f56b02d..80e3ec16 100644 --- a/examples/reachy1/nodes/replay_node.py +++ b/examples/reachy1/nodes/replay_node.py @@ -1,3 +1,9 @@ +"""Module for replaying recorded robot actions and movements. + +This module provides functionality for loading and replaying previously recorded +robot actions and movements, allowing for demonstration and analysis of robot behavior. +""" + import time import pyarrow as pa diff --git a/examples/reachy1/nodes/text_interpolation.py b/examples/reachy1/nodes/text_interpolation.py index 2d9a514b..0618c1b8 100644 --- a/examples/reachy1/nodes/text_interpolation.py +++ b/examples/reachy1/nodes/text_interpolation.py @@ -1,3 +1,9 @@ +"""Module for interpolating between text-based robot commands. + +This module provides functionality for processing and interpolating between +text-based commands to create smooth transitions in robot behavior. +""" + import pyarrow as pa from dora import Node diff --git a/examples/so100/bus.py b/examples/so100/bus.py index 569ca9d2..35495146 100644 --- a/examples/so100/bus.py +++ b/examples/so100/bus.py @@ -1,3 +1,9 @@ +"""Module for managing Feetech servo motor communication. + +This module provides functionality for communicating with and controlling +Feetech servo motors through a serial bus interface. +""" + import enum from typing import Union @@ -23,6 +29,16 @@ def wrap_joints_and_values( joints: Union[list[str], pa.Array], values: Union[list[int], pa.Array], ) -> pa.StructArray: + """Create a structured array from joint names and their values. + + Args: + joints: List of joint names or Arrow array of joint names + values: List of values or Arrow array of values + + Returns: + pa.StructArray: Structured array containing joint names and values + + """ return pa.StructArray.from_arrays( arrays=[joints, values], names=["joints", "values"], @@ -30,11 +46,15 @@ def wrap_joints_and_values( class TorqueMode(enum.Enum): + """Enumeration of torque control modes for servo motors.""" + ENABLED = pa.scalar(1, pa.uint32()) DISABLED = pa.scalar(0, pa.uint32()) class OperatingMode(enum.Enum): + """Enumeration of operating modes for servo motors.""" + ONE_TURN = pa.scalar(0, pa.uint32()) @@ -92,12 +112,21 @@ MODEL_CONTROL_TABLE = { class FeetechBus: + """A class for managing communication with Feetech servo motors. + + This class handles the low-level communication with Feetech servo motors + through a serial bus interface, providing methods for reading and writing + motor parameters. + """ def __init__(self, port: str, description: dict[str, (np.uint8, str)]): - """Args: - port: the serial port to connect to the Feetech bus - description: a dictionary containing the description of the motors connected to the bus. The keys are the - motor names and the values are tuples containing the motor id and the motor model. + """Initialize the Feetech bus interface. + + Args: + port: The serial port to connect to the Feetech bus + description: A dictionary containing the description of the motors + connected to the bus. The keys are the motor names and + the values are tuples containing the motor id and model. """ self.port = port @@ -130,9 +159,17 @@ class FeetechBus: self.group_writers = {} def close(self): + """Close the serial port connection.""" self.port_handler.closePort() def write(self, data_name: str, data: pa.StructArray): + """Write data to the specified motor parameters. + + Args: + data_name: Name of the parameter to write + data: Structured array containing the data to write + + """ motor_ids = [ self.motor_ctrl[motor_name.as_py()]["id"] for motor_name in data.field("joints") @@ -199,6 +236,16 @@ class FeetechBus: ) def read(self, data_name: str, motor_names: pa.Array) -> pa.StructArray: + """Read data from the specified motor parameters. + + Args: + data_name: Name of the parameter to read + motor_names: Array of motor names to read from + + Returns: + pa.StructArray: Structured array containing the read data + + """ motor_ids = [ self.motor_ctrl[motor_name.as_py()]["id"] for motor_name in motor_names ] @@ -249,25 +296,82 @@ class FeetechBus: return wrap_joints_and_values(motor_names, values) def write_torque_enable(self, torque_mode: pa.StructArray): + """Enable or disable torque for the specified motors. + + Args: + torque_mode: Structured array containing torque enable/disable values + + """ self.write("Torque_Enable", torque_mode) def write_operating_mode(self, operating_mode: pa.StructArray): + """Set the operating mode for the specified motors. + + Args: + operating_mode: Structured array containing operating mode values + + """ self.write("Mode", operating_mode) def read_position(self, motor_names: pa.Array) -> pa.StructArray: + """Read current position from the specified motors. + + Args: + motor_names: Array of motor names to read positions from + + Returns: + pa.StructArray: Structured array containing position values + + """ return self.read("Present_Position", motor_names) def read_velocity(self, motor_names: pa.Array) -> pa.StructArray: + """Read current velocity from the specified motors. + + Args: + motor_names: Array of motor names to read velocities from + + Returns: + pa.StructArray: Structured array containing velocity values + + """ return self.read("Present_Velocity", motor_names) def read_current(self, motor_names: pa.Array) -> pa.StructArray: + """Read current current from the specified motors. + + Args: + motor_names: Array of motor names to read currents from + + Returns: + pa.StructArray: Structured array containing current values + + """ return self.read("Present_Current", motor_names) def write_goal_position(self, goal_position: pa.StructArray): + """Set goal positions for the specified motors. + + Args: + goal_position: Structured array containing goal position values + + """ self.write("Goal_Position", goal_position) def write_max_angle_limit(self, max_angle_limit: pa.StructArray): + """Set maximum angle limits for the specified motors. + + Args: + max_angle_limit: Structured array containing maximum angle limit values + + """ self.write("Max_Angle_Limit", max_angle_limit) def write_min_angle_limit(self, min_angle_limit: pa.StructArray): + """Set minimum angle limits for the specified motors. + + Args: + min_angle_limit: Structured array containing minimum angle limit values + + """ self.write("Min_Angle_Limit", min_angle_limit) diff --git a/examples/so100/configure.py b/examples/so100/configure.py index b99848bc..247ade59 100644 --- a/examples/so100/configure.py +++ b/examples/so100/configure.py @@ -1,4 +1,7 @@ -"""SO100 Auto Configure: This program is used to automatically configure the SO-ARM100 (SO100) for the user. +"""Module for configuring and setting up the SO100 robot hardware. + +This module provides functionality for initializing and configuring the SO100 robot's +servo motors and other hardware components. The program will: 1. Disable all torque motors of provided SO100. @@ -55,6 +58,7 @@ def configure_servos(bus: FeetechBus): Args: bus: The FeetechBus instance to configure. + """ bus.write_torque_enable( wrap_joints_and_values(FULL_ARM, [TorqueMode.DISABLED.value] * 6), @@ -74,7 +78,7 @@ def configure_servos(bus: FeetechBus): def main(): - """Main function to run the servo configuration process.""" + """Run the servo configuration process.""" parser = argparse.ArgumentParser( description="SO100 Auto Configure: This program is used to automatically configure the Low Cost Robot (SO100) " "for the user.", diff --git a/examples/so100/nodes/interpolate_lcr_to_so100.py b/examples/so100/nodes/interpolate_lcr_to_so100.py index c200cc7f..635f45fd 100644 --- a/examples/so100/nodes/interpolate_lcr_to_so100.py +++ b/examples/so100/nodes/interpolate_lcr_to_so100.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + import argparse import json import os @@ -14,7 +16,7 @@ from pwm_position_control.transform import ( def main(): - """Main function to handle interpolation between LCR and SO100 robot configurations.""" + """Handle interpolation between LCR and SO100 robot configurations.""" # Handle dynamic nodes, ask for the name of the node in the dataflow parser = argparse.ArgumentParser( description="Interpolation LCR Node: This Dora node is used to calculates appropriate goal positions for the " diff --git a/examples/so100/nodes/interpolate_lcr_x_so100_to_record.py b/examples/so100/nodes/interpolate_lcr_x_so100_to_record.py index 540c13d8..e6cd0eef 100644 --- a/examples/so100/nodes/interpolate_lcr_x_so100_to_record.py +++ b/examples/so100/nodes/interpolate_lcr_x_so100_to_record.py @@ -1,3 +1,9 @@ +"""Module for recording and interpolating between LCR and SO100 configurations. + +This module provides functionality for recording robot configurations and +interpolating between LCR and SO100 robots to create smooth motion sequences. +""" + import argparse import json import os @@ -13,7 +19,7 @@ from pwm_position_control.transform import ( def main(): - """Main function to handle interpolation and recording between LCR and SO100 robot configurations.""" + """Handle interpolation and recording between LCR and SO100 configurations.""" parser = argparse.ArgumentParser( description="Interpolation LCR Node: This Dora node is used to calculates appropriate goal positions for the " "LCR followers knowing a Leader position and Follower position.", diff --git a/examples/so100/nodes/interpolate_replay_to_so100.py b/examples/so100/nodes/interpolate_replay_to_so100.py index 3621cde5..6fbac6d8 100644 --- a/examples/so100/nodes/interpolate_replay_to_so100.py +++ b/examples/so100/nodes/interpolate_replay_to_so100.py @@ -1,3 +1,9 @@ +"""Module for interpolating between replay and SO100 robot configurations. + +This module provides functionality for interpolating between recorded robot actions +and SO100 robot configurations to create smooth motion sequences. +""" + import argparse import json import os @@ -8,7 +14,7 @@ from pwm_position_control.transform import logical_to_pwm_with_offset_arrow def main(): - """Main function to handle replay interpolation for SO100 robot configurations.""" + """Handle replay interpolation for SO100 robot configurations.""" # Handle dynamic nodes, ask for the name of the node in the dataflow parser = argparse.ArgumentParser( description="Interpolation LCR Node: This Dora node is used to calculates appropriate goal positions for the " diff --git a/tests/llm/run_script.py b/tests/llm/run_script.py index 5b53f710..58217d47 100644 --- a/tests/llm/run_script.py +++ b/tests/llm/run_script.py @@ -1,3 +1,5 @@ +"""TODO: Add docstring.""" + from dora import run # Make sure to build it first with the CLI.