Browse Source

Wait until all nodes on daemon have been built before spawning

tags/v0.3.12-rc0
Philipp Oppermann 9 months ago
parent
commit
17f75c04d5
Failed to extract signature
2 changed files with 452 additions and 308 deletions
  1. +120
    -41
      binaries/daemon/src/lib.rs
  2. +332
    -267
      binaries/daemon/src/spawn.rs

+ 120
- 41
binaries/daemon/src/lib.rs View File

@@ -772,7 +772,12 @@ impl Daemon {
uv: bool,
build_only: bool,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
let mut logger = self.logger.for_dataflow(dataflow_id);
let mut logger = self
.logger
.for_dataflow(dataflow_id)
.try_clone()
.await
.context("failed to clone logger")?;
let dataflow =
RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
let dataflow = match self.running.entry(dataflow_id) {
@@ -857,7 +862,7 @@ impl Daemon {
.await;
match spawner
.clone()
.spawn_node(
.prepare_node(
node,
node_stderr_most_recent,
&mut logger,
@@ -867,39 +872,7 @@ impl Daemon {
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
{
Ok(result) => {
let events_tx = self.events_tx.clone();
let clock = self.clock.clone();
tasks.push(async move {
let (node_spawn_result, success) = match result.await {
Ok(node) => (Ok(node), Ok(())),
Err(err) => {
let node_err = NodeError {
timestamp: clock.new_timestamp(),
cause: NodeErrorCause::Other {
stderr: format!("spawn failed: {err:?}"),
},
exit_status: NodeExitStatus::Unknown,
};
(Err(node_err), Err(err))
}
};
let send_result = events_tx
.send(Timestamped {
inner: Event::SpawnNodeResult {
dataflow_id,
node_id,
result: node_spawn_result,
},
timestamp: clock.new_timestamp(),
})
.await;
if send_result.is_err() {
tracing::error!(
"failed to send SpawnNodeResult to main daemon task"
)
}
success
});
tasks.push((node_id, result));
}
Err(err) => {
logger
@@ -980,16 +953,122 @@ impl Daemon {
self.handle_node_stop(dataflow_id, &node_id).await?;
}

let spawn_result = async move {
for task in tasks {
task.await?;
}
Ok(())
};
let spawn_result = Self::spawn_prepared_nodes(
dataflow_id,
logger,
tasks,
self.events_tx.clone(),
self.clock.clone(),
);

Ok(spawn_result)
}

async fn spawn_prepared_nodes(
dataflow_id: Uuid,
mut logger: DataflowLogger<'_>,
tasks: Vec<(
NodeId,
impl Future<Output = std::result::Result<spawn::PreparedNode, eyre::Error>>,
)>,
events_tx: mpsc::Sender<Timestamped<Event>>,
clock: Arc<HLC>,
) -> eyre::Result<()> {
let node_result = |node_id, result| Timestamped {
inner: Event::SpawnNodeResult {
dataflow_id,
node_id,
result,
},
timestamp: clock.new_timestamp(),
};
let mut failed_to_prepare = None;
let mut prepared_nodes = Vec::new();
for (node_id, task) in tasks {
match task.await {
Ok(node) => prepared_nodes.push(node),
Err(err) => {
if failed_to_prepare.is_none() {
failed_to_prepare = Some(node_id.clone());
}
let node_err: NodeError = NodeError {
timestamp: clock.new_timestamp(),
cause: NodeErrorCause::Other {
stderr: format!("preparing for spawn failed: {err:?}"),
},
exit_status: NodeExitStatus::Unknown,
};
let send_result = events_tx.send(node_result(node_id, Err(node_err))).await;
if send_result.is_err() {
tracing::error!("failed to send SpawnNodeResult to main daemon task")
}
}
}
}

// once all nodes are prepared, do the actual spawning
if let Some(failed_node) = failed_to_prepare {
// don't spawn any nodes when an error occurred before
for node in prepared_nodes {
let err = NodeError {
timestamp: clock.new_timestamp(),
cause: NodeErrorCause::Cascading {
caused_by_node: failed_node.clone(),
},
exit_status: NodeExitStatus::Unknown,
};
let send_result = events_tx
.send(node_result(node.node_id().clone(), Err(err)))
.await;
if send_result.is_err() {
tracing::error!("failed to send SpawnNodeResult to main daemon task")
}
}
Err(eyre!("failed to prepare node {failed_node}"))
} else {
let mut spawn_result = Ok(());

logger
.log(
LogLevel::Info,
None,
Some("dora daemon".into()),
"finished building nodes, spawning...",
)
.await;

// spawn the nodes
for node in prepared_nodes {
let node_id = node.node_id().clone();
let mut logger = logger.reborrow().for_node(node_id.clone());
let result = node.spawn(&mut logger).await;
let node_spawn_result = match result {
Ok(node) => Ok(node),
Err(err) => {
let node_err = NodeError {
timestamp: clock.new_timestamp(),
cause: NodeErrorCause::Other {
stderr: format!("spawn failed: {err:?}"),
},
exit_status: NodeExitStatus::Unknown,
};
if spawn_result.is_ok() {
spawn_result = Err(err.wrap_err(format!("failed to spawn {node_id}")));
}
Err(node_err)
}
};
let send_result = events_tx
.send(node_result(node_id, node_spawn_result))
.await;
if send_result.is_err() {
tracing::error!("failed to send SpawnNodeResult to main daemon task")
}
}
spawn_result
}
}

async fn handle_dynamic_node_event(
&mut self,
event: DynamicNodeEventWrapper,


+ 332
- 267
binaries/daemon/src/spawn.rs View File

@@ -22,6 +22,7 @@ use dora_message::{
daemon_to_coordinator::{DataMessage, NodeExitStatus, Timestamped},
daemon_to_node::{NodeConfig, RuntimeConfig},
descriptor::{EnvValue, GitRepoRev},
id::NodeId,
DataflowId,
};
use dora_node_api::{
@@ -60,13 +61,13 @@ pub struct Spawner {
}

impl Spawner {
pub async fn spawn_node(
pub async fn prepare_node(
mut self,
node: ResolvedNode,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
logger: &mut NodeLogger<'_>,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> eyre::Result<impl Future<Output = eyre::Result<RunningNode>>> {
) -> eyre::Result<impl Future<Output = eyre::Result<PreparedNode>>> {
let dataflow_id = self.dataflow_id;
let node_id = node.id.clone();
logger
@@ -115,7 +116,7 @@ impl Spawner {
.await
.wrap_err("failed to clone logger")?;
let task = async move {
self.spawn_node_inner(
self.prepare_node_inner(
node,
&mut logger,
dataflow_id,
@@ -128,22 +129,18 @@ impl Spawner {
Ok(task)
}

async fn spawn_node_inner(
&mut self,
async fn prepare_node_inner(
mut self,
node: ResolvedNode,
logger: &mut NodeLogger<'_>,
dataflow_id: uuid::Uuid,
node_config: NodeConfig,
prepared_git: Option<PreparedGit>,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
) -> Result<RunningNode, eyre::Error> {
let send_stdout_to = node
.send_stdout_as()
.context("Could not resolve `send_stdout_as` configuration")?;

let mut child = match node.kind {
) -> eyre::Result<PreparedNode> {
let (command, error_msg) = match &node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
let command = match &n.source {
let mut command = match &n.source {
dora_message::descriptor::NodeSource::Local => {
if let Some(build) = &n.build {
self.build_node(logger, &node.env, self.working_dir.clone(), build)
@@ -152,63 +149,71 @@ impl Spawner {
if self.build_only {
None
} else {
spawn_command_from_path(&self.working_dir, self.uv, logger, &n, true)
.await?
path_spawn_command(&self.working_dir, self.uv, logger, &n, true).await?
}
}
dora_message::descriptor::NodeSource::GitBranch { repo, rev } => {
self.spawn_git_node(&n, repo, rev, logger, &node.env, prepared_git.unwrap())
.await?
self.git_node_spawn_command(
&n,
repo,
rev,
logger,
&node.env,
prepared_git.unwrap(),
)
.await?
}
};
let Some(mut command) = command else {
return Ok(RunningNode {
pid: None,
node_config,
});
};
if let Some(command) = &mut command {
command.current_dir(&self.working_dir);
command.stdin(Stdio::null());

command.env(
"DORA_NODE_CONFIG",
serde_yaml::to_string(&node_config.clone())
.wrap_err("failed to serialize node config")?,
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = &node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}
if let Some(envs) = &n.envs {
// node has some inner env variables -> add them too
for (key, value) in envs {
command.env(key, value.to_string());
}
}

command.current_dir(&self.working_dir);
command.stdin(Stdio::null());
// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command.env(
"DORA_NODE_CONFIG",
serde_yaml::to_string(&node_config.clone())
.wrap_err("failed to serialize node config")?,
command.env("PYTHONUNBUFFERED", "1");
command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
};

let error_msg = format!(
"failed to run `{}` with args `{}`",
n.path,
n.args.as_deref().unwrap_or_default(),
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}
if let Some(envs) = n.envs {
// node has some inner env variables -> add them too
for (key, value) in envs {
command.env(key, value.to_string());
(command, error_msg)
}
dora_core::descriptor::CoreNodeKind::Runtime(n) => {
// run build commands
for operator in &n.operators {
if let Some(build) = &operator.config.build {
self.build_node(logger, &node.env, self.working_dir.clone(), build)
.await?;
}
}

// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command.env("PYTHONUNBUFFERED", "1");
command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.wrap_err_with(move || {
format!(
"failed to run `{}` with args `{}`",
n.path,
n.args.as_deref().unwrap_or_default(),
)
})?
}
dora_core::descriptor::CoreNodeKind::Runtime(n) => {
let python_operators: Vec<&OperatorDefinition> = n
.operators
.iter()
@@ -220,7 +225,9 @@ impl Spawner {
.iter()
.any(|x| !matches!(x.config.source, OperatorSource::Python { .. }));

let mut command = if !python_operators.is_empty() && !other_operators {
let mut command = if self.build_only {
None
} else if !python_operators.is_empty() && !other_operators {
// Use python to spawn runtime if there is a python operator

// TODO: Handle multi-operator runtime once sub-interpreter is supported
@@ -253,7 +260,7 @@ impl Spawner {
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
command
Some(command)
} else {
let mut cmd = if self.uv {
let mut cmd = tokio::process::Command::new("uv");
@@ -279,7 +286,7 @@ impl Spawner {
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
cmd
Some(cmd)
}
} else if python_operators.is_empty() && other_operators {
let mut cmd = tokio::process::Command::new(
@@ -287,41 +294,256 @@ impl Spawner {
.wrap_err("failed to get current executable path")?,
);
cmd.arg("runtime");
cmd
Some(cmd)
} else {
eyre::bail!("Runtime can not mix Python Operator with other type of operator.");
};
command.current_dir(&self.working_dir);

let runtime_config = RuntimeConfig {
node: node_config.clone(),
operators: n.operators,
operators: n.operators.clone(),
};
command.env(
"DORA_RUNTIME_CONFIG",
serde_yaml::to_string(&runtime_config)
.wrap_err("failed to serialize runtime config")?,

if let Some(command) = &mut command {
command.current_dir(&self.working_dir);

command.env(
"DORA_RUNTIME_CONFIG",
serde_yaml::to_string(&runtime_config)
.wrap_err("failed to serialize runtime config")?,
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = &node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}
// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
};
let error_msg = format!(
"failed to run runtime {}/{}",
runtime_config.node.dataflow_id, runtime_config.node.node_id
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = node.env {
for (key, value) in envs {
command.env(key, value.to_string());
(command, error_msg)
}
};
Ok(PreparedNode {
command,
spawn_error_msg: error_msg,
working_dir: self.working_dir,
dataflow_id,
node,
node_config,
clock: self.clock,
daemon_tx: self.daemon_tx,
node_stderr_most_recent,
})
}

async fn prepare_git_node(
&mut self,
repo_addr: &String,
rev: &Option<GitRepoRev>,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> eyre::Result<PreparedGit> {
let dataflow_id = self.dataflow_id;
let repo_url = Url::parse(repo_addr).context("failed to parse git repository URL")?;
let target_dir = self.working_dir.join("build");

let clone_dir_base = {
let base = {
let mut path =
target_dir.join(repo_url.host_str().context("git URL has no hostname")?);

path.extend(repo_url.path_segments().context("no path in git URL")?);
path
};
match rev {
None => base,
Some(rev) => match rev {
GitRepoRev::Branch(branch) => base.join("branch").join(branch),
GitRepoRev::Tag(tag) => base.join("tag").join(tag),
GitRepoRev::Rev(rev) => base.join("rev").join(rev),
},
}
};
let clone_dir = if clone_dir_exists(&clone_dir_base, repos_in_use) {
let used_by_other_dataflow =
self.used_by_other_dataflow(dataflow_id, &clone_dir_base, repos_in_use);
if used_by_other_dataflow {
// don't reuse, choose new directory
// (TODO reuse if still up to date)

let dir_name = clone_dir_base.file_name().unwrap().to_str().unwrap();
let mut i = 1;
loop {
let new_path = clone_dir_base.with_file_name(format!("{dir_name}-{i}"));
if clone_dir_exists(&new_path, repos_in_use)
&& self.used_by_other_dataflow(dataflow_id, &new_path, repos_in_use)
{
i += 1;
} else {
break new_path;
}
}
// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.wrap_err(format!(
"failed to run runtime {}/{}",
runtime_config.node.dataflow_id, runtime_config.node.node_id
))?
} else {
clone_dir_base
}
} else {
clone_dir_base
};
let clone_dir = dunce::simplified(&clone_dir).to_owned();

let (reuse, checkout) = if clone_dir_exists(&clone_dir, repos_in_use) {
let empty = BTreeSet::new();
let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
if used_by_other_dataflow {
// TODO allow if still up to date
eyre::bail!("clone_dir is already in use by other dataflow")
} else if in_use.is_empty() {
(true, true)
} else {
(true, false)
}
} else {
(false, true)
};
repos_in_use
.entry(clone_dir.clone())
.or_default()
.insert(dataflow_id);

Ok(PreparedGit {
clone_dir,
reuse,
checkout,
})
}

async fn git_node_spawn_command(
&mut self,
node: &dora_core::descriptor::CustomNode,
repo_addr: &String,
rev: &Option<GitRepoRev>,
logger: &mut NodeLogger<'_>,
node_env: &Option<BTreeMap<String, EnvValue>>,
prepared: PreparedGit,
) -> Result<Option<tokio::process::Command>, eyre::Error> {
let PreparedGit {
clone_dir,
reuse,
checkout,
} = prepared;

let rev_str = rev_str(rev);
let refname = rev.clone().map(|rev| match rev {
GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"),
GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"),
GitRepoRev::Rev(rev) => rev,
});

if reuse {
logger
.log(
LogLevel::Info,
None,
format!("reusing {repo_addr}{rev_str}"),
)
.await;
let refname_cloned = refname.clone();
let clone_dir = clone_dir.clone();
let repository = fetch_changes(clone_dir, refname_cloned).await?;
if checkout {
checkout_tree(&repository, refname)?;
}
} else {
let repository = clone_into(repo_addr, rev, &clone_dir, logger).await?;
if checkout {
checkout_tree(&repository, refname)?;
}
};
if let Some(build) = &node.build {
self.build_node(logger, node_env, clone_dir.clone(), build)
.await?;
}
if self.build_only {
Ok(None)
} else {
path_spawn_command(&clone_dir, self.uv, logger, node, true).await
}
}

fn used_by_other_dataflow(
&mut self,
dataflow_id: uuid::Uuid,
clone_dir_base: &PathBuf,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> bool {
let empty = BTreeSet::new();
let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
used_by_other_dataflow
}

async fn build_node(
&mut self,
logger: &mut NodeLogger<'_>,
node_env: &Option<BTreeMap<String, EnvValue>>,
working_dir: PathBuf,
build: &String,
) -> Result<(), eyre::Error> {
logger
.log(
LogLevel::Info,
None,
format!("running build command: `{build}"),
)
.await;
let build = build.to_owned();
let uv = self.uv;
let node_env = node_env.clone();
let task = tokio::task::spawn_blocking(move || {
run_build_command(&build, &working_dir, uv, &node_env).context("build command failed")
});
task.await??;
Ok(())
}
}

pub struct PreparedNode {
command: Option<tokio::process::Command>,
spawn_error_msg: String,
working_dir: PathBuf,
dataflow_id: DataflowId,
node: ResolvedNode,
node_config: NodeConfig,
clock: Arc<HLC>,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
}

impl PreparedNode {
pub fn node_id(&self) -> &NodeId {
&self.node.id
}

pub async fn spawn(mut self, logger: &mut NodeLogger<'_>) -> eyre::Result<RunningNode> {
let mut child = match &mut self.command {
Some(command) => command.spawn().wrap_err(self.spawn_error_msg)?,
None => {
return Ok(RunningNode {
pid: None,
node_config: self.node_config,
})
}
};

@@ -336,22 +558,29 @@ impl Spawner {
)
.await;

let dataflow_dir: PathBuf = self.working_dir.join("out").join(dataflow_id.to_string());
let dataflow_dir: PathBuf = self
.working_dir
.join("out")
.join(self.dataflow_id.to_string());
if !dataflow_dir.exists() {
std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?;
}
let (tx, mut rx) = mpsc::channel(10);
let mut file = File::create(log::log_path(&self.working_dir, &dataflow_id, &node.id))
.await
.expect("Failed to create log file");
let mut file = File::create(log::log_path(
&self.working_dir,
&self.dataflow_id,
&self.node.id,
))
.await
.expect("Failed to create log file");
let mut child_stdout =
tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"));
let running_node = RunningNode {
pid: Some(pid),
node_config,
node_config: self.node_config,
};
let stdout_tx = tx.clone();
let node_id = node.id.clone();
let node_id = self.node.id.clone();
// Stdout listener stream
tokio::spawn(async move {
let mut buffer = String::new();
@@ -411,7 +640,7 @@ impl Spawner {

// Stderr listener stream
let stderr_tx = tx.clone();
let node_id = node.id.clone();
let node_id = self.node.id.clone();
let uhlc = self.clock.clone();
let daemon_tx_log = self.daemon_tx.clone();
tokio::spawn(async move {
@@ -447,7 +676,7 @@ impl Spawner {

buffer.push_str(&new);

node_stderr_most_recent.force_push(new);
self.node_stderr_most_recent.force_push(new);

// send the buffered lines
let lines = std::mem::take(&mut buffer);
@@ -458,10 +687,11 @@ impl Spawner {
}
});

let node_id = node.id.clone();
let node_id = self.node.id.clone();
let (log_finish_tx, log_finish_rx) = oneshot::channel();
let clock = self.clock.clone();
let daemon_tx = self.daemon_tx.clone();
let dataflow_id = self.dataflow_id;
tokio::spawn(async move {
let exit_status = NodeExitStatus::from(child.wait().await);
let _ = log_finish_rx.await;
@@ -478,7 +708,7 @@ impl Spawner {
let _ = daemon_tx.send(event).await;
});

let node_id = node.id.clone();
let node_id = self.node.id.clone();
let daemon_id = logger.inner().inner().daemon_id().clone();
let mut cloned_logger = logger
.inner()
@@ -488,6 +718,11 @@ impl Spawner {
.await
.context("failed to clone logger")?;

let send_stdout_to = self
.node
.send_stdout_as()
.context("Could not resolve `send_stdout_as` configuration")?;

// Log to file stream.
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
@@ -557,176 +792,6 @@ impl Spawner {
});
Ok(running_node)
}

async fn prepare_git_node(
&mut self,
repo_addr: &String,
rev: &Option<GitRepoRev>,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> eyre::Result<PreparedGit> {
let dataflow_id = self.dataflow_id;
let repo_url = Url::parse(repo_addr).context("failed to parse git repository URL")?;
let target_dir = self.working_dir.join("build");

let clone_dir_base = {
let base = {
let mut path =
target_dir.join(repo_url.host_str().context("git URL has no hostname")?);

path.extend(repo_url.path_segments().context("no path in git URL")?);
path
};
match rev {
None => base,
Some(rev) => match rev {
GitRepoRev::Branch(branch) => base.join("branch").join(branch),
GitRepoRev::Tag(tag) => base.join("tag").join(tag),
GitRepoRev::Rev(rev) => base.join("rev").join(rev),
},
}
};
let clone_dir = if clone_dir_exists(&clone_dir_base, repos_in_use) {
let used_by_other_dataflow =
self.used_by_other_dataflow(dataflow_id, &clone_dir_base, repos_in_use);
if used_by_other_dataflow {
// don't reuse, choose new directory
// (TODO reuse if still up to date)

let dir_name = clone_dir_base.file_name().unwrap().to_str().unwrap();
let mut i = 1;
loop {
let new_path = clone_dir_base.with_file_name(format!("{dir_name}-{i}"));
if clone_dir_exists(&new_path, repos_in_use)
&& self.used_by_other_dataflow(dataflow_id, &new_path, repos_in_use)
{
i += 1;
} else {
break new_path;
}
}
} else {
clone_dir_base
}
} else {
clone_dir_base
};
let clone_dir = dunce::simplified(&clone_dir).to_owned();

let (reuse, checkout) = if clone_dir_exists(&clone_dir, repos_in_use) {
let empty = BTreeSet::new();
let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
if used_by_other_dataflow {
// TODO allow if still up to date
eyre::bail!("clone_dir is already in use by other dataflow")
} else if in_use.is_empty() {
(true, true)
} else {
(true, false)
}
} else {
(false, true)
};
repos_in_use
.entry(clone_dir.clone())
.or_default()
.insert(dataflow_id);

Ok(PreparedGit {
clone_dir,
reuse,
checkout,
})
}

async fn spawn_git_node(
&mut self,
node: &dora_core::descriptor::CustomNode,
repo_addr: &String,
rev: &Option<GitRepoRev>,
logger: &mut NodeLogger<'_>,
node_env: &Option<BTreeMap<String, EnvValue>>,
prepared: PreparedGit,
) -> Result<Option<tokio::process::Command>, eyre::Error> {
let PreparedGit {
clone_dir,
reuse,
checkout,
} = prepared;

let rev_str = rev_str(rev);
let refname = rev.clone().map(|rev| match rev {
GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"),
GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"),
GitRepoRev::Rev(rev) => rev,
});

if reuse {
logger
.log(
LogLevel::Info,
None,
format!("reusing {repo_addr}{rev_str}"),
)
.await;
let refname_cloned = refname.clone();
let clone_dir = clone_dir.clone();
let repository = fetch_changes(clone_dir, refname_cloned).await?;
if checkout {
checkout_tree(&repository, refname)?;
}
} else {
let repository = clone_into(repo_addr, rev, &clone_dir, logger).await?;
if checkout {
checkout_tree(&repository, refname)?;
}
};
if let Some(build) = &node.build {
self.build_node(logger, node_env, clone_dir.clone(), build)
.await?;
}
if self.build_only {
Ok(None)
} else {
spawn_command_from_path(&clone_dir, self.uv, logger, node, true).await
}
}

async fn build_node(
&mut self,
logger: &mut NodeLogger<'_>,
node_env: &Option<BTreeMap<String, EnvValue>>,
working_dir: PathBuf,
build: &String,
) -> Result<(), eyre::Error> {
logger
.log(
LogLevel::Info,
None,
format!("running build command: `{build}"),
)
.await;
let build = build.to_owned();
let uv = self.uv;
let node_env = node_env.clone();
let task = tokio::task::spawn_blocking(move || {
run_build_command(&build, &working_dir, uv, &node_env).context("build command failed")
});
task.await??;
Ok(())
}

fn used_by_other_dataflow(
&mut self,
dataflow_id: uuid::Uuid,
clone_dir_base: &PathBuf,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> bool {
let empty = BTreeSet::new();
let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
used_by_other_dataflow
}
}

fn rev_str(rev: &Option<GitRepoRev>) -> String {
@@ -832,7 +897,7 @@ fn checkout_tree(repository: &git2::Repository, refname: Option<String>) -> eyre
Ok(())
}

async fn spawn_command_from_path(
async fn path_spawn_command(
working_dir: &Path,
uv: bool,
logger: &mut NodeLogger<'_>,


Loading…
Cancel
Save