From f307196df3fc6087c182f13bde947e5544439b0f Mon Sep 17 00:00:00 2001 From: drindr Date: Mon, 4 Aug 2025 18:44:50 +0800 Subject: [PATCH 1/2] feat: more flexible exprs for the `build` and `path` field in yaml - allow quoted arguments in the `build` field, like `bash -c "..."` - allow specifying path of the node according to the environment variable --- binaries/daemon/src/spawn.rs | 32 +++++++++++++++++++++-- libraries/core/src/build/build_command.rs | 17 ++++++++++-- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 5c3d9a02..7a4f403c 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -22,6 +22,7 @@ use dora_message::{ daemon_to_node::{NodeConfig, RuntimeConfig}, id::NodeId, DataflowId, + descriptor::EnvValue }; use dora_node_api::{ arrow::array::ArrayData, @@ -34,6 +35,7 @@ use std::{ path::{Path, PathBuf}, process::Stdio, sync::Arc, + collections::BTreeMap }; use tokio::{ fs::File, @@ -126,7 +128,7 @@ impl Spawner { let (command, error_msg) = match &node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { let mut command = - path_spawn_command(&node_working_dir, self.uv, logger, n, true).await?; + path_spawn_command(&node_working_dir, self.uv, logger, n, &node.env, true).await?; if let Some(command) = &mut command { command.current_dir(&node_working_dir); @@ -608,6 +610,7 @@ async fn path_spawn_command( uv: bool, logger: &mut NodeLogger<'_>, node: &dora_core::descriptor::CustomNode, + env: &Option>, permit_url: bool, ) -> eyre::Result> { let cmd = match node.path.as_str() { @@ -634,7 +637,32 @@ async fn path_spawn_command( .await .wrap_err("failed to download custom node")? } else { - resolve_path(source, working_dir) + let replacements: Vec> = source.find('$').map(|start| { + let end = source[start..].find('/').unwrap_or(source.len()); + let var = &source[start + 1..start + end]; + if let Some(envs) = env { + if let Some(val) = envs.get(var) { + Ok((var.to_string(), val.to_string())) + } else { + eyre::bail!("environment variable `{}` for node `{}` not found", var, source) + } + } else { + eyre::bail!("environment variable `{}` for node `{}` not found", var, source) + } + }).into_iter().collect(); + let mut source = String::from(source); + for kv in replacements.into_iter() { + match kv { + Ok((var, value)) => { + source = source.replace(&format!("${}", var), &value); + } + Err(err) => { + return Err(err); + } + } + } + + resolve_path(&source, working_dir) .wrap_err_with(|| format!("failed to resolve node source `{source}`"))? }; diff --git a/libraries/core/src/build/build_command.rs b/libraries/core/src/build/build_command.rs index a43c0a53..d26097b1 100644 --- a/libraries/core/src/build/build_command.rs +++ b/libraries/core/src/build/build_command.rs @@ -19,12 +19,25 @@ pub fn run_build_command( let lines = build.lines().collect::>(); for build_line in lines { - let mut split = build_line.split_whitespace(); + let quote: Vec<&str> = build_line.split('"').collect(); + if quote.len() % 2 == 0 { + return Err(eyre!("build command `{build_line}`. quote(s) are not in pair")); + } + let mut split_vec: Vec<&str> = vec![]; + quote.iter().enumerate().for_each(|(i, part)| { + if i % 2 == 0 { + let mut split_with_white: Vec<&str> = part.split_whitespace().collect(); + split_vec.append(&mut split_with_white); + } else { + split_vec.push(part); + } + }); + let mut split = split_vec.iter(); let program = split .next() .ok_or_else(|| eyre!("build command is empty"))?; - let mut cmd = if uv && (program == "pip" || program == "pip3") { + let mut cmd = if uv && (*program == "pip" || *program == "pip3") { let mut cmd = Command::new("uv"); cmd.arg("pip"); cmd From ad634bf49ccbc8857628227d864423199cbb4cc8 Mon Sep 17 00:00:00 2001 From: drindr Date: Wed, 6 Aug 2025 23:50:54 +0800 Subject: [PATCH 2/2] fmt code --- binaries/daemon/src/spawn.rs | 41 +++++++++++++++-------- libraries/core/src/build/build_command.rs | 4 ++- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index edf3ae2b..d192c3ec 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -22,8 +22,8 @@ use dora_message::{ common::{LogLevel, LogMessage}, daemon_to_coordinator::{DataMessage, NodeExitStatus, Timestamped}, daemon_to_node::{NodeConfig, RuntimeConfig}, + descriptor::EnvValue, id::NodeId, - descriptor::EnvValue }; use dora_node_api::{ Metadata, @@ -32,11 +32,11 @@ use dora_node_api::{ }; use eyre::{ContextCompat, WrapErr, bail}; use std::{ + collections::BTreeMap, future::Future, path::{Path, PathBuf}, process::Stdio, sync::Arc, - collections::BTreeMap }; use tokio::{ fs::File, @@ -129,7 +129,8 @@ impl Spawner { let (command, error_msg) = match &node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { let mut command = - path_spawn_command(&node_working_dir, self.uv, logger, n, &node.env, true).await?; + path_spawn_command(&node_working_dir, self.uv, logger, n, &node.env, true) + .await?; if let Some(command) = &mut command { command.current_dir(&node_working_dir); @@ -655,19 +656,31 @@ async fn path_spawn_command( .await .wrap_err("failed to download custom node")? } else { - let replacements: Vec> = source.find('$').map(|start| { - let end = source[start..].find('/').unwrap_or(source.len()); - let var = &source[start + 1..start + end]; - if let Some(envs) = env { - if let Some(val) = envs.get(var) { - Ok((var.to_string(), val.to_string())) + let replacements: Vec> = source + .find('$') + .map(|start| { + let end = source[start..].find('/').unwrap_or(source.len()); + let var = &source[start + 1..start + end]; + if let Some(envs) = env { + if let Some(val) = envs.get(var) { + Ok((var.to_string(), val.to_string())) + } else { + eyre::bail!( + "environment variable `{}` for node `{}` not found", + var, + source + ) + } } else { - eyre::bail!("environment variable `{}` for node `{}` not found", var, source) + eyre::bail!( + "environment variable `{}` for node `{}` not found", + var, + source + ) } - } else { - eyre::bail!("environment variable `{}` for node `{}` not found", var, source) - } - }).into_iter().collect(); + }) + .into_iter() + .collect(); let mut source = String::from(source); for kv in replacements.into_iter() { match kv { diff --git a/libraries/core/src/build/build_command.rs b/libraries/core/src/build/build_command.rs index 3902ce1a..79231611 100644 --- a/libraries/core/src/build/build_command.rs +++ b/libraries/core/src/build/build_command.rs @@ -21,7 +21,9 @@ pub fn run_build_command( for build_line in lines { let quote: Vec<&str> = build_line.split('"').collect(); if quote.len() % 2 == 0 { - return Err(eyre!("build command `{build_line}`. quote(s) are not in pair")); + return Err(eyre!( + "build command `{build_line}`. quote(s) are not in pair" + )); } let mut split_vec: Vec<&str> = vec![]; quote.iter().enumerate().for_each(|(i, part)| {