diff --git a/node-hub/dora-reachy1/README.md b/node-hub/dora-reachy1/README.md new file mode 100644 index 00000000..04b4dda8 --- /dev/null +++ b/node-hub/dora-reachy1/README.md @@ -0,0 +1 @@ +## Dora reachy client diff --git a/node-hub/dora-reachy1/dora_reachy1/__init__,py b/node-hub/dora-reachy1/dora_reachy1/__init__,py new file mode 100644 index 00000000..e69de29b diff --git a/node-hub/dora-reachy1/dora_reachy1/main.py b/node-hub/dora-reachy1/dora_reachy1/main.py new file mode 100644 index 00000000..7305b285 --- /dev/null +++ b/node-hub/dora-reachy1/dora_reachy1/main.py @@ -0,0 +1,125 @@ +from reachy_sdk import ReachySDK +from reachy_sdk.trajectory import goto, goto_async +import os +from dora import Node +import numpy as np +import time + + +def r_arm_inverse_kinematics(reachy, pose, action) -> list: + A = np.array( + [ + [0, 0, -1, pose[0] + action[0]], + [0, 1, 0, pose[1] + action[1]], + [1, 0, 0, pose[2] + action[2]], + [0, 0, 0, 1], + ] + ) + return reachy.r_arm.inverse_kinematics(A) + + +def happy_antennas(reachy): + reachy.head.l_antenna.speed_limit = 480.0 + reachy.head.r_antenna.speed_limit = 480.0 + + for _ in range(1): + reachy.head.l_antenna.goal_position = 10.0 + reachy.head.r_antenna.goal_position = -10.0 + + time.sleep(0.1) + + reachy.head.l_antenna.goal_position = -10.0 + reachy.head.r_antenna.goal_position = 10.0 + + time.sleep(0.1) + + reachy.head.l_antenna.goal_position = 0.0 + reachy.head.r_antenna.goal_position = 0.0 + + +def sad_antennas(reachy): + reachy.head.l_antenna.speed_limit = 360.0 + reachy.head.r_antenna.speed_limit = 360.0 + + reachy.head.l_antenna.goal_position = 140.0 + reachy.head.r_antenna.goal_position = -140.0 + + +def main(): + + node = Node() + + ROBOT_IP = os.getenv("ROBOT_IP", "10.42.0.24") + MAX_R_ARM_POSE = [0.35, -0.46, -0.42] + + reachy = ReachySDK(ROBOT_IP, with_mobile_base=False) + reachy.turn_on("r_arm") + reachy.turn_on("head") + + r_arm_pose = [0.2, -0.46, -0.42] + head_pose = [ + reachy.head.neck_roll.present_position, + reachy.head.neck_yaw.present_position, + reachy.head.neck_pitch.present_position, + ] + + default_pose = r_arm_inverse_kinematics(reachy, r_arm_pose, [0, 0, 0]) + + goto( + {joint: pos for joint, pos in zip(reachy.r_arm.joints.values(), default_pose)}, + duration=3, + ) + + for event in node: + if event["type"] != "INPUT": + continue + + if event["id"] == "r_arm_action": + action = event["value"].to_pylist() + joint_pose = r_arm_inverse_kinematics(reachy, r_arm_pose, action) + goto( + { + joint: pos + for joint, pos in zip(reachy.r_arm.joints.values(), joint_pose) + }, + duration=0.200, + ) + r_arm_pose = np.array(r_arm_pose) + np.array(action) + elif event["id"] == "head_action": + action = event["value"].to_pylist() + for i in range(5): + head_pose = np.array(head_pose) + np.array(action) / 5 + reachy.head.neck_roll.goal_position = head_pose[0] + reachy.head.neck_yaw.goal_position = head_pose[1] + reachy.head.neck_pitch.goal_position = head_pose[2] + time.sleep(0.03) + elif event["id"] == "antenna_action": + text = event["value"].to_pylist()[0] + if text == "smile": + happy_antennas(reachy) + elif text == "cry": + sad_antennas(reachy) + elif event["id"] == "gripper_action": + action = event["value"].to_pylist()[0] + step = (action - reachy.joints.r_gripper.present_position) / 10 + goal = reachy.joints.r_gripper.present_position + for i in range(10): + goal += step + goal = np.clip(goal, -100, 100) + reachy.joints.r_gripper.goal_position = goal + time.sleep(0.02) + + # When openning the gripper always go to default pose + if action == -100: + goto( + { + joint: pos + for joint, pos in zip( + reachy.r_arm.joints.values(), default_pose + ) + }, + duration=3, + ) + r_arm_pose = [0.2, -0.46, -0.42] + reachy.turn_off_smoothly("r_arm") + reachy.turn_off_smoothly("head") diff --git a/node-hub/dora-reachy1/dora_reachy1_vision/main.py b/node-hub/dora-reachy1/dora_reachy1_vision/main.py new file mode 100644 index 00000000..3f3d36bc --- /dev/null +++ b/node-hub/dora-reachy1/dora_reachy1_vision/main.py @@ -0,0 +1,31 @@ +from reachy_sdk import ReachySDK +from reachy_sdk.trajectory import goto, goto_async +import os +from dora import Node +import numpy as np +import pyarrow as pa + + +def main(): + + node = Node() + + ROBOT_IP = os.getenv("ROBOT_IP", "10.42.0.24") + CAMERA = os.getenv("CAMERA", "right") + + reachy = ReachySDK(ROBOT_IP, with_mobile_base=False) + + for event in node: + if event["type"] == "INPUT": + if CAMERA == "right": + frame = reachy.right_camera.last_frame + else: + frame = reachy.left_camera.last_frame + encoding = "bgr8" + metadata = {} + metadata["width"] = int(frame.shape[1]) + metadata["height"] = int(frame.shape[0]) + metadata["encoding"] = encoding + + storage = pa.array(frame.ravel()) + node.send_output("image", storage, metadata) diff --git a/node-hub/dora-reachy1/pyproject.toml b/node-hub/dora-reachy1/pyproject.toml new file mode 100644 index 00000000..0bdf44cf --- /dev/null +++ b/node-hub/dora-reachy1/pyproject.toml @@ -0,0 +1,23 @@ +[tool.poetry] +name = "dora-reachy1" +version = "0.3.6" +authors = [ + "Haixuan Xavier Tao ", + "Enzo Le Van ", +] +description = "Dora Node for controlling reachy1" +readme = "README.md" + +packages = [{ include = "dora_reachy1" }, { include = "dora_reachy1_vision" }] + +[tool.poetry.dependencies] +dora-rs = "^0.3.6" +reachy-sdk = "^0.7.0" + +[tool.poetry.scripts] +dora-reachy1 = "dora_reachy1.main:main" +dora-reachy1-vision = "dora_reachy1_vision.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" diff --git a/node-hub/dynamixel-client/README.md b/node-hub/dynamixel-client/README.md new file mode 100644 index 00000000..6b26261c --- /dev/null +++ b/node-hub/dynamixel-client/README.md @@ -0,0 +1,107 @@ +## DynamixelClient for XL motors + +This node is a client for the Dynamixel motors. It is based on the Dynamixel SDK and is used to control the motors. It +is a Python node that communicates with the motors via the USB port. + +## YAML Configuration + +````YAML +nodes: + - id: dynamixel_client + path: client.py # modify this to the relative path from the graph file to the client script + inputs: + pull_position: dora/timer/millis/10 # pull the present position every 10ms + pull_velocity: dora/timer/millis/10 # pull the present velocity every 10ms + pull_current: dora/timer/millis/10 # pull the present current every 10ms + + # write_goal_position: some goal position from other node + # write_goal_current: some goal current from other node + + # end: some end signal from other node + outputs: + - position # regarding 'pull_position' input, it will output the position every 10ms + - velocity # regarding 'pull_velocity' input, it will output the velocity every 10ms + - current # regarding 'pull_current' input, it will output the current every 10ms + + env: + PORT: COM9 # e.g. /dev/ttyUSB0 or COM9 + CONFIG: config.json # the configuration file for the motors +```` + +## Arrow format + +### Outputs + +Arrow **StructArray** with two fields, **joints** and **values**: + +```Python +import pyarrow as pa + +# Create a StructArray from a list of joints (py_list, numpy_array or pyarrow_array) and a list of values (py_list, numpy_array or pyarrow_array) +arrow_struct = pa.StructArray.from_arrays( + arrays=[joints, values], + names=["joints", "values"] +) + +# Send the StructArray to the dataflow +node.send_output("output_name", arrow_struct, None) + +# Receive the StructArray from the dataflow +event = node.next() +arrow_struct = event["value"] +joints = arrow_struct.field("joints") # PyArrow Array of Strings +values = arrow_struct.field("values") # PyArrow Array of Int32/Uint32/Float32... +``` + +### Inputs + +Arrow **StructArray** with two fields, **joints** and **values**: + +```Python +import pyarrow as pa + +# Create a StructArray from a list of joints (py_list, numpy_array or pyarrow_array) and a list of values (py_list, numpy_array or pyarrow_array) +arrow_struct = pa.StructArray.from_arrays( + arrays=[joints, values], + names=["joints", "values"] +) + +# Send the StructArray to the dataflow +node.send_output("output_name", arrow_struct, None) + +# Receive the StructArray from the dataflow +event = node.next() +arrow_struct = event["value"] +joints = arrow_struct.field("joints") # PyArrow Array of Strings +values = arrow_struct.field("values") # PyArrow Array of Int32/Uint32/Float32... +``` + +**Note**: The zero-copy is available for numpy arrays (with no None values) and pyarrow arrays. + +## Configuration + +The configuration file that should be passed to the node is a JSON file that contains the configuration for the motors: + +```JSON +{ + "shoulder_pan": { + "id": 1, + "model": "x_series", + "torque": true, + "P": 800, + "I": 0, + "D": 0, + "goal_current": null + } +} +``` + +The configuration file starts by the **joint** name of the servo. **id**: the id of the motor in the bus, **model**: the +model of the motor, **torque**: whether the motor should be +in torque mode or not (at the beginning), **P**: the proportional gain for position control mode, **I**: the integral +gain for position control mode, **D**: the derivative gain for position control mode, **goal_current**: the goal current +for the motor at the beginning, null if you don't want to set it. + +## License + +This library is licensed under the [Apache License 2.0](../../LICENSE). \ No newline at end of file diff --git a/node-hub/dynamixel-client/config.template.json b/node-hub/dynamixel-client/config.template.json new file mode 100644 index 00000000..62e7b7aa --- /dev/null +++ b/node-hub/dynamixel-client/config.template.json @@ -0,0 +1,20 @@ +{ + "shoulder_pan": { + "id": 1, + "model": "x_series", + "torque": true, + "P": 800, + "I": 0, + "D": 0, + "goal_current": null + }, + "shoulder_lift": { + "id": 2, + "model": "x_series", + "torque": true, + "P": 800, + "I": 0, + "D": 0, + "goal_current": null + } +} \ No newline at end of file diff --git a/node-hub/dynamixel-client/dynamixel_client/__init__.py b/node-hub/dynamixel-client/dynamixel_client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/node-hub/dynamixel-client/dynamixel_client/bus.py b/node-hub/dynamixel-client/dynamixel_client/bus.py new file mode 100644 index 00000000..d123fa67 --- /dev/null +++ b/node-hub/dynamixel-client/dynamixel_client/bus.py @@ -0,0 +1,328 @@ +import enum + +import pyarrow as pa + +from typing import Union + +from dynamixel_sdk import ( + PacketHandler, + PortHandler, + COMM_SUCCESS, + GroupSyncRead, + GroupSyncWrite, +) +from dynamixel_sdk import DXL_HIBYTE, DXL_HIWORD, DXL_LOBYTE, DXL_LOWORD + +PROTOCOL_VERSION = 2.0 +BAUD_RATE = 1_000_000 +TIMEOUT_MS = 1000 + + +def wrap_joints_and_values( + joints: Union[list[str], pa.Array], + values: Union[int, list[int], pa.Array], +) -> pa.StructArray: + """ + Wraps joints and their corresponding values into a structured array. + + :param joints: A list, numpy array, or pyarrow array of joint names. + :type joints: Union[list[str], np.array, pa.Array] + :param values: A single integer value, or a list, numpy array, or pyarrow array of integer values. + If a single integer is provided, it will be broadcasted to all joints. + :type values: Union[int, list[int], np.array, pa.Array] + + :return: A structured array with two fields: + - "joints": A string field containing the names of the joints. + - "values": An Int32Array containing the values corresponding to the joints. + :rtype: pa.StructArray + + :raises ValueError: If lengths of joints and values do not match. + + Example: + -------- + joints = ["shoulder_pan", "shoulder_lift", "elbow_flex"] + values = [100, 200, 300] + struct_array = wrap_joints_and_values(joints, values) + + This example wraps the given joints and their corresponding values into a structured array. + + Another example with a single integer value: + joints = ["shoulder_pan", "shoulder_lift", "elbow_flex"] + value = 150 + struct_array = wrap_joints_and_values(joints, value) + + This example broadcasts the single integer value to all joints and wraps them into a structured array. + """ + + if isinstance(values, int): + values = [values] * len(joints) + + if len(joints) != len(values): + raise ValueError("joints and values must have the same length") + + mask = pa.array([False] * len(values), type=pa.bool_()) + + if isinstance(values, list): + mask = pa.array([value is None for value in values]) + + if isinstance(values, pa.Array): + mask = values.is_null() + + return pa.StructArray.from_arrays( + arrays=[joints, values], names=["joints", "values"], mask=mask + ).drop_null() + + +class TorqueMode(enum.Enum): + ENABLED = pa.scalar(1, pa.uint32()) + DISABLED = pa.scalar(0, pa.uint32()) + + +class OperatingMode(enum.Enum): + VELOCITY = pa.scalar(1, pa.uint32()) + POSITION = pa.scalar(3, pa.uint32()) + EXTENDED_POSITION = pa.scalar(4, pa.uint32()) + CURRENT_CONTROLLED_POSITION = pa.scalar(5, pa.uint32()) + PWM = pa.scalar(16, pa.uint32()) + + +X_SERIES_CONTROL_TABLE = [ + ("Model_Number", 0, 2), + ("Model_Information", 2, 4), + ("Firmware_Version", 6, 1), + ("ID", 7, 1), + ("Baud_Rate", 8, 1), + ("Return_Delay_Time", 9, 1), + ("Drive_Mode", 10, 1), + ("Operating_Mode", 11, 1), + ("Secondary_ID", 12, 1), + ("Protocol_Type", 13, 1), + ("Homing_Offset", 20, 4), + ("Moving_Threshold", 24, 4), + ("Temperature_Limit", 31, 1), + ("Max_Voltage_Limit", 32, 2), + ("Min_Voltage_Limit", 34, 2), + ("PWM_Limit", 36, 2), + ("Current_Limit", 38, 2), + ("Acceleration_Limit", 40, 4), + ("Velocity_Limit", 44, 4), + ("Max_Position_Limit", 48, 4), + ("Min_Position_Limit", 52, 4), + ("Shutdown", 63, 1), + ("Torque_Enable", 64, 1), + ("LED", 65, 1), + ("Status_Return_Level", 68, 1), + ("Registered_Instruction", 69, 1), + ("Hardware_Error_Status", 70, 1), + ("Velocity_I_Gain", 76, 2), + ("Velocity_P_Gain", 78, 2), + ("Position_D_Gain", 80, 2), + ("Position_I_Gain", 82, 2), + ("Position_P_Gain", 84, 2), + ("Feedforward_2nd_Gain", 88, 2), + ("Feedforward_1st_Gain", 90, 2), + ("Bus_Watchdog", 98, 1), + ("Goal_PWM", 100, 2), + ("Goal_Current", 102, 2), + ("Goal_Velocity", 104, 4), + ("Profile_Acceleration", 108, 4), + ("Profile_Velocity", 112, 4), + ("Goal_Position", 116, 4), + ("Realtime_Tick", 120, 2), + ("Moving", 122, 1), + ("Moving_Status", 123, 1), + ("Present_PWM", 124, 2), + ("Present_Current", 126, 2), + ("Present_Velocity", 128, 4), + ("Present_Position", 132, 4), + ("Velocity_Trajectory", 136, 4), + ("Position_Trajectory", 140, 4), + ("Present_Input_Voltage", 144, 2), + ("Present_Temperature", 146, 1), +] + +MODEL_CONTROL_TABLE = { + "x_series": X_SERIES_CONTROL_TABLE, + "xl330-m077": X_SERIES_CONTROL_TABLE, + "xl330-m288": X_SERIES_CONTROL_TABLE, + "xl430-w250": X_SERIES_CONTROL_TABLE, + "xm430-w350": X_SERIES_CONTROL_TABLE, + "xm540-w270": X_SERIES_CONTROL_TABLE, +} + + +class DynamixelBus: + + def __init__(self, port: str, description: dict[str, (int, str)]): + self.port = port + self.descriptions = description + self.motor_ctrl = {} + + for motor_name, (motor_id, motor_model) in description.items(): + if motor_model not in MODEL_CONTROL_TABLE: + raise ValueError(f"Model {motor_model} is not supported.") + + self.motor_ctrl[motor_name] = {} + + self.motor_ctrl[motor_name]["id"] = motor_id + for data_name, address, bytes_size in MODEL_CONTROL_TABLE[motor_model]: + self.motor_ctrl[motor_name][data_name] = { + "addr": address, + "bytes_size": bytes_size, + } + + self.port_handler = PortHandler(self.port) + self.packet_handler = PacketHandler(PROTOCOL_VERSION) + + if not self.port_handler.openPort(): + raise OSError(f"Failed to open port {self.port}") + + self.port_handler.setBaudRate(BAUD_RATE) + self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS) + + self.group_readers = {} + self.group_writers = {} + + def close(self): + self.port_handler.closePort() + + def write(self, data_name: str, data: pa.StructArray): + motor_ids = [ + self.motor_ctrl[motor_name.as_py()]["id"] + for motor_name in data.field("joints") + ] + + values = pa.Array.from_buffers( + pa.uint32(), + length=len(data.field("values")), + buffers=data.field("values").buffers(), + ) + + group_key = f"{data_name}_" + "_".join([str(idx) for idx in motor_ids]) + + first_motor_name = list(self.motor_ctrl.keys())[0] + + packet_address = self.motor_ctrl[first_motor_name][data_name]["addr"] + packet_bytes_size = self.motor_ctrl[first_motor_name][data_name]["bytes_size"] + + init_group = data_name not in self.group_readers + + if init_group: + self.group_writers[group_key] = GroupSyncWrite( + self.port_handler, + self.packet_handler, + packet_address, + packet_bytes_size, + ) + + for idx, value in zip(motor_ids, values): + value = value.as_py() + if value is None: + continue + + if packet_bytes_size == 1: + data = [ + DXL_LOBYTE(DXL_LOWORD(value)), + ] + elif packet_bytes_size == 2: + data = [ + DXL_LOBYTE(DXL_LOWORD(value)), + DXL_HIBYTE(DXL_LOWORD(value)), + ] + elif packet_bytes_size == 4: + data = [ + DXL_LOBYTE(DXL_LOWORD(value)), + DXL_HIBYTE(DXL_LOWORD(value)), + DXL_LOBYTE(DXL_HIWORD(value)), + DXL_HIBYTE(DXL_HIWORD(value)), + ] + else: + raise NotImplementedError( + f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but {packet_bytes_size} " + f"is provided instead." + ) + + if init_group: + self.group_writers[group_key].addParam(idx, data) + else: + self.group_writers[group_key].changeParam(idx, data) + + comm = self.group_writers[group_key].txPacket() + if comm != COMM_SUCCESS: + raise ConnectionError( + f"Write failed due to communication error on port {self.port} for group_key {group_key}: " + f"{self.packet_handler.getTxRxResult(comm)}" + ) + + def read(self, data_name: str, motor_names: pa.Array) -> pa.StructArray: + motor_ids = [ + self.motor_ctrl[motor_name.as_py()]["id"] for motor_name in motor_names + ] + + group_key = f"{data_name}_" + "_".join([str(idx) for idx in motor_ids]) + + first_motor_name = list(self.motor_ctrl.keys())[0] + + packet_address = self.motor_ctrl[first_motor_name][data_name]["addr"] + packet_bytes_size = self.motor_ctrl[first_motor_name][data_name]["bytes_size"] + + if data_name not in self.group_readers: + self.group_readers[group_key] = GroupSyncRead( + self.port_handler, + self.packet_handler, + packet_address, + packet_bytes_size, + ) + + for idx in motor_ids: + self.group_readers[group_key].addParam(idx) + + comm = self.group_readers[group_key].txRxPacket() + if comm != COMM_SUCCESS: + raise ConnectionError( + f"Read failed due to communication error on port {self.port} for group_key {group_key}: " + f"{self.packet_handler.getTxRxResult(comm)}" + ) + + values = pa.array( + [ + self.group_readers[group_key].getData( + idx, packet_address, packet_bytes_size + ) + for idx in motor_ids + ], + type=pa.uint32(), + ) + values = values.from_buffers(pa.int32(), len(values), values.buffers()) + + return wrap_joints_and_values(motor_names, values) + + def write_torque_enable(self, torque_mode: pa.StructArray): + self.write("Torque_Enable", torque_mode) + + def write_operating_mode(self, operating_mode: pa.StructArray): + self.write("Operating_Mode", operating_mode) + + def read_position(self, motor_names: pa.Array) -> pa.StructArray: + return self.read("Present_Position", motor_names) + + def read_velocity(self, motor_names: pa.Array) -> pa.StructArray: + return self.read("Present_Velocity", motor_names) + + def read_current(self, motor_names: pa.Array) -> pa.StructArray: + return self.read("Present_Current", motor_names) + + def write_goal_position(self, goal_position: pa.StructArray): + self.write("Goal_Position", goal_position) + + def write_goal_current(self, goal_current: pa.StructArray): + self.write("Goal_Current", goal_current) + + def write_position_p_gain(self, position_p_gain: pa.StructArray): + self.write("Position_P_Gain", position_p_gain) + + def write_position_i_gain(self, position_i_gain: pa.StructArray): + self.write("Position_I_Gain", position_i_gain) + + def write_position_d_gain(self, position_d_gain: pa.StructArray): + self.write("Position_D_Gain", position_d_gain) diff --git a/node-hub/dynamixel-client/dynamixel_client/main.py b/node-hub/dynamixel-client/dynamixel_client/main.py new file mode 100644 index 00000000..e1a565fa --- /dev/null +++ b/node-hub/dynamixel-client/dynamixel_client/main.py @@ -0,0 +1,222 @@ +""" +Dynamixel Client: This node is used to represent a chain of dynamixel motors. It can be used to read positions, +velocities, currents, and set goal positions and currents. +""" + +import os +import time +import argparse +import json + +import pyarrow as pa + +from dora import Node + +from .bus import DynamixelBus, TorqueMode, wrap_joints_and_values + + +class Client: + + def __init__(self, config: dict[str, any]): + self.config = config + + description = {} + for i in range(len(config["ids"])): + description[config["joints"][i]] = (config["ids"][i], config["models"][i]) + + self.config["joints"] = pa.array(config["joints"], pa.string()) + self.bus = DynamixelBus(config["port"], description) + + # Set client configuration values, raise errors if the values are not set to indicate that the motors are not + # configured correctly + + self.bus.write_torque_enable(self.config["torque"]) + self.bus.write_goal_current(self.config["goal_current"]) + + time.sleep(0.1) + self.bus.write_position_d_gain(self.config["D"]) + + time.sleep(0.1) + self.bus.write_position_i_gain(self.config["I"]) + + time.sleep(0.1) + self.bus.write_position_p_gain(self.config["P"]) + + self.node = Node(config["name"]) + + def run(self): + for event in self.node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + + if event_id == "pull_position": + self.pull_position(self.node, event["metadata"]) + elif event_id == "pull_velocity": + self.pull_velocity(self.node, event["metadata"]) + elif event_id == "pull_current": + self.pull_current(self.node, event["metadata"]) + elif event_id == "write_goal_position": + self.write_goal_position(event["value"]) + elif event_id == "write_goal_current": + self.write_goal_current(event["value"]) + elif event_id == "end": + break + + elif event_type == "ERROR": + raise ValueError("An error occurred in the dataflow: " + event["error"]) + + def close(self): + self.bus.write_torque_enable( + wrap_joints_and_values( + self.config["joints"], + [TorqueMode.DISABLED.value] * len(self.config["joints"]), + ) + ) + + def pull_position(self, node, metadata): + try: + node.send_output( + "position", + self.bus.read_position(self.config["joints"]), + metadata, + ) + + except ConnectionError as e: + print("Error reading position:", e) + + def pull_velocity(self, node, metadata): + try: + node.send_output( + "velocity", + self.bus.read_velocity(self.config["joints"]), + metadata, + ) + except ConnectionError as e: + print("Error reading velocity:", e) + + def pull_current(self, node, metadata): + try: + node.send_output( + "current", + self.bus.read_current(self.config["joints"]), + metadata, + ) + except ConnectionError as e: + print("Error reading current:", e) + + def write_goal_position(self, goal_position: pa.StructArray): + try: + self.bus.write_goal_position(goal_position) + except ConnectionError as e: + print("Error writing goal position:", e) + + def write_goal_current(self, goal_current: pa.StructArray): + try: + self.bus.write_goal_current(goal_current) + except ConnectionError as e: + print("Error writing goal current:", e) + + +def main(): + # Handle dynamic nodes, ask for the name of the node in the dataflow + parser = argparse.ArgumentParser( + description="Dynamixel Client: This node is used to represent a chain of dynamixel motors. It can be used to " + "read positions, velocities, currents, and set goal positions and currents." + ) + + parser.add_argument( + "--name", + type=str, + required=False, + help="The name of the node in the dataflow.", + default="dynamixel_client", + ) + parser.add_argument( + "--port", + type=str, + required=False, + help="The port of the dynamixel motors.", + default=None, + ) + parser.add_argument( + "--config", + type=str, + help="The configuration of the dynamixel motors.", + default=None, + ) + + args = parser.parse_args() + + # Check if port is set + if not os.environ.get("PORT") and args.port is None: + raise ValueError( + "The port is not set. Please set the port of the dynamixel motors in the environment variables or as an " + "argument." + ) + + port = os.environ.get("PORT") if args.port is None else args.port + + # Check if config is set + if not os.environ.get("CONFIG") and args.config is None: + raise ValueError( + "The configuration is not set. Please set the configuration of the dynamixel motors in the environment " + "variables or as an argument." + ) + + with open(os.environ.get("CONFIG") if args.config is None else args.config) as file: + config = json.load(file) + + joints = config.keys() + + # Create configuration + bus = { + "name": args.name, + "port": port, # (e.g. "/dev/ttyUSB0", "COM3") + "ids": [config[joint]["id"] for joint in joints], + "joints": list(config.keys()), + "models": [config[joint]["model"] for joint in joints], + "torque": wrap_joints_and_values( + pa.array(config.keys(), pa.string()), + pa.array( + [ + ( + TorqueMode.ENABLED.value + if config[joint]["torque"] + else TorqueMode.DISABLED.value + ) + for joint in joints + ], + type=pa.uint32(), + ), + ), + "goal_current": wrap_joints_and_values( + pa.array(config.keys(), pa.string()), + pa.array( + [config[joint]["goal_current"] for joint in joints], type=pa.uint32() + ), + ), + "P": wrap_joints_and_values( + pa.array(config.keys(), pa.string()), + pa.array([config[joint]["P"] for joint in joints], type=pa.uint32()), + ), + "I": wrap_joints_and_values( + pa.array(config.keys(), pa.string()), + pa.array([config[joint]["I"] for joint in joints], type=pa.uint32()), + ), + "D": wrap_joints_and_values( + pa.array(config.keys(), pa.string()), + pa.array([config[joint]["D"] for joint in joints], type=pa.uint32()), + ), + } + + print("Dynamixel Client Configuration: ", bus, flush=True) + + client = Client(bus) + client.run() + client.close() + + +if __name__ == "__main__": + main() diff --git a/node-hub/dynamixel-client/pyproject.toml b/node-hub/dynamixel-client/pyproject.toml new file mode 100644 index 00000000..85a3a8d1 --- /dev/null +++ b/node-hub/dynamixel-client/pyproject.toml @@ -0,0 +1,20 @@ +[tool.poetry] +name = "dynamixel-client" +version = "0.1" +authors = ["Hennzau "] +description = "Dora Node client for dynamixel motors." +readme = "README.md" + +packages = [{ include = "dynamixel_client" }] + +[tool.poetry.dependencies] +python = "^3.9" +dora-rs = "0.3.5" +dynamixel-sdk = "3.7.31" + +[tool.poetry.scripts] +dynamixel-client = "dynamixel_client.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/node-hub/feetech-client/README.md b/node-hub/feetech-client/README.md new file mode 100644 index 00000000..1c878127 --- /dev/null +++ b/node-hub/feetech-client/README.md @@ -0,0 +1,100 @@ +## FeetechClient for SCS/STS motors + +This node is a client for the Feetech motors. It is based on the Dynamixel SDK and is used to control the motors. It +is a Python node that communicates with the motors via the USB port. + +## YAML Configuration + +```YAML +nodes: + - id: feetech_client + path: client.py # modify this to the relative path from the graph file to the client script + inputs: + pull_position: dora/timer/millis/10 # pull the present position every 10ms + pull_velocity: dora/timer/millis/10 # pull the present velocity every 10ms + pull_current: dora/timer/millis/10 # pull the present current every 10ms + + # write_goal_position: some goal position from other node + + # end: some end signal from other node + outputs: + - position # regarding 'pull_position' input, it will output the position every 10ms + - velocity # regarding 'pull_velocity' input, it will output the velocity every 10ms + - current # regarding 'pull_current' input, it will output the current every 10ms + + env: + PORT: COM9 # e.g. /dev/ttyUSB0 or COM9 + CONFIG: config.json # the configuration file for the motors +``` + +## Arrow format + +### Outputs + +Arrow **StructArray** with two fields, **joints** and **values**: + +```Python +import pyarrow as pa + +# Create a StructArray from a list of joints (py_list, numpy_array or pyarrow_array) and a list of values (py_list, numpy_array or pyarrow_array) +arrow_struct = pa.StructArray.from_arrays( + arrays=[joints, values], + names=["joints", "values"] +) + +# Send the StructArray to the dataflow +node.send_output("output_name", arrow_struct, None) + +# Receive the StructArray from the dataflow +event = node.next() +arrow_struct = event["value"] +joints = arrow_struct.field("joints") # PyArrow Array of Strings +values = arrow_struct.field("values") # PyArrow Array of Int32/Uint32/Float32... +``` + +### Inputs + +Arrow **StructArray** with two fields, **joints** and **values**: + +```Python +import pyarrow as pa + +# Create a StructArray from a list of joints (py_list, numpy_array or pyarrow_array) and a list of values (py_list, numpy_array or pyarrow_array) +arrow_struct = pa.StructArray.from_arrays( + arrays=[joints, values], + names=["joints", "values"] +) + +# Send the StructArray to the dataflow +node.send_output("output_name", arrow_struct, None) + +# Receive the StructArray from the dataflow +event = node.next() +arrow_struct = event["value"] +joints = arrow_struct.field("joints") # PyArrow Array of Strings +values = arrow_struct.field("values") # PyArrow Array of Int32/Uint32/Float32... +``` + +**Note**: The zero-copy is available for numpy arrays (with no None values) and pyarrow arrays. + +## Configuration + +The configuration file that should be passed to the node is a JSON file that contains the configuration for the motors: + +```JSON +{ + "shoulder_pan": { + "id": 1, + "model": "scs_series", + "torque": true + } +} +``` + +The configuration file starts by the **joint** name of the servo. **id**: the id of the motor in the bus, **model**: the +model of the motor, **torque**: whether the motor should be in torque mode or not (at the beginning), **goal_current**: +the goal current for the motor at the beginning, null if you don't want to set it. + +## License + +This library is licensed under the [Apache License 2.0](../../LICENSE). \ No newline at end of file diff --git a/node-hub/feetech-client/config.template.json b/node-hub/feetech-client/config.template.json new file mode 100644 index 00000000..493f3ad1 --- /dev/null +++ b/node-hub/feetech-client/config.template.json @@ -0,0 +1,12 @@ +{ + "shoulder_pan": { + "id": 1, + "model": "scs_series", + "torque": true + }, + "shoulder_lift": { + "id": 2, + "model": "scs_series", + "torque": true + } +} \ No newline at end of file diff --git a/node-hub/feetech-client/feetech_client/__init__.py b/node-hub/feetech-client/feetech_client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/node-hub/feetech-client/feetech_client/bus.py b/node-hub/feetech-client/feetech_client/bus.py new file mode 100644 index 00000000..2b231615 --- /dev/null +++ b/node-hub/feetech-client/feetech_client/bus.py @@ -0,0 +1,273 @@ +import enum + +import pyarrow as pa + +from typing import Union + +from scservo_sdk import ( + PacketHandler, + PortHandler, + COMM_SUCCESS, + GroupSyncRead, + GroupSyncWrite, +) +from scservo_sdk import SCS_HIBYTE, SCS_HIWORD, SCS_LOBYTE, SCS_LOWORD + +PROTOCOL_VERSION = 0 +BAUD_RATE = 1_000_000 +TIMEOUT_MS = 1000 + + +def wrap_joints_and_values( + joints: Union[list[str], pa.Array], + values: Union[list[int], pa.Array], +) -> pa.StructArray: + return pa.StructArray.from_arrays( + arrays=[joints, values], + names=["joints", "values"], + ) + + +class TorqueMode(enum.Enum): + ENABLED = pa.scalar(1, pa.uint32()) + DISABLED = pa.scalar(0, pa.uint32()) + + +class OperatingMode(enum.Enum): + ONE_TURN = pa.scalar(0, pa.uint32()) + + +SCS_SERIES_CONTROL_TABLE = [ + ("Model", 3, 2), + ("ID", 5, 1), + ("Baud_Rate", 6, 1), + ("Return_Delay", 7, 1), + ("Response_Status_Level", 8, 1), + ("Min_Angle_Limit", 9, 2), + ("Max_Angle_Limit", 11, 2), + ("Max_Temperature_Limit", 13, 1), + ("Max_Voltage_Limit", 14, 1), + ("Min_Voltage_Limit", 15, 1), + ("Max_Torque_Limit", 16, 2), + ("Phase", 18, 1), + ("Unloading_Condition", 19, 1), + ("LED_Alarm_Condition", 20, 1), + ("P_Coefficient", 21, 1), + ("D_Coefficient", 22, 1), + ("I_Coefficient", 23, 1), + ("Minimum_Startup_Force", 24, 2), + ("CW_Dead_Zone", 26, 1), + ("CCW_Dead_Zone", 27, 1), + ("Protection_Current", 28, 2), + ("Angular_Resolution", 30, 1), + ("Offset", 31, 2), + ("Mode", 33, 1), + ("Protective_Torque", 34, 1), + ("Protection_Time", 35, 1), + ("Overload_Torque", 36, 1), + ("Speed_closed_loop_P_proportional_coefficient", 37, 1), + ("Over_Current_Protection_Time", 38, 1), + ("Velocity_closed_loop_I_integral_coefficient", 39, 1), + ("Torque_Enable", 40, 1), + ("Acceleration", 41, 1), + ("Goal_Position", 42, 2), + ("Goal_Time", 44, 2), + ("Goal_Speed", 46, 2), + ("Lock", 55, 1), + ("Present_Position", 56, 2), + ("Present_Speed", 58, 2), + ("Present_Load", 60, 2), + ("Present_Voltage", 62, 1), + ("Present_Temperature", 63, 1), + ("Status", 65, 1), + ("Moving", 66, 1), + ("Present_Current", 69, 2), +] + +MODEL_CONTROL_TABLE = { + "scs_series": SCS_SERIES_CONTROL_TABLE, + "sts3215": SCS_SERIES_CONTROL_TABLE, +} + + +class FeetechBus: + + def __init__(self, port: str, description: dict[str, (np.uint8, str)]): + """ + Args: + port: the serial port to connect to the Feetech bus + description: a dictionary containing the description of the motors connected to the bus. The keys are the + motor names and the values are tuples containing the motor id and the motor model. + """ + + self.port = port + self.descriptions = description + self.motor_ctrl = {} + + for motor_name, (motor_id, motor_model) in description.items(): + if motor_model not in MODEL_CONTROL_TABLE: + raise ValueError(f"Model {motor_model} is not supported.") + + self.motor_ctrl[motor_name] = {} + + self.motor_ctrl[motor_name]["id"] = motor_id + for data_name, address, bytes_size in MODEL_CONTROL_TABLE[motor_model]: + self.motor_ctrl[motor_name][data_name] = { + "addr": address, + "bytes_size": bytes_size, + } + + self.port_handler = PortHandler(self.port) + self.packet_handler = PacketHandler(PROTOCOL_VERSION) + + if not self.port_handler.openPort(): + raise OSError(f"Failed to open port {self.port}") + + self.port_handler.setBaudRate(BAUD_RATE) + self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS) + + self.group_readers = {} + self.group_writers = {} + + def close(self): + self.port_handler.closePort() + + def write(self, data_name: str, data: pa.StructArray): + motor_ids = [ + self.motor_ctrl[motor_name.as_py()]["id"] + for motor_name in data.field("joints") + ] + + values = [ + np.uint32(32767 - value.as_py()) if value < 0 else np.uint32(value.as_py()) + for value in data.field("values") + ] + + group_key = f"{data_name}_" + "_".join([str(idx) for idx in motor_ids]) + + first_motor_name = list(self.motor_ctrl.keys())[0] + + packet_address = self.motor_ctrl[first_motor_name][data_name]["addr"] + packet_bytes_size = self.motor_ctrl[first_motor_name][data_name]["bytes_size"] + + init_group = data_name not in self.group_readers + + if init_group: + self.group_writers[group_key] = GroupSyncWrite( + self.port_handler, + self.packet_handler, + packet_address, + packet_bytes_size, + ) + + for idx, value in zip(motor_ids, values): + if value is None: + continue + + if packet_bytes_size == 1: + data = [ + SCS_LOBYTE(SCS_LOWORD(value)), + ] + elif packet_bytes_size == 2: + data = [ + SCS_LOBYTE(SCS_LOWORD(value)), + SCS_HIBYTE(SCS_LOWORD(value)), + ] + elif packet_bytes_size == 4: + data = [ + SCS_LOBYTE(SCS_LOWORD(value)), + SCS_HIBYTE(SCS_LOWORD(value)), + SCS_LOBYTE(SCS_HIWORD(value)), + SCS_HIBYTE(SCS_HIWORD(value)), + ] + else: + raise NotImplementedError( + f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but {packet_bytes_size} " + f"is provided instead." + ) + + if init_group: + self.group_writers[group_key].addParam(idx, data) + else: + self.group_writers[group_key].changeParam(idx, data) + + comm = self.group_writers[group_key].txPacket() + if comm != COMM_SUCCESS: + raise ConnectionError( + f"Write failed due to communication error on port {self.port} for group_key {group_key}: " + f"{self.packet_handler.getTxRxResult(comm)}" + ) + + def read(self, data_name: str, motor_names: pa.Array) -> pa.StructArray: + motor_ids = [ + self.motor_ctrl[motor_name.as_py()]["id"] for motor_name in motor_names + ] + + group_key = f"{data_name}_" + "_".join([str(idx) for idx in motor_ids]) + + first_motor_name = list(self.motor_ctrl.keys())[0] + + packet_address = self.motor_ctrl[first_motor_name][data_name]["addr"] + packet_bytes_size = self.motor_ctrl[first_motor_name][data_name]["bytes_size"] + + if data_name not in self.group_readers: + self.group_readers[group_key] = GroupSyncRead( + self.port_handler, + self.packet_handler, + packet_address, + packet_bytes_size, + ) + + for idx in motor_ids: + self.group_readers[group_key].addParam(idx) + + comm = self.group_readers[group_key].txRxPacket() + if comm != COMM_SUCCESS: + raise ConnectionError( + f"Read failed due to communication error on port {self.port} for group_key {group_key}: " + f"{self.packet_handler.getTxRxResult(comm)}" + ) + + values = pa.array( + [ + self.group_readers[group_key].getData( + idx, packet_address, packet_bytes_size + ) + for idx in motor_ids + ], + type=pa.uint32(), + ) + + values = pa.array( + [ + value.as_py() if value.as_py() < 32767 else 32767 - value.as_py() + for value in values + ], + type=pa.int32(), + ) + + return wrap_joints_and_values(motor_names, values) + + def write_torque_enable(self, torque_mode: pa.StructArray): + self.write("Torque_Enable", torque_mode) + + def write_operating_mode(self, operating_mode: pa.StructArray): + self.write("Mode", operating_mode) + + def read_position(self, motor_names: pa.Array) -> pa.StructArray: + return self.read("Present_Position", motor_names) + + def read_velocity(self, motor_names: pa.Array) -> pa.StructArray: + return self.read("Present_Velocity", motor_names) + + def read_current(self, motor_names: pa.Array) -> pa.StructArray: + return self.read("Present_Current", motor_names) + + def write_goal_position(self, goal_position: pa.StructArray): + self.write("Goal_Position", goal_position) + + def write_max_angle_limit(self, max_angle_limit: pa.StructArray): + self.write("Max_Angle_Limit", max_angle_limit) + + def write_min_angle_limit(self, min_angle_limit: pa.StructArray): + self.write("Min_Angle_Limit", min_angle_limit) diff --git a/node-hub/feetech-client/feetech_client/main.py b/node-hub/feetech-client/feetech_client/main.py new file mode 100644 index 00000000..1e5cfdd2 --- /dev/null +++ b/node-hub/feetech-client/feetech_client/main.py @@ -0,0 +1,186 @@ +""" +Feetech Client: This node is used to represent a chain of feetech motors. It can be used to read positions, +velocities, currents, and set goal positions and currents. +""" + +import os +import argparse +import json + +import pyarrow as pa + +from dora import Node + +from .bus import FeetechBus, TorqueMode, wrap_joints_and_values + + +class Client: + + def __init__(self, config: dict[str, any]): + self.config = config + + description = {} + for i in range(len(config["ids"])): + description[config["joints"][i]] = (config["ids"][i], config["models"][i]) + + self.config["joints"] = pa.array(config["joints"], pa.string()) + self.bus = FeetechBus(config["port"], description) + + # Set client configuration values and raise errors if the values are not set to indicate that the motors are not + # configured correctly + + self.bus.write_torque_enable(self.config["torque"]) + + self.node = Node(config["name"]) + + def run(self): + for event in self.node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + + if event_id == "pull_position": + self.pull_position(self.node, event["metadata"]) + elif event_id == "pull_velocity": + self.pull_velocity(self.node, event["metadata"]) + elif event_id == "pull_current": + self.pull_current(self.node, event["metadata"]) + elif event_id == "write_goal_position": + self.write_goal_position(event["value"]) + elif event_id == "end": + break + + elif event_type == "ERROR": + raise ValueError("An error occurred in the dataflow: " + event["error"]) + + def close(self): + self.bus.write_torque_enable( + wrap_joints_and_values( + self.config["joints"], + [TorqueMode.DISABLED.value] * len(self.config["joints"]), + ) + ) + + def pull_position(self, node, metadata): + try: + node.send_output( + "position", + self.bus.read_position(self.config["joints"]), + metadata, + ) + + except ConnectionError as e: + print("Error reading position:", e) + + def pull_velocity(self, node, metadata): + try: + node.send_output( + "velocity", + self.bus.read_velocity(self.config["joints"]), + metadata, + ) + except ConnectionError as e: + print("Error reading velocity:", e) + + def pull_current(self, node, metadata): + try: + node.send_output( + "current", + self.bus.read_current(self.config["joints"]), + metadata, + ) + except ConnectionError as e: + print("Error reading current:", e) + + def write_goal_position(self, goal_position: pa.StructArray): + try: + self.bus.write_goal_position(goal_position) + except ConnectionError as e: + print("Error writing goal position:", e) + + +def main(): + # Handle dynamic nodes, ask for the name of the node in the dataflow + parser = argparse.ArgumentParser( + description="Feetech Client: This node is used to represent a chain of feetech motors. " + "It can be used to read " + "positions, velocities, currents, and set goal positions and currents." + ) + + parser.add_argument( + "--name", + type=str, + required=False, + help="The name of the node in the dataflow.", + default="feetech_client", + ) + parser.add_argument( + "--port", + type=str, + required=False, + help="The port of the feetech motors.", + default=None, + ) + parser.add_argument( + "--config", + type=str, + help="The configuration of the feetech motors.", + default=None, + ) + + args = parser.parse_args() + + # Check if port is set + if not os.environ.get("PORT") and args.port is None: + raise ValueError( + "The port is not set. Please set the port of the feetech motors in the environment variables or as an " + "argument." + ) + + port = os.environ.get("PORT") if args.port is None else args.port + + # Check if config is set + if not os.environ.get("CONFIG") and args.config is None: + raise ValueError( + "The configuration is not set. Please set the configuration of the feetech motors in the environment " + "variables or as an argument." + ) + + with open(os.environ.get("CONFIG") if args.config is None else args.config) as file: + config = json.load(file) + + joints = config.keys() + + # Create configuration + bus = { + "name": args.name, + "port": port, # (e.g. "/dev/ttyUSB0", "COM3") + "ids": [config[joint]["id"] for joint in joints], + "joints": list(config.keys()), + "models": [config[joint]["model"] for joint in joints], + "torque": wrap_joints_and_values( + pa.array(config.keys(), pa.string()), + pa.array( + [ + ( + TorqueMode.ENABLED.value + if config[joint]["torque"] + else TorqueMode.DISABLED.value + ) + for joint in joints + ], + type=pa.uint32(), + ), + ), + } + + print("Feetech Client Configuration: ", bus, flush=True) + + client = Client(bus) + client.run() + client.close() + + +if __name__ == "__main__": + main() diff --git a/node-hub/feetech-client/pyproject.toml b/node-hub/feetech-client/pyproject.toml new file mode 100644 index 00000000..9e65b724 --- /dev/null +++ b/node-hub/feetech-client/pyproject.toml @@ -0,0 +1,20 @@ +[tool.poetry] +name = "feetech-client" +version = "0.1" +authors = ["Hennzau "] +description = "Dora Node client for feetech motors." +readme = "README.md" + +packages = [{ include = "feetech_client" }] + +[tool.poetry.dependencies] +python = "^3.9" +dora-rs = "0.3.5" +feetech-servo-sdk = "1.0.0" + +[tool.poetry.scripts] +feetech-client = "feetech_client.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/node-hub/lebai-client/README.md b/node-hub/lebai-client/README.md new file mode 100644 index 00000000..4661f43e --- /dev/null +++ b/node-hub/lebai-client/README.md @@ -0,0 +1,3 @@ +# Lebai client + +This is an experimental client for lebai robotic arms! diff --git a/node-hub/lebai-client/lebai_client/main.py b/node-hub/lebai-client/lebai_client/main.py new file mode 100644 index 00000000..651daa4d --- /dev/null +++ b/node-hub/lebai-client/lebai_client/main.py @@ -0,0 +1,186 @@ +import lebai_sdk +import numpy as np +import pyarrow as pa +from dora import Node +import json +import os +import time + + +def load_json_file(file_path): + """Load JSON file and return the dictionary.""" + if os.path.exists(file_path): + with open(file_path, "r") as file: + data = json.load(file) + else: + # Return an empty dictionary if file does not exist + data = {"recording": {}, "pose": {}} + return data + + +def save_json_file(file_path, data): + """Save the dictionary back to the JSON file.""" + with open(file_path, "w") as file: + json.dump(data, file, indent=4) + + +SAVED_POSE_PATH = "pose_library.json" + +lebai_sdk.init() +ROBOT_IP = os.getenv( + "LEBAI_IP", "10.42.0.253" +) # 设定机器人ip地址,需要根据机器人实际ip地址修改 + + +def main(): + # Load the JSON file + pose_library = load_json_file(SAVED_POSE_PATH) + lebai = lebai_sdk.connect(ROBOT_IP, False) # 创建实例 + + lebai.start_sys() # 启动手臂 + node = Node() + recording = False + teaching = False + recording_name = None + data = lebai.get_kin_data() + [x, y, z, rx, ry, rz] = list(data["actual_tcp_pose"].values()) + joint_position = data["actual_joint_pose"] + t = 0.15 + + for event in node: + if event["type"] == "INPUT": + event_id = event["id"] + if event_id == "claw": + [claw] = event["value"].tolist() + lebai.set_claw(10, claw) + elif event_id == "movec": + if teaching: + continue + [dx, dy, dz, drx, dry, drz, t] = event["value"].tolist() + + cartesian_pose = { + "x": x + dx, + "y": y + dy, + "z": z + dz, + "rx": rx + drx, + "ry": ry + dry, + "rz": rz + drz, + } # 目标位姿笛卡尔数据 + + t = 0.25 # 运动时间 (s)。 当 t > 0 时,参数速度 v 和加速度 a 无效 + try: + joint_position = lebai.kinematics_inverse(cartesian_pose) + except TypeError: + print("could not compute inverse kinematics") + continue + [x, y, z, rx, ry, rz] = list(cartesian_pose.values()) + lebai.move_pvat( + joint_position, + [0.05, 0.05, 0.05, 0.05, 0.05, 0.05], + [0.05, 0.05, 0.05, 0.05, 0.05, 0.05], + t, + ) # 直线运动 https://help.lebai.ltd/sdk/motion.html#%E7%9B%B4%E7%BA%BF%E8%BF%90%E5%8A%A8 + elif event_id == "movej": + if teaching: + continue + relative_joint_position = event["value"].to_numpy() + joint_position = np.array(joint_position) + joint_position += np.array(relative_joint_position[:6]) + cartesian_pose = lebai.kinematics_forward(list(joint_position)) + [x, y, z, rx, ry, rz] = list(cartesian_pose.values()) + + t = 0.15 # 运动时间 (s)。 当 t > 0 时,参数速度 v 和加速度 a 无效 + + lebai.move_pvat( + list(joint_position), + [0.1, 0.1, 0.1, 0.1, 0.1, 0.1], + [0.1, 0.1, 0.1, 0.1, 0.1, 0.1], + t, + ) # 直线运动 https://help.lebai.ltd/sdk/motion.html#%E7%9B%B4%E7%BA%BF%E8%BF%90%E5%8A%A8 + elif event_id == "stop": + lebai.stop_move() + data = lebai.get_kin_data() + [x, y, z, rx, ry, rz] = list(data["actual_tcp_pose"].values()) + joint_position = list(data["actual_joint_pose"]) + elif event_id == "save": + name = event["value"][0].as_py() + lebai.stop_move() + data = lebai.get_kin_data() + [x, y, z, rx, ry, rz] = list(data["actual_tcp_pose"].values()) + joint_position = list(data["actual_joint_pose"]) + pose_library["pose"][name] = list(joint_position) + elif event_id == "go_to": + if teaching: + continue + name = event["value"][0].as_py() + lebai.stop_move() + retrieved_pose = pose_library["pose"].get(name) + if retrieved_pose is not None: + joint_position = retrieved_pose + t = 2 + lebai.move_pvat( + list(joint_position), + [0.1, 0.1, 0.1, 0.1, 0.1, 0.1], + [0.1, 0.1, 0.1, 0.1, 0.1, 0.1], + t, + ) # 直线运动 https://help.lebai.ltd/sdk/motion.html#%E7%9B%B4%E7%BA%BF%E8%BF%90%E5%8A%A8 + lebai.wait_move() + data = lebai.get_kin_data() + [x, y, z, rx, ry, rz] = list(data["actual_tcp_pose"].values()) + joint_position = list(data["actual_joint_pose"]) + elif event_id == "record": + name = event["value"][0].as_py() + recording = True + + recording_name = name + pose_library["recording"][recording_name] = [] + start_time = time.time() + data = lebai.get_kin_data() + [x, y, z, rx, ry, rz] = list(data["actual_tcp_pose"].values()) + joint_position = list(data["actual_joint_pose"]) + elif event_id == "cut": + recording = False + elif event_id == "teach": + if teaching: + teaching = False + continue + lebai.teach_mode() + teaching = True + elif event_id == "end_teach": + teaching = False + lebai.end_teach_mode() + elif event_id == "play": + name = event["value"][0].as_py() + if name in pose_library["recording"]: + for event in pose_library["recording"][name]: + print(event, flush=True) + lebai.move_pvat( + list(event["joint_position"]), + [0.1, 0.1, 0.1, 0.1, 0.1, 0.1], + [0.1, 0.1, 0.1, 0.1, 0.1, 0.1], + event["t"], + ) + event = node.next(timeout=event["duration"]) + if event is not None: + print(event) + if event["type"] == "INPUT" and event["id"] == "stop": + lebai.stop_move() + break + + else: + pass + if recording and ( + event_id == "movej" or event_id == "movec" or event_id == "go_to" + ): + if len(pose_library["recording"][recording_name]) == 0: + t = 2 + pose_library["recording"][recording_name] += [ + { + "duration": time.time() - start_time, + "joint_position": joint_position, + "t": t * 2 if t == 0.1 else t, + } + ] + start_time = time.time() + + save_json_file(SAVED_POSE_PATH, pose_library) diff --git a/node-hub/lebai-client/pyproject.toml b/node-hub/lebai-client/pyproject.toml new file mode 100644 index 00000000..dbb49cad --- /dev/null +++ b/node-hub/lebai-client/pyproject.toml @@ -0,0 +1,20 @@ +[tool.poetry] +name = "lebai-client" +version = "0.1" +authors = ["dora-rs team"] +description = "Dora Node client for lebai robotic arms." +readme = "README.md" + +packages = [{ include = "lebai_client" }] + +[tool.poetry.dependencies] +python = "^3.9" +dora-rs = "^0.3.6rc0" +lebai-sdk = "^0.2.17" + +[tool.poetry.scripts] +lebai-client = "lebai_client.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" diff --git a/node-hub/lebai-client/test/test.py b/node-hub/lebai-client/test/test.py new file mode 100644 index 00000000..f8711c19 --- /dev/null +++ b/node-hub/lebai-client/test/test.py @@ -0,0 +1,35 @@ +import lebai_sdk + + +lebai_sdk.init() + + +def main(): + # print(lebai_sdk.discover_devices(2)) #发现同一局域网内的机器人 + + robot_ip = "10.20.17.1" # 设定机器人ip地址,需要根据机器人实际ip地址修改 + lebai = lebai_sdk.connect(robot_ip, False) # 创建实例 + lebai.start_sys() # 启动手臂 + cartesian_pose = { + "x": -0.383, + "y": -0.121, + "z": 0.36, + "rz": -1.57, + "ry": 0, + "rx": 1.57, + } # 目标位姿笛卡尔数据 + a = 0.3 # 空间加速度 (m/s2) + v = 0.1 # 空间速度 (m/s) + t = 0 # 运动时间 (s)。 当 t > 0 时,参数速度 v 和加速度 a 无效 + r = 0 # 交融半径 (m)。用于指定路径的平滑效果 + + lebai.movel( + cartesian_pose, a, v, t, r + ) # 直线运动 https://help.lebai.ltd/sdk/motion.html#%E7%9B%B4%E7%BA%BF%E8%BF%90%E5%8A%A8 + lebai.wait_move() # 等待运动完成 + # scene_number = "10000" #需要调用的场景编号 + # lebai.start_task(scene_number, None, None, False, 1) #调用场景 + lebai.stop_sys() # 停止手臂 + + +main() diff --git a/node-hub/lerobot-dashboard/README.md b/node-hub/lerobot-dashboard/README.md new file mode 100644 index 00000000..7b5cfab9 --- /dev/null +++ b/node-hub/lerobot-dashboard/README.md @@ -0,0 +1,40 @@ +## LeRobot Record Interface + +Simple Interface that uses Pygame to display images and texts. It also manages keyboard events. +This simple interface can only display two images of the same size side by side and a text in the middle bottom of the +screen. + +## YAML Configuration + +````YAML +nodes: + - id: lerobot_record + path: record.py # modify this to the relative path from the graph file to the client script + inputs: + tick: dora/timer/millis/16 # update the interface every 16ms (= 60fps) + + # image_left: some image from other node + # image_right: some image from other node + outputs: + - text + - episode + - failed + - end # end signal that can be sent to other nodes (sent when the window is closed) + + env: + WINDOW_WIDTH: 1280 # window width (default is 640) + WINDOW_HEIGHT: 1080 # window height (default is 480) +```` + +## Inputs + +## Outputs: + +- `text` : Array containing 1 element, the text in Arrow format. +- `episode` : Array containing 1 element, the episode number in Arrow format (or -1, marks episode end). +- `failed` : Array containing 1 element, the episode number failed in Arrow format. +- `end` : Array containing 1 empty element, indicates the end of recording to the dataflow. + +## License + +This library is licensed under the [Apache License 2.0](../../LICENSE). \ No newline at end of file diff --git a/node-hub/lerobot-dashboard/lerobot_dashboard/__init__.py b/node-hub/lerobot-dashboard/lerobot_dashboard/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/node-hub/lerobot-dashboard/lerobot_dashboard/main.py b/node-hub/lerobot-dashboard/lerobot_dashboard/main.py new file mode 100644 index 00000000..d45e7fb4 --- /dev/null +++ b/node-hub/lerobot-dashboard/lerobot_dashboard/main.py @@ -0,0 +1,204 @@ +""" +This Dora node is a minimalistic interface that shows two images and text in a Pygame window. +""" + +import os +import argparse + +import numpy as np +import pygame + +import pyarrow as pa + +from dora import Node + + +def main(): + # Handle dynamic nodes, ask for the name of the node in the dataflow + parser = argparse.ArgumentParser( + description="LeRobot Record: This node is used to record episodes of a robot interacting with the environment." + ) + + parser.add_argument( + "--name", + type=str, + required=False, + help="The name of the node in the dataflow.", + default="lerobot_record", + ) + parser.add_argument( + "--window-width", + type=int, + required=False, + help="The width of the window.", + default=640, + ) + parser.add_argument( + "--window-height", + type=int, + required=False, + help="The height of the window.", + default=480, + ) + + args = parser.parse_args() + + window_width = int(os.getenv("WINDOW_WIDTH", args.window_width)) + window_height = int(os.getenv("WINDOW_HEIGHT", args.window_height)) + + image_left = pygame.Surface((int(window_width // 2), window_height // 2)) + image_right = pygame.Surface((int(window_width // 2), window_height // 2)) + + pygame.font.init() + font = pygame.font.SysFont("Comic Sans MS", 30) + text = font.render("No text to render", True, (255, 255, 255)) + + pygame.init() + + screen = pygame.display.set_mode((window_width, window_height + text.get_height())) + + pygame.display.set_caption("Pygame minimalistic interface") + + node = Node(args.name) + + episode_index = 1 + recording = False + + for event in node: + event_type = event["type"] + if event_type == "STOP": + break + + elif event_type == "INPUT": + event_id = event["id"] + + if event_id == "image_left": + arrow_image = event["value"][0] + + image = { + "width": arrow_image["width"].as_py(), + "height": arrow_image["height"].as_py(), + "channels": arrow_image["channels"].as_py(), + "data": arrow_image["data"].values.to_numpy(), + } + + image_left = pygame.image.frombuffer( + image["data"], (image["width"], image["height"]), "BGR" + ) + + elif event_id == "image_right": + arrow_image = event["value"][0] + image = { + "width": arrow_image["width"].as_py(), + "height": arrow_image["height"].as_py(), + "channels": arrow_image["channels"].as_py(), + "data": arrow_image["data"].values.to_numpy(), + } + + image_right = pygame.image.frombuffer( + image["data"], (image["width"], image["height"]), "BGR" + ) + + elif event_id == "tick": + node.send_output("tick", pa.array([]), event["metadata"]) + + running = True + for pygame_event in pygame.event.get(): + if pygame_event.type == pygame.QUIT: + running = False + break + elif pygame_event.type == pygame.KEYDOWN: + key = pygame.key.name(pygame_event.key) + + if key == "space": + recording = not recording + if recording: + text = font.render( + f"Recording episode {episode_index}", + True, + (255, 255, 255), + ) + + node.send_output( + "episode", + pa.array([episode_index]), + event["metadata"], + ) + else: + text = font.render( + f"Stopped recording episode {episode_index}", + True, + (255, 255, 255), + ) + + node.send_output( + "episode", + pa.array([-1]), + event["metadata"], + ) + + episode_index += 1 + + elif key == "return": + if recording: + recording = not recording + text = font.render( + f"Failed episode {episode_index}", + True, + (255, 255, 255), + ) + + node.send_output( + "failed", + pa.array([episode_index]), + event["metadata"], + ) + episode_index += 1 + node.send_output( + "episode", + pa.array([-1]), + event["metadata"], + ) + + elif episode_index >= 2: + text = font.render( + f"Failed episode {episode_index - 1}", + True, + (255, 255, 255), + ) + + node.send_output( + "failed", + pa.array([episode_index - 1]), + event["metadata"], + ) + + if not running: + break + + screen.fill((0, 0, 0)) + + # Draw the left image + screen.blit(image_left, (0, 0)) + + # Draw the right image + screen.blit(image_right, (window_width // 2, 0)) + + # Draw the text bottom center + screen.blit( + text, + (window_width // 2 - text.get_width() // 2, int(window_height)), + ) + + pygame.display.flip() + + elif event_type == "ERROR": + raise ValueError("An error occurred in the dataflow: " + event["error"]) + + node.send_output("end", pa.array([])) + + pygame.quit() + + +if __name__ == "__main__": + main() diff --git a/node-hub/lerobot-dashboard/pyproject.toml b/node-hub/lerobot-dashboard/pyproject.toml new file mode 100644 index 00000000..11ae4f27 --- /dev/null +++ b/node-hub/lerobot-dashboard/pyproject.toml @@ -0,0 +1,22 @@ +[tool.poetry] +name = "lerobot-dashboard" +version = "0.1" +authors = ["Hennzau "] +description = "Dora Node Dashboard for LeRobot dataset recording." +readme = "README.md" + +packages = [{ include = "lerobot_dashboard" }] + +[tool.poetry.dependencies] +python = "^3.9" +dora-rs = "0.3.5" +numpy = "< 2.0.0" +opencv-python = ">= 4.1.1" +pygame = "~2.6.0" + +[tool.poetry.scripts] +lerobot-dashboard = "lerobot_dashboard.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/node-hub/mujoco-client/README.md b/node-hub/mujoco-client/README.md new file mode 100644 index 00000000..95b6aa52 --- /dev/null +++ b/node-hub/mujoco-client/README.md @@ -0,0 +1,31 @@ +## Mujoco Client + +This node is a client for a Mujoco simulation. + +## YAML Configuration + +````YAML +nodes: + - id: mujoco_client + path: client.py # modify this to the relative path from the graph file to the client script + inputs: + pull_position: dora/timer/millis/10 # pull the present position every 10ms + + # write_goal_position: some goal position from other node + # end: some end signal from other node + outputs: + - position # regarding 'pull_position' input, it will output the position every 10ms + - end # end signal that can be sent to other nodes (sent when the simulation ends) + + env: + SCENE: scene.xml # the scene file to be used in the simulation modify this to the relative path from the graph file to the scene file + CONFIG: config.json # the configuration file for the simulated arm (only retrieve joints names) +```` + +## Inputs + +## Outputs + +## License + +This library is licensed under the [Apache License 2.0](../../LICENSE). \ No newline at end of file diff --git a/node-hub/mujoco-client/mujoco_client/__init__.py b/node-hub/mujoco-client/mujoco_client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/node-hub/mujoco-client/mujoco_client/main.py b/node-hub/mujoco-client/mujoco_client/main.py new file mode 100644 index 00000000..7ab8bd38 --- /dev/null +++ b/node-hub/mujoco-client/mujoco_client/main.py @@ -0,0 +1,151 @@ +""" +Mujoco Client: This node is used to represent simulated robot, it can be used to read virtual positions, +or can be controlled +""" + +import os +import argparse +import time +import json + +import pyarrow as pa + +from dora import Node + +import mujoco +import mujoco.viewer + + +class Client: + + def __init__(self, config: dict[str, any]): + self.config = config + + self.m = mujoco.MjModel.from_xml_path(filename=config["scene"]) + self.data = mujoco.MjData(self.m) + + self.node = Node(config["name"]) + + def run(self): + with mujoco.viewer.launch_passive(self.m, self.data) as viewer: + for event in self.node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + + if event_id == "tick": + self.node.send_output("tick", pa.array([]), event["metadata"]) + + if not viewer.is_running(): + break + + step_start = time.time() + + # Step the simulation forward + mujoco.mj_step(self.m, self.data) + viewer.sync() + + # Rudimentary time keeping, will drift relative to wall clock. + time_until_next_step = self.m.opt.timestep - ( + time.time() - step_start + ) + if time_until_next_step > 0: + time.sleep(time_until_next_step) + + elif event_id == "pull_position": + self.pull_position(self.node, event["metadata"]) + elif event_id == "pull_velocity": + self.pull_velocity(self.node, event["metadata"]) + elif event_id == "pull_current": + self.pull_current(self.node, event["metadata"]) + elif event_id == "write_goal_position": + self.write_goal_position(event["value"]) + elif event_id == "end": + break + + elif event_type == "ERROR": + raise ValueError( + "An error occurred in the dataflow: " + event["error"] + ) + + self.node.send_output("end", pa.array([])) + + def pull_position(self, node, metadata): + pass + + def pull_velocity(self, node, metadata): + pass + + def pull_current(self, node, metadata): + pass + + def write_goal_position(self, goal_position_with_joints): + joints = goal_position_with_joints.field("joints") + goal_position = goal_position_with_joints.field("values") + + for i, joint in enumerate(joints): + self.data.joint(joint.as_py()).qpos[0] = goal_position[i].as_py() + + +def main(): + # Handle dynamic nodes, ask for the name of the node in the dataflow + parser = argparse.ArgumentParser( + description="MujoCo Client: This node is used to represent a MuJoCo simulation. It can be used instead of a " + "follower arm to test the dataflow." + ) + + parser.add_argument( + "--name", + type=str, + required=False, + help="The name of the node in the dataflow.", + default="mujoco_client", + ) + parser.add_argument( + "--scene", + type=str, + required=False, + help="The scene file of the MuJoCo simulation.", + ) + + parser.add_argument( + "--config", type=str, help="The configuration of the joints.", default=None + ) + + args = parser.parse_args() + + if not os.getenv("SCENE") and args.scene is None: + raise ValueError( + "Please set the SCENE environment variable or pass the --scene argument." + ) + + scene = os.getenv("SCENE", args.scene) + + # Check if config is set + if not os.environ.get("CONFIG") and args.config is None: + raise ValueError( + "The configuration is not set. Please set the configuration of the simulated motors in the environment " + "variables or as an argument." + ) + + with open(os.environ.get("CONFIG") if args.config is None else args.config) as file: + config = json.load(file) + + joints = config.keys() + + # Create configuration + bus = { + "name": args.name, + "scene": scene, + "joints": pa.array(joints, pa.string()), + } + + print("Mujoco Client Configuration: ", bus, flush=True) + + client = Client(bus) + client.run() + + +if __name__ == "__main__": + main() diff --git a/node-hub/mujoco-client/pyproject.toml b/node-hub/mujoco-client/pyproject.toml new file mode 100644 index 00000000..0969e055 --- /dev/null +++ b/node-hub/mujoco-client/pyproject.toml @@ -0,0 +1,21 @@ +[tool.poetry] +name = "mujoco-client" +version = "0.1" +authors = ["Hennzau "] +description = "Dora Node client for mujoco simulated robots." +readme = "README.md" + +packages = [{ include = "mujoco_client" }] + +[tool.poetry.dependencies] +python = "^3.9" +dora-rs = "0.3.5" +mujoco = "~3.1.6" +PyOpenGL = "~3.1.1a1" + +[tool.poetry.scripts] +mujoco-client = "mujoco_client.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/node-hub/replay-client/README.md b/node-hub/replay-client/README.md new file mode 100644 index 00000000..3e5e53bc --- /dev/null +++ b/node-hub/replay-client/README.md @@ -0,0 +1,25 @@ +## Replay + +This node is a replay client. + +## YAML Configuration + +````YAML +nodes: + - id: replay_client + path: client.py # modify this to the relative path from the graph file to the client script + inputs: + pull_position: dora/timer/millis/10 + + outputs: + - position + - end # end signal that can be sent to other nodes (sent when the replay ends) + + env: + PATH: /path/to/record + EPISODE: 1 +```` + +## License + +This library is licensed under the [Apache License 2.0](../../LICENSE). \ No newline at end of file diff --git a/node-hub/replay-client/pyproject.toml b/node-hub/replay-client/pyproject.toml new file mode 100644 index 00000000..561b700f --- /dev/null +++ b/node-hub/replay-client/pyproject.toml @@ -0,0 +1,20 @@ +[tool.poetry] +name = "replay-client" +version = "0.1" +authors = ["Hennzau "] +description = "Dora Node client for replaying recorded data." +readme = "README.md" + +packages = [{ include = "replay_client" }] + +[tool.poetry.dependencies] +python = "^3.9" +dora-rs = "0.3.5" +pandas = "~2.2.2" + +[tool.poetry.scripts] +replay-client = "replay_client.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/node-hub/replay-client/replay_client/__init__.py b/node-hub/replay-client/replay_client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/node-hub/replay-client/replay_client/main.py b/node-hub/replay-client/replay_client/main.py new file mode 100644 index 00000000..901d3daa --- /dev/null +++ b/node-hub/replay-client/replay_client/main.py @@ -0,0 +1,125 @@ +""" +Replay Client: This node is used to represent a leader robot and send a sequence of goals to the dataflow, +reading a dataset of actions and joints from a specific episode. +""" + +import os +import argparse + +import pyarrow as pa +import pandas as pd + +from dora import Node + + +def joints_values_to_arrow(joints, values): + return pa.StructArray.from_arrays( + arrays=[joints, values], + names=["joints", "values"], + fields=None, + mask=None, + memory_pool=None, + ) + + +class Client: + + def __init__(self, config: dict[str, any]): + self.config = config + + self.node = Node(config["name"]) + + dataset = pd.read_parquet(config["episode_path"] + "/dataset.parquet") + + # Filter the dataset to only keep rows from the episode + dataset = dataset[dataset["episode_index"] == config["episode_id"]] + + self.action = dataset["action"] + self.joints = dataset["joints"] + self.frame = 0 + + def run(self): + for event in self.node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + + if event_id == "pull_position": + if self.pull_position(self.node, event["metadata"]): + break + elif event_id == "end": + break + + elif event_type == "ERROR": + raise ValueError("An error occurred in the dataflow: " + event["error"]) + + self.node.send_output("end", pa.array([])) + + def pull_position(self, node, metadata) -> bool: + if self.frame >= len(self.action): + return True + + action = self.action.iloc[self.frame] + joints = self.joints.iloc[self.frame] + + position = joints_values_to_arrow(joints, pa.array(action, type=pa.float32())) + + self.frame += 1 + + node.send_output("position", position, metadata) + + +def main(): + # Handle dynamic nodes, ask for the name of the node in the dataflow + parser = argparse.ArgumentParser( + description="Replay Client: This node is used to replay a sequence of goals for a followee robot." + ) + + parser.add_argument( + "--name", + type=str, + required=False, + help="The name of the node in the dataflow.", + default="replay_client", + ) + parser.add_argument( + "--path", + type=str, + required=False, + help="The path to the episode dataset.", + default=None, + ) + parser.add_argument( + "--episode", + type=int, + required=False, + help="The episode id to replay.", + default=None, + ) + + args = parser.parse_args() + + if (not os.getenv("PATH") and args.path is None) or ( + not os.getenv("EPISODE") and args.episode is None + ): + raise ValueError("The environment variables PATH and EPISODE_ID must be set.") + + if not isinstance(int(os.getenv("EPISODE")), int): + raise ValueError("The environment variable EPISODE_ID must be an integer.") + + # Create configuration + config = { + "name": args.name, + "episode_path": os.getenv("PATH", args.path), + "episode_id": int(os.getenv("EPISODE", args.episode)), + } + + print("Replay Client Configuration: ", config, flush=True) + + client = Client(config) + client.run() + + +if __name__ == "__main__": + main() diff --git a/node-hub/video-encoder/README.md b/node-hub/video-encoder/README.md new file mode 100644 index 00000000..1f826beb --- /dev/null +++ b/node-hub/video-encoder/README.md @@ -0,0 +1,28 @@ +## Camera Node for OpenCV compatible cameras + +Simple Camera node that uses OpenCV to capture images from a camera. The node can be configured to use a specific camera +id, width and height. +It then sends the images to the dataflow. + +## YAML Configuration + +````YAML +nodes: + - id: video_encoder + path: encoder.py # modify this to the relative path from the graph file to the client script + inputs: + # image: some image from other node + # episode_index: some episode index from other node + outputs: + - image + + env: + VIDEO_NAME: cam_up + VIDEO_WIDTH: 640 + VIDEO_HEIGHT: 480 + FPS: 30 +```` + +## License + +This library is licensed under the [Apache License 2.0](../../LICENSE). \ No newline at end of file diff --git a/node-hub/video-encoder/pyproject.toml b/node-hub/video-encoder/pyproject.toml new file mode 100644 index 00000000..8d1bbe63 --- /dev/null +++ b/node-hub/video-encoder/pyproject.toml @@ -0,0 +1,22 @@ +[tool.poetry] +name = "video-encoder" +version = "0.1" +authors = ["Hennzau "] +description = "Dora Node for video encoding (LeRobot compatible)." +readme = "README.md" + +packages = [{ include = "video_encoder" }] + +[tool.poetry.dependencies] +python = "^3.9" +dora-rs = "0.3.5" +numpy = "< 2.0.0" +opencv-python = ">= 4.1.1" +python-ffmpeg = "~2.0.12" + +[tool.poetry.scripts] +video-encoder = "video_encoder.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/node-hub/video-encoder/video_encoder/__init__.py b/node-hub/video-encoder/video_encoder/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/node-hub/video-encoder/video_encoder/main.py b/node-hub/video-encoder/video_encoder/main.py new file mode 100644 index 00000000..26f528f1 --- /dev/null +++ b/node-hub/video-encoder/video_encoder/main.py @@ -0,0 +1,133 @@ +import os +from pathlib import Path + +import cv2 +import argparse + +import numpy as np +import pyarrow as pa + +from dora import Node +from ffmpeg import FFmpeg + + +def main(): + # Handle dynamic nodes, ask for the name of the node in the dataflow + parser = argparse.ArgumentParser( + description="Video Encoder: This node is used to record episodes of a robot interacting with the environment." + ) + + parser.add_argument( + "--name", + type=str, + required=False, + help="The name of the node in the dataflow.", + default="video_encoder", + ) + + if not os.getenv("VIDEO_NAME") or not os.getenv("FPS"): + raise ValueError("Please set the VIDEO_NAME and FPS environment variables.") + + video_name = os.getenv("VIDEO_NAME") + fps = int(os.getenv("FPS")) + + args = parser.parse_args() + + node = Node(args.name) + + recording = False + episode_index = 1 + + dataflow_id = node.dataflow_id() + + base = Path("out") / dataflow_id / "videos" + out_dir = base / f"{video_name}_episode_{episode_index:06d}" + name = f"{video_name}_episode_{episode_index:06d}.mp4" + + if not out_dir.exists(): + out_dir.mkdir(parents=True) + + # We initialize the output canal with a first data + node.send_output( + "image", + pa.array([{"path": f"videos/{name}", "timestamp": float(0) / fps}]), + ) + + frame_count = 0 + for event in node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + + if event_id == "image": + if recording: + base = Path("out") / dataflow_id / "videos" + out_dir = base / f"{video_name}_episode_{episode_index:06d}" + name = f"{video_name}_episode_{episode_index:06d}.mp4" + + if not out_dir.exists(): + out_dir.mkdir(parents=True) + + node.send_output( + "image", + pa.array( + [ + { + "path": f"videos/{name}", + "timestamp": float(frame_count) / fps, + } + ] + ), + event["metadata"], + ) + + arrow_image = event["value"][0] + image = { + "width": arrow_image["width"].as_py(), + "height": arrow_image["height"].as_py(), + "channels": arrow_image["channels"].as_py(), + "data": arrow_image["data"].values.to_numpy().astype(np.uint8), + } + + data = image["data"].reshape( + (image["height"], image["width"], image["channels"]) + ) + + path = str(out_dir / f"frame_{frame_count:06d}.png") + cv2.imwrite(path, data) + + frame_count += 1 + + elif event_id == "episode_index": + episode = event["value"][0].as_py() + recording = episode != -1 + + if recording: + episode_index = episode + else: + # save all the frames into a video + base = Path("out") / dataflow_id / "videos" + out_dir = base / f"{video_name}_episode_{episode_index:06d}" + name = f"{video_name}_episode_{episode_index:06d}.mp4" + video_path = base / name + + ffmpeg = ( + FFmpeg() + .option("y") + .input(str(out_dir / "frame_%06d.png"), f="image2", r=fps) + .output( + str(video_path), vcodec="libx264", g=2, pix_fmt="yuv444p" + ) + ) + + ffmpeg.execute() + + frame_count = 0 + + elif event_type == "ERROR": + raise ValueError("An error occurred in the dataflow: " + event["error"]) + + +if __name__ == "__main__": + main()