diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0ba144e7..11e36754 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -87,7 +87,7 @@ jobs: with: # this might remove tools that are actually needed, # if set to "true" but frees about 6 GB - tool-cache: false + tool-cache: true # all of these default to true, but feel free to set to # "false" if necessary for your workflow @@ -96,7 +96,7 @@ jobs: haskell: true large-packages: false docker-images: true - swap-storage: false + swap-storage: true - name: Free disk Space (Windows) if: runner.os == 'Windows' run: | diff --git a/Cargo.lock b/Cargo.lock index 20f9639e..396bce54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1060,7 +1060,7 @@ dependencies = [ "bincode", "bugreport", "bytesize", - "clap 4.5.6", + "clap 4.5.7", "clircle", "console", "content_inspector", @@ -1547,9 +1547,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.6" +version = "4.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9689a29b593160de5bc4aacab7b5d54fb52231de70122626c178e6a368994c7" +checksum = "5db83dced34638ad474f39f250d7fea9598bdd239eaced1bdf45d597da0f433f" dependencies = [ "clap_builder", "clap_derive 4.5.5", @@ -1557,9 +1557,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.6" +version = "4.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e5387378c84f6faa26890ebf9f0a92989f8873d4d380467bcd0d8d8620424df" +checksum = "f7e204572485eb3fbf28f871612191521df159bc3e15a9f5064c66dba3a8c05f" dependencies = [ "anstream", "anstyle", @@ -2250,7 +2250,7 @@ name = "dora-cli" version = "0.3.4" dependencies = [ "bat", - "clap 4.5.6", + "clap 4.5.7", "communication-layer-request-reply", "ctrlc", "dora-coordinator", @@ -4578,7 +4578,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if 1.0.0", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] @@ -7702,7 +7702,7 @@ version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d11fe92ba3e159b2c4fe3640973d00cf8143ffb8d010078068a16b58a0c48b" dependencies = [ - "clap 4.5.6", + "clap 4.5.7", "document-features", "futures-util", "hyper 0.14.28", @@ -8017,7 +8017,7 @@ dependencies = [ "bytes", "cdr-encoding-size", "chrono", - "clap 4.5.6", + "clap 4.5.7", "futures", "itertools 0.11.0", "lazy_static", diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index b19d1e55..aea14ad3 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -332,9 +332,13 @@ fn run() -> eyre::Result<()> { .parent() .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); - dataflow_descriptor - .check(&working_dir) - .wrap_err("Could not validate yaml")?; + if !coordinator_addr.is_loopback() { + dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?; + } else { + dataflow_descriptor + .check(&working_dir) + .wrap_err("Could not validate yaml")?; + } let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index f3761358..534b857b 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -24,7 +24,17 @@ pub(super) async fn spawn_dataflow( daemon_connections: &mut HashMap, clock: &HLC, ) -> eyre::Result { - dataflow.check(&working_dir)?; + let remote_machine_id: Vec<_> = daemon_connections + .iter() + .filter_map(|(id, c)| { + if !c.listen_socket.ip().is_loopback() { + Some(id.as_str()) + } else { + None + } + }) + .collect(); + dataflow.check_in_daemon(&working_dir, &remote_machine_id, false)?; let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index fc836412..5300eeec 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -133,7 +133,23 @@ impl Descriptor { } pub fn check(&self, working_dir: &Path) -> eyre::Result<()> { - validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.") + validate::check_dataflow(self, working_dir, None, false) + .wrap_err("Dataflow could not be validated.") + } + + pub fn check_in_daemon( + &self, + working_dir: &Path, + remote_machine_id: &[&str], + coordinator_is_remote: bool, + ) -> eyre::Result<()> { + validate::check_dataflow( + self, + working_dir, + Some(remote_machine_id), + coordinator_is_remote, + ) + .wrap_err("Dataflow could not be validated.") } } diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index fe558096..9c4e89dd 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -1,7 +1,7 @@ use crate::{ adjust_shared_library_path, config::{DataId, Input, InputMapping, OperatorId, UserInputMapping}, - descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, + descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION}, get_python_path, }; @@ -12,18 +12,45 @@ use tracing::info; use super::{resolve_path, Descriptor, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { +pub fn check_dataflow( + dataflow: &Descriptor, + working_dir: &Path, + remote_daemon_id: Option<&[&str]>, + coordinator_is_remote: bool, +) -> eyre::Result<()> { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut has_python_operator = false; // check that nodes and operators exist for node in &nodes { match &node.kind { - descriptor::CoreNodeKind::Custom(node) => match node.source.as_str() { + descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() { SHELL_SOURCE => (), source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. + } else if let Some(remote_daemon_id) = remote_daemon_id { + if remote_daemon_id.contains(&node.deploy.machine.as_str()) + || coordinator_is_remote + { + let path = Path::new(&source); + let path = if path.extension().is_none() { + path.with_extension(EXE_EXTENSION) + } else { + path.to_owned() + }; + if path.is_relative() { + eyre::bail!( + "paths of remote nodes must be absolute (node `{}`)", + node.id + ); + } + info!("skipping path check for remote node `{}`", node.id); + } else { + resolve_path(source, working_dir).wrap_err_with(|| { + format!("Could not find source path `{}`", source) + })?; + } } else { resolve_path(source, working_dir) .wrap_err_with(|| format!("Could not find source path `{}`", source))?;