| @@ -5,6 +5,7 @@ communication: | |||
| nodes: | |||
| - id: rust-node | |||
| custom: | |||
| build: cargo build -p rust-dataflow-example-node | |||
| run: ../../target/debug/rust-dataflow-example-node | |||
| inputs: | |||
| tick: dora/timer/millis/300 | |||
| @@ -13,6 +14,7 @@ nodes: | |||
| - id: runtime-node | |||
| operators: | |||
| - id: rust-operator | |||
| build: cargo build -p rust-dataflow-example-operator | |||
| shared-library: ../../target/debug/rust_dataflow_example_operator | |||
| inputs: | |||
| tick: dora/timer/millis/100 | |||
| @@ -21,6 +23,7 @@ nodes: | |||
| - status | |||
| - id: rust-sink | |||
| custom: | |||
| build: cargo build -p rust-dataflow-example-sink | |||
| run: ../../target/debug/rust-dataflow-example-sink | |||
| inputs: | |||
| message: runtime-node/rust-operator/status | |||
| @@ -7,13 +7,12 @@ async fn main() -> eyre::Result<()> { | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| build_package("rust-dataflow-example-node").await?; | |||
| build_package("rust-dataflow-example-operator").await?; | |||
| build_package("rust-dataflow-example-sink").await?; | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| build_dataflow(dataflow).await?; | |||
| build_package("dora-runtime").await?; | |||
| dora_coordinator::run(dora_coordinator::Command::Run { | |||
| dataflow: Path::new("dataflow.yml").to_owned(), | |||
| dataflow: dataflow.to_owned(), | |||
| runtime: Some(root.join("target").join("debug").join("dora-runtime")), | |||
| }) | |||
| .await?; | |||
| @@ -21,6 +20,18 @@ async fn main() -> eyre::Result<()> { | |||
| Ok(()) | |||
| } | |||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--").arg("build").arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build dataflow"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| async fn build_package(package: &str) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| @@ -1,6 +1,5 @@ | |||
| use dora_node_api::config::{ | |||
| CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId, | |||
| }; | |||
| pub use dora_node_api::config::OperatorId; | |||
| use dora_node_api::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig}; | |||
| use serde::{Deserialize, Serialize}; | |||
| use std::collections::HashMap; | |||
| use std::fmt; | |||
| @@ -20,10 +19,11 @@ pub struct Descriptor { | |||
| pub communication: CommunicationConfig, | |||
| pub nodes: Vec<Node>, | |||
| } | |||
| pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op"; | |||
| impl Descriptor { | |||
| pub fn resolve_aliases(&self) -> Vec<ResolvedNode> { | |||
| let default_op_id = OperatorId::from("op".to_string()); | |||
| let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string()); | |||
| let single_operator_nodes: HashMap<_, _> = self | |||
| .nodes | |||
| @@ -158,6 +158,9 @@ pub struct OperatorConfig { | |||
| #[serde(flatten)] | |||
| pub source: OperatorSource, | |||
| #[serde(default, skip_serializing_if = "Option::is_none")] | |||
| pub build: Option<String>, | |||
| } | |||
| #[derive(Debug, Serialize, Deserialize, Clone)] | |||
| @@ -194,6 +197,8 @@ pub struct CustomNode { | |||
| pub run: String, | |||
| pub env: Option<BTreeMap<String, EnvValue>>, | |||
| pub working_directory: Option<BTreeMap<String, EnvValue>>, | |||
| #[serde(default, skip_serializing_if = "Option::is_none")] | |||
| pub build: Option<String>, | |||
| #[serde(flatten)] | |||
| pub run_config: NodeRunConfig, | |||