The working directory of nodes with git fields is only decided on `dora build` because that's where we translate the branch/tag name into the commit hash. Before, we fell back to using the base working directory if no build was done. However, this is confusing since the reason for the different working directory is not communicated in any. This commit throws an error if no build happened before starting a dataflow with git nodes.pull/1036/merge
| @@ -1117,8 +1117,19 @@ impl Daemon { | |||||
| let mut stopped = Vec::new(); | let mut stopped = Vec::new(); | ||||
| let node_working_dirs = build_id | |||||
| .and_then(|build_id| self.builds.get(&build_id)) | |||||
| let build_info = build_id.and_then(|build_id| self.builds.get(&build_id)); | |||||
| let node_with_git_source = nodes.values().find(|n| n.has_git_source()); | |||||
| if let Some(git_node) = node_with_git_source { | |||||
| if build_info.is_none() { | |||||
| eyre::bail!( | |||||
| "node {} has git source, but no `dora build` was run yet\n\n\ | |||||
| nodes with a `git` field must be built using `dora build` before starting the \ | |||||
| dataflow", | |||||
| git_node.id | |||||
| ) | |||||
| } | |||||
| } | |||||
| let node_working_dirs = build_info | |||||
| .map(|info| info.node_working_dirs.clone()) | .map(|info| info.node_working_dirs.clone()) | ||||
| .unwrap_or_default(); | .unwrap_or_default(); | ||||
| @@ -1186,12 +1197,16 @@ impl Daemon { | |||||
| .entry(node.id.clone()) | .entry(node.id.clone()) | ||||
| .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES))) | .or_insert_with(|| Arc::new(ArrayQueue::new(STDERR_LOG_LINES))) | ||||
| .clone(); | .clone(); | ||||
| logger | |||||
| .log(LogLevel::Info, Some("daemon".into()), "spawning") | |||||
| .await; | |||||
| let node_working_dir = node_working_dirs | |||||
| .get(&node_id) | |||||
| .cloned() | |||||
| let configured_node_working_dir = node_working_dirs.get(&node_id).cloned(); | |||||
| if configured_node_working_dir.is_none() && node.has_git_source() { | |||||
| eyre::bail!( | |||||
| "node {} has git source, but no git clone directory was found for it\n\n\ | |||||
| try running `dora build` again", | |||||
| node.id | |||||
| ) | |||||
| } | |||||
| let node_working_dir = configured_node_working_dir | |||||
| .or_else(|| { | .or_else(|| { | ||||
| node.deploy | node.deploy | ||||
| .as_ref() | .as_ref() | ||||
| @@ -357,7 +357,24 @@ impl PreparedNode { | |||||
| pub async fn spawn(mut self, logger: &mut NodeLogger<'_>) -> eyre::Result<RunningNode> { | pub async fn spawn(mut self, logger: &mut NodeLogger<'_>) -> eyre::Result<RunningNode> { | ||||
| let mut child = match &mut self.command { | let mut child = match &mut self.command { | ||||
| Some(command) => command.spawn().wrap_err(self.spawn_error_msg)?, | |||||
| Some(command) => { | |||||
| let std_command = command.as_std(); | |||||
| logger | |||||
| .log( | |||||
| LogLevel::Info, | |||||
| Some("spawner".into()), | |||||
| format!( | |||||
| "spawning `{}` in `{}`", | |||||
| std_command.get_program().to_string_lossy(), | |||||
| std_command | |||||
| .get_current_dir() | |||||
| .unwrap_or(Path::new("<unknown>")) | |||||
| .display(), | |||||
| ), | |||||
| ) | |||||
| .await; | |||||
| command.spawn().wrap_err(self.spawn_error_msg)? | |||||
| } | |||||
| None => { | None => { | ||||
| return Ok(RunningNode { | return Ok(RunningNode { | ||||
| pid: None, | pid: None, | ||||
| @@ -672,13 +689,6 @@ async fn path_spawn_command( | |||||
| cmd | cmd | ||||
| } | } | ||||
| _ => { | _ => { | ||||
| logger | |||||
| .log( | |||||
| LogLevel::Info, | |||||
| Some("spawner".into()), | |||||
| format!("spawning: {}", resolved_path.display()), | |||||
| ) | |||||
| .await; | |||||
| if uv { | if uv { | ||||
| let mut cmd = tokio::process::Command::new("uv"); | let mut cmd = tokio::process::Command::new("uv"); | ||||
| cmd.arg("run"); | cmd.arg("run"); | ||||
| @@ -482,6 +482,15 @@ pub struct ResolvedNode { | |||||
| pub kind: CoreNodeKind, | pub kind: CoreNodeKind, | ||||
| } | } | ||||
| impl ResolvedNode { | |||||
| pub fn has_git_source(&self) -> bool { | |||||
| self.kind | |||||
| .as_custom() | |||||
| .map(|n| n.source.is_git()) | |||||
| .unwrap_or_default() | |||||
| } | |||||
| } | |||||
| #[derive(Debug, Clone, Serialize, Deserialize)] | #[derive(Debug, Clone, Serialize, Deserialize)] | ||||
| #[serde(rename_all = "lowercase")] | #[serde(rename_all = "lowercase")] | ||||
| #[allow(clippy::large_enum_variant)] | #[allow(clippy::large_enum_variant)] | ||||
| @@ -492,6 +501,15 @@ pub enum CoreNodeKind { | |||||
| Custom(CustomNode), | Custom(CustomNode), | ||||
| } | } | ||||
| impl CoreNodeKind { | |||||
| pub fn as_custom(&self) -> Option<&CustomNode> { | |||||
| match self { | |||||
| CoreNodeKind::Runtime(_) => None, | |||||
| CoreNodeKind::Custom(custom_node) => Some(custom_node), | |||||
| } | |||||
| } | |||||
| } | |||||
| #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] | #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] | ||||
| #[serde(transparent)] | #[serde(transparent)] | ||||
| pub struct RuntimeNode { | pub struct RuntimeNode { | ||||
| @@ -640,6 +658,12 @@ pub enum NodeSource { | |||||
| }, | }, | ||||
| } | } | ||||
| impl NodeSource { | |||||
| pub fn is_git(&self) -> bool { | |||||
| matches!(self, Self::GitBranch { .. }) | |||||
| } | |||||
| } | |||||
| #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] | #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] | ||||
| pub enum ResolvedNodeSource { | pub enum ResolvedNodeSource { | ||||
| Local, | Local, | ||||