Philipp Oppermann 7 months ago
parent
commit
8781c8a8c5
Failed to extract signature
3 changed files with 57 additions and 226 deletions
  1. +3
    -7
      binaries/daemon/src/build/git.rs
  2. +20
    -219
      binaries/daemon/src/build/mod.rs
  3. +34
    -0
      binaries/daemon/src/log.rs

+ 3
- 7
binaries/daemon/src/build/git.rs View File

@@ -1,4 +1,4 @@
use crate::log::NodeLogger;
use crate::log::NodeBuildLogger;
use dora_message::{common::LogLevel, descriptor::GitRepoRev, BuildId, DataflowId};
use eyre::{ContextCompat, WrapErr};
use git2::FetchOptions;
@@ -139,7 +139,7 @@ pub struct GitFolder {
}

impl GitFolder {
pub async fn prepare(self, logger: &mut NodeLogger<'_>) -> eyre::Result<PathBuf> {
pub async fn prepare(self, logger: &mut NodeBuildLogger<'_>) -> eyre::Result<PathBuf> {
let GitFolder { reuse } = self;

let clone_dir = match reuse {
@@ -164,7 +164,6 @@ impl GitFolder {
logger
.log(
LogLevel::Info,
None,
format!("fetching changes after copying {}", from.display()),
)
.await;
@@ -185,7 +184,6 @@ impl GitFolder {
logger
.log(
LogLevel::Info,
None,
format!("fetching changes after renaming {}", from.display()),
)
.await;
@@ -198,7 +196,6 @@ impl GitFolder {
logger
.log(
LogLevel::Info,
None,
format!("reusing up-to-date {}", dir.display()),
)
.await;
@@ -255,7 +252,7 @@ fn rev_str(rev: &Option<GitRepoRev>) -> String {
async fn clone_into(
repo_addr: Url,
clone_dir: &Path,
logger: &mut NodeLogger<'_>,
logger: &mut NodeBuildLogger<'_>,
) -> eyre::Result<git2::Repository> {
if let Some(parent) = clone_dir.parent() {
tokio::fs::create_dir_all(parent)
@@ -266,7 +263,6 @@ async fn clone_into(
logger
.log(
LogLevel::Info,
None,
format!("cloning {repo_addr} into {}", clone_dir.display()),
)
.await;


+ 20
- 219
binaries/daemon/src/build/mod.rs View File

@@ -1,30 +1,23 @@
pub use git::GitManager;
use url::Url;

use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
path::PathBuf,
sync::Arc,
};
use std::{collections::BTreeMap, future::Future, path::PathBuf, sync::Arc};

use dora_core::{
build::run_build_command,
descriptor::{CustomNode, Descriptor, ResolvedNode},
descriptor::{Descriptor, ResolvedNode},
uhlc::HLC,
};
use dora_message::{
common::{LogLevel, Timestamped},
coordinator_to_daemon::GitSource,
daemon_to_node::NodeConfig,
descriptor::{EnvValue, GitRepoRev, ResolvedNodeSource},
id::NodeId,
BuildId, DataflowId,
descriptor::EnvValue,
BuildId,
};
use eyre::Context;
use tokio::sync::mpsc;

use crate::{build::git::GitFolder, log::DaemonLogger, Event};
use crate::{build::git::GitFolder, log::NodeBuildLogger, Event};

mod git;

@@ -45,18 +38,10 @@ impl Builder {
self,
node: ResolvedNode,
git: Option<GitSource>,
logger: &mut DaemonLogger,
logger: &mut NodeBuildLogger<'_>,
git_manager: &mut GitManager,
) -> eyre::Result<impl Future<Output = eyre::Result<PreparedNode>>> {
let build_id = self.build_id;
logger
.log_build(
build_id,
LogLevel::Debug,
Some(node.id.clone()),
"building node",
)
.await;
logger.log(LogLevel::Debug, "building node").await;

let prepared_git = if let Some(GitSource { repo, commit_hash }) = git {
let repo_url = Url::parse(&repo).context("failed to parse git repository URL")?;
@@ -78,214 +63,44 @@ impl Builder {
.await
.wrap_err("failed to clone logger")?;
let task = async move {
self.prepare_node_inner(node, &mut logger, build_id, prepared_git)
self.prepare_node_inner(node, &mut logger, prepared_git)
.await
};
Ok(task)
}

async fn prepare_node_inner(
mut self,
self,
node: ResolvedNode,
logger: &mut DaemonLogger,
build_id: uuid::Uuid,
logger: &mut NodeBuildLogger<'_>,
git_folder: Option<GitFolder>,
) -> eyre::Result<PreparedNode> {
let (command, error_msg) = match &node.kind {
let node_working_dir = match &node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
let build_dir = match git_folder {
let node_working_dir = match git_folder {
Some(git_folder) => git_folder.prepare(logger).await?,
None => self.working_dir.clone(),
};

if let Some(build) = &n.build {
self.build_node(logger, &node.env, build_dir.clone(), build)
.await?;
build_node(logger, &node.env, node_working_dir.clone(), build, self.uv).await?;
}
let mut command = if self.build_only {
None
} else {
path_spawn_command(&build_dir, self.uv, logger, n, true).await?
};

if let Some(command) = &mut command {
command.current_dir(&self.working_dir);
command.stdin(Stdio::null());

command.env(
"DORA_NODE_CONFIG",
serde_yaml::to_string(&node_config.clone())
.wrap_err("failed to serialize node config")?,
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = &node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}
if let Some(envs) = &n.envs {
// node has some inner env variables -> add them too
for (key, value) in envs {
command.env(key, value.to_string());
}
}

// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command.env("PYTHONUNBUFFERED", "1");
command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
};

let error_msg = format!(
"failed to run `{}` with args `{}`",
n.path,
n.args.as_deref().unwrap_or_default(),
);
(command, error_msg)
node_working_dir
}
dora_core::descriptor::CoreNodeKind::Runtime(n) => {
// run build commands
for operator in &n.operators {
if let Some(build) = &operator.config.build {
self.build_node(logger, &node.env, self.working_dir.clone(), build)
build_node(logger, &node.env, self.working_dir.clone(), build, self.uv)
.await?;
}
}

let python_operators: Vec<&OperatorDefinition> = n
.operators
.iter()
.filter(|x| matches!(x.config.source, OperatorSource::Python { .. }))
.collect();

let other_operators = n
.operators
.iter()
.any(|x| !matches!(x.config.source, OperatorSource::Python { .. }));

let mut command = if self.build_only {
None
} else if !python_operators.is_empty() && !other_operators {
// Use python to spawn runtime if there is a python operator

// TODO: Handle multi-operator runtime once sub-interpreter is supported
if python_operators.len() > 2 {
eyre::bail!(
"Runtime currently only support one Python Operator.
This is because pyo4 sub-interpreter is not yet available.
See: https://github.com/PyO4/pyo3/issues/576"
);
}

let python_operator = python_operators
.first()
.context("Runtime had no operators definition.")?;

if let OperatorSource::Python(PythonSource {
source: _,
conda_env: Some(conda_env),
}) = &python_operator.config.source
{
let conda = which::which("conda").context(
"failed to find `conda`, yet a `conda_env` was defined. Make sure that `conda` is available.",
)?;
let mut command = tokio::process::Command::new(conda);
command.args([
"run",
"-n",
conda_env,
"python",
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
Some(command)
} else {
let mut cmd = if self.uv {
let mut cmd = tokio::process::Command::new("uv");
cmd.arg("run");
cmd.arg("python");
tracing::info!(
"spawning: uv run python -uc import dora; dora.start_runtime() # {}",
node.id
);
cmd
} else {
let python = get_python_path()
.wrap_err("Could not find python path when spawning custom node")?;
tracing::info!(
"spawning: python -uc import dora; dora.start_runtime() # {}",
node.id
);

tokio::process::Command::new(python)
};
// Force python to always flush stdout/stderr buffer
cmd.args([
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
Some(cmd)
}
} else if python_operators.is_empty() && other_operators {
let mut cmd = tokio::process::Command::new(
std::env::current_exe()
.wrap_err("failed to get current executable path")?,
);
cmd.arg("runtime");
Some(cmd)
} else {
eyre::bail!("Runtime can not mix Python Operator with other type of operator.");
};

let runtime_config = RuntimeConfig {
node: node_config.clone(),
operators: n.operators.clone(),
};

if let Some(command) = &mut command {
command.current_dir(&self.working_dir);

command.env(
"DORA_RUNTIME_CONFIG",
serde_yaml::to_string(&runtime_config)
.wrap_err("failed to serialize runtime config")?,
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = &node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}
// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
};
let error_msg = format!(
"failed to run runtime {}/{}",
runtime_config.node.dataflow_id, runtime_config.node.node_id
);
(command, error_msg)
self.working_dir.clone()
}
};
Ok(PreparedNode {
command,
spawn_error_msg: error_msg,
working_dir: self.working_dir,
dataflow_id,
node_working_dir,
node,
node_config,
clock: self.clock,
daemon_tx: self.daemon_tx,
})
@@ -293,21 +108,14 @@ impl Builder {
}

pub async fn build_node(
node_id: NodeId,
logger: &mut DaemonLogger<'_>,
logger: &mut NodeBuildLogger<'_>,
node_env: &Option<BTreeMap<String, EnvValue>>,
working_dir: PathBuf,
build: &String,
uv: bool,
) -> eyre::Result<()> {
logger
.log(
LogLevel::Info,
None,
Some(node_id),
Some("build".to_owned()),
format!("running build command: `{build}"),
)
.log(LogLevel::Info, format!("running build command: `{build}"))
.await;
let build = build.to_owned();
let node_env = node_env.clone();
@@ -322,9 +130,6 @@ pub async fn build_node(
logger
.log(
LogLevel::Info,
None,
Some(node_id),
Some("build command".into()),
line.unwrap_or_else(|err| format!("io err: {}", err.kind())),
)
.await;
@@ -335,12 +140,8 @@ pub async fn build_node(
}

pub struct PreparedNode {
command: Option<tokio::process::Command>,
spawn_error_msg: String,
working_dir: PathBuf,
dataflow_id: DataflowId,
node_working_dir: PathBuf,
node: ResolvedNode,
node_config: NodeConfig,
clock: Arc<HLC>,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
}

+ 34
- 0
binaries/daemon/src/log.rs View File

@@ -94,6 +94,32 @@ impl<'a> DataflowLogger<'a> {
}
}

pub struct NodeBuildLogger<'a> {
build_id: BuildId,
node_id: NodeId,
logger: CowMut<'a, DaemonLogger>,
}

impl NodeBuildLogger<'_> {
pub fn inner(&self) -> &DaemonLogger {
&self.logger
}

pub async fn log(&mut self, level: LogLevel, message: impl Into<String>) {
self.logger
.log_build(self.build_id, level, Some(self.node_id.clone()), message)
.await
}

pub async fn try_clone(&self) -> eyre::Result<NodeBuildLogger<'static>> {
Ok(NodeBuildLogger {
build_id: self.build_id,
node_id: self.node_id.clone(),
logger: CowMut::Owned(self.logger.try_clone().await?),
})
}
}

pub struct DaemonLogger {
daemon_id: DaemonId,
logger: Logger,
@@ -107,6 +133,14 @@ impl DaemonLogger {
}
}

pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger {
NodeBuildLogger {
build_id,
node_id,
logger: CowMut::Borrowed(self),
}
}

pub fn inner(&self) -> &Logger {
&self.logger
}


Loading…
Cancel
Save