diff --git a/apis/rust/node/src/config.rs b/apis/rust/node/src/config.rs index 6130e22e..18bda253 100644 --- a/apis/rust/node/src/config.rs +++ b/apis/rust/node/src/config.rs @@ -6,7 +6,7 @@ use std::{ str::FromStr, }; -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct NodeRunConfig { #[serde(default)] diff --git a/binaries/coordinator/examples/mini-dataflow.yml b/binaries/coordinator/examples/mini-dataflow.yml index 8579a5db..9d5af611 100644 --- a/binaries/coordinator/examples/mini-dataflow.yml +++ b/binaries/coordinator/examples/mini-dataflow.yml @@ -54,5 +54,14 @@ nodes: python: ../runtime/examples/python-operator/op.py inputs: time: timer/time + test: python-operator/counter outputs: - counter + + - id: python-operator + operator: + python: ../runtime/examples/python-operator/op.py + inputs: + time: timer/time + outputs: + - counter diff --git a/binaries/coordinator/src/main.rs b/binaries/coordinator/src/main.rs index 376638a6..f6a743f0 100644 --- a/binaries/coordinator/src/main.rs +++ b/binaries/coordinator/src/main.rs @@ -1,4 +1,4 @@ -use dora_core::descriptor::{self, Descriptor, NodeKind}; +use dora_core::descriptor::{self, CoreNodeKind, Descriptor}; use dora_node_api::config::NodeId; use eyre::{bail, eyre, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; @@ -49,17 +49,21 @@ async fn main() -> eyre::Result<()> { } async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> { - let Descriptor { - mut communication, - nodes, - } = read_descriptor(&dataflow_path).await.wrap_err_with(|| { + let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| { format!( "failed to read dataflow descriptor at {}", dataflow_path.display() ) })?; - if nodes.iter().any(|n| matches!(n.kind, NodeKind::Runtime(_))) && !runtime.is_file() { + let nodes = descriptor.resolve_aliases(); + let mut communication = descriptor.communication; + + if nodes + .iter() + .any(|n| matches!(n.kind, CoreNodeKind::Runtime(_))) + && !runtime.is_file() + { bail!( "There is no runtime at {}, or it is not a file", runtime.display() @@ -74,12 +78,12 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() let node_id = node.id.clone(); match node.kind { - descriptor::NodeKind::Custom(node) => { + descriptor::CoreNodeKind::Custom(node) => { let result = spawn_custom_node(node_id.clone(), &node, &communication) .wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?; tasks.push(result); } - descriptor::NodeKind::Runtime(node) => { + descriptor::CoreNodeKind::Runtime(node) => { if !node.operators.is_empty() { let result = spawn_runtime_node(runtime, node_id.clone(), &node, &communication) diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index f6132618..0838d644 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -1,6 +1,6 @@ #![warn(unsafe_op_in_unsafe_fn)] -use dora_core::descriptor::OperatorConfig; +use dora_core::descriptor::OperatorDefinition; use dora_node_api::{ self, communication::{self, CommunicationLayer}, @@ -37,7 +37,7 @@ async fn main() -> eyre::Result<()> { .wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?; serde_yaml::from_str(&raw).context("failed to deserialize communication config")? }; - let operators: Vec = { + let operators: Vec = { let raw = std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?; serde_yaml::from_str(&raw).context("failed to deserialize operator config")? @@ -107,7 +107,7 @@ async fn main() -> eyre::Result<()> { .ok_or_else(|| eyre!("received event from unknown operator {id}"))?; match event { OperatorEvent::Output { id: data_id, value } => { - if !operator.config().outputs.contains(&data_id) { + if !operator.definition().config.outputs.contains(&data_id) { eyre::bail!("unknown output {data_id} for operator {id}"); } publish(&node_id, id, data_id, &value, communication.as_ref()) @@ -154,7 +154,7 @@ async fn main() -> eyre::Result<()> { } async fn subscribe<'a>( - operators: &'a [OperatorConfig], + operators: &'a [OperatorDefinition], communication: &'a dyn CommunicationLayer, ) -> eyre::Result + 'a> { let mut streams = Vec::new(); @@ -168,11 +168,11 @@ async fn subscribe<'a>( } async fn subscribe_operator<'a>( - operator: &'a OperatorConfig, + operator: &'a OperatorDefinition, communication: &'a dyn CommunicationLayer, ) -> Result + 'a, eyre::Error> { let stop_messages = FuturesUnordered::new(); - for input in operator.inputs.values() { + for input in operator.config.inputs.values() { let InputMapping { source, operator, .. } = input; @@ -189,7 +189,7 @@ async fn subscribe_operator<'a>( let finished = Box::pin(stop_messages.all(|_| async { true }).shared()); let mut streams = Vec::new(); - for (input, mapping) in &operator.inputs { + for (input, mapping) in &operator.config.inputs { let InputMapping { source, operator: source_operator, diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index d3c3d900..299347e2 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -1,4 +1,4 @@ -use dora_core::descriptor::{OperatorConfig, OperatorSource}; +use dora_core::descriptor::{OperatorDefinition, OperatorSource}; use dora_node_api::config::DataId; use eyre::{eyre, Context}; use std::any::Any; @@ -9,28 +9,31 @@ mod shared_lib; pub struct Operator { operator_task: Sender, - config: OperatorConfig, + definition: OperatorDefinition, } impl Operator { pub async fn init( - operator_config: OperatorConfig, + operator_definition: OperatorDefinition, events_tx: Sender, ) -> eyre::Result { let (operator_task, operator_rx) = mpsc::channel(10); - match &operator_config.source { + match &operator_definition.config.source { OperatorSource::SharedLibrary(path) => { shared_lib::spawn(path, events_tx, operator_rx).wrap_err_with(|| { format!( "failed ot spawn shared library operator for {}", - operator_config.id + operator_definition.id ) })?; } OperatorSource::Python(path) => { python::spawn(path, events_tx, operator_rx).wrap_err_with(|| { - format!("failed ot spawn Python operator for {}", operator_config.id) + format!( + "failed ot spawn Python operator for {}", + operator_definition.id + ) })?; } OperatorSource::Wasm(path) => { @@ -39,7 +42,7 @@ impl Operator { } Ok(Self { operator_task, - config: operator_config, + definition: operator_definition, }) } @@ -52,10 +55,10 @@ impl Operator { }) } - /// Get a reference to the operator's config. + /// Get a reference to the operator's definition. #[must_use] - pub fn config(&self) -> &OperatorConfig { - &self.config + pub fn definition(&self) -> &OperatorDefinition { + &self.definition } } diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 2d2240ec..e4a56171 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -2,6 +2,7 @@ use dora_node_api::config::{ CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId, }; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::fmt; use std::{ collections::{BTreeMap, BTreeSet}, @@ -18,14 +19,68 @@ pub struct Descriptor { } impl Descriptor { + pub fn resolve_aliases(&self) -> Vec { + let default_op_id = OperatorId::from("op".to_string()); + + let single_operator_nodes: HashMap<_, _> = self + .nodes + .iter() + .filter_map(|n| match &n.kind { + NodeKind::Operator(op) => Some((&n.id, op.id.as_ref().unwrap_or(&default_op_id))), + _ => None, + }) + .collect(); + + let mut resolved = vec![]; + for mut node in self.nodes.clone() { + // adjust input mappings + let input_mappings: Vec<_> = match &mut node.kind { + NodeKind::Runtime(node) => node + .operators + .iter_mut() + .flat_map(|op| op.config.inputs.values_mut()) + .collect(), + NodeKind::Custom(node) => node.run_config.inputs.values_mut().collect(), + NodeKind::Operator(operator) => operator.config.inputs.values_mut().collect(), + }; + for mapping in input_mappings { + if let Some(op_name) = single_operator_nodes.get(&mapping.source).copied() { + if mapping.operator.is_none() { + mapping.operator = Some(op_name.to_owned()); + } + } + } + + // resolve nodes + let kind = match node.kind { + NodeKind::Custom(node) => CoreNodeKind::Custom(node), + NodeKind::Runtime(node) => CoreNodeKind::Runtime(node), + NodeKind::Operator(op) => CoreNodeKind::Runtime(RuntimeNode { + operators: vec![OperatorDefinition { + id: op.id.unwrap_or_else(|| default_op_id.clone()), + config: op.config, + }], + }), + }; + resolved.push(ResolvedNode { + id: node.id, + name: node.name, + description: node.description, + kind, + }); + } + resolved + } + pub fn visualize_as_mermaid(&self) -> eyre::Result { - let flowchart = visualize::visualize_nodes(&self.nodes); + let resolved = self.resolve_aliases(); + let flowchart = visualize::visualize_nodes(&resolved); Ok(flowchart) } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Node { pub id: NodeId, pub name: Option, @@ -35,24 +90,58 @@ pub struct Node { pub kind: NodeKind, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum NodeKind { /// Dora runtime node #[serde(rename = "operators")] Runtime(RuntimeNode), Custom(CustomNode), + Operator(SingleOperatorDefinition), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ResolvedNode { + pub id: NodeId, + pub name: Option, + pub description: Option, + + #[serde(flatten)] + pub kind: CoreNodeKind, } #[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CoreNodeKind { + /// Dora runtime node + #[serde(rename = "operators")] + Runtime(RuntimeNode), + Custom(CustomNode), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(transparent)] pub struct RuntimeNode { - pub operators: Vec, + pub operators: Vec, } #[derive(Debug, Serialize, Deserialize, Clone)] -pub struct OperatorConfig { +pub struct OperatorDefinition { pub id: OperatorId, + #[serde(flatten)] + pub config: OperatorConfig, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct SingleOperatorDefinition { + /// ID is optional if there is only a single operator. + pub id: Option, + #[serde(flatten)] + pub config: OperatorConfig, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct OperatorConfig { pub name: Option, pub description: Option, @@ -73,7 +162,16 @@ pub enum OperatorSource { Wasm(PathBuf), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct PythonOperatorConfig { + pub path: PathBuf, + #[serde(default)] + pub inputs: BTreeMap, + #[serde(default)] + pub outputs: BTreeSet, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct CustomNode { pub run: String, pub env: Option>, @@ -83,7 +181,7 @@ pub struct CustomNode { pub run_config: NodeRunConfig, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum EnvValue { Bool(bool), diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index 0603dd24..6f256651 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -1,11 +1,11 @@ -use super::{CustomNode, Node, NodeKind, OperatorConfig, RuntimeNode}; +use super::{CoreNodeKind, CustomNode, OperatorDefinition, ResolvedNode, RuntimeNode}; use dora_node_api::config::{DataId, InputMapping, NodeId}; use std::{ collections::{BTreeMap, HashMap}, fmt::Write as _, }; -pub fn visualize_nodes(nodes: &[Node]) -> String { +pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String { let mut flowchart = "flowchart TB\n".to_owned(); let mut all_nodes = HashMap::new(); @@ -21,11 +21,11 @@ pub fn visualize_nodes(nodes: &[Node]) -> String { flowchart } -fn visualize_node(node: &Node, flowchart: &mut String) { +fn visualize_node(node: &ResolvedNode, flowchart: &mut String) { let node_id = &node.id; match &node.kind { - NodeKind::Custom(node) => visualize_custom_node(node_id, node, flowchart), - NodeKind::Runtime(RuntimeNode { operators }) => { + CoreNodeKind::Custom(node) => visualize_custom_node(node_id, node, flowchart), + CoreNodeKind::Runtime(RuntimeNode { operators }) => { visualize_runtime_node(node_id, operators, flowchart) } } @@ -44,14 +44,18 @@ fn visualize_custom_node(node_id: &NodeId, node: &CustomNode, flowchart: &mut St } } -fn visualize_runtime_node(node_id: &NodeId, operators: &[OperatorConfig], flowchart: &mut String) { +fn visualize_runtime_node( + node_id: &NodeId, + operators: &[OperatorDefinition], + flowchart: &mut String, +) { writeln!(flowchart, "subgraph {node_id}").unwrap(); for operator in operators { let operator_id = &operator.id; - if operator.inputs.is_empty() { + if operator.config.inputs.is_empty() { // source operator writeln!(flowchart, " {node_id}/{operator_id}[\\{operator_id}/]").unwrap(); - } else if operator.outputs.is_empty() { + } else if operator.config.outputs.is_empty() { // sink operator writeln!(flowchart, " {node_id}/{operator_id}[/{operator_id}\\]").unwrap(); } else { @@ -63,20 +67,24 @@ fn visualize_runtime_node(node_id: &NodeId, operators: &[OperatorConfig], flowch flowchart.push_str("end\n"); } -fn visualize_node_inputs(node: &Node, flowchart: &mut String, nodes: &HashMap<&NodeId, &Node>) { +fn visualize_node_inputs( + node: &ResolvedNode, + flowchart: &mut String, + nodes: &HashMap<&NodeId, &ResolvedNode>, +) { let node_id = &node.id; match &node.kind { - NodeKind::Custom(node) => visualize_inputs( + CoreNodeKind::Custom(node) => visualize_inputs( &node_id.to_string(), &node.run_config.inputs, flowchart, nodes, ), - NodeKind::Runtime(RuntimeNode { operators }) => { + CoreNodeKind::Runtime(RuntimeNode { operators }) => { for operator in operators { visualize_inputs( &format!("{node_id}/{}", operator.id), - &operator.inputs, + &operator.config.inputs, flowchart, nodes, ) @@ -89,7 +97,7 @@ fn visualize_inputs( target: &str, inputs: &BTreeMap, flowchart: &mut String, - nodes: &HashMap<&NodeId, &Node>, + nodes: &HashMap<&NodeId, &ResolvedNode>, ) { for (input_id, mapping) in inputs { let InputMapping { @@ -101,7 +109,7 @@ fn visualize_inputs( let mut source_found = false; if let Some(source_node) = nodes.get(source) { match (&source_node.kind, operator) { - (NodeKind::Custom(custom_node), None) => { + (CoreNodeKind::Custom(custom_node), None) => { if custom_node.run_config.outputs.contains(output) { let data = if output == input_id { format!("{output}") @@ -112,9 +120,9 @@ fn visualize_inputs( source_found = true; } } - (NodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => { + (CoreNodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => { if let Some(operator) = operators.iter().find(|o| &o.id == operator_id) { - if operator.outputs.contains(output) { + if operator.config.outputs.contains(output) { let data = if output == input_id { format!("{output}") } else { @@ -126,7 +134,7 @@ fn visualize_inputs( } } } - (NodeKind::Custom(_), Some(_)) | (NodeKind::Runtime(_), None) => {} + (CoreNodeKind::Custom(_), Some(_)) | (CoreNodeKind::Runtime(_), None) => {} } }