|
- use crate::{
- config::{CommunicationConfig, Input, InputMapping, NodeRunConfig},
- id::{DataId, NodeId, OperatorId},
- };
- use schemars::JsonSchema;
- use serde::{Deserialize, Serialize};
- use serde_with_expand_env::with_expand_envs;
- use std::{
- collections::{BTreeMap, BTreeSet},
- fmt,
- path::PathBuf,
- };
-
- pub const SHELL_SOURCE: &str = "shell";
- pub const DYNAMIC_SOURCE: &str = "dynamic";
-
- /// Dataflow description
- #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
- #[serde(deny_unknown_fields)]
- #[schemars(title = "dora-rs specification")]
- pub struct Descriptor {
- #[schemars(skip)]
- #[serde(default)]
- pub communication: CommunicationConfig,
- #[schemars(skip)]
- #[serde(default, rename = "_unstable_deploy")]
- pub deploy: Deploy,
- pub nodes: Vec<Node>,
- }
-
- #[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
- #[serde(deny_unknown_fields)]
- pub struct Deploy {
- pub machine: Option<String>,
- }
-
- /// Dora Node
- #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
- #[serde(deny_unknown_fields)]
- pub struct Node {
- /// Node identifier
- pub id: NodeId,
- /// Node name
- pub name: Option<String>,
- /// Description of the node
- pub description: Option<String>,
- /// Environment variables
- pub env: Option<BTreeMap<String, EnvValue>>,
-
- /// Unstable machine deployment configuration
- #[schemars(skip)]
- #[serde(default, rename = "_unstable_deploy")]
- pub deploy: Deploy,
-
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub operators: Option<RuntimeNode>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub custom: Option<CustomNode>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub operator: Option<SingleOperatorDefinition>,
-
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub path: Option<String>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub args: Option<String>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub build: Option<String>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub send_stdout_as: Option<String>,
- #[serde(default)]
- pub inputs: BTreeMap<DataId, Input>,
- #[serde(default)]
- pub outputs: BTreeSet<DataId>,
- }
-
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct ResolvedNode {
- pub id: NodeId,
- pub name: Option<String>,
- pub description: Option<String>,
- pub env: Option<BTreeMap<String, EnvValue>>,
-
- #[serde(default)]
- pub deploy: ResolvedDeploy,
-
- #[serde(flatten)]
- pub kind: CoreNodeKind,
- }
-
- #[derive(Debug, Clone, Default, Serialize, Deserialize)]
- pub struct ResolvedDeploy {
- pub machine: String,
- }
-
- #[derive(Debug, Clone, Serialize, Deserialize)]
- #[serde(rename_all = "lowercase")]
- pub enum CoreNodeKind {
- /// Dora runtime node
- #[serde(rename = "operators")]
- Runtime(RuntimeNode),
- Custom(CustomNode),
- }
-
- #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
- #[serde(transparent)]
- pub struct RuntimeNode {
- pub operators: Vec<OperatorDefinition>,
- }
-
- #[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
- pub struct OperatorDefinition {
- pub id: OperatorId,
- #[serde(flatten)]
- pub config: OperatorConfig,
- }
-
- #[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
- pub struct SingleOperatorDefinition {
- /// ID is optional if there is only a single operator.
- pub id: Option<OperatorId>,
- #[serde(flatten)]
- pub config: OperatorConfig,
- }
-
- #[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
- pub struct OperatorConfig {
- pub name: Option<String>,
- pub description: Option<String>,
-
- #[serde(default)]
- pub inputs: BTreeMap<DataId, Input>,
- #[serde(default)]
- pub outputs: BTreeSet<DataId>,
-
- #[serde(flatten)]
- pub source: OperatorSource,
-
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub build: Option<String>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub send_stdout_as: Option<String>,
- }
-
- #[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
- #[serde(rename_all = "kebab-case")]
- pub enum OperatorSource {
- SharedLibrary(String),
- Python(PythonSource),
- #[schemars(skip)]
- Wasm(String),
- }
- #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
- #[serde(from = "PythonSourceDef", into = "PythonSourceDef")]
- pub struct PythonSource {
- pub source: String,
- pub conda_env: Option<String>,
- }
-
- #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
- #[serde(untagged)]
- pub enum PythonSourceDef {
- SourceOnly(String),
- WithOptions {
- source: String,
- conda_env: Option<String>,
- },
- }
-
- impl From<PythonSource> for PythonSourceDef {
- fn from(input: PythonSource) -> Self {
- match input {
- PythonSource {
- source,
- conda_env: None,
- } => Self::SourceOnly(source),
- PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
- }
- }
- }
-
- impl From<PythonSourceDef> for PythonSource {
- fn from(value: PythonSourceDef) -> Self {
- match value {
- PythonSourceDef::SourceOnly(source) => Self {
- source,
- conda_env: None,
- },
- PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
- }
- }
- }
-
- #[derive(Debug, Serialize, Deserialize, Clone)]
- #[serde(deny_unknown_fields)]
- pub struct PythonOperatorConfig {
- pub path: PathBuf,
- #[serde(default)]
- pub inputs: BTreeMap<DataId, InputMapping>,
- #[serde(default)]
- pub outputs: BTreeSet<DataId>,
- }
-
- #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
- pub struct CustomNode {
- /// Path of the source code
- ///
- /// If you want to use a specific `conda` environment.
- /// Provide the python path within the source.
- ///
- /// source: /home/peter/miniconda3/bin/python
- ///
- /// args: some_node.py
- ///
- /// Source can match any executable in PATH.
- pub source: String,
- /// Args for the executable.
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub args: Option<String>,
- /// Environment variables for the custom nodes
- ///
- /// Deprecated, use outer-level `env` field instead.
- pub envs: Option<BTreeMap<String, EnvValue>>,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- pub build: Option<String>,
- /// Send stdout and stderr to another node
- #[serde(skip_serializing_if = "Option::is_none")]
- pub send_stdout_as: Option<String>,
-
- #[serde(flatten)]
- pub run_config: NodeRunConfig,
- }
-
- #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
- #[serde(untagged)]
- pub enum EnvValue {
- #[serde(deserialize_with = "with_expand_envs")]
- Bool(bool),
- #[serde(deserialize_with = "with_expand_envs")]
- Integer(u64),
- #[serde(deserialize_with = "with_expand_envs")]
- String(String),
- }
-
- impl fmt::Display for EnvValue {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- match self {
- EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()),
- EnvValue::Integer(u64) => fmt.write_str(&u64.to_string()),
- EnvValue::String(str) => fmt.write_str(str),
- }
- }
- }
|