Browse Source

Move `repos_in_use` to parameter list to make `Spawner` clonable

tags/v0.3.12-rc0
Philipp Oppermann 9 months ago
parent
commit
dd2ce5ed87
Failed to extract signature
2 changed files with 22 additions and 12 deletions
  1. +6
    -2
      binaries/daemon/src/lib.rs
  2. +16
    -10
      binaries/daemon/src/spawn.rs

+ 6
- 2
binaries/daemon/src/lib.rs View File

@@ -744,7 +744,6 @@ impl Daemon {
dataflow_descriptor,
clock: self.clock.clone(),
uv,
repos_in_use: &mut self.repos_in_use,
};

// spawn nodes and set up subscriptions
@@ -768,7 +767,12 @@ impl Daemon {
.log(LogLevel::Info, Some("daemon".into()), "spawning")
.await;
match spawner
.spawn_node(node, node_stderr_most_recent, &mut logger)
.spawn_node(
node,
node_stderr_most_recent,
&mut logger,
&mut self.repos_in_use,
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
{


+ 16
- 10
binaries/daemon/src/spawn.rs View File

@@ -45,7 +45,8 @@ use tokio::{
use tracing::error;
use url::Url;

pub struct Spawner<'a> {
#[derive(Clone)]
pub struct Spawner {
pub dataflow_id: DataflowId,
pub working_dir: PathBuf,
pub daemon_tx: mpsc::Sender<Timestamped<Event>>,
@@ -53,15 +54,15 @@ pub struct Spawner<'a> {
/// clock is required for generating timestamps when dropping messages early because queue is full
pub clock: Arc<HLC>,
pub uv: bool,
pub repos_in_use: &'a mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
}

impl Spawner<'_> {
impl Spawner {
pub async fn spawn_node(
&mut self,
node: ResolvedNode,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
logger: &mut NodeLogger<'_>,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> eyre::Result<RunningNode> {
let dataflow_id = self.dataflow_id;
let node_id = node.id.clone();
@@ -111,7 +112,7 @@ impl Spawner<'_> {
.await?
}
dora_message::descriptor::NodeSource::GitBranch { repo, rev } => {
self.spawn_git_node(&n, repo, rev, logger, &node.env)
self.spawn_git_node(&n, repo, rev, logger, &node.env, repos_in_use)
.await?
}
};
@@ -518,6 +519,7 @@ impl Spawner<'_> {
rev: &Option<GitRepoRev>,
logger: &mut NodeLogger<'_>,
node_env: &Option<BTreeMap<String, EnvValue>>,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> Result<Option<tokio::process::Command>, eyre::Error> {
let dataflow_id = self.dataflow_id;
let repo_url = Url::parse(repo_addr).context("failed to parse git repository URL")?;
@@ -546,7 +548,8 @@ impl Spawner<'_> {
}
};
let clone_dir = if clone_dir_base.exists() {
let used_by_other_dataflow = self.used_by_other_dataflow(dataflow_id, &clone_dir_base);
let used_by_other_dataflow =
self.used_by_other_dataflow(dataflow_id, &clone_dir_base, repos_in_use);
if used_by_other_dataflow {
// don't reuse, choose new directory
// (TODO reuse if still up to date)
@@ -555,7 +558,9 @@ impl Spawner<'_> {
let mut i = 1;
loop {
let new_path = clone_dir_base.with_file_name(format!("{dir_name}-{i}"));
if new_path.exists() && self.used_by_other_dataflow(dataflow_id, &new_path) {
if new_path.exists()
&& self.used_by_other_dataflow(dataflow_id, &new_path, repos_in_use)
{
i += 1;
} else {
break new_path;
@@ -571,13 +576,13 @@ impl Spawner<'_> {

if clone_dir.exists() {
let empty = BTreeSet::new();
let in_use = self.repos_in_use.get(&clone_dir).unwrap_or(&empty);
let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
if used_by_other_dataflow {
// TODO allow if still up to date
eyre::bail!("clone_dir is already in use by other dataflow")
} else {
self.repos_in_use
repos_in_use
.entry(clone_dir.clone())
.or_default()
.insert(dataflow_id);
@@ -594,7 +599,7 @@ impl Spawner<'_> {
checkout_tree(&repository, refname)?;
}
} else {
self.repos_in_use
repos_in_use
.entry(clone_dir.clone())
.or_default()
.insert(dataflow_id);
@@ -636,9 +641,10 @@ impl Spawner<'_> {
&mut self,
dataflow_id: uuid::Uuid,
clone_dir_base: &PathBuf,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> bool {
let empty = BTreeSet::new();
let in_use = self.repos_in_use.get(clone_dir_base).unwrap_or(&empty);
let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
used_by_other_dataflow
}


Loading…
Cancel
Save