You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

operator.mdx 11 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. # 第三章:操作符
  2. **Author** : **`Leon 李扬`**
  3. 欢迎回来!在第一章:数据流中,我们学习了 `dora` 应用程序的整体蓝图以及它如何连接各个部分。在第二章:节点中,我们深入探讨了这些节点部分,并了解到每个节点通常作为一个独立的进程运行,执行特定的任务,并通过数据流 `YAML` 中定义的输入和输出进行通信。
  4. 现在,让我们探索一种在 `Node` 内部添加更多结构和可重用性的方法: `Operator` 的概念。
  5. ## 节点内部更小、可重复使用的步骤
  6. 假设你有一个负责处理图像的节点。该节点可能需要执行几个不同的步骤:
  7. 1. 将图像转换为灰度。
  8. 2. 将图像调整为特定尺寸。
  9. 3. 对图像应用过滤器。
  10. 4. 在过滤后的图像中寻找边缘。
  11. 你可以在一个节点的单一代码实现中编写所有这些逻辑。但是,如果你之后想在另一个节点或不同的数据流中重用“调整图像大小”或“应用滤镜”逻辑,你就必须复制粘贴代码,这并不理想。
  12. 这时, **`Operator`** 就派上用场了。`Operator` 是一种较小的模块化逻辑,设计用于在一种名为 `Dora Runtime Node` 的特殊节点内运行。`Operator` 允许你将单个节点的工作分解为可重用的独立步骤。
  13. 可以将节点想象成工厂车间的工作站(一栋独立的建筑),而操作符则是该工作站内的专用工具或机器。工作站(节点)是独立的单元,但其内部的工作由其工具集(操作符)完成。
  14. ## 操作符与自定义节点
  15. 让我们快速比较一下:
  16. | 特征 | 自定义节点 | Dora 运行时节点和操作符 |
  17. | :--------------- | :--------------------------------- | :------------------------------- |
  18. | **运行方式** | 独立节点 | 单进程 |
  19. | **包含** | 一段自定义的代码 | 一个或多个操作符 |
  20. | **逻辑单元** | 整个节点代码 | 单个操作符 |
  21. | **通讯**| 使用 `dora Node API` 直接与 `dora` 系统交互 | 节点运行时使用 `dora Node API`; 操作符使用 `dora Operator API` |
  22. | **可重用性** | 重用整个 `Node` 进程 | 重复使用单个操作符逻辑模块 |
  23. 因此,虽然自定义节点是其自己独立的程序( `.py` 、 `.rs` 可执行文件等),但 `Dora 运行时` 节点是一个运行 `dora-runtime` 程序的进程,然后加载和管理数据流 `YAML` 中指定的一个或多个操作员的代码。
  24. ## 在数据流 `YAML` 中定义操作符
  25. 运算符是在 `Dataflow YAML` 中节点的定义内定义的。您无需为节点的实现指定单个 `path` 或 `custom` 块,而是使用 `operators` 符块。
  26. 让我们想象一下之前的图像处理节点,现在使用操作符来实现。
  27. ```py title="dataflow.yml"
  28. nodes:
  29. # ... other nodes like camera ...
  30. # Define our image processing Node that uses Operators
  31. - id: image-processor-node # This is the ID of the Node process
  32. inputs:
  33. image: camera/image # This Node takes image data from the camera Node
  34. outputs:
  35. - processed_image # This Node outputs the final processed image
  36. # This node is a Runtime Node and will load operators
  37. operators:
  38. # First Operator: Convert to Grayscale
  39. - id: grayscale-converter # ID of the operator within THIS node
  40. source: python # How to load this operator's code
  41. path: operators/grayscale_op.py # The code file for this operator
  42. inputs:
  43. input_image: image-processor-node/image # Takes input from the NODE's image input
  44. outputs:
  45. - gray_image # Outputs grayscale image
  46. # Second Operator: Resize
  47. - id: resizer # ID of the second operator
  48. source: python
  49. path: operators/resize_op.py
  50. inputs:
  51. input_image: grayscale-converter/gray_image # Takes input from the 'gray_image' output of 'grayscale-converter' OPERATOR
  52. outputs:
  53. - resized_image # Outputs resized image
  54. # Third Operator: Apply Filter
  55. - id: filter
  56. source: python
  57. path: operators/filter_op.py
  58. inputs:
  59. input_image: resizer/resized_image # Takes input from the 'resized_image' output of 'resizer' OPERATOR
  60. outputs:
  61. - filtered_image # Outputs filtered image
  62. # Fourth Operator: Edge Detector
  63. - id: edge-detector
  64. source: python
  65. path: operators/edge_op.py
  66. inputs:
  67. input_image: filter/filtered_image # Takes input from the 'filtered_image' output of 'filter' OPERATOR
  68. outputs:
  69. - processed_image # Outputs the final processed image
  70. # Notice: the 'processed_image' output of the 'edge-detector' operator
  71. # matches the 'processed_image' output of the 'image-processor-node'.
  72. # The runtime routes this automatically.
  73. ```
  74. 需要注意的关键事项:
  75. - `nodes` 下的 `id` ( `image-processor-node` ) 是 `Node` 进程的 `ID`,和之前一样。
  76. - 这里有一个 `operators` 列表,而不是 `path` 或 `custom` 。
  77. - `operators` 列表中的每个项目定义了在此节点内运行的操作员 。
  78. - 每个 `Operator` 都有自己的 `id` ( 在此节点内唯一)、 `source` (例如 `python` 、 `shared-library` )和指向其代码的 `path` 。
  79. - 操作员也有 `inputs` 和 `outputs` 。
  80. - 连接语法仍然是 `source_id/stream_id` 。
  81. - 节点自己的 `ID`( `image-processor-node/image` ):从外部数据流进入节点的数据被路由到操作员的输入。
  82. - 同一节点内另一个 `Operator` 的 `ID`( `grayscale-converter/gray_image` ):数据在同一节点进程内的 `Operator` 之间直接流动。
  83. - 如果 `Operator` 的输出 `id` 与 `Node` 的某个 `outputs` 匹配,则该输出将自动成为该 `Node` 到外部 `Dataflow` 的输出( `edge-detector/processed_image` 变为 `image-processor-node/processed_image` )。
  84. 这允许您在单个 `Node` 进程中定义一个迷你数据流。
  85. ## 操作员如何工作(操作员 API)
  86. 就像 `Node` 的代码使用 `dora Node API` 一样,`Operator` 的代码也使用 `dora Operator API` 。这个 `API` 更简单,因为 `Operator` 不需要担心网络连接或进程管理之类的事情;`Runtime Node` 会处理这些事情。
  87. 操作员的主要工作是对事件做出反应。最常见的事件是接收输入数据。
  88. 下面是我们的一个示例运算符(`grayscale-converter` 的 `Python` 代码的简化概念图,代码片段非常简短):
  89. ```py title="operators/grayscale_op.py"
  90. # operators/grayscale_op.py
  91. from dora import DoraStatus
  92. class Operator:
  93. """Converts incoming image to grayscale."""
  94. def on_event(
  95. self,
  96. dora_event, # The event received by the operator
  97. send_output, # Function to send outputs
  98. ) -> DoraStatus:
  99. if dora_event["type"] == "INPUT":
  100. # Check if the input is the one we expect
  101. if dora_event["id"] == "input_image":
  102. image_data = dora_event["value"] # Get the incoming image data
  103. print(f"Received image on input_image.")
  104. # --- Operator's specific logic ---
  105. # (Actual image processing code goes here)
  106. grayscale_image_data = convert_to_grayscale(image_data)
  107. # ---------------------------------
  108. # Send the result to the operator's output
  109. send_output("gray_image", grayscale_image_data, dora_event["metadata"])
  110. return DoraStatus.CONTINUE # Keep running
  111. ```
  112. 在此 `Python` 代码片段中:
  113. - `Operator` 代码的结构为 `Python` 类(或 `Rust` 中的函数)。
  114. - 它有一个 `on_event` 方法(或函数),每当发生与该操作符有关的事件(例如接收输入或停止信号)时, `dora-runtime` 就会调用该方法。
  115. - `dora_event` 字典包含有关事件的信息(其 `type` 、 `id` 、 `value` 等。我们将在下一章介绍**事件流** )。
  116. - `send_output` 函数(或方法)由运行时提供,供 `Operator` 在其定义的输出上发送数据。它接受输出 `id` 、 `data` 以及可选的 `metadata` 。
  117. 这个模式( `on_event -> process -> send_output` )是 `Operator` 逻辑的核心。
  118. ## 底层:运行时节点和操作符
  119. 当您使用 `dora run` 和定义了带有操作符的运行时节点的 `Dataflow YAML` 时,下面是发生情况的简化视图:
  120. 1. `dora CLI` 和 `Dora Daemon / Dora Coordinator` 读取数据流 `YAML`。
  121. 2. 他们将 `image-processor-node` 标识为一个节点。
  122. 3. 他们看到它有一个 `operators` 块,表明它是一个运行时节点。
  123. 4. 它们为该节点启动一个单独的进程 。该进程运行 `dora-runtime` 可执行文件。
  124. 5. `dora-runtime` 读取 `image-processor-node` 块中的 Operator 定义。
  125. 6. 它根据 `source` 和 `path` 加载 `grayscale-converter` 、 `resizer` 、 `filter` 和 `edge-detector` 的代码。
  126. 7. 它建立了沟通渠道:
  127. - 从外部数据流的 `camera/image` 到 `image-processor-node` 的 image 输入。
  128. - 内部,从运行时节点的 `image` 输入到 `grayscale-converter` 操作器的 input_image 输入。
  129. - 在内部,从 `grayscale-converter/gray_image` 输出到 `resizer/input_image` 输入。
  130. - 内部,从 `resizer/resized_image` 输出到 `filter/input_image` 输入。
  131. - 内部,从 `filter/filtered_image` 输出到 `edge-detector/input_image` 输入。
  132. - 从 `edge-detector/processed_image` 输出到外部 `Dataflow` 的 `image-processor-node/processed_image` 输出。
  133. 8. 然后, `dora-runtime` 进入事件循环,从外部 `Dataflow` 接收事件(例如新图片),并根据 YAML 中定义的内部连接,将它们分发给相关的 `Operator`。当一个 `Operator` 生成输出时,运行时会将其内部路由到另一个 `Operator`,或者将其作为 `Node` 输出发送出去。
  134. **这是一个简化的序列图:**
  135. ![序列图](/operator01.png)
  136. 这显示了数据如何流入单个运行时节点进程,通过运行时在操作员之间内部传递,然后作为节点输出流出。
  137. ## 总结
  138. 在本章中,我们了解到 `Operator` 是运行在 `Dora Runtime` 节点进程中的模块化逻辑。它们在 `Dataflow YAML` 的 `Node` 块中定义,允许您在单个节点内构建复杂的处理流水线。`Operator` 使用 `Operator API` 接收输入并发送输出, `dora-runtime` 负责加载它们、在它们之间建立内部通信,以及在节点的外部输入/输出和 `Operator` 的输入/输出之间路由数据。这提供了一种创建可复用处理步骤的方法。
  139. 现在我们了解了节点和操作符,让我们看看数据在系统中移动的基本概念: **事件流** 。

Dora中文社区官网

Contributors (1)