Browse Source

Add support for git repo sources for nodes

tags/test-git
Philipp Oppermann 10 months ago
parent
commit
c9b2d92ac4
Failed to extract signature
7 changed files with 262 additions and 120 deletions
  1. +31
    -0
      Cargo.lock
  2. +1
    -0
      binaries/daemon/Cargo.toml
  3. +4
    -1
      binaries/daemon/src/lib.rs
  4. +140
    -93
      binaries/daemon/src/spawn.rs
  5. +36
    -9
      libraries/core/src/descriptor/mod.rs
  6. +23
    -16
      libraries/core/src/descriptor/validate.rs
  7. +27
    -1
      libraries/message/src/descriptor.rs

+ 31
- 0
Cargo.lock View File

@@ -2729,6 +2729,7 @@ dependencies = [
"flume 0.10.14",
"futures",
"futures-concurrency",
"git2",
"serde_json",
"serde_yaml 0.8.26",
"shared-memory-server",
@@ -4120,6 +4121,8 @@ dependencies = [
"libc",
"libgit2-sys",
"log",
"openssl-probe",
"openssl-sys",
"url",
]

@@ -5371,7 +5374,9 @@ checksum = "ee4126d8b4ee5c9d9ea891dd875cfdc1e9d0950437179104b183d7d8a74d24e8"
dependencies = [
"cc",
"libc",
"libssh2-sys",
"libz-sys",
"openssl-sys",
"pkg-config",
]

@@ -5422,6 +5427,20 @@ dependencies = [
"libc",
]

[[package]]
name = "libssh2-sys"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dc8a030b787e2119a731f1951d6a773e2280c660f8ec4b0f5e1505a386e71ee"
dependencies = [
"cc",
"libc",
"libz-sys",
"openssl-sys",
"pkg-config",
"vcpkg",
]

[[package]]
name = "libz-sys"
version = "1.1.18"
@@ -6586,6 +6605,18 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"

[[package]]
name = "openssl-sys"
version = "0.9.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb61ea9811cc39e3c2069f40b8b8e2e70d8569b361f879786cc7ed48b777cdd"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]

[[package]]
name = "opentelemetry"
version = "0.18.0"


+ 1
- 0
binaries/daemon/Cargo.toml View File

@@ -44,3 +44,4 @@ sysinfo = "0.30.11"
crossbeam = "0.8.4"
crossbeam-skiplist = "0.1.3"
zenoh = "1.1.1"
git2 = "0.18.0"

+ 4
- 1
binaries/daemon/src/lib.rs View File

@@ -21,6 +21,7 @@ use dora_message::{
},
daemon_to_daemon::InterDaemonEvent,
daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent},
descriptor::NodeSource,
metadata::{self, ArrowTypeInfo},
node_to_daemon::{DynamicNodeEvent, Timestamped},
DataflowId,
@@ -2176,7 +2177,9 @@ impl CoreNodeKindExt for CoreNodeKind {
fn dynamic(&self) -> bool {
match self {
CoreNodeKind::Runtime(_n) => false,
CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE,
CoreNodeKind::Custom(n) => {
matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE
}
}
}
}

+ 140
- 93
binaries/daemon/src/spawn.rs View File

