|
- # Dora 数据流节点终极指南
-
- ## 引言:欢迎来到 Dora 的自动化工厂
-
- 欢迎您,未来的数据流架构师!请将 **`dora`** 想象成一个高度智能的自动化工厂,而您,就是这家工厂的总设计师。您的核心任务是设计一条或多条高效的**流水线 (Dataflow)**,用于处理、转换和传输数据。
-
- 在这条流水线上,每一个独立的工作站,我们称之为 **节点 (Node)**。
-
- ### 什么是节点 (Node)?
-
- 简单来说,一个节点就是一个**独立的工人**。它具备以下特征:
-
- * **身份**:拥有唯一的**工牌号** (`id`) 和供人阅读的**职位描述**。
- * **技能**:掌握一项特定的**技能** (`path` 或 `operator`),即它要执行的程序逻辑。
- * **协作**:能从上游工位**接收零件** (`inputs`),并在完成自己的工作后,将**新产品**放到传送带上,交给下游工位 (`outputs`)。
-
- 您的全部设计工作,都将体现在一份名为 `dataflow.yml` 的**工厂总蓝图**中。这份文档将是您学习如何绘制这份蓝图的终极指南。
-
- 根据蓝图的设计,工人主要有两种工作模式:
-
- 1. **独立工人 (Standalone Process Node)**: 单独在一个工位上工作,拥有独立的进程空间。
- 2. **工人团队 (Runtime Node)**: 一位**工位管理员**带领着一队**操作符 (Operators)** 在一个大的共享工位里高效协作。
-
- 现在,让我们深入每一个细节,解构节点的全部属性。
-
-
- ## 节点属性深度解析
-
- 我们将节点的所有属性划分为几个逻辑部分,并对每个属性进行“通俗讲解”和“参数详解”两个层面的分析。
-
- ### 第一部分:身份与元数据 (Identity & Metadata)
-
- 这些属性定义了工人的基本信息。
-
- #### `id`
-
- * **通俗讲解** 📛
- 每个工人都必须有一个独一无二的“工牌号”,就像我们的身份证号码一样。这是工厂调度系统识别和找到他的唯一凭证。
- ```yaml
- id: camera_source_node
- ```
- * **参数详解**
- * **类型**: `NodeId` (String)
- * **含义**: 节点的唯一标识符,是其在数据流拓扑中的主键。
- * **限制条件**: **必需字段**;在整个数据流中必须**唯一**;禁止包含正斜杠 (`/`) 字符。
- * **配合参数**: 被其他节点的 `inputs` 字段引用,用于建立数据流连接。
-
- #### `name` & `description`
-
- * **通俗讲解** 💬
- 除了工牌号,我们还可以给工人起一个好听的“花名” (`name`),并写一份详细的“职位描述” (`description`)。这不会改变工人的技能,但能让工厂蓝图更容易被人类读懂。
- ```yaml
- name: "高清摄像头画面采集器"
- description: "负责从 RealSense D435 深度相机捕获 1080p 的视频流。"
- ```
- * **参数详解**
- * **类型**: `Option<String>`
- * **含义**: `name` 是简短的可读名称,`description` 是详细的功能说明。主要用于增强文档、日志和监控的可读性。
- * **限制条件**: 均为可选字段,不要求唯一性。
-
-
- ### 第二部分:执行与行为 (Execution & Behavior)
-
- 这些属性决定了工人如何工作,以及他具体执行什么任务。
-
- #### `path`
-
- * **通俗讲解** 📜
- 这是“独立工人”的“技能手册”。它直接告诉工人应该运行哪个程序或脚本。如果一个节点有 `path`,它就是一个独立工作的工人。
- ```yaml
- path: nodes/video_processing.py
- ```
- * **参数详解**
- * **类型**: `Option<String>`
- * **含义**: 定义独立进程节点的可执行文件或脚本的路径。
- * **限制条件**: 与 `operators` 和 `operator` 字段**互斥**。
- * **配合参数**: `args`, `env`, `build`。若与 `git` 配合,此路径通常相对于 Git 仓库的根目录。
-
- #### `args`
-
- * **通俗讲解** 🗣️
- 给工人下达的“临时指令”。就像你告诉他:“嘿,今天的工作,请按照'加急模式'来办!”
- ```yaml
- args: "--mode fast --quality high"
- ```
- * **参数详解**
- * **类型**: `Option<String>`
- * **含义**: 传递给 `path` 指定的可执行文件的命令行参数字符串。
- * **限制条件**: 仅对定义了 `path` 的独立进程节点有效。
-
- #### `env`
-
- * **通俗讲解** 📋
- 工厂里的“共享公告板”。你可以在上面写一些所有工人都需要知道的信息,比如“今天的生产目标:1000件”或“紧急出口在此方向”。
- ```yaml
- env:
- API_KEY: "a_very_secret_key"
- MAX_RETRIES: 5
- ```
- * **参数详解**
- * **类型**: `Option<BTreeMap<String, EnvValue>>`
- * **含义**: 为节点的构建和执行环境设置环境变量。
- * **限制条件**: `EnvValue` 支持字符串、布尔值和数字。是传递敏感信息(如密钥)的最佳方式。
-
- #### `operators` & `operator`
-
- * **通俗讲解** 👨👩👧👦
- 这是“工人团队”模式的核心。一个节点如果没有 `path`,但有 `operators` 或 `operator`,那它就不是一个干活的工人,而是一个“工位管理员”(运行时节点)。他负责管理一整个**操作符 (Operator)** 团队在同一个大工位里高效协作。`operator` 是 `operators` 只有一个成员时的简化写法。
- ```yaml
- # 一个管理员,带领两个操作符
- operators:
- - id: detector
- python: detect.py
- - id: tracker
- python: track.py
- ```
- * **参数详解**
- * **类型**: `Option<RuntimeNode>` / `Option<SingleOperatorDefinition>`
- * **含义**: 定义在运行时节点内部署的一个或多个轻量级**操作符**。它的存在标志着该节点是一个操作符宿主。
- * **限制条件**: 与 `path` 字段**互斥**;`operators` 和 `operator` 自身也**互斥**。
-
-
- ### 第三部分:数据流连接 (Dataflow Connectivity)
-
- 这些属性定义了传送带,让零件在工人之间流动。
-
- #### `outputs`
-
- * **通俗讲解** 📤
- 工人在这里“晒出”他能生产的所有产品类型清单。比如:“我能生产'切割好的钢板'和'废料'两种东西。”
- ```yaml
- outputs:
- - processed_video
- - detected_objects
- ```
- * **参数详解**
- * **类型**: `BTreeSet<DataId>` (Set of Strings)
- * **含义**: 声明该节点可以产生的所有数据输出流的唯一标识符 (ID)。
- * **限制条件**: 必需,但可为空 `[]`。下游节点的 `inputs` 必须引用此列表中声明的 ID。
-
- #### `inputs`
-
- * **通俗讲解** 📥
- 工人在这里“下订单”,说明他需要哪些上游工人的哪些产品来完成自己的工作。这是连接流水线的关键。
- ```yaml
- # 我需要一个叫 `raw_video` 的零件,它来自 `camera_node` 工人生产的 `video_stream`
- inputs:
- raw_video: camera_source_node/video_stream
- ```
- * **参数详解**
- * **类型**: `BTreeMap<DataId, Input>` (Map)
- * **含义**: 定义节点的输入订阅。将一个本地输入 ID 映射到一个上游输出。
- * **语法**: `local_input_id: upstream_node_id/upstream_output_id`。
- * **限制条件**: 引用的上游节点 ID 和输出 ID 必须在数据流中真实存在。
-
- #### `send_stdout_as`
-
- * **通俗讲解** 🎤
- 给工人安装一个“工作日志麦克风”。它能捕获工人在工作时的所有“碎碎念”(程序打印的日志),并把这些“声音”也当作一种产品放到传送带上,供其他工人(如质检员)分析。
- ```yaml
- outputs:
- - main_product
- - work_logs # 别忘了在 outputs 里也声明这个日志产品
- send_stdout_as: work_logs
- ```
- * **参数详解**
- * **类型**: `Option<String>`
- * **含义**: 将节点的标准输出 (`stdout`) 和标准错误 (`stderr`) 流重定向到一个具名的数据流输出通道。
- * **限制条件**: 此处指定的名称**必须**同时在 `outputs` 列表中声明。
- * **配合参数**: `outputs`。
-
-
- ### 第四部分:源码管理与构建 (Source Management & Build)
-
- 这些属性让你的工厂实现了“云采购”和“岗前自动化培训”。
-
- #### `git`, `branch`, `tag`, `rev`
-
- * **通俗讲解** 📚
- 与其把技能手册 (`path`) 手动放到每个工位,不如让工人直接从中央“技能图书馆” (`git`) 下载。你可以指定下载最新的“草稿” (`branch`),某个“发行版” (`tag`),或者精确到某一页的“修订版” (`rev`)。
- ```yaml
- git: https://github.com/dora-rs/dora.git
- tag: v0.4.0 # 推荐使用 tag 或 rev,保证版本稳定
- ```
- * **参数详解**
- * **类型**: `Option<String>`
- * **含义**: `git` 指定仓库 URL;`branch`, `tag`, `rev` 用于版本控制,分别对应分支、标签和提交哈希。
- * **限制条件**: `branch`, `tag`, `rev` **互斥**,且必须与 `git` 配合使用。
-
- #### `build`
-
- * **通俗讲解** 🛠️
- “岗前培训”。有些技能手册是“散装的零件”(源代码),需要先“组装”(编译)成可用的工具。`build` 就是这个组装指令。
- ```yaml
- build: cargo build --release
- ```
- * **参数详解**
- * **类型**: `Option<String>`
- * **含义**: 在 `dora build` 阶段执行的构建命令。
- * **限制条件**: `env` 中定义的环境变量对此命令生效。
-
- ## 源码分析
-
- 在 `dora` 的核心库中详细定义了 `Node` 节点的参数。
-
- 文件 [`libraries/message/src/descriptor.rs`](https://github.com/dora-rs/dora/blob/main/libraries/message/src/descriptor.rs#L115) 第 `115行`
-
- ```rust title="descriptor.rs"
- /// # Dora Node Configuration
- ///
- /// A node represents a computational unit in a Dora dataflow. Each node runs as a
- /// separate process and can communicate with other nodes through inputs and outputs.
- #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
- #[serde(deny_unknown_fields)]
- pub struct Node {
- pub id: NodeId,
- pub name: Option<String>,
- pub description: Option<String>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub path: Option<String>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub args: Option<String>,
- pub env: Option<BTreeMap<String, EnvValue>>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub operators: Option<RuntimeNode>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub operator: Option<SingleOperatorDefinition>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub custom: Option<CustomNode>,
- #[serde(default)]
- pub outputs: BTreeSet<DataId>,
- #[serde(default)]
- pub inputs: BTreeMap<DataId, Input>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub send_stdout_as: Option<String>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub build: Option<String>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub git: Option<String>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub branch: Option<String>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub tag: Option<String>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub rev: Option<String>,
- #[schemars(skip)]
- #[serde(rename = "_unstable_deploy")]
- pub deploy: Option<Deploy>,
- }
- ```
-
- ### 详细字段分析
-
- #### 核心身份信息 (Identity)
-
- ### `id: NodeId`
- * **作用**: **必需的唯一标识符**。这是节点在数据流图中的“身份证”,所有节点的 `id` 必须唯一,且不能包含 `/` 字符。`inputs` 字段用它来路由数据。
-
- ### `name: Option<String>`
- * **作用**: 可选的、人类可读的名称。主要用于文档和日志,以提高可读性。
-
- ### `description: Option<String>`
- * **作用**: 可选的、对节点功能的详细文字描述。进一步增强了数据流图的可读性和可维护性。
-
- #### 独立进程节点配置 (Standalone Node Configuration)
-
- ### `path: Option<String>`
- * **作用**: **定义独立进程节点的关键**。指向一个可执行文件(如 Rust 编译产物)或脚本(如 Python 脚本)的路径。`dora` 会运行这个文件。如果一个 `Node` 定义了 `path`,它就是一个独立进程节点。
-
- ### `args: Option<String>`
- * **作用**: 传递给 `path` 指定的可执行文件的命令行参数。
-
- ### `env: Option<BTreeMap<String, EnvValue>>`
- * **作用**: 为节点进程(包括构建和运行阶段)设置环境变量。
-
- #### 运行时与操作符配置 (Runtime & Operator Configuration)
-
- ### `operators: Option<RuntimeNode>`
- * **作用**: **定义运行时节点的关键**。包含一个或多个操作符(Operator)的定义。如果一个 `Node` 定义了 `operators` 字段(而没有 `path` 字段),它就成为一个运行时节点。
-
- ### `operator: Option<SingleOperatorDefinition>`
- * **作用**: `operators` 字段的便利版。当一个运行时节点只包含**一个**操作符时,可以使用此字段来简化 YAML 配置。它和 `operators` 是互斥的。
-
- #### 数据流连接 (Dataflow Connectivity)
-
- ### `outputs: BTreeSet<DataId>`
- * **作用**: **声明该节点产生的所有输出流的 ID**。这是一个集合,列出了所有该节点可能发送数据的“通道名称”。下游节点通过这些 ID 来订阅数据。
-
- ### `inputs: BTreeMap<DataId, Input>`
- * **作用**: **定义该节点的输入流连接**。这是一个映射表,key 是本节点内部使用的输入名称 (`input_id`),value (`Input`) 则指明了该输入来源于哪个上游节点 (`source_node_id`) 的哪个输出 (`source_node_output_id`)。这是构建数据流图(DAG)连接关系的核心。
-
- ### `send_stdout_as: Option<String>`
- * **作用**: 一个非常实用的功能,可以将节点进程的标准输出(stdout)和标准错误(stderr)重定向到一个 `dora` 的数据输出流中。这使得日志和调试信息可以像普通数据一样在数据流中传递。
-
- #### 构建与源码管理 (Build & Source Management)
-
- ### `build: Option<String>`
- * **作用**: 定义在 `dora build` 期间需要执行的构建命令。例如 `cargo build` 或 `pip install`。这使得 `dora` 可以直接从源代码构建节点。
-
- ### `git: Option<String>`
- * **作用**: 指定一个 Git 仓库地址。`dora build` 会自动克隆这个仓库,使得数据流可以轻松分发和复现。
-
- ### `branch: Option<String>`, `tag: Option<String>`, `rev: Option<String>`
- * **作用**: 与 `git` 字段配合使用,用于检出(checkout)指定的 Git 分支、标签或提交版本。这确保了构建的一致性。
-
- #### 其他字段
-
- ### `custom: Option<CustomNode>`
- * **作用**: **已弃用 (deprecated)** 的旧版字段。源码注释明确指出应使用顶层的 `path`, `args` 等字段。
-
- ### `deploy: Option<Deploy>`
- * **作用**: 用于机器部署的非稳定(unstable)配置。从 `#[schemars(skip)]` 属性看,它不包含在公开的配置 schema 中,属于内部或实验性功能。
-
- ---
-
- ### 技术方案总结
-
- 这份最新的 `Node` 定义展示了一个成熟且高度灵活的设计方案:
-
- 1. **声明式与配置驱动**: 整个数据流图完全通过 YAML 文件进行声明式定义。`Node` 结构体是这个声明的核心,它是一个纯粹的数据容器,可以被 `serde` 完美地解析。
- 2. **隐式模式切换**: 通过检查 `path` 与 `operators`/`operator` 字段的存在与否,`dora` 能够智能地推断出节点是应作为独立进程运行,还是作为轻量级操作符的宿主运行。这种设计使得 YAML 语法非常直观和扁平。
- 3. **构建与源码集成**: 通过 `git` 和 `build` 字段,`dora` 将节点的源码管理和构建过程无缝集成到其工作流中,极大地提升了项目的可移植性和可复现性。
- 4. **清晰的关注点分离**:
- * **身份 (who)**: `id`, `name`, `description`
- * **行为 (what)**: `path` 或 `operators`
- * **连接 (how)**: `inputs`, `outputs`
- * **准备 (prepare)**: `build`, `git`
-
- ## 最佳实践与最终建议
-
- 现在你已经是 dora 节点配置专家了!一个全功能的节点定义可能看起来像下面这样,但别怕,你已经理解了每一部分!
-
- ```yaml title="dataflow.yml"
- # 这是一个 dora 数据流的综合示例文件 (dataflow.yml)
- # 它展示了如何使用 Node 的各种属性来定义一个复杂的多节点应用。
-
- nodes:
- # --------------------------------------------------------------------------
- # 节点 1: 独立进程节点 (Standalone Rust Node) - 从 Git 构建
- # 职责: 模拟从摄像头捕获视频帧。
- # 特点: 使用 git, tag, build, env, 和 send_stdout_as 字段。
- # --------------------------------------------------------------------------
- - id: video_capture
- name: "Camera Capture Node (Rust)"
- description: "从 Git 仓库构建的 Rust 节点,模拟捕获视频帧并发送。同时,它的标准输出会被重定向为 'log_stream' 输出。"
-
- # 源码管理: 指定从哪个 Git 仓库获取源代码
- git: https://github.com/dora-rs/dora.git
- # 版本控制: 锁定到 v0.4.0 标签,确保构建的可复现性
- tag: v0.4.0
-
- # 构建命令: 在 dora build 期间执行的命令
- build: cargo build --release -p camera-source
-
- # 环境变量: 为构建和运行环境设置环境变量
- env:
- RUST_LOG: info
- CAMERA_ID: 0
-
- # 运行配置: 构建完成后,要执行的程序路径
- # 注意:路径是相对于 git 仓库的根目录
- path: target/release/camera-source
-
- # 输出: 声明此节点会产生两个输出流
- outputs:
- - video_frame # 主要的数据输出
- - log_stream # 用于接收标准输出的流
-
- # 特殊功能: 将此节点的 stdout 和 stderr 重定向到名为 'log_stream' 的输出流
- send_stdout_as: log_stream
-
- # --------------------------------------------------------------------------
- # 节点 2: 独立进程节点 (Standalone Python Node)
- # 职责: 调整接收到的视频帧尺寸。
- # 特点: 使用 args, inputs, 和 branch 字段 (演示与 tag 的不同)。
- # --------------------------------------------------------------------------
- - id: frame_resizer
- name: "Image Resizer (Python)"
- description: "一个 Python 节点,用于调整图像大小。从开发分支获取代码。"
-
- # 使用同一个 git 仓库,但切换到主开发分支
- git: https://github.com/dora-rs/dora.git
- branch: main
-
- # 运行配置: 直接指定 Python 脚本路径和命令行参数
- path: examples/python-dataflow/nodes/resize.py
- args: "--width 640 --height 480"
-
- # 输入: 定义数据来源
- # 'image' 输入流 -> 连接到 'video_capture' 节点的 'video_frame' 输出
- inputs:
- image: video_capture/video_frame
-
- # 输出: 声明会产生一个名为 'resized_image' 的输出
- outputs:
- - resized_image
-
- # --------------------------------------------------------------------------
- # 节点 3: 运行时节点 (Runtime Node) - 托管多个操作符
- # 职责: 作为宿主进程,运行两个轻量级操作符。
- # 特点: 使用 operators 字段,没有 path 字段。
- # --------------------------------------------------------------------------
- - id: processing_hub
- name: "AI & Data Processing Hub"
- description: "这是一个运行时节点,它本身不执行逻辑,而是为其内部的操作符提供运行环境。"
-
- # 注意:没有 path, args, build 等字段,因为它不是一个独立的进程节点。
-
- # 操作符定义:
- operators:
- # 运行时内的第一个操作符
- - id: yolo_detector
- name: "YOLOv8 Detection Operator"
- description: "对输入的图片运行目标检测算法。"
- # 操作符的实现代码
- python: path/to/yolo_operator.py
- # 操作符的输入
- inputs:
- image: frame_resizer/resized_image
- # 操作符的输出
- outputs:
- - detections # 检测结果
- - metadata # 附加元数据
-
- # 运行时内的第二个操作符 (设想中的 Rust 操作符)
- - id: aggregator
- name: "Data Aggregator Operator"
- description: "聚合来自多个源的数据。"
- # 设想中的 Rust 共享库操作符
- shared_library: path/to/libaggregator.so
- # 操作符的输入
- inputs:
- # 可以订阅来自其他操作符或其他节点的数据
- yolo_meta: yolo_detector/metadata
- camera_log: video_capture/log_stream
- # 操作符的输出
- outputs:
- - summary_report
-
- # --------------------------------------------------------------------------
- # 节点 4: 最终的日志和存储节点 (Sink Node)
- # 职责: 收集所有最终结果并打印日志。
- # 特点: 拥有多个复杂的输入,是数据流的终点。
- # --------------------------------------------------------------------------
- - id: sink_logger
- name: "Final Logger"
- description: "接收所有处理结果并将其打印到控制台。"
- path: examples/python-dataflow/nodes/logger.py
-
- # 输入: 演示如何订阅来自不同节点和操作符的多个数据流
- inputs:
- detections_input: processing_hub/yolo_detector/detections
- report_input: processing_hub/aggregator/summary_report
- raw_logs_input: video_capture/log_stream
-
- # 没有 outputs 字段,因为它是数据流的终点(Sink)。
- ```
-
- 1. **ID 清晰唯一**: 这是数据流的基石,请认真命名。
- 2. **文档即代码**: 善用 `name` 和 `description`,让你的数据流蓝图自己会说话。
- 3. **拥抱版本控制**: 在生产环境中,**永远使用 `git` + `tag` 或 `rev`** 的组合,杜绝不确定性。
- 4. **配置外部化**: 尽量使用 `env` 和 `args` 传入配置,保持代码的通用性。
- 5. **调试利器**: 对于关键或复杂的节点,毫不犹豫地使用 `send_stdout_as`,它会在未来为你节省大量调试时间。
- 6. **合理选择模式**: 从简单的**独立工人**模式开始。当遇到性能瓶颈或多个节点需要紧密、低延迟通信时,再重构为**工人团队**(运行时 + 操作符)模式。
-
- 现在,您已经拥有了设计 `dora` 节点所需的所有知识,从高层概念到技术细节。是时候打开您的 `dataflow.yml`,开始构建属于您自己的、高效、智能的自动化数据工厂了!
|