Browse Source

Report spawn result asynchronously to avoid long blocking

tags/v0.3.12-rc0
Philipp Oppermann 9 months ago
parent
commit
e67472f994
Failed to extract signature
2 changed files with 182 additions and 49 deletions
  1. +84
    -10
      binaries/daemon/src/lib.rs
  2. +98
    -39
      binaries/daemon/src/spawn.rs

+ 84
- 10
binaries/daemon/src/lib.rs View File

@@ -38,6 +38,7 @@ use socket_stream_utils::socket_stream_send;
use spawn::Spawner;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
future::Future,
net::SocketAddr,
path::{Path, PathBuf},
pin::pin,
@@ -54,6 +55,7 @@ use tokio::{
mpsc::{self, UnboundedSender},
oneshot::{self, Sender},
},
task::JoinSet,
};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tracing::{error, warn};
@@ -384,6 +386,24 @@ impl Daemon {
Event::DaemonError(err) => {
tracing::error!("Daemon error: {err:?}");
}
Event::SpawnNodeResult {
dataflow_id,
node_id,
result,
} => match result {
Ok(running_node) => {
if let Some(dataflow) = self.running.get_mut(&dataflow_id) {
dataflow.running_nodes.insert(node_id, running_node);
}
}
Err(error) => {
self.dataflow_node_results
.entry(dataflow_id)
.or_default()
.insert(node_id.clone(), Err(error));
self.handle_node_stop(dataflow_id, &node_id).await?;
}
},
}
}

