|
|
|
@@ -53,14 +53,14 @@ |
|
|
|
|
|
|
|
这与我们在第二章:`Node` 中看到的结构类似,但现在重点关注 `API` 调用本身: |
|
|
|
|
|
|
|
```py |
|
|
|
from dora import Node # Import the Node class from the API binding |
|
|
|
```py title="API Binding 示例 Python 节点" |
|
|
|
from dora import Node # 从 API 绑定导入 Node 类 |
|
|
|
|
|
|
|
# 1. Initialize the node |
|
|
|
# 1. 初始化节点 |
|
|
|
node = Node() |
|
|
|
print("Python Node initialized. Waiting for events...") |
|
|
|
|
|
|
|
# 2. Enter the event loop to receive events |
|
|
|
# 2. 进入事件循环接收事件 |
|
|
|
for event in node: |
|
|
|
event_type = event["type"] |
|
|
|
event_id = event["id"] # Often the input ID |
|
|
|
@@ -68,32 +68,32 @@ for event in node: |
|
|
|
print(f"Received event: Type={event_type}, ID={event_id}") |
|
|
|
|
|
|
|
if event_type == "INPUT": |
|
|
|
# 3. Access input data and process |
|
|
|
# 3. 访问输入数据和流程 |
|
|
|
if event_id == "my_input": |
|
|
|
input_data = event["value"] |
|
|
|
print(f" -> Processing data from input '{event_id}'...") |
|
|
|
|
|
|
|
# --- Your custom processing logic here --- |
|
|
|
# --- 您的自定义处理逻辑在这里 --- |
|
|
|
processed_data = process_data(input_data) |
|
|
|
# --------------------------------------- |
|
|
|
|
|
|
|
# 4. Send output data |
|
|
|
# 4. 发送输出数据 |
|
|
|
print(" -> Sending processed data on output 'my_output'") |
|
|
|
node.send_output("my_output", processed_data) |
|
|
|
|
|
|
|
elif event_type == "STOP": |
|
|
|
# 5. Handle stop signal |
|
|
|
# 5. 处理停止信号 |
|
|
|
print(" -> Received STOP command. Exiting loop.") |
|
|
|
break |
|
|
|
|
|
|
|
print("Python Node stopping gracefully.") |
|
|
|
|
|
|
|
# Dummy processing function for illustration |
|
|
|
# 用于说明的虚拟处理函数 |
|
|
|
def process_data(data): |
|
|
|
# In a real node, this would transform the data |
|
|
|
# 在实际节点中,这将转换数据 |
|
|
|
print(" (Doing dummy processing...)") |
|
|
|
# Often data is an Arrow Array, need to convert/process it |
|
|
|
# For simplicity, let's just return a dummy byte array |
|
|
|
# 数据通常是一个 Arrow 数组, 需要对其进行转换/处理 |
|
|
|
# 为了简单起见,我们只返回一个虚拟字节数组 |
|
|
|
return b"processed_output_data" |
|
|
|
``` |
|
|
|
|
|
|
|
@@ -107,11 +107,11 @@ def process_data(data): |
|
|
|
|
|
|
|
操作符使用稍微不同的 `API` 结构,专门设计用于 `dora-runtime` 进程。 |
|
|
|
|
|
|
|
```py |
|
|
|
```py title ="operators/my_operator.py" |
|
|
|
# operators/my_operator.py |
|
|
|
from dora import DoraStatus # Import necessary status enum |
|
|
|
from dora import DoraStatus # 导入必要的状态枚举 |
|
|
|
|
|
|
|
# The Operator logic is typically implemented in a class with specific methods |
|
|
|
# 操作符逻辑通常在具有特定方法的类中实现 |
|
|
|
class Operator: |
|
|
|
|
|
|
|
def on_event(self, dora_event, send_output): |
|
|
|
@@ -129,22 +129,22 @@ class Operator: |
|
|
|
input_data = dora_event["value"] |
|
|
|
print(f" -> Processing data from operator input '{event_id}'") |
|
|
|
|
|
|
|
# --- Your custom processing logic here --- |
|
|
|
# --- 您的自定义处理逻辑在这里 --- |
|
|
|
processed_data = process_operator_data(input_data) |
|
|
|
# --------------------------------------- |
|
|
|
|
|
|
|
# Send output data using the provided send_output function |
|
|
|
# 使用提供的 send_output 函数发送输出数据 |
|
|
|
print(" -> Sending processed data on operator output 'operator_output'") |
|
|
|
send_output("operator_output", processed_data, dora_event["metadata"]) # Operators also pass metadata |
|
|
|
send_output("operator_output", processed_data, dora_event["metadata"]) # 操作符传递元数据 |
|
|
|
|
|
|
|
elif event_type == "STOP": |
|
|
|
print(" -> Received STOP command. Operator stopping.") |
|
|
|
return DoraStatus.STOP # Return STOP status to signal shutdown |
|
|
|
return DoraStatus.STOP # 返回 STOP 状态以发出关机信号 |
|
|
|
|
|
|
|
# By default, continue running |
|
|
|
# 默认继续运行 |
|
|
|
return DoraStatus.CONTINUE |
|
|
|
|
|
|
|
# Dummy processing function for illustration |
|
|
|
# 用于说明的虚拟处理函数 |
|
|
|
def process_operator_data(data): |
|
|
|
print(" (Operator doing dummy processing...)") |
|
|
|
return b"processed_operator_data" |
|
|
|
@@ -173,7 +173,7 @@ def process_operator_data(data): |
|
|
|
|
|
|
|
这是一个简化的序列图: |
|
|
|
|
|
|
|
 |
|
|
|
 |
|
|
|
|
|
|
|
**`API 绑定`** 隐藏了这些通道和协议的复杂性,让您可以专注于应用程序逻辑。它将您语言的数据类型转换为适合 `dora` 的格式(例如 `Arrow` ),并管理高效通信所需的低级交互。 |
|
|
|
|
|
|
|
@@ -181,6 +181,6 @@ def process_operator_data(data): |
|
|
|
|
|
|
|
# 总结 |
|
|
|
|
|
|
|
`API 绑定`是必不可少的软件库,它使您能够使用熟悉的编程语言(例如 P`ython`、`Rust`、`C` 和 `C++`)为自定义 `dora` 节点和操作符编写核心逻辑。它们提供了标准化的函数工具包( `init` 、 `send_output` 、事件循环/回调),这些函数抽象了进程间通信、共享内存和事件处理的复杂性,使您可以专注于特定的应用程序任务,同时无缝集成到 `dora` 数据流中。 |
|
|
|
`API 绑定`是必不可少的软件库,它使您能够使用熟悉的编程语言(例如 `Python`、`Rust`、`C` 和 `C++`)为自定义 `dora` 节点和操作符编写核心逻辑。它们提供了标准化的函数工具包( `init` 、 `send_output` 、`事件循环/回调`),这些函数抽象了进程间通信、共享内存和事件处理的复杂性,使您可以专注于特定的应用程序任务,同时无缝集成到 `dora` 数据流中。 |
|
|
|
|
|
|
|
现在您已经了解了如何使用 `API 绑定`为各个组件编写代码,让我们缩小范围并查看用于管理和运行整个数据流的主要工具: **`Dora CLI 命令行`** 。 |