Browse Source

Merge pull request #50 from dora-rs/python-operator-alias

Add support for single operator nodes in YAML dataflow specification
tags/v0.0.0-test.4
Philipp Oppermann GitHub 3 years ago
parent
commit
e98fc0725d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 172 additions and 50 deletions
  1. +1
    -1
      apis/rust/node/src/config.rs
  2. +9
    -0
      binaries/coordinator/examples/mini-dataflow.yml
  3. +12
    -8
      binaries/coordinator/src/main.rs
  4. +7
    -7
      binaries/runtime/src/main.rs
  5. +13
    -10
      binaries/runtime/src/operator/mod.rs
  6. +105
    -7
      libraries/core/src/descriptor/mod.rs
  7. +25
    -17
      libraries/core/src/descriptor/visualize.rs

+ 1
- 1
apis/rust/node/src/config.rs View File

@@ -6,7 +6,7 @@ use std::{
str::FromStr,
};

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct NodeRunConfig {
#[serde(default)]


+ 9
- 0
binaries/coordinator/examples/mini-dataflow.yml View File

@@ -54,5 +54,14 @@ nodes:
python: ../runtime/examples/python-operator/op.py
inputs:
time: timer/time
test: python-operator/counter
outputs:
- counter

- id: python-operator
operator:
python: ../runtime/examples/python-operator/op.py
inputs:
time: timer/time
outputs:
- counter

+ 12
- 8
binaries/coordinator/src/main.rs View File

@@ -1,4 +1,4 @@
use dora_core::descriptor::{self, Descriptor, NodeKind};
use dora_core::descriptor::{self, CoreNodeKind, Descriptor};
use dora_node_api::config::NodeId;
use eyre::{bail, eyre, WrapErr};
use futures::{stream::FuturesUnordered, StreamExt};
@@ -49,17 +49,21 @@ async fn main() -> eyre::Result<()> {
}

async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> {
let Descriptor {
mut communication,
nodes,
} = read_descriptor(&dataflow_path).await.wrap_err_with(|| {
let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| {
format!(
"failed to read dataflow descriptor at {}",
dataflow_path.display()
)
})?;

if nodes.iter().any(|n| matches!(n.kind, NodeKind::Runtime(_))) && !runtime.is_file() {
let nodes = descriptor.resolve_aliases();
let mut communication = descriptor.communication;

if nodes
.iter()
.any(|n| matches!(n.kind, CoreNodeKind::Runtime(_)))
&& !runtime.is_file()
{
bail!(
"There is no runtime at {}, or it is not a file",
runtime.display()
@@ -74,12 +78,12 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
let node_id = node.id.clone();

match node.kind {
descriptor::NodeKind::Custom(node) => {
descriptor::CoreNodeKind::Custom(node) => {
let result = spawn_custom_node(node_id.clone(), &node, &communication)
.wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?;
tasks.push(result);
}
descriptor::NodeKind::Runtime(node) => {
descriptor::CoreNodeKind::Runtime(node) => {
if !node.operators.is_empty() {
let result =
spawn_runtime_node(runtime, node_id.clone(), &node, &communication)


+ 7
- 7
binaries/runtime/src/main.rs View File

@@ -1,6 +1,6 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_core::descriptor::OperatorConfig;
use dora_core::descriptor::OperatorDefinition;
use dora_node_api::{
self,
communication::{self, CommunicationLayer},
@@ -37,7 +37,7 @@ async fn main() -> eyre::Result<()> {
.wrap_err("env variable DORA_COMMUNICATION_CONFIG must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize communication config")?
};
let operators: Vec<OperatorConfig> = {
let operators: Vec<OperatorDefinition> = {
let raw =
std::env::var("DORA_OPERATORS").wrap_err("env variable DORA_OPERATORS must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
@@ -107,7 +107,7 @@ async fn main() -> eyre::Result<()> {
.ok_or_else(|| eyre!("received event from unknown operator {id}"))?;
match event {
OperatorEvent::Output { id: data_id, value } => {
if !operator.config().outputs.contains(&data_id) {
if !operator.definition().config.outputs.contains(&data_id) {
eyre::bail!("unknown output {data_id} for operator {id}");
}
publish(&node_id, id, data_id, &value, communication.as_ref())
@@ -154,7 +154,7 @@ async fn main() -> eyre::Result<()> {
}

async fn subscribe<'a>(
operators: &'a [OperatorConfig],
operators: &'a [OperatorDefinition],
communication: &'a dyn CommunicationLayer,
) -> eyre::Result<impl futures::Stream<Item = SubscribeEvent> + 'a> {
let mut streams = Vec::new();
@@ -168,11 +168,11 @@ async fn subscribe<'a>(
}

async fn subscribe_operator<'a>(
operator: &'a OperatorConfig,
operator: &'a OperatorDefinition,
communication: &'a dyn CommunicationLayer,
) -> Result<impl futures::Stream<Item = SubscribeEvent> + 'a, eyre::Error> {
let stop_messages = FuturesUnordered::new();
for input in operator.inputs.values() {
for input in operator.config.inputs.values() {
let InputMapping {
source, operator, ..
} = input;
@@ -189,7 +189,7 @@ async fn subscribe_operator<'a>(
let finished = Box::pin(stop_messages.all(|_| async { true }).shared());

let mut streams = Vec::new();
for (input, mapping) in &operator.inputs {
for (input, mapping) in &operator.config.inputs {
let InputMapping {
source,
operator: source_operator,


+ 13
- 10
binaries/runtime/src/operator/mod.rs View File

@@ -1,4 +1,4 @@
use dora_core::descriptor::{OperatorConfig, OperatorSource};
use dora_core::descriptor::{OperatorDefinition, OperatorSource};
use dora_node_api::config::DataId;
use eyre::{eyre, Context};
use std::any::Any;
@@ -9,28 +9,31 @@ mod shared_lib;

pub struct Operator {
operator_task: Sender<OperatorInput>,
config: OperatorConfig,
definition: OperatorDefinition,
}

impl Operator {
pub async fn init(
operator_config: OperatorConfig,
operator_definition: OperatorDefinition,
events_tx: Sender<OperatorEvent>,
) -> eyre::Result<Self> {
let (operator_task, operator_rx) = mpsc::channel(10);

match &operator_config.source {
match &operator_definition.config.source {
OperatorSource::SharedLibrary(path) => {
shared_lib::spawn(path, events_tx, operator_rx).wrap_err_with(|| {
format!(
"failed ot spawn shared library operator for {}",
operator_config.id
operator_definition.id
)
})?;
}
OperatorSource::Python(path) => {
python::spawn(path, events_tx, operator_rx).wrap_err_with(|| {
format!("failed ot spawn Python operator for {}", operator_config.id)
format!(
"failed ot spawn Python operator for {}",
operator_definition.id
)
})?;
}
OperatorSource::Wasm(path) => {
@@ -39,7 +42,7 @@ impl Operator {
}
Ok(Self {
operator_task,
config: operator_config,
definition: operator_definition,
})
}

@@ -52,10 +55,10 @@ impl Operator {
})
}

/// Get a reference to the operator's config.
/// Get a reference to the operator's definition.
#[must_use]
pub fn config(&self) -> &OperatorConfig {
&self.config
pub fn definition(&self) -> &OperatorDefinition {
&self.definition
}
}



+ 105
- 7
libraries/core/src/descriptor/mod.rs View File

@@ -2,6 +2,7 @@ use dora_node_api::config::{
CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::{
collections::{BTreeMap, BTreeSet},
@@ -18,14 +19,68 @@ pub struct Descriptor {
}

impl Descriptor {
pub fn resolve_aliases(&self) -> Vec<ResolvedNode> {
let default_op_id = OperatorId::from("op".to_string());

let single_operator_nodes: HashMap<_, _> = self
.nodes
.iter()
.filter_map(|n| match &n.kind {
NodeKind::Operator(op) => Some((&n.id, op.id.as_ref().unwrap_or(&default_op_id))),
_ => None,
})
.collect();

let mut resolved = vec![];
for mut node in self.nodes.clone() {
// adjust input mappings
let input_mappings: Vec<_> = match &mut node.kind {
NodeKind::Runtime(node) => node
.operators
.iter_mut()
.flat_map(|op| op.config.inputs.values_mut())
.collect(),
NodeKind::Custom(node) => node.run_config.inputs.values_mut().collect(),
NodeKind::Operator(operator) => operator.config.inputs.values_mut().collect(),
};
for mapping in input_mappings {
if let Some(op_name) = single_operator_nodes.get(&mapping.source).copied() {
if mapping.operator.is_none() {
mapping.operator = Some(op_name.to_owned());
}
}
}

// resolve nodes
let kind = match node.kind {
NodeKind::Custom(node) => CoreNodeKind::Custom(node),
NodeKind::Runtime(node) => CoreNodeKind::Runtime(node),
NodeKind::Operator(op) => CoreNodeKind::Runtime(RuntimeNode {
operators: vec![OperatorDefinition {
id: op.id.unwrap_or_else(|| default_op_id.clone()),
config: op.config,
}],
}),
};
resolved.push(ResolvedNode {
id: node.id,
name: node.name,
description: node.description,
kind,
});
}
resolved
}

pub fn visualize_as_mermaid(&self) -> eyre::Result<String> {
let flowchart = visualize::visualize_nodes(&self.nodes);
let resolved = self.resolve_aliases();
let flowchart = visualize::visualize_nodes(&resolved);

Ok(flowchart)
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Node {
pub id: NodeId,
pub name: Option<String>,
@@ -35,24 +90,58 @@ pub struct Node {
pub kind: NodeKind,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum NodeKind {
/// Dora runtime node
#[serde(rename = "operators")]
Runtime(RuntimeNode),
Custom(CustomNode),
Operator(SingleOperatorDefinition),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ResolvedNode {
pub id: NodeId,
pub name: Option<String>,
pub description: Option<String>,

#[serde(flatten)]
pub kind: CoreNodeKind,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CoreNodeKind {
/// Dora runtime node
#[serde(rename = "operators")]
Runtime(RuntimeNode),
Custom(CustomNode),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct RuntimeNode {
pub operators: Vec<OperatorConfig>,
pub operators: Vec<OperatorDefinition>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct OperatorConfig {
pub struct OperatorDefinition {
pub id: OperatorId,
#[serde(flatten)]
pub config: OperatorConfig,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SingleOperatorDefinition {
/// ID is optional if there is only a single operator.
pub id: Option<OperatorId>,
#[serde(flatten)]
pub config: OperatorConfig,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct OperatorConfig {
pub name: Option<String>,
pub description: Option<String>,

@@ -73,7 +162,16 @@ pub enum OperatorSource {
Wasm(PathBuf),
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PythonOperatorConfig {
pub path: PathBuf,
#[serde(default)]
pub inputs: BTreeMap<DataId, InputMapping>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CustomNode {
pub run: String,
pub env: Option<BTreeMap<String, EnvValue>>,
@@ -83,7 +181,7 @@ pub struct CustomNode {
pub run_config: NodeRunConfig,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum EnvValue {
Bool(bool),


+ 25
- 17
libraries/core/src/descriptor/visualize.rs View File

@@ -1,11 +1,11 @@
use super::{CustomNode, Node, NodeKind, OperatorConfig, RuntimeNode};
use super::{CoreNodeKind, CustomNode, OperatorDefinition, ResolvedNode, RuntimeNode};
use dora_node_api::config::{DataId, InputMapping, NodeId};
use std::{
collections::{BTreeMap, HashMap},
fmt::Write as _,
};

pub fn visualize_nodes(nodes: &[Node]) -> String {
pub fn visualize_nodes(nodes: &[ResolvedNode]) -> String {
let mut flowchart = "flowchart TB\n".to_owned();
let mut all_nodes = HashMap::new();

@@ -21,11 +21,11 @@ pub fn visualize_nodes(nodes: &[Node]) -> String {
flowchart
}

fn visualize_node(node: &Node, flowchart: &mut String) {
fn visualize_node(node: &ResolvedNode, flowchart: &mut String) {
let node_id = &node.id;
match &node.kind {
NodeKind::Custom(node) => visualize_custom_node(node_id, node, flowchart),
NodeKind::Runtime(RuntimeNode { operators }) => {
CoreNodeKind::Custom(node) => visualize_custom_node(node_id, node, flowchart),
CoreNodeKind::Runtime(RuntimeNode { operators }) => {
visualize_runtime_node(node_id, operators, flowchart)
}
}
@@ -44,14 +44,18 @@ fn visualize_custom_node(node_id: &NodeId, node: &CustomNode, flowchart: &mut St
}
}

fn visualize_runtime_node(node_id: &NodeId, operators: &[OperatorConfig], flowchart: &mut String) {
fn visualize_runtime_node(
node_id: &NodeId,
operators: &[OperatorDefinition],
flowchart: &mut String,
) {
writeln!(flowchart, "subgraph {node_id}").unwrap();
for operator in operators {
let operator_id = &operator.id;
if operator.inputs.is_empty() {
if operator.config.inputs.is_empty() {
// source operator
writeln!(flowchart, " {node_id}/{operator_id}[\\{operator_id}/]").unwrap();
} else if operator.outputs.is_empty() {
} else if operator.config.outputs.is_empty() {
// sink operator
writeln!(flowchart, " {node_id}/{operator_id}[/{operator_id}\\]").unwrap();
} else {
@@ -63,20 +67,24 @@ fn visualize_runtime_node(node_id: &NodeId, operators: &[OperatorConfig], flowch
flowchart.push_str("end\n");
}

fn visualize_node_inputs(node: &Node, flowchart: &mut String, nodes: &HashMap<&NodeId, &Node>) {
fn visualize_node_inputs(
node: &ResolvedNode,
flowchart: &mut String,
nodes: &HashMap<&NodeId, &ResolvedNode>,
) {
let node_id = &node.id;
match &node.kind {
NodeKind::Custom(node) => visualize_inputs(
CoreNodeKind::Custom(node) => visualize_inputs(
&node_id.to_string(),
&node.run_config.inputs,
flowchart,
nodes,
),
NodeKind::Runtime(RuntimeNode { operators }) => {
CoreNodeKind::Runtime(RuntimeNode { operators }) => {
for operator in operators {
visualize_inputs(
&format!("{node_id}/{}", operator.id),
&operator.inputs,
&operator.config.inputs,
flowchart,
nodes,
)
@@ -89,7 +97,7 @@ fn visualize_inputs(
target: &str,
inputs: &BTreeMap<DataId, InputMapping>,
flowchart: &mut String,
nodes: &HashMap<&NodeId, &Node>,
nodes: &HashMap<&NodeId, &ResolvedNode>,
) {
for (input_id, mapping) in inputs {
let InputMapping {
@@ -101,7 +109,7 @@ fn visualize_inputs(
let mut source_found = false;
if let Some(source_node) = nodes.get(source) {
match (&source_node.kind, operator) {
(NodeKind::Custom(custom_node), None) => {
(CoreNodeKind::Custom(custom_node), None) => {
if custom_node.run_config.outputs.contains(output) {
let data = if output == input_id {
format!("{output}")
@@ -112,9 +120,9 @@ fn visualize_inputs(
source_found = true;
}
}
(NodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => {
(CoreNodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => {
if let Some(operator) = operators.iter().find(|o| &o.id == operator_id) {
if operator.outputs.contains(output) {
if operator.config.outputs.contains(output) {
let data = if output == input_id {
format!("{output}")
} else {
@@ -126,7 +134,7 @@ fn visualize_inputs(
}
}
}
(NodeKind::Custom(_), Some(_)) | (NodeKind::Runtime(_), None) => {}
(CoreNodeKind::Custom(_), Some(_)) | (CoreNodeKind::Runtime(_), None) => {}
}
}



Loading…
Cancel
Save