@@ -20,6 +20,7 @@ use dora_message::{
common::{LogLevel, LogMessage},
daemon_to_coordinator::{DataMessage, NodeExitStatus, Timestamped},
daemon_to_node::{NodeConfig, RuntimeConfig},
descriptor::GitRepoRev,
DataflowId,
};
use dora_node_api::{
@@ -89,103 +90,52 @@ pub async fn spawn_node(

let mut child = match node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
let mut command = match n.source.as_str() {
DYNAMIC_SOURCE => {
return Ok(RunningNode {
pid: None,
node_config,
});
}
SHELL_SOURCE => {
if cfg!(target_os = "windows") {
let mut cmd = tokio::process::Command::new("cmd");
cmd.args(["/C", &n.args.clone().unwrap_or_default()]);
cmd
} else {
let mut cmd = tokio::process::Command::new("sh");
cmd.args(["-c", &n.args.clone().unwrap_or_default()]);
cmd
}
let command = match &n.source {
dora_message::descriptor::NodeSource::Local => {
spawn_command_from_path(working_dir, uv, logger, &n, true).await?
}
source => {
let resolved_path = if source_is_url(source) {
// try to download the shared library
let target_dir = Path::new("build");
download_file(source, target_dir)
.await
.wrap_err("failed to download custom node")?
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
})?
};

// If extension is .py, use python to run the script
let mut cmd = match resolved_path.extension().map(|ext| ext.to_str()) {
Some(Some("py")) => {
let mut cmd = if uv {
let mut cmd = tokio::process::Command::new("uv");
cmd.arg("run");
cmd.arg("python");
logger
.log(
LogLevel::Info,
Some("spawner".into()),
format!(
"spawning: uv run python -u {}",
resolved_path.display()
),
)
.await;
cmd
} else {
let python = get_python_path().wrap_err(
"Could not find python path when spawning custom node",
)?;
logger
.log(
LogLevel::Info,
Some("spawner".into()),
format!(
"spawning: {:?} -u {}",
&python,
resolved_path.display()
),
)
.await;

tokio::process::Command::new(python)
dora_message::descriptor::NodeSource::GitBranch { repo, rev } => {
let target_dir = Path::new("build");

let repo = repo.clone();
let rev = rev.clone();
let task = tokio::task::spawn_blocking(move || {
let repo =
git2::Repository::clone(&repo, target_dir.join(node_id.as_ref()))
.context("failed to clone repo")?;
if let Some(rev) = rev {
let refname = match rev {
GitRepoRev::Branch(branch) => branch,
GitRepoRev::Tag(tag) => tag,
GitRepoRev::Rev(rev) => rev,
};
// Force python to always flush stdout/stderr buffer
cmd.arg("-u");
cmd.arg(&resolved_path);
cmd
}
_ => {
logger
.log(
LogLevel::Info,
Some("spawner".into()),
format!("spawning: {}", resolved_path.display()),
)
.await;
if uv {
let mut cmd = tokio::process::Command::new("uv");
cmd.arg("run");
cmd.arg(&resolved_path);
cmd
} else {
tokio::process::Command::new(&resolved_path)
let (object, reference) =
repo.revparse_ext(&refname).context("failed to parse ref")?;
repo.checkout_tree(&object, None)
.context("failed to checkout ref")?;
match reference {
Some(reference) => repo
.set_head(
reference.name().context("failed to get reference_name")?,
)
.context("failed to set head")?,
None => repo
.set_head_detached(object.id())
.context("failed to set detached head")?,
}
}
};

if let Some(args) = &n.args {
cmd.args(args.split_ascii_whitespace());
}
cmd
Result::<_, eyre::Error>::Ok(())
});
task.await?;
spawn_command_from_path(working_dir, uv, logger, &n, true).await?
}
};
let Some(mut command) = command else {
return Ok(RunningNode {
pid: None,
node_config,
});
};

command.current_dir(working_dir);
command.stdin(Stdio::null());
@@ -222,7 +172,7 @@ pub async fn spawn_node(
.wrap_err_with(move || {
format!(
"failed to run `{}` with args `{}`",
n.source,
n.path,
n.args.as_deref().unwrap_or_default(),
)
})?
@@ -359,7 +309,7 @@ pub async fn spawn_node(
std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?;
}
let (tx, mut rx) = mpsc::channel(10);
let mut file = File::create(log::log_path(working_dir, &dataflow_id, &node_id))
let mut file = File::create(log::log_path(working_dir, &dataflow_id, &node.id))
.await
.expect("Failed to create log file");
let mut child_stdout =
@@ -569,3 +519,100 @@ pub async fn spawn_node(
});
Ok(running_node)
}

async fn spawn_command_from_path(
working_dir: &Path,
uv: bool,
logger: &mut NodeLogger<'_>,
node: &dora_core::descriptor::CustomNode,
permit_url: bool,
) -> eyre::Result<Option<tokio::process::Command>> {
let cmd = match node.path.as_str() {
DYNAMIC_SOURCE => return Ok(None),
SHELL_SOURCE => {
if cfg!(target_os = "windows") {
let mut cmd = tokio::process::Command::new("cmd");
cmd.args(["/C", &node.args.clone().unwrap_or_default()]);
cmd
} else {
let mut cmd = tokio::process::Command::new("sh");
cmd.args(["-c", &node.args.clone().unwrap_or_default()]);
cmd
}
}
source => {
let resolved_path = if source_is_url(source) {
if !permit_url {
eyre::bail!("URL paths are not supported in this case");
}
// try to download the shared library
let target_dir = Path::new("build");
download_file(source, target_dir)
.await
.wrap_err("failed to download custom node")?
} else {
resolve_path(source, working_dir)
.wrap_err_with(|| format!("failed to resolve node source `{}`", source))?
};

// If extension is .py, use python to run the script
let mut cmd = match resolved_path.extension().map(|ext| ext.to_str()) {
Some(Some("py")) => {
let mut cmd = if uv {
let mut cmd = tokio::process::Command::new("uv");
cmd.arg("run");
cmd.arg("python");
logger
.log(
LogLevel::Info,
Some("spawner".into()),
format!("spawning: uv run python -u {}", resolved_path.display()),
)
.await;
cmd
} else {
let python = get_python_path()
.wrap_err("Could not find python path when spawning custom node")?;
logger
.log(
LogLevel::Info,
Some("spawner".into()),
format!("spawning: {:?} -u {}", &python, resolved_path.display()),
)
.await;

tokio::process::Command::new(python)
};
// Force python to always flush stdout/stderr buffer
cmd.arg("-u");
cmd.arg(&resolved_path);
cmd
}
_ => {
logger
.log(
LogLevel::Info,
Some("spawner".into()),
format!("spawning: {}", resolved_path.display()),
)
.await;
if uv {
let mut cmd = tokio::process::Command::new("uv");
cmd.arg("run");
cmd.arg(&resolved_path);
cmd
} else {
tokio::process::Command::new(&resolved_path)
}
}
};

if let Some(args) = &node.args {
cmd.args(args.split_ascii_whitespace());
}
cmd
}
};

Ok(Some(cmd))
}

+ 36
- 9
libraries/core/src/descriptor/mod.rs View File

@@ -1,5 +1,6 @@
use dora_message::{
config::{Input, InputMapping, NodeRunConfig},
descriptor::{GitRepoRev, NodeSource},
id::{DataId, NodeId, OperatorId},
};
use eyre::{bail, Context, OptionExt, Result};
@@ -53,7 +54,7 @@ impl DescriptorExt for Descriptor {
// adjust input mappings
let mut node_kind = node_kind_mut(&mut node)?;
let input_mappings: Vec<_> = match &mut node_kind {
NodeKindMut::Standard { path: _, inputs } => inputs.values_mut().collect(),
NodeKindMut::Standard { inputs, .. } => inputs.values_mut().collect(),
NodeKindMut::Runtime(node) => node
.operators
.iter_mut()
@@ -76,8 +77,13 @@ impl DescriptorExt for Descriptor {

// resolve nodes
let kind = match node_kind {
NodeKindMut::Standard { path, inputs: _ } => CoreNodeKind::Custom(CustomNode {
source: path.clone(),
NodeKindMut::Standard {
path,
source,
inputs: _,
} => CoreNodeKind::Custom(CustomNode {
path: path.clone(),
source,
args: node.args,
build: node.build,
send_stdout_as: node.send_stdout_as,
@@ -149,14 +155,34 @@ pub async fn read_as_descriptor(path: &Path) -> eyre::Result<Descriptor> {

fn node_kind_mut(node: &mut Node) -> eyre::Result<NodeKindMut> {
match node.kind()? {
NodeKind::Standard(_) => node
.path
.as_ref()
.map(|path| NodeKindMut::Standard {
path,
NodeKind::Standard(_) => {
let source = match (&node.git, &node.branch, &node.tag, &node.rev) {
(None, None, None, None) => NodeSource::Local,
(Some(repo), branch, tag, rev) => {
let rev = match (branch, tag, rev) {
(Some(branch), None, None) => Some(GitRepoRev::Branch(branch.clone())),
(None, Some(tag), None) => Some(GitRepoRev::Tag(tag.clone())),
(None, None, Some(rev)) => Some(GitRepoRev::Rev(rev.clone())),
(_, _, _) => {
eyre::bail!("only one of `branch`, `tag`, and `rev` are allowed")
}
};
NodeSource::GitBranch {
repo: repo.clone(),
rev,
}
}
(None, _, _, _) => {
eyre::bail!("`git` source required when using branch, tag, or rev")
}
};

Ok(NodeKindMut::Standard {
path: node.path.as_ref().ok_or_eyre("missing `path` attribute")?,
source,
inputs: &mut node.inputs,
})
.ok_or_eyre("no path"),
}
NodeKind::Runtime(_) => node
.operators
.as_mut()
@@ -249,6 +275,7 @@ pub enum NodeKind<'a> {
enum NodeKindMut<'a> {
Standard {
path: &'a String,
source: NodeSource,
inputs: &'a mut BTreeMap<DataId, Input>,
},
/// Dora runtime node


+ 23
- 16
libraries/core/src/descriptor/validate.rs View File

@@ -28,23 +28,30 @@ pub fn check_dataflow(
// check that nodes and operators exist
for node in nodes.values() {
match &node.kind {
descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() {
SHELL_SOURCE => (),
DYNAMIC_SOURCE => (),
source => {
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else if let Some(remote_daemon_id) = remote_daemon_id {
if let Some(machine) = &node.deploy.machine {
if remote_daemon_id.contains(&machine.as_str()) || coordinator_is_remote
{
info!("skipping path check for remote node `{}`", node.id);
descriptor::CoreNodeKind::Custom(custom) => match &custom.source {
dora_message::descriptor::NodeSource::Local => match custom.path.as_str() {
SHELL_SOURCE => (),
DYNAMIC_SOURCE => (),
source => {
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else if let Some(remote_daemon_id) = remote_daemon_id {
if let Some(machine) = &node.deploy.machine {
if remote_daemon_id.contains(&machine.as_str())
|| coordinator_is_remote
{
info!("skipping path check for remote node `{}`", node.id);
}
}
}
} else {
resolve_path(source, working_dir)
.wrap_err_with(|| format!("Could not find source path `{}`", source))?;
};
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("Could not find source path `{}`", source)
})?;
};
}
},
dora_message::descriptor::NodeSource::GitBranch { repo, rev } => {
// TODO: implement git repo check
}
},
descriptor::CoreNodeKind::Runtime(node) => {


+ 27
- 1
libraries/message/src/descriptor.rs View File

@@ -70,6 +70,15 @@ pub struct Node {

#[serde(default, skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub git: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub branch: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tag: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rev: Option<String>,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -216,7 +225,8 @@ pub struct CustomNode {
/// args: some_node.py
///
/// Source can match any executable in PATH.
pub source: String,
pub path: String,
pub source: NodeSource,
/// Args for the executable.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
@@ -234,6 +244,22 @@ pub struct CustomNode {
pub run_config: NodeRunConfig,
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum NodeSource {
Local,
GitBranch {
repo: String,
rev: Option<GitRepoRev>,
},
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum GitRepoRev {
Branch(String),
Tag(String),
Rev(String),
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(untagged)]
pub enum EnvValue {


Loading…
Cancel
Save