Browse Source

Merge pull request #493 from dora-rs/python-ide-linting

Adding python IDE typing
tags/v0.3.4-rc1
Haixuan Xavier Tao GitHub 1 year ago
parent
commit
d6f13d08bd
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
11 changed files with 1095 additions and 25 deletions
  1. +15
    -0
      README.md
  2. +9
    -0
      apis/python/node/README.md
  3. +16
    -6
      apis/python/node/dora/__init__.py
  4. +317
    -0
      apis/python/node/dora/__init__.pyi
  5. +517
    -0
      apis/python/node/generate_stubs.py
  6. +48
    -9
      apis/python/node/src/lib.rs
  7. +12
    -1
      apis/python/operator/src/lib.rs
  8. +4
    -7
      examples/python-ros2-dataflow/random_turtle.py
  9. +1
    -0
      libraries/core/src/descriptor/mod.rs
  10. +135
    -1
      libraries/extensions/ros2-bridge/python/src/lib.rs
  11. +21
    -1
      libraries/extensions/ros2-bridge/python/src/qos.rs

+ 15
- 0
README.md View File

@@ -263,6 +263,21 @@ We also have [a contributing guide](CONTRIBUTING.md).
| **Supported Platforms (ARM)** | macOS, Linux |
| **Configuration** | YAML |

### Unstable functionality

`dora-rs` Ros2 Bridge is marked as **unstable**.
There are a number of reasons functionality may be marked as unstable:

- We are unsure about the exact API. The name, function signature, or implementation are likely to change in the future.
- The functionality is not tested extensively yet. Bugs may pop up when used in real-world scenarios.
- The functionality does not integrate well with the full dora-rs API. You may find it works in one context but not in another.

Releasing functionality as unstable allows us to gather important feedback from users that use dora-rs in real-world scenarios.
This helps us fine-tune things before giving it the final stamp of approval.
Users are only interested in solid, well-tested functionality can avoid this part of the API.

Functionality marked as unstable may change at any point without it being considered a breaking change.

## License

This project is licensed under Apache-2.0. Check out [NOTICE.md](NOTICE.md) for more information.

+ 9
- 0
apis/python/node/README.md View File

@@ -10,3 +10,12 @@ source .env/bin/activate
pip install maturin
maturin develop
```

## Type hinting

Type hinting requires to run a second step

```bash
python generate_stubs.py dora dora/__init__.pyi
maturin develop
```

+ 16
- 6
apis/python/node/dora/__init__.py View File

@@ -1,10 +1,7 @@
"""
# dora-rs

This is the dora python client for interacting with dora dataflow.

You can install it via:

```bash
pip install dora-rs
```
@@ -14,14 +11,27 @@ from enum import Enum

from .dora import *

__author__ = "Dora-rs Authors"
__version__ = "0.3.3"
from .dora import (
Node,
PyEvent,
Ros2Context,
Ros2Node,
Ros2NodeOptions,
Ros2Topic,
Ros2Publisher,
Ros2Subscription,
start_runtime,
__version__,
__author__,
Ros2QosPolicies,
Ros2Durability,
Ros2Liveliness,
)


class DoraStatus(Enum):
"""Dora status to indicate if operator `on_input` loop
should be stopped.

Args:
Enum (u8): Status signaling to dora operator to
stop or continue the operator.


+ 317
- 0
apis/python/node/dora/__init__.pyi View File

@@ -0,0 +1,317 @@
import dora
import pyarrow
import typing

@typing.final
class Enum:
"""Generic enumeration.

Derive from this class to define new enumerations."""
__members__: mappingproxy = ...

@typing.final
class Node:
"""The custom node API lets you integrate `dora` into your application.
It allows you to retrieve input and send output in any fashion you want.

Use with:

```python
from dora import Node

node = Node()
```"""

def __init__(self) -> None:
"""The custom node API lets you integrate `dora` into your application.
It allows you to retrieve input and send output in any fashion you want.

Use with:

```python
from dora import Node

