|
|
|
@@ -1,36 +0,0 @@ |
|
|
|
from typing import Callable |
|
|
|
from enum import Enum |
|
|
|
|
|
|
|
class DoraStatus(Enum): |
|
|
|
CONTINUE = 0 |
|
|
|
STOP = 1 |
|
|
|
|
|
|
|
class Operator: |
|
|
|
""" |
|
|
|
Example operator incrementing a counter every times its been called. |
|
|
|
|
|
|
|
The current value of the counter is sent back to dora on `counter`. |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self, counter=0): |
|
|
|
self.counter = counter |
|
|
|
|
|
|
|
def on_input( |
|
|
|
self, |
|
|
|
input_id: str, |
|
|
|
value: bytes, |
|
|
|
send_output: Callable[[str, bytes], None], |
|
|
|
): |
|
|
|
"""Handle input by incrementing count by one. |
|
|
|
|
|
|
|
Args: |
|
|
|
input_id (str): Id of the input declared in the yaml configuration |
|
|
|
value (bytes): Bytes message of the input |
|
|
|
send_output (Callable[[str, bytes]]): Function enabling sending output back to dora. |
|
|
|
""" |
|
|
|
val_len = len(value) |
|
|
|
print(f"PYTHON received input {input_id}; value length: {val_len}") |
|
|
|
send_output("counter", (self.counter % 256).to_bytes(1, "little")) |
|
|
|
self.counter = self.counter + 1 |
|
|
|
|
|
|
|
return DoraStatus.OK |