Browse Source

Merge branch 'main' of https://github.com/dora-rs/dora into update_dora_new

tags/v0.3.5-rc0
XxChang 1 year ago
parent
commit
351929ca81
6 changed files with 76 additions and 19 deletions
  1. +2
    -2
      .github/workflows/ci.yml
  2. +9
    -9
      Cargo.lock
  3. +7
    -3
      binaries/cli/src/main.rs
  4. +11
    -1
      binaries/coordinator/src/run/mod.rs
  5. +17
    -1
      libraries/core/src/descriptor/mod.rs
  6. +30
    -3
      libraries/core/src/descriptor/validate.rs

+ 2
- 2
.github/workflows/ci.yml View File

@@ -87,7 +87,7 @@ jobs:
with:
# this might remove tools that are actually needed,
# if set to "true" but frees about 6 GB
tool-cache: false
tool-cache: true

# all of these default to true, but feel free to set to
# "false" if necessary for your workflow
@@ -96,7 +96,7 @@ jobs:
haskell: true
large-packages: false
docker-images: true
swap-storage: false
swap-storage: true
- name: Free disk Space (Windows)
if: runner.os == 'Windows'
run: |


+ 9
- 9
Cargo.lock View File

@@ -1060,7 +1060,7 @@ dependencies = [
"bincode",
"bugreport",
"bytesize",
"clap 4.5.6",
"clap 4.5.7",
"clircle",
"console",
"content_inspector",
@@ -1547,9 +1547,9 @@ dependencies = [

[[package]]
name = "clap"
version = "4.5.6"
version = "4.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9689a29b593160de5bc4aacab7b5d54fb52231de70122626c178e6a368994c7"
checksum = "5db83dced34638ad474f39f250d7fea9598bdd239eaced1bdf45d597da0f433f"
dependencies = [
"clap_builder",
"clap_derive 4.5.5",
@@ -1557,9 +1557,9 @@ dependencies = [

[[package]]
name = "clap_builder"
version = "4.5.6"
version = "4.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e5387378c84f6faa26890ebf9f0a92989f8873d4d380467bcd0d8d8620424df"
checksum = "f7e204572485eb3fbf28f871612191521df159bc3e15a9f5064c66dba3a8c05f"
dependencies = [
"anstream",
"anstyle",
@@ -2250,7 +2250,7 @@ name = "dora-cli"
version = "0.3.4"
dependencies = [
"bat",
"clap 4.5.6",
"clap 4.5.7",
"communication-layer-request-reply",
"ctrlc",
"dora-coordinator",
@@ -4578,7 +4578,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19"
dependencies = [
"cfg-if 1.0.0",
"windows-targets 0.48.5",
"windows-targets 0.52.5",
]

[[package]]
@@ -7702,7 +7702,7 @@ version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9d11fe92ba3e159b2c4fe3640973d00cf8143ffb8d010078068a16b58a0c48b"
dependencies = [
"clap 4.5.6",
"clap 4.5.7",
"document-features",
"futures-util",
"hyper 0.14.28",
@@ -8017,7 +8017,7 @@ dependencies = [
"bytes",
"cdr-encoding-size",
"chrono",
"clap 4.5.6",
"clap 4.5.7",
"futures",
"itertools 0.11.0",
"lazy_static",


+ 7
- 3
binaries/cli/src/main.rs View File

@@ -332,9 +332,13 @@ fn run() -> eyre::Result<()> {
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
if !coordinator_addr.is_loopback() {
dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?;
} else {
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
}

let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;


+ 11
- 1
binaries/coordinator/src/run/mod.rs View File

@@ -24,7 +24,17 @@ pub(super) async fn spawn_dataflow(
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
) -> eyre::Result<SpawnedDataflow> {
dataflow.check(&working_dir)?;
let remote_machine_id: Vec<_> = daemon_connections
.iter()
.filter_map(|(id, c)| {
if !c.listen_socket.ip().is_loopback() {
Some(id.as_str())
} else {
None
}
})
.collect();
dataflow.check_in_daemon(&working_dir, &remote_machine_id, false)?;

let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));


+ 17
- 1
libraries/core/src/descriptor/mod.rs View File

@@ -133,7 +133,23 @@ impl Descriptor {
}

pub fn check(&self, working_dir: &Path) -> eyre::Result<()> {
validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.")
validate::check_dataflow(self, working_dir, None, false)
.wrap_err("Dataflow could not be validated.")
}

pub fn check_in_daemon(
&self,
working_dir: &Path,
remote_machine_id: &[&str],
coordinator_is_remote: bool,
) -> eyre::Result<()> {
validate::check_dataflow(
self,
working_dir,
Some(remote_machine_id),
coordinator_is_remote,
)
.wrap_err("Dataflow could not be validated.")
}
}



+ 30
- 3
libraries/core/src/descriptor/validate.rs View File

@@ -1,7 +1,7 @@
use crate::{
adjust_shared_library_path,
config::{DataId, Input, InputMapping, OperatorId, UserInputMapping},
descriptor::{self, source_is_url, CoreNodeKind, OperatorSource},
descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION},
get_python_path,
};

@@ -12,18 +12,45 @@ use tracing::info;
use super::{resolve_path, Descriptor, SHELL_SOURCE};
const VERSION: &str = env!("CARGO_PKG_VERSION");

pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> {
pub fn check_dataflow(
dataflow: &Descriptor,
working_dir: &Path,
remote_daemon_id: Option<&[&str]>,
coordinator_is_remote: bool,
) -> eyre::Result<()> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let mut has_python_operator = false;

// check that nodes and operators exist
for node in &nodes {
match &node.kind {
descriptor::CoreNodeKind::Custom(node) => match node.source.as_str() {
descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() {
SHELL_SOURCE => (),
source => {
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else if let Some(remote_daemon_id) = remote_daemon_id {
if remote_daemon_id.contains(&node.deploy.machine.as_str())
|| coordinator_is_remote
{
let path = Path::new(&source);
let path = if path.extension().is_none() {
path.with_extension(EXE_EXTENSION)
} else {
path.to_owned()
};
if path.is_relative() {
eyre::bail!(
"paths of remote nodes must be absolute (node `{}`)",
node.id
);
}
info!("skipping path check for remote node `{}`", node.id);
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("Could not find source path `{}`", source)
})?;
}
} else {
resolve_path(source, working_dir)
.wrap_err_with(|| format!("Could not find source path `{}`", source))?;


Loading…
Cancel
Save