Python API
Operator
The operator API is a framework for you to implement. The implemented operator will be managed by dora. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using dora.
An operator requires an on_input method and requires to return a DoraStatus of 0 or 1, depending of it needs to continue or stop.
class Operator:
def on_input(
self,
input_id: str,
value: bytes,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
For Python, we recommend to allocate the operator on a single runtime. A runtime will share the same GIL with several operators making those operators run almost sequentially. See: https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks
Try it out!
- Create an operator python file called
op.py:
from typing import Callable
from enum import Enum
class DoraStatus(Enum):
CONTINUE = 0
STOP = 1
class Operator:
"""
Example operator incrementing a counter every times its been called.
The current value of the counter is sent back to dora on `counter`.
"""
def __init__(self, counter=0):
self.counter = counter
def on_input(
self,
input_id: str,
value: bytes,
send_output: Callable[[str, bytes], None],
):
"""Handle input by incrementing count by one.
Args:
input_id (str): Id of the input declared in the yaml configuration
value (bytes): Bytes message of the input
send_output (Callable[[str, bytes]]): Function enabling sending output back to dora.
"""
val_len = len(value)
print(f"PYTHON received input {input_id}; value length: {val_len}")
send_output("counter", (self.counter % 256).to_bytes(1, "little"))
self.counter = self.counter + 1
return DoraStatus.OK
- Link it in your graph as:
{{#include ../../binaries/coordinator/examples/graphs/mini-dataflow.yml:67:73}}
Custom Node
The custom node API allow you to integrate dora into your application. It allows you to retrieve input and send output in any fashion you want.
Node()
Node() initiate a node from environment variables set by dora-coordinator
from dora import Node()
node = Node()
.next() or __next__() as an iterator
.next() gives you the next input that the node has received. It blocks until the next input becomes available. It will return None when all senders has been dropped.
input_id, value = node.next()
# or
for input_id, value in node:
.send_output(output_id, data)
send_output send data from the node.
node.send_output("string", b"string")
Try it out!
- Install python node API:
cd apis/python/node
python3 -m venv .env
source .env/bin/activate
pip install maturin
maturin develop
- Create a python file called
printer.py:
from dora import Node
node = Node()
for id, value in node:
print(f"From Python, id: {id}, value: {value}") if value is not [] else None
print("printer finished")
- Link it in your graph as:
- id: python-printer
custom:
run: python examples/nodes/python/printer.py
inputs:
string: static-string/string
time2: rust-timer/time