Browse Source

Update coordinator to pass runtime arguments as env variables

tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
8832e433fc
3 changed files with 73 additions and 20 deletions
  1. +9
    -2
      common/src/descriptor/mod.rs
  2. +7
    -5
      common/src/descriptor/visualize.rs
  3. +57
    -13
      coordinator/src/main.rs

+ 9
- 2
common/src/descriptor/mod.rs View File

@@ -38,10 +38,17 @@ pub struct Node {
#[serde(rename_all = "lowercase")]
pub enum NodeKind {
/// Dora runtime node
Operators(Vec<OperatorConfig>),
#[serde(rename = "operators")]
Runtime(RuntimeNode),
Custom(CustomNode),
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct RuntimeNode {
pub operators: Vec<OperatorConfig>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct OperatorConfig {
pub id: OperatorId,
@@ -58,7 +65,7 @@ pub struct OperatorConfig {
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
#[serde(rename_all = "kebab-case")]
pub enum OperatorSource {
SharedLibrary(PathBuf),
Python(PathBuf),


+ 7
- 5
common/src/descriptor/visualize.rs View File

@@ -1,6 +1,6 @@
use dora_api::config::{DataId, InputMapping, NodeId};

use super::{CustomNode, Node, NodeKind, OperatorConfig};
use super::{CustomNode, Node, NodeKind, OperatorConfig, RuntimeNode};
use std::collections::{BTreeMap, HashMap};

pub fn visualize_nodes(nodes: &[Node]) -> String {
@@ -23,7 +23,9 @@ fn visualize_node(node: &Node, flowchart: &mut String) {
let node_id = &node.id;
match &node.kind {
NodeKind::Custom(node) => visualize_custom_node(node_id, &node, flowchart),
NodeKind::Operators(operators) => visualize_runtime_node(node_id, operators, flowchart),
NodeKind::Runtime(RuntimeNode { operators }) => {
visualize_runtime_node(node_id, operators, flowchart)
}
}
}

@@ -68,7 +70,7 @@ fn visualize_node_inputs(node: &Node, flowchart: &mut String, nodes: &HashMap<&N
flowchart,
nodes,
),
NodeKind::Operators(operators) => {
NodeKind::Runtime(RuntimeNode { operators }) => {
for operator in operators {
visualize_inputs(
&format!("{node_id}/{}", operator.id),
@@ -108,7 +110,7 @@ fn visualize_inputs(
source_found = true;
}
}
(NodeKind::Operators(operators), Some(operator_id)) => {
(NodeKind::Runtime(RuntimeNode { operators }), Some(operator_id)) => {
if let Some(operator) = operators.into_iter().find(|o| &o.id == operator_id) {
if operator.outputs.contains(output) {
let data = if output == input_id {
@@ -123,7 +125,7 @@ fn visualize_inputs(
}
}
}
(NodeKind::Custom(_), Some(_)) | (NodeKind::Operators(_), None) => {}
(NodeKind::Custom(_), Some(_)) | (NodeKind::Runtime(_), None) => {}
}
}



+ 57
- 13
coordinator/src/main.rs View File

@@ -53,13 +53,17 @@ async fn run_dataflow(file: PathBuf) -> eyre::Result<()> {

match node.kind {
descriptor::NodeKind::Custom(node) => {
let result = spawn_custom_node(node_id.clone(), node, &communication)
let result = spawn_custom_node(node_id.clone(), &node, &communication)
.wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?;
tasks.push(result);
}
descriptor::NodeKind::Operators(_) => {
let todo = "todo";
bail!("runtime nodes are not supported yet")
descriptor::NodeKind::Runtime(node) => {
if !node.operators.is_empty() {
let result =
spawn_runtime_node(&runtime, node_id.clone(), &node, &communication)
.wrap_err_with(|| format!("failed to spawn runtime node {node_id}"))?;
tasks.push(result);
}
}
}
}
@@ -75,7 +79,7 @@ async fn run_dataflow(file: PathBuf) -> eyre::Result<()> {

fn spawn_custom_node(
node_id: NodeId,
node: descriptor::CustomNode,
node: &descriptor::CustomNode,
communication: &dora_api::config::CommunicationConfig,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut args = node.run.split_ascii_whitespace();
@@ -84,23 +88,46 @@ fn spawn_custom_node(
.ok_or_else(|| eyre!("`run` field must not be empty"))?;
let mut command = tokio::process::Command::new(cmd);
command.args(args);
command.env(
"DORA_NODE_ID",
serde_yaml::to_string(&node_id).wrap_err("failed to serialize custom node ID")?,
);
command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_NODE_RUN_CONFIG",
serde_yaml::to_string(&node.run_config)
.wrap_err("failed to serialize custom node run config")?,
);
let mut child = command
.spawn()
.wrap_err_with(|| format!("failed to run command `{}`", &node.run))?;
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
println!("operator {node_id} finished");
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!("operator {node_id} failed with exit code: {code}"))
} else {
Err(eyre!("operator {node_id} failed (unknown exit code)"))
}
});
Ok(result)
}

fn spawn_runtime_node(
runtime: &Path,
node_id: NodeId,
node: &descriptor::RuntimeNode,
communication: &dora_api::config::CommunicationConfig,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut command = tokio::process::Command::new(runtime);
command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_COMMUNICATION_CONFIG",
serde_yaml::to_string(communication)
.wrap_err("failed to serialize communication config")?,
"DORA_OPERATORS",
serde_yaml::to_string(&node.operators)
.wrap_err("failed to serialize custom node run config")?,
);

let mut child = command
.spawn()
.wrap_err_with(|| format!("failed to run command `{}`", &node.run))?;
.wrap_err_with(|| format!("failed to run runtime at `{}`", runtime.display()))?;
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
@@ -115,6 +142,23 @@ fn spawn_custom_node(
Ok(result)
}

fn command_init_common_env(
command: &mut tokio::process::Command,
node_id: &NodeId,
communication: &dora_api::config::CommunicationConfig,
) -> Result<(), eyre::Error> {
command.env(
"DORA_NODE_ID",
serde_yaml::to_string(&node_id).wrap_err("failed to serialize custom node ID")?,
);
command.env(
"DORA_COMMUNICATION_CONFIG",
serde_yaml::to_string(communication)
.wrap_err("failed to serialize communication config")?,
);
Ok(())
}

async fn read_descriptor(file: &Path) -> Result<Descriptor, eyre::Error> {
let descriptor_file = tokio::fs::read(file)
.await


Loading…
Cancel
Save