Browse Source

Fix positional argument typing

tags/v0.3.4-rc1
haixuanTao 1 year ago
parent
commit
bdf90f7219
2 changed files with 37 additions and 36 deletions
  1. +34
    -34
      apis/python/node/dora/__init__.pyi
  2. +3
    -2
      apis/python/node/generate_stubs.py

+ 34
- 34
apis/python/node/dora/__init__.pyi View File

@@ -22,7 +22,7 @@ from dora import Node
node = Node()
```"""

def __init__(self, /) -> None:
def __init__(self) -> None:
"""The custom node API lets you integrate `dora` into your application.
It allows you to retrieve input and send output in any fashion you want.

@@ -34,16 +34,16 @@ from dora import Node
node = Node()
```"""

def dataflow_descriptor(self, /) -> dict:
def dataflow_descriptor(self) -> dict:
"""Returns the full dataflow descriptor that this node is part of.

This method returns the parsed dataflow YAML file."""

def merge_external_events(self, /, subscription: dora.Ros2Subscription) -> None:
def merge_external_events(self, subscription: dora.Ros2Subscription) -> None:
"""Merge an external event stream with dora main loop.
This currently only work with ROS2."""

def next(self, /, timeout: float=None) -> dora.PyEvent:
def next(self, timeout: float=None) -> dora.PyEvent:
"""`.next()` gives you the next input that the node has received.
It blocks until the next event becomes available.
You can use timeout in seconds to return if no input is available.
@@ -63,7 +63,7 @@ match event["id"]:
case "image":
```"""

def send_output(self, /, output_id: str, data: pyarrow.Array, metadata: dict=None) -> None:
def send_output(self, output_id: str, data: pyarrow.Array, metadata: dict=None) -> None:
"""`send_output` send data from the node.

```python
@@ -79,19 +79,19 @@ ex:
node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})
```"""

def __iter__(self, /) -> typing.Any:
def __iter__(self) -> typing.Any:
"""Implement iter(self)."""

def __next__(self, /) -> typing.Any:
def __next__(self) -> typing.Any:
"""Implement next(self)."""

@typing.final
class PyEvent:
"""Dora Event"""

def inner(self, /):...
def inner(self):...

def __getitem__(self, key: typing.Any, /) -> typing.Any:
def __getitem__(self, key: typing.Any) -> typing.Any:
"""Return self[key]."""

@typing.final
@@ -115,7 +115,7 @@ at any point without it being considered a breaking change.
context = Ros2Context()
```"""

def __init__(self, /, ros_paths: typing.List[str]=None) -> None:
def __init__(self, ros_paths: typing.List[str]=None) -> None:
"""ROS2 Context holding all messages definition for receiving and sending messages to ROS2.

By default, Ros2Context will use env `AMENT_PREFIX_PATH` to search for message definition.
@@ -135,7 +135,7 @@ at any point without it being considered a breaking change.
context = Ros2Context()
```"""

def new_node(self, /, name: str, namespace: str, options: dora.Ros2NodeOptions) -> dora.Ros2Node:
def new_node(self, name: str, namespace: str, options: dora.Ros2NodeOptions) -> dora.Ros2Node:
"""Create a new ROS2 node

```python
@@ -150,28 +150,28 @@ Ros2NodeOptions(rosout=True),
class Ros2Durability:
"""DDS 2.2.3.4 DURABILITY"""

def __eq__(self, value: typing.Any, /) -> bool:
def __eq__(self, value: typing.Any) -> bool:
"""Return self==value."""

def __ge__(self, value: typing.Any, /) -> bool:
def __ge__(self, value: typing.Any) -> bool:
"""Return self>=value."""

def __gt__(self, value: typing.Any, /) -> bool:
def __gt__(self, value: typing.Any) -> bool:
"""Return self>value."""

def __int__(self, /) -> None:
def __int__(self) -> None:
"""int(self)"""

def __le__(self, value: typing.Any, /) -> bool:
def __le__(self, value: typing.Any) -> bool:
"""Return self<=value."""

def __lt__(self, value: typing.Any, /) -> bool:
def __lt__(self, value: typing.Any) -> bool:
"""Return self<value."""

def __ne__(self, value: typing.Any, /) -> bool:
def __ne__(self, value: typing.Any) -> bool:
"""Return self!=value."""

def __repr__(self, /) -> str:
def __repr__(self) -> str:
"""Return repr(self)."""
Persistent: Ros2Durability = ...
Transient: Ros2Durability = ...
@@ -182,28 +182,28 @@ class Ros2Durability:
class Ros2Liveliness:
"""DDS 2.2.3.11 LIVELINESS"""