@@ -441,10 +461,17 @@ impl Daemon {
if let Err(err) = &result {
tracing::error!("{err:?}");
}
let reply =
DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}")));
let _ = reply_tx.send(Some(reply)).map_err(|_| {
error!("could not send `SpawnResult` reply from daemon to coordinator")
tokio::spawn(async move {
let result = match result {
Err(err) => Err(err),
Ok(task) => task.await,
};
let reply = DaemonCoordinatorReply::SpawnResult(
result.map_err(|err| format!("{err:?}")),
);
let _ = reply_tx.send(Some(reply)).map_err(|_| {
error!("could not send `SpawnResult` reply from daemon to coordinator")
});
});
RunStatus::Continue
}
@@ -685,7 +712,7 @@ impl Daemon {
dataflow_descriptor: Descriptor,
spawn_nodes: BTreeSet<NodeId>,
uv: bool,
) -> eyre::Result<()> {
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
let mut logger = self.logger.for_dataflow(dataflow_id);
let dataflow =
RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
@@ -737,7 +764,7 @@ impl Daemon {
}
}

let mut spawner = Spawner {
let spawner = Spawner {
dataflow_id,
working_dir,
daemon_tx: self.events_tx.clone(),
@@ -746,6 +773,8 @@ impl Daemon {
uv,
};

let mut tasks = JoinSet::new();

// spawn nodes and set up subscriptions
for node in nodes.into_values() {
let mut logger = logger.reborrow().for_node(node.id.clone());
@@ -767,6 +796,7 @@ impl Daemon {
.log(LogLevel::Info, Some("daemon".into()), "spawning")
.await;
match spawner
.clone()
.spawn_node(
node,
node_stderr_most_recent,
@@ -776,8 +806,43 @@ impl Daemon {
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
{
Ok(running_node) => {
dataflow.running_nodes.insert(node_id, running_node);
Ok(spawn_task) => {
let events_tx = self.events_tx.clone();
let clock = self.clock.clone();
tasks.spawn(async move {
let result = spawn_task.await.unwrap_or_else(|err| {
Err(eyre!("failed to join spawn task: {err}"))
});
let (node_spawn_result, success) = match result {
Ok(node) => (Ok(node), Ok(())),
Err(err) => {
let node_err = NodeError {
timestamp: clock.new_timestamp(),
cause: NodeErrorCause::Other {
stderr: format!("spawn failed: {err:?}"),
},
exit_status: NodeExitStatus::Unknown,
};
(Err(node_err), Err(err))
}
};
let send_result = events_tx
.send(Timestamped {
inner: Event::SpawnNodeResult {
dataflow_id,
node_id,
result: node_spawn_result,
},
timestamp: clock.new_timestamp(),
})
.await;
if send_result.is_err() {
tracing::error!(
"failed to send SpawnNodeResult to main daemon task"
)
}
success
});
}
Err(err) => {
logger
@@ -858,7 +923,11 @@ impl Daemon {
self.handle_node_stop(dataflow_id, &node_id).await?;
}

Ok(())
let spawn_result = async move {
let result: eyre::Result<()> = tasks.join_all().await.into_iter().collect();
result
};
Ok(spawn_result)
}

async fn handle_dynamic_node_event(
@@ -1713,7 +1782,7 @@ fn close_input(
}

#[derive(Debug)]
struct RunningNode {
pub struct RunningNode {
pid: Option<ProcessId>,
node_config: NodeConfig,
}
@@ -2017,6 +2086,11 @@ pub enum Event {
CtrlC,
SecondCtrlC,
DaemonError(eyre::Report),
SpawnNodeResult {
dataflow_id: DataflowId,
node_id: NodeId,
result: Result<RunningNode, NodeError>,
},
}

impl From<DoraEvent> for Event {


+ 98
- 39
binaries/daemon/src/spawn.rs View File

@@ -10,8 +10,8 @@ use dora_core::{
build::run_build_command,
config::DataId,
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource,
ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE,
resolve_path, source_is_url, CustomNode, Descriptor, OperatorDefinition, OperatorSource,
PythonSource, ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE,
},
get_python_path,
uhlc::HLC,
@@ -58,12 +58,12 @@ pub struct Spawner {

impl Spawner {
pub async fn spawn_node(
&mut self,
mut self,
node: ResolvedNode,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
logger: &mut NodeLogger<'_>,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> eyre::Result<RunningNode> {
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<RunningNode>>> {
let dataflow_id = self.dataflow_id;
let node_id = node.id.clone();
logger
@@ -87,9 +87,6 @@ impl Spawner {
self.clock.clone(),
)
.await?;
let send_stdout_to = node
.send_stdout_as()
.context("Could not resolve `send_stdout_as` configuration")?;

let node_config = NodeConfig {
dataflow_id,
@@ -100,6 +97,47 @@ impl Spawner {
dynamic: node.kind.dynamic(),
};

let prepared_git = if let dora_core::descriptor::CoreNodeKind::Custom(CustomNode {
source: dora_message::descriptor::NodeSource::GitBranch { repo, rev },
..
}) = &node.kind
{
Some(self.prepare_git_node(repo, rev, repos_in_use).await?)
} else {
None
};

let mut logger = logger
.try_clone()
.await
.wrap_err("failed to clone logger")?;
let task = async move {
self.spawn_node_inner(
node,
&mut logger,
dataflow_id,
node_config,
prepared_git,
node_stderr_most_recent,
)
.await
};
Ok(tokio::spawn(task))
}

async fn spawn_node_inner(
&mut self,
node: ResolvedNode,
logger: &mut NodeLogger<'_>,
dataflow_id: uuid::Uuid,
node_config: NodeConfig,
prepared_git: Option<PreparedGit>,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
) -> Result<RunningNode, eyre::Error> {
let send_stdout_to = node
.send_stdout_as()
.context("Could not resolve `send_stdout_as` configuration")?;

let mut child = match node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
let command = match &n.source {
@@ -112,7 +150,7 @@ impl Spawner {
.await?
}
dora_message::descriptor::NodeSource::GitBranch { repo, rev } => {
self.spawn_git_node(&n, repo, rev, logger, &node.env, repos_in_use)
self.spawn_git_node(&n, repo, rev, logger, &node.env, prepared_git.unwrap())
.await?
}
};
@@ -442,6 +480,7 @@ impl Spawner {
.try_clone()
.await
.context("failed to clone logger")?;

// Log to file stream.
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
@@ -512,24 +551,16 @@ impl Spawner {
Ok(running_node)
}

async fn spawn_git_node(
async fn prepare_git_node(
&mut self,
node: &dora_core::descriptor::CustomNode,
repo_addr: &String,
rev: &Option<GitRepoRev>,
logger: &mut NodeLogger<'_>,
node_env: &Option<BTreeMap<String, EnvValue>>,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> Result<Option<tokio::process::Command>, eyre::Error> {
) -> eyre::Result<PreparedGit> {
let dataflow_id = self.dataflow_id;
let repo_url = Url::parse(repo_addr).context("failed to parse git repository URL")?;
let target_dir = self.working_dir.join("build");
let rev_str = rev_str(rev);
let refname = rev.clone().map(|rev| match rev {
GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"),
GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"),
GitRepoRev::Rev(rev) => rev,
});

let clone_dir_base = {
let base = {
let mut path =
@@ -574,7 +605,7 @@ impl Spawner {
};
let clone_dir = dunce::simplified(&clone_dir).to_owned();

if clone_dir.exists() {
let reuse = if clone_dir.exists() {
let empty = BTreeSet::new();
let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
@@ -582,27 +613,50 @@ impl Spawner {
// TODO allow if still up to date
eyre::bail!("clone_dir is already in use by other dataflow")
} else {
repos_in_use
.entry(clone_dir.clone())
.or_default()
.insert(dataflow_id);
logger
.log(
LogLevel::Info,
None,
format!("reusing {repo_addr}{rev_str}"),
)
.await;
let refname_cloned = refname.clone();
let clone_dir = clone_dir.clone();
let repository = fetch_changes(clone_dir, refname_cloned).await?;
checkout_tree(&repository, refname)?;
true
}
} else {
repos_in_use
.entry(clone_dir.clone())
.or_default()
.insert(dataflow_id);
false
};
repos_in_use
.entry(clone_dir.clone())
.or_default()
.insert(dataflow_id);

Ok(PreparedGit { clone_dir, reuse })
}

async fn spawn_git_node(
&mut self,
node: &dora_core::descriptor::CustomNode,
repo_addr: &String,
rev: &Option<GitRepoRev>,
logger: &mut NodeLogger<'_>,
node_env: &Option<BTreeMap<String, EnvValue>>,
prepared: PreparedGit,
) -> Result<Option<tokio::process::Command>, eyre::Error> {
let PreparedGit { clone_dir, reuse } = prepared;

let rev_str = rev_str(rev);
let refname = rev.clone().map(|rev| match rev {
GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"),
GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"),
GitRepoRev::Rev(rev) => rev,
});

if reuse {
logger
.log(
LogLevel::Info,
None,
format!("reusing {repo_addr}{rev_str}"),
)
.await;
let refname_cloned = refname.clone();
let clone_dir = clone_dir.clone();
let repository = fetch_changes(clone_dir, refname_cloned).await?;
checkout_tree(&repository, refname)?;
} else {
let repository = clone_into(repo_addr, rev, &clone_dir, logger).await?;
checkout_tree(&repository, refname)?;
};
@@ -849,3 +903,8 @@ async fn spawn_command_from_path(

Ok(Some(cmd))
}

struct PreparedGit {
clone_dir: PathBuf,
reuse: bool,
}

Loading…
Cancel
Save