node = Node()
```"""

def dataflow_descriptor(self) -> dict:
"""Returns the full dataflow descriptor that this node is part of.

This method returns the parsed dataflow YAML file."""

def merge_external_events(self, subscription: dora.Ros2Subscription) -> None:
"""Merge an external event stream with dora main loop.
This currently only work with ROS2."""

def next(self, timeout: float=None) -> dora.PyEvent:
"""`.next()` gives you the next input that the node has received.
It blocks until the next event becomes available.
You can use timeout in seconds to return if no input is available.
It will return `None` when all senders has been dropped.

```python
event = node.next()
```

You can also iterate over the event stream with a loop

```python
for event in node:
match event["type"]:
case "INPUT":
match event["id"]:
case "image":
```"""

def send_output(self, output_id: str, data: pyarrow.Array, metadata: dict=None) -> None:
"""`send_output` send data from the node.

```python
Args:
output_id: str,
data: pyarrow.Array,
metadata: Option[Dict],
```

ex:

```python
node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})
```"""

def __iter__(self) -> typing.Any:
"""Implement iter(self)."""

def __next__(self) -> typing.Any:
"""Implement next(self)."""

@typing.final
class PyEvent:
"""Dora Event"""

def inner(self):...

def __getitem__(self, key: typing.Any) -> typing.Any:
"""Return self[key]."""

@typing.final
class Ros2Context:
"""ROS2 Context holding all messages definition for receiving and sending messages to ROS2.

By default, Ros2Context will use env `AMENT_PREFIX_PATH` to search for message definition.

AMENT_PREFIX_PATH folder structure should be the following:

- For messages: <namespace>/msg/<name>.msg
- For services: <namespace>/srv/<name>.srv

You can also use `ros_paths` if you don't want to use env variable.

warning::
dora Ros2 bridge functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change.

```python
context = Ros2Context()
```"""

def __init__(self, ros_paths: typing.List[str]=None) -> None:
"""ROS2 Context holding all messages definition for receiving and sending messages to ROS2.

By default, Ros2Context will use env `AMENT_PREFIX_PATH` to search for message definition.

AMENT_PREFIX_PATH folder structure should be the following:

- For messages: <namespace>/msg/<name>.msg
- For services: <namespace>/srv/<name>.srv

You can also use `ros_paths` if you don't want to use env variable.

warning::
dora Ros2 bridge functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change.

```python
context = Ros2Context()
```"""

def new_node(self, name: str, namespace: str, options: dora.Ros2NodeOptions) -> dora.Ros2Node:
"""Create a new ROS2 node

```python
ros2_node = ros2_context.new_node(
"turtle_teleop",
"/ros2_demo",
Ros2NodeOptions(rosout=True),
)
```

warning::
dora Ros2 bridge functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change."""

@typing.final
class Ros2Durability:
"""DDS 2.2.3.4 DURABILITY"""

def __eq__(self, value: typing.Any) -> bool:
"""Return self==value."""

def __ge__(self, value: typing.Any) -> bool:
"""Return self>=value."""

def __gt__(self, value: typing.Any) -> bool:
"""Return self>value."""

def __int__(self) -> None:
"""int(self)"""

def __le__(self, value: typing.Any) -> bool:
"""Return self<=value."""

def __lt__(self, value: typing.Any) -> bool:
"""Return self<value."""

def __ne__(self, value: typing.Any) -> bool:
"""Return self!=value."""

def __repr__(self) -> str:
"""Return repr(self)."""
Persistent: Ros2Durability = ...
Transient: Ros2Durability = ...
TransientLocal: Ros2Durability = ...
Volatile: Ros2Durability = ...

@typing.final
class Ros2Liveliness:
"""DDS 2.2.3.11 LIVELINESS"""

def __eq__(self, value: typing.Any) -> bool:
"""Return self==value."""

def __ge__(self, value: typing.Any) -> bool:
"""Return self>=value."""

def __gt__(self, value: typing.Any) -> bool:
"""Return self>value."""

def __int__(self) -> None:
"""int(self)"""

def __le__(self, value: typing.Any) -> bool:
"""Return self<=value."""

def __lt__(self, value: typing.Any) -> bool:
"""Return self<value."""

def __ne__(self, value: typing.Any) -> bool:
"""Return self!=value."""

def __repr__(self) -> str:
"""Return repr(self)."""
Automatic: Ros2Liveliness = ...
ManualByParticipant: Ros2Liveliness = ...
ManualByTopic: Ros2Liveliness = ...

@typing.final
class Ros2Node:
"""ROS2 Node

warnings::
- dora Ros2 bridge functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
- There's a known issue about ROS2 nodes not being discoverable by ROS2
See: https://github.com/jhelovuo/ros2-client/issues/4"""

def create_publisher(self, topic: dora.Ros2Topic, qos: dora.Ros2QosPolicies=None) -> dora.Ros2Publisher:
"""Create a ROS2 publisher

```python
pose_publisher = ros2_node.create_publisher(turtle_pose_topic)
```
warnings:
- dora Ros2 bridge functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change."""

def create_subscription(self, topic: dora.Ros2Topic, qos: dora.Ros2QosPolicies=None) -> dora.Ros2Subscription:
"""Create a ROS2 subscription

```python
pose_reader = ros2_node.create_subscription(turtle_pose_topic)
```

warnings:
- dora Ros2 bridge functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change."""

def create_topic(self, name: str, message_type: str, qos: dora.Ros2QosPolicies) -> dora.Ros2Topic:
"""Create a ROS2 topic to connect to.

```python
turtle_twist_topic = ros2_node.create_topic(
"/turtle1/cmd_vel", "geometry_msgs/Twist", topic_qos
)
```"""

@typing.final
class Ros2NodeOptions:
"""ROS2 Node Options"""

def __init__(self, rosout: bool=None) -> None:
"""ROS2 Node Options"""

@typing.final
class Ros2Publisher:
"""ROS2 Publisher

warnings:
- dora Ros2 bridge functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change."""

def publish(self, data: pyarrow.Array) -> None:
"""Publish a message into ROS2 topic.

Remember that the data format should respect the structure of the ROS2 message usinng an arrow Structure.

ex:
```python
gripper_command.publish(
pa.array(
[
{
"name": "gripper",
"cmd": np.float32(5),
}
]
),
)
```"""

@typing.final
class Ros2QosPolicies:
"""ROS2 QoS Policy"""

def __init__(self, durability: dora.Ros2Durability=None, liveliness: dora.Ros2Liveliness=None, reliable: bool=None, keep_all: bool=None, lease_duration: float=None, max_blocking_time: float=None, keep_last: int=None) -> dora.Ros2QoSPolicies:
"""ROS2 QoS Policy"""

@typing.final
class Ros2Subscription:
"""ROS2 Subscription


warnings:
- dora Ros2 bridge functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change."""

def next(self):...

@typing.final
class Ros2Topic:
"""ROS2 Topic

warnings:
- dora Ros2 bridge functionality is considered **unstable**. It may be changed
at any point without it being considered a breaking change."""

def start_runtime() -> None:
"""Start a runtime for Operators"""

+ 517
- 0
apis/python/node/generate_stubs.py View File

@@ -0,0 +1,517 @@
import argparse
import ast
import importlib
import inspect
import logging
import re
import subprocess
from functools import reduce
from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Union


def path_to_type(*elements: str) -> ast.AST:
base: ast.AST = ast.Name(id=elements[0], ctx=ast.Load())
for e in elements[1:]:
base = ast.Attribute(value=base, attr=e, ctx=ast.Load())
return base


OBJECT_MEMBERS = dict(inspect.getmembers(object))
BUILTINS: Dict[str, Union[None, Tuple[List[ast.AST], ast.AST]]] = {
"__annotations__": None,
"__bool__": ([], path_to_type("bool")),
"__bytes__": ([], path_to_type("bytes")),
"__class__": None,
"__contains__": ([path_to_type("typing", "Any")], path_to_type("bool")),
"__del__": None,
"__delattr__": ([path_to_type("str")], path_to_type("None")),
"__delitem__": ([path_to_type("typing", "Any")], path_to_type("typing", "Any")),
"__dict__": None,
"__dir__": None,
"__doc__": None,
"__eq__": ([path_to_type("typing", "Any")], path_to_type("bool")),
"__format__": ([path_to_type("str")], path_to_type("str")),
"__ge__": ([path_to_type("typing", "Any")], path_to_type("bool")),
"__getattribute__": ([path_to_type("str")], path_to_type("typing", "Any")),
"__getitem__": ([path_to_type("typing", "Any")], path_to_type("typing", "Any")),
"__gt__": ([path_to_type("typing", "Any")], path_to_type("bool")),
"__hash__": ([], path_to_type("int")),
"__init__": ([], path_to_type("None")),
"__init_subclass__": None,
"__iter__": ([], path_to_type("typing", "Any")),
"__le__": ([path_to_type("typing", "Any")], path_to_type("bool")),
"__len__": ([], path_to_type("int")),
"__lt__": ([path_to_type("typing", "Any")], path_to_type("bool")),
"__module__": None,
"__ne__": ([path_to_type("typing", "Any")], path_to_type("bool")),
"__new__": None,
"__next__": ([], path_to_type("typing", "Any")),
"__int__": ([], path_to_type("None")),
"__reduce__": None,
"__reduce_ex__": None,
"__repr__": ([], path_to_type("str")),
"__setattr__": (
[path_to_type("str"), path_to_type("typing", "Any")],
path_to_type("None"),
),
"__setitem__": (
[path_to_type("typing", "Any"), path_to_type("typing", "Any")],
path_to_type("typing", "Any"),
),
"__sizeof__": None,
"__str__": ([], path_to_type("str")),
"__subclasshook__": None,
}


def module_stubs(module: Any) -> ast.Module:
types_to_import = {"typing"}
classes = []
functions = []
for member_name, member_value in inspect.getmembers(module):
element_path = [module.__name__, member_name]
if member_name.startswith("__"):
pass
elif member_name.startswith("DoraStatus"):
pass
elif inspect.isclass(member_value):
classes.append(
class_stubs(member_name, member_value, element_path, types_to_import)
)
elif inspect.isbuiltin(member_value):
functions.append(
function_stub(
member_name,
member_value,
element_path,
types_to_import,
in_class=False,
)
)
else:
logging.warning(f"Unsupported root construction {member_name}")
return ast.Module(
body=[ast.Import(names=[ast.alias(name=t)]) for t in sorted(types_to_import)]
+ classes
+ functions,
type_ignores=[],
)


def class_stubs(
cls_name: str, cls_def: Any, element_path: List[str], types_to_import: Set[str]
) -> ast.ClassDef:
attributes: List[ast.AST] = []
methods: List[ast.AST] = []
magic_methods: List[ast.AST] = []
constants: List[ast.AST] = []
for member_name, member_value in inspect.getmembers(cls_def):
current_element_path = [*element_path, member_name]
if member_name == "__init__":
try:
inspect.signature(cls_def) # we check it actually exists
methods = [
function_stub(
member_name,
cls_def,
current_element_path,
types_to_import,
in_class=True,
),
*methods,
]
except ValueError as e:
if "no signature found" not in str(e):
raise ValueError(
f"Error while parsing signature of {cls_name}.__init_"
) from e
elif (
member_value == OBJECT_MEMBERS.get(member_name)
or BUILTINS.get(member_name, ()) is None
):
pass
elif inspect.isdatadescriptor(member_value):
attributes.extend(
data_descriptor_stub(
member_name, member_value, current_element_path, types_to_import
)
)
elif inspect.isroutine(member_value):
(magic_methods if member_name.startswith("__") else methods).append(
function_stub(
member_name,
member_value,
current_element_path,
types_to_import,
in_class=True,
)
)
elif member_name == "__match_args__":
constants.append(
ast.AnnAssign(
target=ast.Name(id=member_name, ctx=ast.Store()),
annotation=ast.Subscript(
value=path_to_type("tuple"),
slice=ast.Tuple(
elts=[path_to_type("str"), ast.Ellipsis()], ctx=ast.Load()
),
ctx=ast.Load(),
),
value=ast.Constant(member_value),
simple=1,
)
)
elif member_value is not None:
constants.append(
ast.AnnAssign(
target=ast.Name(id=member_name, ctx=ast.Store()),
annotation=concatenated_path_to_type(
member_value.__class__.__name__, element_path, types_to_import
),
value=ast.Ellipsis(),
simple=1,
)
)
else:
logging.warning(
f"Unsupported member {member_name} of class {'.'.join(element_path)}"
)

doc = inspect.getdoc(cls_def)
doc_comment = build_doc_comment(doc) if doc else None
return ast.ClassDef(
cls_name,
bases=[],
keywords=[],
body=(
([doc_comment] if doc_comment else [])
+ attributes
+ methods
+ magic_methods
+ constants
)
or [ast.Ellipsis()],
decorator_list=[path_to_type("typing", "final")],
)


def data_descriptor_stub(
data_desc_name: str,
data_desc_def: Any,
element_path: List[str],
types_to_import: Set[str],
) -> Union[Tuple[ast.AnnAssign, ast.Expr], Tuple[ast.AnnAssign]]:
annotation = None
doc_comment = None

doc = inspect.getdoc(data_desc_def)
if doc is not None:
annotation = returns_stub(data_desc_name, doc, element_path, types_to_import)
m = re.findall(r"^ *:return: *(.*) *$", doc, re.MULTILINE)
if len(m) == 1:
doc_comment = m[0]
elif len(m) > 1:
raise ValueError(
f"Multiple return annotations found with :return: in {'.'.join(element_path)} documentation"
)

assign = ast.AnnAssign(
target=ast.Name(id=data_desc_name, ctx=ast.Store()),
annotation=annotation or path_to_type("typing", "Any"),
simple=1,
)
doc_comment = build_doc_comment(doc_comment) if doc_comment else None
return (assign, doc_comment) if doc_comment else (assign,)


def function_stub(
fn_name: str,
fn_def: Any,
element_path: List[str],
types_to_import: Set[str],
*,
in_class: bool,
) -> ast.FunctionDef:
body: List[ast.AST] = []
doc = inspect.getdoc(fn_def)
if doc is not None:
doc_comment = build_doc_comment(doc)
if doc_comment is not None:
body.append(doc_comment)

decorator_list = []
if in_class and hasattr(fn_def, "__self__"):
decorator_list.append(ast.Name("staticmethod"))

return ast.FunctionDef(
fn_name,
arguments_stub(fn_name, fn_def, doc or "", element_path, types_to_import),
body or [ast.Ellipsis()],
decorator_list=decorator_list,
returns=(
returns_stub(fn_name, doc, element_path, types_to_import) if doc else None
),
lineno=0,
)


def arguments_stub(
callable_name: str,
callable_def: Any,
doc: str,
element_path: List[str],
types_to_import: Set[str],
) -> ast.arguments:
real_parameters: Mapping[str, inspect.Parameter] = inspect.signature(
callable_def
).parameters
if callable_name == "__init__":
real_parameters = {
"self": inspect.Parameter("self", inspect.Parameter.POSITIONAL_ONLY),
**real_parameters,
}

parsed_param_types = {}
optional_params = set()

# Types for magic functions types
builtin = BUILTINS.get(callable_name)
if isinstance(builtin, tuple):
param_names = list(real_parameters.keys())
if param_names and param_names[0] == "self":
del param_names[0]
for name, t in zip(param_names, builtin[0]):
parsed_param_types[name] = t

# Types from comment
for match in re.findall(
r"^ *:type *([a-zA-Z0-9_]+): ([^\n]*) *$", doc, re.MULTILINE
):
if match[0] not in real_parameters:
raise ValueError(
f"The parameter {match[0]} of {'.'.join(element_path)} "
"is defined in the documentation but not in the function signature"
)
type = match[1]
if type.endswith(", optional"):
optional_params.add(match[0])
type = type[:-10]
parsed_param_types[match[0]] = convert_type_from_doc(
type, element_path, types_to_import
)

# we parse the parameters
posonlyargs = []
args = []
vararg = None
kwonlyargs = []
kw_defaults = []
kwarg = None
defaults = []
for param in real_parameters.values():
if param.name != "self" and param.name not in parsed_param_types:
raise ValueError(
f"The parameter {param.name} of {'.'.join(element_path)} "
"has no type definition in the function documentation"
)
param_ast = ast.arg(
arg=param.name, annotation=parsed_param_types.get(param.name)
)

default_ast = None
if param.default != param.empty:
default_ast = ast.Constant(param.default)
if param.name not in optional_params:
raise ValueError(
f"Parameter {param.name} of {'.'.join(element_path)} "
"is optional according to the type but not flagged as such in the doc"
)
elif param.name in optional_params:
raise ValueError(
f"Parameter {param.name} of {'.'.join(element_path)} "
"is optional according to the documentation but has no default value"
)

if param.kind == param.POSITIONAL_ONLY:
args.append(param_ast)
# posonlyargs.append(param_ast)
# defaults.append(default_ast)
elif param.kind == param.POSITIONAL_OR_KEYWORD:
args.append(param_ast)
defaults.append(default_ast)
elif param.kind == param.VAR_POSITIONAL:
vararg = param_ast
elif param.kind == param.KEYWORD_ONLY:
kwonlyargs.append(param_ast)
kw_defaults.append(default_ast)
elif param.kind == param.VAR_KEYWORD:
kwarg = param_ast

return ast.arguments(
posonlyargs=posonlyargs,
args=args,
vararg=vararg,
kwonlyargs=kwonlyargs,
kw_defaults=kw_defaults,
defaults=defaults,
kwarg=kwarg,
)


def returns_stub(
callable_name: str, doc: str, element_path: List[str], types_to_import: Set[str]
) -> Optional[ast.AST]:
m = re.findall(r"^ *:rtype: *([^\n]*) *$", doc, re.MULTILINE)
if len(m) == 0:
builtin = BUILTINS.get(callable_name)
if isinstance(builtin, tuple) and builtin[1] is not None:
return builtin[1]
raise ValueError(
f"The return type of {'.'.join(element_path)} "
"has no type definition using :rtype: in the function documentation"
)
if len(m) > 1:
raise ValueError(
f"Multiple return type annotations found with :rtype: for {'.'.join(element_path)}"
)
return convert_type_from_doc(m[0], element_path, types_to_import)


def convert_type_from_doc(
type_str: str, element_path: List[str], types_to_import: Set[str]
) -> ast.AST:
type_str = type_str.strip()
return parse_type_to_ast(type_str, element_path, types_to_import)


def parse_type_to_ast(
type_str: str, element_path: List[str], types_to_import: Set[str]
) -> ast.AST:
# let's tokenize
tokens = []
current_token = ""
for c in type_str:
if "a" <= c <= "z" or "A" <= c <= "Z" or c == ".":
current_token += c
else:
if current_token:
tokens.append(current_token)
current_token = ""
if c != " ":
tokens.append(c)
if current_token:
tokens.append(current_token)

# let's first parse nested parenthesis
stack: List[List[Any]] = [[]]
for token in tokens:
if token == "[":
children: List[str] = []
stack[-1].append(children)
stack.append(children)
elif token == "]":
stack.pop()
else:
stack[-1].append(token)

# then it's easy
def parse_sequence(sequence: List[Any]) -> ast.AST:
# we split based on "or"
or_groups: List[List[str]] = [[]]
print(sequence)
# TODO: Fix sequence
if "Ros" in sequence and "2" in sequence:
sequence = ["".join(sequence)]
elif "dora.Ros" in sequence and "2" in sequence:
sequence = ["".join(sequence)]

for e in sequence:
if e == "or":
or_groups.append([])
else:
or_groups[-1].append(e)
if any(not g for g in or_groups):
raise ValueError(
f"Not able to parse type '{type_str}' used by {'.'.join(element_path)}"
)

new_elements: List[ast.AST] = []
for group in or_groups:
if len(group) == 1 and isinstance(group[0], str):
new_elements.append(
concatenated_path_to_type(group[0], element_path, types_to_import)
)
elif (
len(group) == 2
and isinstance(group[0], str)
and isinstance(group[1], list)
):
new_elements.append(
ast.Subscript(
value=concatenated_path_to_type(
group[0], element_path, types_to_import
),
slice=parse_sequence(group[1]),
ctx=ast.Load(),
)
)
else:
raise ValueError(
f"Not able to parse type '{type_str}' used by {'.'.join(element_path)}"
)
return reduce(
lambda left, right: ast.BinOp(left=left, op=ast.BitOr(), right=right),
new_elements,
)

return parse_sequence(stack[0])


def concatenated_path_to_type(
path: str, element_path: List[str], types_to_import: Set[str]
) -> ast.AST:
parts = path.split(".")
if any(not p for p in parts):
raise ValueError(
f"Not able to parse type '{path}' used by {'.'.join(element_path)}"
)
if len(parts) > 1:
types_to_import.add(".".join(parts[:-1]))
return path_to_type(*parts)


def build_doc_comment(doc: str) -> Optional[ast.Expr]:
lines = [line.strip() for line in doc.split("\n")]
clean_lines = []
for line in lines:
if line.startswith((":type", ":rtype")):
continue
clean_lines.append(line)
text = "\n".join(clean_lines).strip()
return ast.Expr(value=ast.Constant(text)) if text else None


def format_with_ruff(file: str) -> None:
subprocess.check_call(["python", "-m", "ruff", "format", file])


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Extract Python type stub from a python module."
)
parser.add_argument(
"module_name", help="Name of the Python module for which generate stubs"
)
parser.add_argument(
"out",
help="Name of the Python stub file to write to",
type=argparse.FileType("wt"),
)
parser.add_argument(
"--ruff", help="Formats the generated stubs using Ruff", action="store_true"
)
args = parser.parse_args()
stub_content = ast.unparse(module_stubs(importlib.import_module(args.module_name)))
args.out.write(stub_content)
if args.ruff:
format_with_ruff(args.out.name)

+ 48
- 9
apis/python/node/src/lib.rs View File

@@ -43,6 +43,7 @@ impl Node {

/// `.next()` gives you the next input that the node has received.
/// It blocks until the next event becomes available.
/// You can use timeout in seconds to return if no input is available.
/// It will return `None` when all senders has been dropped.
///
/// ```python
@@ -58,17 +59,42 @@ impl Node {
/// match event["id"]:
/// case "image":
/// ```
///
/// :type timeout: float, optional
/// :rtype: dora.PyEvent
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self, py: Python, timeout: Option<f32>) -> PyResult<Option<PyEvent>> {
let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32)));
Ok(event)
}

