From e6b70fb2a51bbcc52ee31a113eae408f3e562bc5 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 23 Jul 2025 17:37:49 +0200 Subject: [PATCH 1/2] Error if node with `git` field was not built before dataflow start The working directory of nodes with git fields is only decided on `dora build` because that's where we translate the branch/tag name into the commit hash. Before, we fell back to using the base working directory if no build was done. However, this is confusing since the reason for the different working directory is not communicated in any. This commit throws an error if no build happened before starting a dataflow with git nodes. --- binaries/daemon/src/lib.rs | 31 +++++++++++++++++++++-------- libraries/message/src/descriptor.rs | 24 ++++++++++++++++++++++ 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 86d05a62..89642c7b 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1117,8 +1117,19 @@ impl Daemon { let mut stopped = Vec::new(); - let node_working_dirs = build_id - .and_then(|build_id| self.builds.get(&build_id)) + let build_info = build_id.and_then(|build_id| self.builds.get(&build_id)); + let node_with_git_source = nodes.values().find(|n| n.has_git_source()); + if let Some(git_node) = node_with_git_source { + if build_info.is_none() { + eyre::bail!( + "node {} has git source, but no `dora build` was run yet\n\n\ + nodes with a `git` field must be built using `dora build` before starting the \ + dataflow", + git_node.id + ) + } + } + let node_working_dirs = build_info .map(|info| info.node_working_dirs.clone()) .unwrap_or_default(); @@ -1186,12 +1197,16 @@ impl Daemon { .entry(node.id.clone()) .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES))) .clone(); - logger - .log(LogLevel::Info, Some("daemon".into()), "spawning") - .await; - let node_working_dir = node_working_dirs - .get(&node_id) - .cloned() + + let configured_node_working_dir = node_working_dirs.get(&node_id).cloned(); + if configured_node_working_dir.is_none() && node.has_git_source() { + eyre::bail!( + "node {} has git source, but no git clone directory was found for it\n\n\ + try running `dora build` again", + node.id + ) + } + let node_working_dir = configured_node_working_dir .or_else(|| { node.deploy .as_ref() diff --git a/libraries/message/src/descriptor.rs b/libraries/message/src/descriptor.rs index d7ae034e..e7f4811c 100644 --- a/libraries/message/src/descriptor.rs +++ b/libraries/message/src/descriptor.rs @@ -482,6 +482,15 @@ pub struct ResolvedNode { pub kind: CoreNodeKind, } +impl ResolvedNode { + pub fn has_git_source(&self) -> bool { + self.kind + .as_custom() + .map(|n| n.source.is_git()) + .unwrap_or_default() + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] #[allow(clippy::large_enum_variant)] @@ -492,6 +501,15 @@ pub enum CoreNodeKind { Custom(CustomNode), } +impl CoreNodeKind { + pub fn as_custom(&self) -> Option<&CustomNode> { + match self { + CoreNodeKind::Runtime(_) => None, + CoreNodeKind::Custom(custom_node) => Some(custom_node), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] #[serde(transparent)] pub struct RuntimeNode { @@ -640,6 +658,12 @@ pub enum NodeSource { }, } +impl NodeSource { + pub fn is_git(&self) -> bool { + matches!(self, Self::GitBranch { .. }) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub enum ResolvedNodeSource { Local, From b0d6440446765fedcdc81301cec48d23440b3077 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 23 Jul 2025 17:38:46 +0200 Subject: [PATCH 2/2] Include working directory in log message To make it more obvious what's going on --- binaries/daemon/src/spawn.rs | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 5c3d9a02..a47c3638 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -357,7 +357,24 @@ impl PreparedNode { pub async fn spawn(mut self, logger: &mut NodeLogger<'_>) -> eyre::Result { let mut child = match &mut self.command { - Some(command) => command.spawn().wrap_err(self.spawn_error_msg)?, + Some(command) => { + let std_command = command.as_std(); + logger + .log( + LogLevel::Info, + Some("spawner".into()), + format!( + "spawning `{}` in `{}`", + std_command.get_program().to_string_lossy(), + std_command + .get_current_dir() + .unwrap_or(Path::new("")) + .display(), + ), + ) + .await; + command.spawn().wrap_err(self.spawn_error_msg)? + } None => { return Ok(RunningNode { pid: None, @@ -672,13 +689,6 @@ async fn path_spawn_command( cmd } _ => { - logger - .log( - LogLevel::Info, - Some("spawner".into()), - format!("spawning: {}", resolved_path.display()), - ) - .await; if uv { let mut cmd = tokio::process::Command::new("uv"); cmd.arg("run");