Browse Source

Make `send_stdout_as` fail if there is more than one entry for a runtime node

tags/v0.3.3-rc1
haixuanTao 1 year ago
parent
commit
b32a7e4924
3 changed files with 15 additions and 7 deletions
  1. +3
    -1
      binaries/daemon/src/spawn.rs
  2. +6
    -6
      libraries/core/src/descriptor/mod.rs
  3. +6
    -0
      libraries/core/src/descriptor/validate.rs

+ 3
- 1
binaries/daemon/src/spawn.rs View File

@@ -58,7 +58,9 @@ pub async fn spawn_node(
clock.clone(),
)
.await?;
let send_stdout_to = node.send_stdout_as();
let send_stdout_to = node
.send_stdout_as()
.context("Could not resolve `send_stdout_as` configuration")?;

let mut child = match node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {


+ 6
- 6
libraries/core/src/descriptor/mod.rs View File

@@ -1,7 +1,7 @@
use crate::config::{
CommunicationConfig, DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId,
};
use eyre::{bail, Context, Result};
use eyre::{bail, eyre, Context, Result};
use serde::{Deserialize, Serialize};
use serde_with_expand_env::with_expand_envs;
use std::{
@@ -166,7 +166,7 @@ pub struct ResolvedNode {
}

impl ResolvedNode {
pub fn send_stdout_as(&self) -> Option<String> {
pub fn send_stdout_as(&self) -> Result<Option<String>> {
match &self.kind {
// TODO: Split stdout between operators
CoreNodeKind::Runtime(n) => {
@@ -178,16 +178,16 @@ impl ResolvedNode {
if count == 1 && n.operators.len() > 1 {
warn!("All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator.")
} else if count > 1 {
warn!("More than one `send_stdout_as` operators for a runtime node. Selecting the first stdout operator.")
return Err(eyre!("More than one `send_stdout_as` entries for a runtime node. Please only use one `send_stdout_as` per runtime."));
}
n.operators.iter().find_map(|op| {
Ok(n.operators.iter().find_map(|op| {
op.config
.send_stdout_as
.clone()
.map(|stdout| format!("{}/{}", op.id, stdout))
})
}))
}
CoreNodeKind::Custom(n) => n.send_stdout_as.clone(),
CoreNodeKind::Custom(n) => Ok(n.send_stdout_as.clone()),
}
}
}


+ 6
- 0
libraries/core/src/descriptor/validate.rs View File

@@ -93,6 +93,12 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result
};
}

// Check that nodes can resolve `send_stdout_as`
for node in &nodes {
node.send_stdout_as()
.context("Could not resolve `send_stdout_as` configuration")?;
}

if has_python_operator {
check_python_runtime()?;
}


Loading…
Cancel
Save