diff --git a/Cargo.lock b/Cargo.lock index 41f76174..9eca5102 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2729,6 +2729,7 @@ dependencies = [ "flume 0.10.14", "futures", "futures-concurrency", + "git2", "serde_json", "serde_yaml 0.8.26", "shared-memory-server", @@ -4120,6 +4121,8 @@ dependencies = [ "libc", "libgit2-sys", "log", + "openssl-probe", + "openssl-sys", "url", ] @@ -5371,7 +5374,9 @@ checksum = "ee4126d8b4ee5c9d9ea891dd875cfdc1e9d0950437179104b183d7d8a74d24e8" dependencies = [ "cc", "libc", + "libssh2-sys", "libz-sys", + "openssl-sys", "pkg-config", ] @@ -5422,6 +5427,20 @@ dependencies = [ "libc", ] +[[package]] +name = "libssh2-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dc8a030b787e2119a731f1951d6a773e2280c660f8ec4b0f5e1505a386e71ee" +dependencies = [ + "cc", + "libc", + "libz-sys", + "openssl-sys", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.18" @@ -6586,6 +6605,18 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb61ea9811cc39e3c2069f40b8b8e2e70d8569b361f879786cc7ed48b777cdd" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.18.0" diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index ca29d9b5..debca5c5 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -44,3 +44,4 @@ sysinfo = "0.30.11" crossbeam = "0.8.4" crossbeam-skiplist = "0.1.3" zenoh = "1.1.1" +git2 = "0.18.0" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index de531fd0..b0f3a6c4 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -21,6 +21,7 @@ use dora_message::{ }, daemon_to_daemon::InterDaemonEvent, daemon_to_node::{DaemonReply, NodeConfig, NodeDropEvent, NodeEvent}, + descriptor::NodeSource, metadata::{self, ArrowTypeInfo}, node_to_daemon::{DynamicNodeEvent, Timestamped}, DataflowId, @@ -2176,7 +2177,9 @@ impl CoreNodeKindExt for CoreNodeKind { fn dynamic(&self) -> bool { match self { CoreNodeKind::Runtime(_n) => false, - CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE, + CoreNodeKind::Custom(n) => { + matches!(&n.source, NodeSource::Local) && n.path == DYNAMIC_SOURCE + } } } } diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index db7c7bbd..3b9b0f59 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -20,6 +20,7 @@ use dora_message::{ common::{LogLevel, LogMessage}, daemon_to_coordinator::{DataMessage, NodeExitStatus, Timestamped}, daemon_to_node::{NodeConfig, RuntimeConfig}, + descriptor::GitRepoRev, DataflowId, }; use dora_node_api::{ @@ -89,103 +90,52 @@ pub async fn spawn_node( let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { - let mut command = match n.source.as_str() { - DYNAMIC_SOURCE => { - return Ok(RunningNode { - pid: None, - node_config, - }); - } - SHELL_SOURCE => { - if cfg!(target_os = "windows") { - let mut cmd = tokio::process::Command::new("cmd"); - cmd.args(["/C", &n.args.clone().unwrap_or_default()]); - cmd - } else { - let mut cmd = tokio::process::Command::new("sh"); - cmd.args(["-c", &n.args.clone().unwrap_or_default()]); - cmd - } + let command = match &n.source { + dora_message::descriptor::NodeSource::Local => { + spawn_command_from_path(working_dir, uv, logger, &n, true).await? } - source => { - let resolved_path = if source_is_url(source) { - // try to download the shared library - let target_dir = Path::new("build"); - download_file(source, target_dir) - .await - .wrap_err("failed to download custom node")? - } else { - resolve_path(source, working_dir).wrap_err_with(|| { - format!("failed to resolve node source `{}`", source) - })? - }; - - // If extension is .py, use python to run the script - let mut cmd = match resolved_path.extension().map(|ext| ext.to_str()) { - Some(Some("py")) => { - let mut cmd = if uv { - let mut cmd = tokio::process::Command::new("uv"); - cmd.arg("run"); - cmd.arg("python"); - logger - .log( - LogLevel::Info, - Some("spawner".into()), - format!( - "spawning: uv run python -u {}", - resolved_path.display() - ), - ) - .await; - cmd - } else { - let python = get_python_path().wrap_err( - "Could not find python path when spawning custom node", - )?; - logger - .log( - LogLevel::Info, - Some("spawner".into()), - format!( - "spawning: {:?} -u {}", - &python, - resolved_path.display() - ), - ) - .await; - - tokio::process::Command::new(python) + dora_message::descriptor::NodeSource::GitBranch { repo, rev } => { + let target_dir = Path::new("build"); + + let repo = repo.clone(); + let rev = rev.clone(); + let task = tokio::task::spawn_blocking(move || { + let repo = + git2::Repository::clone(&repo, target_dir.join(node_id.as_ref())) + .context("failed to clone repo")?; + if let Some(rev) = rev { + let refname = match rev { + GitRepoRev::Branch(branch) => branch, + GitRepoRev::Tag(tag) => tag, + GitRepoRev::Rev(rev) => rev, }; - // Force python to always flush stdout/stderr buffer - cmd.arg("-u"); - cmd.arg(&resolved_path); - cmd - } - _ => { - logger - .log( - LogLevel::Info, - Some("spawner".into()), - format!("spawning: {}", resolved_path.display()), - ) - .await; - if uv { - let mut cmd = tokio::process::Command::new("uv"); - cmd.arg("run"); - cmd.arg(&resolved_path); - cmd - } else { - tokio::process::Command::new(&resolved_path) + let (object, reference) = + repo.revparse_ext(&refname).context("failed to parse ref")?; + repo.checkout_tree(&object, None) + .context("failed to checkout ref")?; + match reference { + Some(reference) => repo + .set_head( + reference.name().context("failed to get reference_name")?, + ) + .context("failed to set head")?, + None => repo + .set_head_detached(object.id()) + .context("failed to set detached head")?, } } - }; - - if let Some(args) = &n.args { - cmd.args(args.split_ascii_whitespace()); - } - cmd + Result::<_, eyre::Error>::Ok(()) + }); + task.await?; + spawn_command_from_path(working_dir, uv, logger, &n, true).await? } }; + let Some(mut command) = command else { + return Ok(RunningNode { + pid: None, + node_config, + }); + }; command.current_dir(working_dir); command.stdin(Stdio::null()); @@ -222,7 +172,7 @@ pub async fn spawn_node( .wrap_err_with(move || { format!( "failed to run `{}` with args `{}`", - n.source, + n.path, n.args.as_deref().unwrap_or_default(), ) })? @@ -359,7 +309,7 @@ pub async fn spawn_node( std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?; } let (tx, mut rx) = mpsc::channel(10); - let mut file = File::create(log::log_path(working_dir, &dataflow_id, &node_id)) + let mut file = File::create(log::log_path(working_dir, &dataflow_id, &node.id)) .await .expect("Failed to create log file"); let mut child_stdout = @@ -569,3 +519,100 @@ pub async fn spawn_node( }); Ok(running_node) } + +async fn spawn_command_from_path( + working_dir: &Path, + uv: bool, + logger: &mut NodeLogger<'_>, + node: &dora_core::descriptor::CustomNode, + permit_url: bool, +) -> eyre::Result> { + let cmd = match node.path.as_str() { + DYNAMIC_SOURCE => return Ok(None), + SHELL_SOURCE => { + if cfg!(target_os = "windows") { + let mut cmd = tokio::process::Command::new("cmd"); + cmd.args(["/C", &node.args.clone().unwrap_or_default()]); + cmd + } else { + let mut cmd = tokio::process::Command::new("sh"); + cmd.args(["-c", &node.args.clone().unwrap_or_default()]); + cmd + } + } + source => { + let resolved_path = if source_is_url(source) { + if !permit_url { + eyre::bail!("URL paths are not supported in this case"); + } + // try to download the shared library + let target_dir = Path::new("build"); + download_file(source, target_dir) + .await + .wrap_err("failed to download custom node")? + } else { + resolve_path(source, working_dir) + .wrap_err_with(|| format!("failed to resolve node source `{}`", source))? + }; + + // If extension is .py, use python to run the script + let mut cmd = match resolved_path.extension().map(|ext| ext.to_str()) { + Some(Some("py")) => { + let mut cmd = if uv { + let mut cmd = tokio::process::Command::new("uv"); + cmd.arg("run"); + cmd.arg("python"); + logger + .log( + LogLevel::Info, + Some("spawner".into()), + format!("spawning: uv run python -u {}", resolved_path.display()), + ) + .await; + cmd + } else { + let python = get_python_path() + .wrap_err("Could not find python path when spawning custom node")?; + logger + .log( + LogLevel::Info, + Some("spawner".into()), + format!("spawning: {:?} -u {}", &python, resolved_path.display()), + ) + .await; + + tokio::process::Command::new(python) + }; + // Force python to always flush stdout/stderr buffer + cmd.arg("-u"); + cmd.arg(&resolved_path); + cmd + } + _ => { + logger + .log( + LogLevel::Info, + Some("spawner".into()), + format!("spawning: {}", resolved_path.display()), + ) + .await; + if uv { + let mut cmd = tokio::process::Command::new("uv"); + cmd.arg("run"); + cmd.arg(&resolved_path); + cmd + } else { + tokio::process::Command::new(&resolved_path) + } + } + }; + + if let Some(args) = &node.args { + cmd.args(args.split_ascii_whitespace()); + } + cmd + } + }; + + Ok(Some(cmd)) +} diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index cb8860fa..59588406 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,5 +1,6 @@ use dora_message::{ config::{Input, InputMapping, NodeRunConfig}, + descriptor::{GitRepoRev, NodeSource}, id::{DataId, NodeId, OperatorId}, }; use eyre::{bail, Context, OptionExt, Result}; @@ -53,7 +54,7 @@ impl DescriptorExt for Descriptor { // adjust input mappings let mut node_kind = node_kind_mut(&mut node)?; let input_mappings: Vec<_> = match &mut node_kind { - NodeKindMut::Standard { path: _, inputs } => inputs.values_mut().collect(), + NodeKindMut::Standard { inputs, .. } => inputs.values_mut().collect(), NodeKindMut::Runtime(node) => node .operators .iter_mut() @@ -76,8 +77,13 @@ impl DescriptorExt for Descriptor { // resolve nodes let kind = match node_kind { - NodeKindMut::Standard { path, inputs: _ } => CoreNodeKind::Custom(CustomNode { - source: path.clone(), + NodeKindMut::Standard { + path, + source, + inputs: _, + } => CoreNodeKind::Custom(CustomNode { + path: path.clone(), + source, args: node.args, build: node.build, send_stdout_as: node.send_stdout_as, @@ -149,14 +155,34 @@ pub async fn read_as_descriptor(path: &Path) -> eyre::Result { fn node_kind_mut(node: &mut Node) -> eyre::Result { match node.kind()? { - NodeKind::Standard(_) => node - .path - .as_ref() - .map(|path| NodeKindMut::Standard { - path, + NodeKind::Standard(_) => { + let source = match (&node.git, &node.branch, &node.tag, &node.rev) { + (None, None, None, None) => NodeSource::Local, + (Some(repo), branch, tag, rev) => { + let rev = match (branch, tag, rev) { + (Some(branch), None, None) => Some(GitRepoRev::Branch(branch.clone())), + (None, Some(tag), None) => Some(GitRepoRev::Tag(tag.clone())), + (None, None, Some(rev)) => Some(GitRepoRev::Rev(rev.clone())), + (_, _, _) => { + eyre::bail!("only one of `branch`, `tag`, and `rev` are allowed") + } + }; + NodeSource::GitBranch { + repo: repo.clone(), + rev, + } + } + (None, _, _, _) => { + eyre::bail!("`git` source required when using branch, tag, or rev") + } + }; + + Ok(NodeKindMut::Standard { + path: node.path.as_ref().ok_or_eyre("missing `path` attribute")?, + source, inputs: &mut node.inputs, }) - .ok_or_eyre("no path"), + } NodeKind::Runtime(_) => node .operators .as_mut() @@ -249,6 +275,7 @@ pub enum NodeKind<'a> { enum NodeKindMut<'a> { Standard { path: &'a String, + source: NodeSource, inputs: &'a mut BTreeMap, }, /// Dora runtime node diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index c28bd451..526fedff 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -28,23 +28,30 @@ pub fn check_dataflow( // check that nodes and operators exist for node in nodes.values() { match &node.kind { - descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() { - SHELL_SOURCE => (), - DYNAMIC_SOURCE => (), - source => { - if source_is_url(source) { - info!("{source} is a URL."); // TODO: Implement url check. - } else if let Some(remote_daemon_id) = remote_daemon_id { - if let Some(machine) = &node.deploy.machine { - if remote_daemon_id.contains(&machine.as_str()) || coordinator_is_remote - { - info!("skipping path check for remote node `{}`", node.id); + descriptor::CoreNodeKind::Custom(custom) => match &custom.source { + dora_message::descriptor::NodeSource::Local => match custom.path.as_str() { + SHELL_SOURCE => (), + DYNAMIC_SOURCE => (), + source => { + if source_is_url(source) { + info!("{source} is a URL."); // TODO: Implement url check. + } else if let Some(remote_daemon_id) = remote_daemon_id { + if let Some(machine) = &node.deploy.machine { + if remote_daemon_id.contains(&machine.as_str()) + || coordinator_is_remote + { + info!("skipping path check for remote node `{}`", node.id); + } } - } - } else { - resolve_path(source, working_dir) - .wrap_err_with(|| format!("Could not find source path `{}`", source))?; - }; + } else { + resolve_path(source, working_dir).wrap_err_with(|| { + format!("Could not find source path `{}`", source) + })?; + }; + } + }, + dora_message::descriptor::NodeSource::GitBranch { repo, rev } => { + // TODO: implement git repo check } }, descriptor::CoreNodeKind::Runtime(node) => { diff --git a/libraries/message/src/descriptor.rs b/libraries/message/src/descriptor.rs index 2fe68760..02f660d4 100644 --- a/libraries/message/src/descriptor.rs +++ b/libraries/message/src/descriptor.rs @@ -70,6 +70,15 @@ pub struct Node { #[serde(default, skip_serializing_if = "Option::is_none")] pub path: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub git: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub branch: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub tag: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub rev: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] pub args: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -216,7 +225,8 @@ pub struct CustomNode { /// args: some_node.py /// /// Source can match any executable in PATH. - pub source: String, + pub path: String, + pub source: NodeSource, /// Args for the executable. #[serde(default, skip_serializing_if = "Option::is_none")] pub args: Option, @@ -234,6 +244,22 @@ pub struct CustomNode { pub run_config: NodeRunConfig, } +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub enum NodeSource { + Local, + GitBranch { + repo: String, + rev: Option, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub enum GitRepoRev { + Branch(String), + Tag(String), + Rev(String), +} + #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] #[serde(untagged)] pub enum EnvValue {