diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 86d05a62..89642c7b 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1117,8 +1117,19 @@ impl Daemon { let mut stopped = Vec::new(); - let node_working_dirs = build_id - .and_then(|build_id| self.builds.get(&build_id)) + let build_info = build_id.and_then(|build_id| self.builds.get(&build_id)); + let node_with_git_source = nodes.values().find(|n| n.has_git_source()); + if let Some(git_node) = node_with_git_source { + if build_info.is_none() { + eyre::bail!( + "node {} has git source, but no `dora build` was run yet\n\n\ + nodes with a `git` field must be built using `dora build` before starting the \ + dataflow", + git_node.id + ) + } + } + let node_working_dirs = build_info .map(|info| info.node_working_dirs.clone()) .unwrap_or_default(); @@ -1186,12 +1197,16 @@ impl Daemon { .entry(node.id.clone()) .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES))) .clone(); - logger - .log(LogLevel::Info, Some("daemon".into()), "spawning") - .await; - let node_working_dir = node_working_dirs - .get(&node_id) - .cloned() + + let configured_node_working_dir = node_working_dirs.get(&node_id).cloned(); + if configured_node_working_dir.is_none() && node.has_git_source() { + eyre::bail!( + "node {} has git source, but no git clone directory was found for it\n\n\ + try running `dora build` again", + node.id + ) + } + let node_working_dir = configured_node_working_dir .or_else(|| { node.deploy .as_ref() diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 5c3d9a02..a47c3638 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -357,7 +357,24 @@ impl PreparedNode { pub async fn spawn(mut self, logger: &mut NodeLogger<'_>) -> eyre::Result { let mut child = match &mut self.command { - Some(command) => command.spawn().wrap_err(self.spawn_error_msg)?, + Some(command) => { + let std_command = command.as_std(); + logger + .log( + LogLevel::Info, + Some("spawner".into()), + format!( + "spawning `{}` in `{}`", + std_command.get_program().to_string_lossy(), + std_command + .get_current_dir() + .unwrap_or(Path::new("")) + .display(), + ), + ) + .await; + command.spawn().wrap_err(self.spawn_error_msg)? + } None => { return Ok(RunningNode { pid: None, @@ -672,13 +689,6 @@ async fn path_spawn_command( cmd } _ => { - logger - .log( - LogLevel::Info, - Some("spawner".into()), - format!("spawning: {}", resolved_path.display()), - ) - .await; if uv { let mut cmd = tokio::process::Command::new("uv"); cmd.arg("run"); diff --git a/libraries/message/src/descriptor.rs b/libraries/message/src/descriptor.rs index d7ae034e..e7f4811c 100644 --- a/libraries/message/src/descriptor.rs +++ b/libraries/message/src/descriptor.rs @@ -482,6 +482,15 @@ pub struct ResolvedNode { pub kind: CoreNodeKind, } +impl ResolvedNode { + pub fn has_git_source(&self) -> bool { + self.kind + .as_custom() + .map(|n| n.source.is_git()) + .unwrap_or_default() + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] #[allow(clippy::large_enum_variant)] @@ -492,6 +501,15 @@ pub enum CoreNodeKind { Custom(CustomNode), } +impl CoreNodeKind { + pub fn as_custom(&self) -> Option<&CustomNode> { + match self { + CoreNodeKind::Runtime(_) => None, + CoreNodeKind::Custom(custom_node) => Some(custom_node), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] #[serde(transparent)] pub struct RuntimeNode { @@ -640,6 +658,12 @@ pub enum NodeSource { }, } +impl NodeSource { + pub fn is_git(&self) -> bool { + matches!(self, Self::GitBranch { .. }) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub enum ResolvedNodeSource { Local,