| @@ -12,6 +12,8 @@ use eyre::WrapErr; | |||
| use std::{env::consts::EXE_EXTENSION, path::Path, process::Stdio}; | |||
| use tokio::sync::mpsc; | |||
| const SHELL_SOURCE: &str = "shell"; | |||
| pub async fn spawn_node( | |||
| dataflow_id: DataflowId, | |||
| working_dir: &Path, | |||
| @@ -35,22 +37,43 @@ pub async fn spawn_node( | |||
| let mut child = match node.kind { | |||
| dora_core::descriptor::CoreNodeKind::Custom(n) => { | |||
| let resolved_path = if source_is_url(&n.source) { | |||
| // try to download the shared library | |||
| let target_path = Path::new("build") | |||
| .join(node_id.to_string()) | |||
| .with_extension(EXE_EXTENSION); | |||
| download_file(&n.source, &target_path) | |||
| .await | |||
| .wrap_err("failed to download custom node")?; | |||
| target_path.clone() | |||
| } else { | |||
| resolve_path(&n.source, working_dir) | |||
| .wrap_err_with(|| format!("failed to resolve node source `{}`", n.source))? | |||
| let mut command = match n.source.as_str() { | |||
| SHELL_SOURCE => { | |||
| if cfg!(target_os = "windows") { | |||
| let mut cmd = tokio::process::Command::new("cmd"); | |||
| cmd.args(["/C", &n.args.clone().unwrap_or_default()]); | |||
| cmd | |||
| } else { | |||
| let mut cmd = tokio::process::Command::new("sh"); | |||
| cmd.args(["-c", &n.args.clone().unwrap_or_default()]); | |||
| cmd | |||
| } | |||
| } | |||
| source => { | |||
| let resolved_path = if source_is_url(source) { | |||
| // try to download the shared library | |||
| let target_path = Path::new("build") | |||
| .join(node_id.to_string()) | |||
| .with_extension(EXE_EXTENSION); | |||
| download_file(source, &target_path) | |||
| .await | |||
| .wrap_err("failed to download custom node")?; | |||
| target_path.clone() | |||
| } else { | |||
| resolve_path(source, working_dir).wrap_err_with(|| { | |||
| format!("failed to resolve node source `{}`", source) | |||
| })? | |||
| }; | |||
| tracing::info!("spawning {}", resolved_path.display()); | |||
| let mut cmd = tokio::process::Command::new(&resolved_path); | |||
| if let Some(args) = &n.args { | |||
| cmd.args(args.split_ascii_whitespace()); | |||
| } | |||
| cmd | |||
| } | |||
| }; | |||
| tracing::info!("spawning {}", resolved_path.display()); | |||
| let mut command = tokio::process::Command::new(&resolved_path); | |||
| command.current_dir(working_dir); | |||
| command.stdin(Stdio::null()); | |||
| let node_config = NodeConfig { | |||
| @@ -59,9 +82,7 @@ pub async fn spawn_node( | |||
| run_config: n.run_config.clone(), | |||
| daemon_communication, | |||
| }; | |||
| if let Some(args) = &n.args { | |||
| command.args(args.split_ascii_whitespace()); | |||
| } | |||
| command.env( | |||
| "DORA_NODE_CONFIG", | |||
| serde_yaml::to_string(&node_config).wrap_err("failed to serialize node config")?, | |||
| @@ -75,8 +96,8 @@ pub async fn spawn_node( | |||
| } | |||
| command.spawn().wrap_err_with(move || { | |||
| format!( | |||
| "failed to run source path: `{}` with args `{}`", | |||
| resolved_path.display(), | |||
| "failed to run `{}` with args `{}`", | |||
| n.source, | |||
| n.args.as_deref().unwrap_or_default() | |||
| ) | |||
| })? | |||