def __eq__(self, value: typing.Any, /) -> bool:
def __eq__(self, value: typing.Any) -> bool:
"""Return self==value."""

def __ge__(self, value: typing.Any, /) -> bool:
def __ge__(self, value: typing.Any) -> bool:
"""Return self>=value."""

def __gt__(self, value: typing.Any, /) -> bool:
def __gt__(self, value: typing.Any) -> bool:
"""Return self>value."""

def __int__(self, /) -> None:
def __int__(self) -> None:
"""int(self)"""

def __le__(self, value: typing.Any, /) -> bool:
def __le__(self, value: typing.Any) -> bool:
"""Return self<=value."""

def __lt__(self, value: typing.Any, /) -> bool:
def __lt__(self, value: typing.Any) -> bool:
"""Return self<value."""

def __ne__(self, value: typing.Any, /) -> bool:
def __ne__(self, value: typing.Any) -> bool:
"""Return self!=value."""

def __repr__(self, /) -> str:
def __repr__(self) -> str:
"""Return repr(self)."""
Automatic: Ros2Liveliness = ...
ManualByParticipant: Ros2Liveliness = ...
@@ -217,21 +217,21 @@ warnings::
- There's a known issue about ROS2 nodes not being discoverable by ROS2
See: https://github.com/jhelovuo/ros2-client/issues/4"""

def create_publisher(self, /, topic: dora.Ros2Topic, qos: dora.Ros2QosPolicies=None) -> dora.Ros2Publisher:
def create_publisher(self, topic: dora.Ros2Topic, qos: dora.Ros2QosPolicies=None) -> dora.Ros2Publisher:
"""Create a ROS2 publisher

```python
pose_publisher = ros2_node.create_publisher(turtle_pose_topic)
```"""

def create_subscription(self, /, topic: dora.Ros2Topic, qos: dora.Ros2QosPolicies=None) -> dora.Ros2Subscription:
def create_subscription(self, topic: dora.Ros2Topic, qos: dora.Ros2QosPolicies=None) -> dora.Ros2Subscription:
"""Create a ROS2 subscription

```python
pose_reader = ros2_node.create_subscription(turtle_pose_topic)
```"""

def create_topic(self, /, name: str, message_type: str, qos: dora.Ros2QosPolicies) -> dora.Ros2Topic:
def create_topic(self, name: str, message_type: str, qos: dora.Ros2QosPolicies) -> dora.Ros2Topic:
"""Create a ROS2 topic to connect to.

```python
@@ -244,14 +244,14 @@ turtle_twist_topic = ros2_node.create_topic(
class Ros2NodeOptions:
"""ROS2 Node Options"""

def __init__(self, /, rosout: bool=None) -> None:
def __init__(self, rosout: bool=None) -> None:
"""ROS2 Node Options"""

@typing.final
class Ros2Publisher:
"""ROS2 Publisher"""

def publish(self, /, data: pyarrow.Array) -> None:
def publish(self, data: pyarrow.Array) -> None:
"""Publish a message into ROS2 topic.

Remember that the data format should respect the structure of the ROS2 message usinng an arrow Structure.
@@ -274,14 +274,14 @@ pa.array(
class Ros2QosPolicies:
"""ROS2 QoS Policy"""

def __init__(self, /, durability: dora.Ros2Durability=None, liveliness: dora.Ros2Liveliness=None, reliable: bool=None, keep_all: bool=None, lease_duration: float=None, max_blocking_time: float=None, keep_last: int=None) -> dora.Ros2QoSPolicies:
def __init__(self, durability: dora.Ros2Durability=None, liveliness: dora.Ros2Liveliness=None, reliable: bool=None, keep_all: bool=None, lease_duration: float=None, max_blocking_time: float=None, keep_last: int=None) -> dora.Ros2QoSPolicies:
"""ROS2 QoS Policy"""

@typing.final
class Ros2Subscription:
"""ROS2 Subscription"""

def next(self, /):...
def next(self):...

@typing.final
class Ros2Topic:


+ 3
- 2
apis/python/node/generate_stubs.py View File

@@ -333,8 +333,9 @@ def arguments_stub(
)

if param.kind == param.POSITIONAL_ONLY:
posonlyargs.append(param_ast)
defaults.append(default_ast)
args.append(param_ast)
# posonlyargs.append(param_ast)
# defaults.append(default_ast)
elif param.kind == param.POSITIONAL_OR_KEYWORD:
args.append(param_ast)
defaults.append(default_ast)


Loading…
Cancel
Save