/// You can iterate over the event stream with a loop
///
/// ```python
/// for event in node:
/// match event["type"]:
/// case "INPUT":
/// match event["id"]:
/// case "image":
/// ```
///
/// :rtype: dora.PyEvent
pub fn __next__(&mut self, py: Python) -> PyResult<Option<PyEvent>> {
let event = py.allow_threads(|| self.events.recv(None));
Ok(event)
}

/// You can iterate over the event stream with a loop
///
/// ```python
/// for event in node:
/// match event["type"]:
/// case "INPUT":
/// match event["id"]:
/// case "image":
/// ```
///
/// :rtype: dora.PyEvent
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
@@ -78,14 +104,20 @@ impl Node {
/// ```python
/// Args:
/// output_id: str,
/// data: Bytes|Arrow,
/// data: pyarrow.Array,
/// metadata: Option[Dict],
/// ```
///
/// ex:
///
/// ```python
/// node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})
/// ```
///
/// :type output_id: str
/// :type data: pyarrow.Array
/// :type metadata: dict, optional
/// :rtype: None
pub fn send_output(
&mut self,
output_id: String,
@@ -116,10 +148,17 @@ impl Node {
/// Returns the full dataflow descriptor that this node is part of.
///
/// This method returns the parsed dataflow YAML file.
///
/// :rtype: dict
pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result<PyObject> {
pythonize::pythonize(py, self.node.dataflow_descriptor())
}

/// Merge an external event stream with dora main loop.
/// This currently only work with ROS2.
///
/// :type subscription: dora.Ros2Subscription
/// :rtype: None
pub fn merge_external_events(
&mut self,
subscription: &mut Ros2Subscription,
@@ -197,21 +236,21 @@ impl Node {
}

/// Start a runtime for Operators
///
/// :rtype: None
#[pyfunction]
pub fn start_runtime() -> eyre::Result<()> {
dora_runtime::main().wrap_err("Dora Runtime raised an error.")
}

#[pymodule]
fn dora(py: Python, m: &PyModule) -> PyResult<()> {
fn dora(_py: Python, m: &PyModule) -> PyResult<()> {
dora_ros2_bridge_python::create_dora_ros2_bridge_module(m)?;
m.add_function(wrap_pyfunction!(start_runtime, m)?)?;
m.add_class::<Node>().unwrap();

let ros2_bridge = PyModule::new(py, "ros2_bridge")?;
dora_ros2_bridge_python::create_dora_ros2_bridge_module(ros2_bridge)?;
let experimental = PyModule::new(py, "experimental")?;
experimental.add_submodule(ros2_bridge)?;
m.add_submodule(experimental)?;
m.add_class::<Node>()?;
m.add_class::<PyEvent>()?;
m.setattr("__version__", env!("CARGO_PKG_VERSION"))?;
m.setattr("__author__", "Dora-rs Authors")?;

Ok(())
}

+ 12
- 1
apis/python/operator/src/lib.rs View File

@@ -3,14 +3,18 @@ use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters};
use eyre::{Context, Result};
use pyo3::{exceptions::PyLookupError, prelude::*, types::PyDict};

/// Dora Event
#[pyclass]
pub struct PyEvent {
event: MergedEvent<PyObject>,
data: Option<ArrayRef>,
}

// Dora Event
#[pymethods]
impl PyEvent {
///
/// :rtype: dora.PyObject
pub fn __getitem__(&self, key: &str, py: Python<'_>) -> PyResult<Option<PyObject>> {
if key == "kind" {
let kind = match &self.event {
@@ -35,7 +39,14 @@ impl PyEvent {
};
Ok(value)
}
MergedEvent::External(event) => event.call_method1(py, "__getitem__", (key,)).map(Some),
MergedEvent::External(event) => {
let value = match key {
"value" => event,
_ => todo!(),
};

Ok(Some(value.clone()))
}
}
}



+ 4
- 7
examples/python-ros2-dataflow/random_turtle.py View File

@@ -1,23 +1,20 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import dora
from dora import Node
from dora import Node, Ros2Context, Ros2NodeOptions, Ros2QosPolicies

CHECK_TICK = 50

# Create a ROS2 Context
ros2_context = dora.experimental.ros2_bridge.Ros2Context()
ros2_context = Ros2Context()
ros2_node = ros2_context.new_node(
"turtle_teleop",
"/ros2_demo",
dora.experimental.ros2_bridge.Ros2NodeOptions(rosout=True),
Ros2NodeOptions(rosout=True),
)

# Define a ROS2 QOS
topic_qos = dora.experimental.ros2_bridge.Ros2QosPolicies(
reliable=True, max_blocking_time=0.1
)
topic_qos = Ros2QosPolicies(reliable=True, max_blocking_time=0.1)

# Create a publisher to cmd_vel topic
turtle_twist_topic = ros2_node.create_topic(


+ 1
- 0
libraries/core/src/descriptor/mod.rs View File

@@ -17,6 +17,7 @@ mod validate;
mod visualize;
pub const SHELL_SOURCE: &str = "shell";

/// Dataflow description
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Descriptor {


+ 135
- 1
libraries/extensions/ros2-bridge/python/src/lib.rs View File

@@ -11,7 +11,7 @@ use arrow::{
pyarrow::{FromPyArrow, ToPyArrow},
};
use dora_ros2_bridge_msg_gen::types::Message;
use eyre::{eyre, Context, ContextCompat};
use eyre::{eyre, Context, ContextCompat, Result};
use futures::{Stream, StreamExt};
use pyo3::{
prelude::{pyclass, pymethods},
@@ -23,6 +23,27 @@ use typed::{deserialize::StructDeserializer, TypeInfo, TypedValue};
pub mod qos;
pub mod typed;

/// ROS2 Context holding all messages definition for receiving and sending messages to ROS2.
///
/// By default, Ros2Context will use env `AMENT_PREFIX_PATH` to search for message definition.
///
/// AMENT_PREFIX_PATH folder structure should be the following:
///
/// - For messages: <namespace>/msg/<name>.msg
/// - For services: <namespace>/srv/<name>.srv
///
/// You can also use `ros_paths` if you don't want to use env variable.
///
/// warning::
/// dora Ros2 bridge functionality is considered **unstable**. It may be changed
/// at any point without it being considered a breaking change.
///
/// ```python
/// context = Ros2Context()
/// ```
///
/// :type ros_paths: typing.List[str], optional
///
#[pyclass]
pub struct Ros2Context {
context: ros2_client::Context,
@@ -31,8 +52,18 @@ pub struct Ros2Context {

#[pymethods]
impl Ros2Context {
/// Create a new context
#[new]
pub fn new(ros_paths: Option<Vec<PathBuf>>) -> eyre::Result<Self> {
Python::with_gil(|py| -> Result<()> {
let warnings = py
.import("warnings")
.wrap_err("failed to import `warnings` module")?;
warnings
.call_method1("warn", ("dora-rs ROS2 Bridge is unstable and may change at any point without it being considered a breaking change",))
.wrap_err("failed to call `warnings.warn` module")?;
Ok(())
})?;
let ament_prefix_path = std::env::var("AMENT_PREFIX_PATH");
let empty = String::new();

@@ -72,6 +103,23 @@ impl Ros2Context {
}

/// Create a new ROS2 node
///
/// ```python
/// ros2_node = ros2_context.new_node(
/// "turtle_teleop",
/// "/ros2_demo",
/// Ros2NodeOptions(rosout=True),
/// )
/// ```
///
/// warning::
/// dora Ros2 bridge functionality is considered **unstable**. It may be changed
/// at any point without it being considered a breaking change.
///
/// :type name: str
/// :type namespace: str
/// :type options: dora.Ros2NodeOptions
/// :rtype: dora.Ros2Node
pub fn new_node(
&self,
name: &str,
@@ -90,6 +138,14 @@ impl Ros2Context {
}
}

/// ROS2 Node
///
/// warnings::
/// - dora Ros2 bridge functionality is considered **unstable**. It may be changed
/// at any point without it being considered a breaking change.
/// - There's a known issue about ROS2 nodes not being discoverable by ROS2
/// See: https://github.com/jhelovuo/ros2-client/issues/4
///
#[pyclass]
pub struct Ros2Node {
node: ros2_client::Node,
@@ -98,6 +154,18 @@ pub struct Ros2Node {

#[pymethods]
impl Ros2Node {
/// Create a ROS2 topic to connect to.
///
/// ```python
/// turtle_twist_topic = ros2_node.create_topic(
/// "/turtle1/cmd_vel", "geometry_msgs/Twist", topic_qos
/// )
/// ```
///
/// :type name: str
/// :type message_type: str
/// :type qos: dora.Ros2QosPolicies
/// :rtype: dora.Ros2Topic
pub fn create_topic(
&self,
name: &str,
@@ -126,6 +194,18 @@ impl Ros2Node {
Ok(Ros2Topic { topic, type_info })
}

/// Create a ROS2 publisher
///
/// ```python
/// pose_publisher = ros2_node.create_publisher(turtle_pose_topic)
/// ```
/// warnings:
/// - dora Ros2 bridge functionality is considered **unstable**. It may be changed
/// at any point without it being considered a breaking change.
///
/// :type topic: dora.Ros2Topic
/// :type qos: dora.Ros2QosPolicies, optional
/// :rtype: dora.Ros2Publisher
pub fn create_publisher(
&mut self,
topic: &Ros2Topic,
@@ -140,6 +220,19 @@ impl Ros2Node {
})
}

/// Create a ROS2 subscription
///
/// ```python
/// pose_reader = ros2_node.create_subscription(turtle_pose_topic)
/// ```
///
/// warnings:
/// - dora Ros2 bridge functionality is considered **unstable**. It may be changed
/// at any point without it being considered a breaking change.
///
/// :type topic: dora.Ros2Topic
/// :type qos: dora.Ros2QosPolicies, optional
/// :rtype: dora.Ros2Subscription
pub fn create_subscription(
&mut self,
topic: &Ros2Topic,
@@ -155,6 +248,9 @@ impl Ros2Node {
}
}

/// ROS2 Node Options
/// :type rosout: bool, optional
///
#[derive(Debug, Clone, Default)]
#[pyclass]
#[non_exhaustive]
@@ -178,6 +274,12 @@ impl From<Ros2NodeOptions> for ros2_client::NodeOptions {
}
}

/// ROS2 Topic
/// :type rosout: bool, optional
///
/// warnings:
/// - dora Ros2 bridge functionality is considered **unstable**. It may be changed
/// at any point without it being considered a breaking change.
#[pyclass]
#[non_exhaustive]
pub struct Ros2Topic {
@@ -185,6 +287,11 @@ pub struct Ros2Topic {
type_info: TypeInfo<'static>,
}

/// ROS2 Publisher
///
/// warnings:
/// - dora Ros2 bridge functionality is considered **unstable**. It may be changed
/// at any point without it being considered a breaking change.
#[pyclass]
#[non_exhaustive]
pub struct Ros2Publisher {
@@ -194,6 +301,27 @@ pub struct Ros2Publisher {

#[pymethods]
impl Ros2Publisher {
/// Publish a message into ROS2 topic.
///
/// Remember that the data format should respect the structure of the ROS2 message usinng an arrow Structure.
///
/// ex:
/// ```python
/// gripper_command.publish(
/// pa.array(
/// [
/// {
/// "name": "gripper",
/// "cmd": np.float32(5),
/// }
/// ]
/// ),
/// )
/// ```
///
/// :type data: pyarrow.Array
/// :rtype: None
///
pub fn publish(&self, data: &PyAny) -> eyre::Result<()> {
let pyarrow = PyModule::import(data.py(), "pyarrow")?;

@@ -228,6 +356,12 @@ impl Ros2Publisher {
}
}

/// ROS2 Subscription
///
///
/// warnings:
/// - dora Ros2 bridge functionality is considered **unstable**. It may be changed
/// at any point without it being considered a breaking change.
#[pyclass]
#[non_exhaustive]
pub struct Ros2Subscription {


+ 21
- 1
libraries/extensions/ros2-bridge/python/src/qos.rs View File

@@ -1,8 +1,19 @@
use ::dora_ros2_bridge::rustdds::{self, policy};
use pyo3::prelude::{pyclass, pymethods};

/// ROS2 QoS Policy
///
/// :type durability: dora.Ros2Durability, optional
/// :type liveliness: dora.Ros2Liveliness, optional
/// :type reliable: bool, optional
/// :type keep_all: bool, optional
/// :type lease_duration: float, optional
/// :type max_blocking_time: float, optional
/// :type keep_last: int, optional
/// :rtype: dora.Ros2QoSPolicies
///
#[derive(Debug, Clone)]
#[pyclass(get_all, set_all)]
#[pyclass]
#[non_exhaustive]
pub struct Ros2QosPolicies {
pub durability: Ros2Durability,
@@ -64,6 +75,8 @@ impl From<Ros2QosPolicies> for rustdds::QosPolicies {
}

/// DDS 2.2.3.4 DURABILITY
///
/// :rtype: dora.Ros2Durability
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[pyclass]
pub enum Ros2Durability {
@@ -73,7 +86,11 @@ pub enum Ros2Durability {
Persistent,
}

/// :type value: dora.Ros2Durability
/// :rtype: dora.Ros2Durability
impl From<Ros2Durability> for policy::Durability {
/// :type value: dora.Ros2Durability
/// :rtype: dora.Ros2Durability
fn from(value: Ros2Durability) -> Self {
match value {
Ros2Durability::Volatile => policy::Durability::Volatile,
@@ -85,6 +102,7 @@ impl From<Ros2Durability> for policy::Durability {
}

/// DDS 2.2.3.11 LIVELINESS
/// :rtype: dora.Ros2Liveliness
#[derive(Copy, Clone, Debug, PartialEq)]
#[pyclass]
pub enum Ros2Liveliness {
@@ -94,6 +112,8 @@ pub enum Ros2Liveliness {
}

impl Ros2Liveliness {
/// :type lease_duration: float
/// :rtype: dora.Ros2Liveliness
fn convert(self, lease_duration: f64) -> policy::Liveliness {
let lease_duration = if lease_duration.is_infinite() {
rustdds::Duration::INFINITE


Loading…
Cancel
Save