Browse Source

Rework and refactor for two-step build

tags/v0.3.12-rc0
Philipp Oppermann 7 months ago
parent
commit
51ed0194c7
Failed to extract signature
54 changed files with 2437 additions and 884 deletions
  1. +2
    -2
      .gitignore
  2. +133
    -101
      Cargo.lock
  3. +3
    -0
      Cargo.toml
  4. +1
    -1
      apis/python/node/Cargo.toml
  5. +1
    -14
      apis/python/node/src/lib.rs
  6. +3
    -2
      binaries/cli/Cargo.toml
  7. +107
    -0
      binaries/cli/src/command/build/distributed.rs
  8. +45
    -0
      binaries/cli/src/command/build/git.rs
  9. +101
    -0
      binaries/cli/src/command/build/local.rs
  10. +162
    -0
      binaries/cli/src/command/build/mod.rs
  11. +0
    -0
      binaries/cli/src/command/check.rs
  12. +0
    -0
      binaries/cli/src/command/logs.rs
  13. +60
    -0
      binaries/cli/src/command/mod.rs
  14. +23
    -0
      binaries/cli/src/command/run.rs
  15. +1
    -37
      binaries/cli/src/command/start/attach.rs
  16. +167
    -0
      binaries/cli/src/command/start/mod.rs
  17. +1
    -1
      binaries/cli/src/command/up.rs
  18. +38
    -196
      binaries/cli/src/lib.rs
  19. +48
    -0
      binaries/cli/src/output.rs
  20. +73
    -0
      binaries/cli/src/session.rs
  21. +28
    -2
      binaries/coordinator/src/control.rs
  22. +282
    -45
      binaries/coordinator/src/lib.rs
  23. +10
    -0
      binaries/coordinator/src/listener.rs
  24. +10
    -16
      binaries/coordinator/src/run/mod.rs
  25. +2
    -1
      binaries/daemon/Cargo.toml
  26. +280
    -46
      binaries/daemon/src/lib.rs
  27. +81
    -7
      binaries/daemon/src/log.rs
  28. +0
    -286
      binaries/daemon/src/spawn/git.rs
  29. +18
    -103
      binaries/daemon/src/spawn/mod.rs
  30. +14
    -0
      examples/benchmark/run.rs
  31. +1
    -0
      examples/c++-dataflow/.gitignore
  32. +1
    -0
      examples/c++-ros2-dataflow/.gitignore
  33. +20
    -2
      examples/multiple-daemons/run.rs
  34. +9
    -0
      examples/python-ros2-dataflow/run.rs
  35. +2
    -0
      examples/rust-dataflow-git/.gitignore
  36. +3
    -3
      examples/rust-dataflow-git/dataflow.yml
  37. +13
    -0
      examples/rust-dataflow-git/run.rs
  38. +1
    -0
      examples/rust-dataflow-url/.gitignore
  39. +13
    -0
      examples/rust-dataflow-url/run.rs
  40. +13
    -0
      examples/rust-dataflow/run.rs
  41. +4
    -1
      libraries/core/Cargo.toml
  42. +0
    -0
      libraries/core/src/build/build_command.rs
  43. +353
    -0
      libraries/core/src/build/git.rs
  44. +15
    -0
      libraries/core/src/build/logger.rs
  45. +139
    -0
      libraries/core/src/build/mod.rs
  46. +0
    -0
      libraries/core/src/git.rs
  47. +1
    -0
      libraries/core/src/lib.rs
  48. +36
    -5
      libraries/message/src/cli_to_coordinator.rs
  49. +9
    -2
      libraries/message/src/common.rs
  50. +29
    -6
      libraries/message/src/coordinator_to_cli.rs
  51. +32
    -4
      libraries/message/src/coordinator_to_daemon.rs
  52. +8
    -1
      libraries/message/src/daemon_to_coordinator.rs
  53. +6
    -0
      libraries/message/src/descriptor.rs
  54. +35
    -0
      libraries/message/src/lib.rs

+ 2
- 2
.gitignore View File

@@ -34,7 +34,7 @@ __pycache__/

# Distribution / packaging
.Python
build/
/build/
develop-eggs/
dist/
downloads/
@@ -179,4 +179,4 @@ out/
#Miscellaneous
yolo.yml

~*
~*

+ 133
- 101
Cargo.lock View File

@@ -851,7 +851,7 @@ dependencies = [
"num-traits",
"rusticata-macros",
"thiserror 1.0.69",
"time 0.3.41",
"time",
]

[[package]]
@@ -1415,7 +1415,7 @@ dependencies = [
"path_abs",
"plist",
"regex",
"semver 1.0.26",
"semver",
"serde",
"serde_yaml 0.9.34+deprecated",
"shell-words",
@@ -1823,7 +1823,7 @@ checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
dependencies = [
"camino",
"cargo-platform",
"semver 1.0.26",
"semver",
"serde",
"serde_json",
]
@@ -1836,7 +1836,7 @@ checksum = "2d886547e41f740c616ae73108f6eb70afe6d940c7bc697cb30f13daec073037"
dependencies = [
"camino",
"cargo-platform",
"semver 1.0.26",
"semver",
"serde",
"serde_json",
"thiserror 1.0.69",
@@ -2521,6 +2521,33 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991"

[[package]]
name = "curve25519-dalek"
version = "4.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be"
dependencies = [
"cfg-if 1.0.0",
"cpufeatures",
"curve25519-dalek-derive",
"digest",
"fiat-crypto",
"rustc_version",
"subtle",
"zeroize",
]

[[package]]
name = "curve25519-dalek-derive"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]

[[package]]
name = "cxx"
version = "1.0.149"
@@ -2955,10 +2982,12 @@ dependencies = [
"dora-operator-api-c",
"dora-runtime",
"dora-tracing",
"dunce",
"duration-str",
"env_logger 0.11.7",
"eyre",
"futures",
"git2",
"inquire",
"log",
"notify 5.2.0",
@@ -3004,6 +3033,8 @@ dependencies = [
"dora-message",
"dunce",
"eyre",
"git2",
"itertools 0.14.0",
"log",
"once_cell",
"schemars",
@@ -3013,6 +3044,7 @@ dependencies = [
"serde_yaml 0.9.34+deprecated",
"tokio",
"tracing",
"url",
"uuid 1.16.0",
"which",
]
@@ -3039,6 +3071,7 @@ dependencies = [
"futures",
"futures-concurrency",
"git2",
"itertools 0.14.0",
"serde_json",
"serde_yaml 0.8.26",
"shared-memory-server",
@@ -3067,6 +3100,7 @@ dependencies = [
name = "dora-examples"
version = "0.0.0"
dependencies = [
"dora-cli",
"dora-coordinator",
"dora-core",
"dora-download",
@@ -3108,7 +3142,7 @@ dependencies = [
"log",
"once_cell",
"schemars",
"semver 1.0.26",
"semver",
"serde",
"serde-with-expand-env",
"serde_yaml 0.9.34+deprecated",
@@ -3196,7 +3230,7 @@ name = "dora-node-api-python"
version = "0.3.10"
dependencies = [
"arrow 54.2.1",
"dora-daemon",
"dora-cli",
"dora-download",
"dora-node-api",
"dora-operator-api-python",
@@ -3455,7 +3489,7 @@ dependencies = [
"rust_decimal",
"serde",
"thiserror 1.0.69",
"time 0.3.41",
"time",
]

[[package]]
@@ -3492,6 +3526,31 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18aade80d5e09429040243ce1143ddc08a92d7a22820ac512610410a4dd5214f"

[[package]]
name = "ed25519"
version = "2.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53"
dependencies = [
"pkcs8 0.10.2",
"signature 2.2.0",
]

[[package]]
name = "ed25519-dalek"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871"
dependencies = [
"curve25519-dalek",
"ed25519",
"serde",
"sha2",
"signature 2.2.0",
"subtle",
"zeroize",
]

[[package]]
name = "eframe"
version = "0.31.1"
@@ -4099,6 +4158,12 @@ dependencies = [
"anyhow",
]

[[package]]
name = "fiat-crypto"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d"

[[package]]
name = "filetime"
version = "0.2.25"
@@ -5073,7 +5138,7 @@ dependencies = [
"dirs 5.0.1",
"futures",
"http 1.3.1",
"indicatif 0.17.11",
"indicatif",
"libc",
"log",
"num_cpus",
@@ -5204,7 +5269,7 @@ dependencies = [
"http 0.2.12",
"http-serde",
"serde",
"time 0.3.41",
"time",
]

[[package]]
@@ -5602,18 +5667,6 @@ dependencies = [
"serde",
]

[[package]]
name = "indicatif"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7baab56125e25686df467fe470785512329883aab42696d661247aca2a2896e4"
dependencies = [
"console",
"lazy_static",
"number_prefix 0.3.0",
"regex",
]

[[package]]
name = "indicatif"
version = "0.17.11"
@@ -5621,7 +5674,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"
dependencies = [
"console",
"number_prefix 0.4.0",
"number_prefix",
"portable-atomic",
"rayon",
"unicode-width 0.2.0",
@@ -6730,7 +6783,7 @@ dependencies = [
"hf-hub",
"image",
"indexmap 2.8.0",
"indicatif 0.17.11",
"indicatif",
"interprocess",
"itertools 0.13.0",
"llguidance",
@@ -7367,12 +7420,6 @@ dependencies = [
"libc",
]

[[package]]
name = "number_prefix"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a"

[[package]]
name = "number_prefix"
version = "0.4.0"
@@ -8382,7 +8429,7 @@ dependencies = [
"indexmap 2.8.0",
"quick-xml 0.32.0",
"serde",
"time 0.3.41",
"time",
]

[[package]]
@@ -8880,15 +8927,6 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"

[[package]]
name = "quick-xml"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26aab6b48e2590e4a64d1ed808749ba06257882b461d01ca71baeb747074a6dd"
dependencies = [
"memchr",
]

[[package]]
name = "quick-xml"
version = "0.30.0"
@@ -9238,7 +9276,7 @@ dependencies = [
"serde_json",
"sha2",
"thiserror 1.0.69",
"time 0.3.41",
"time",
"url",
"uuid 1.16.0",
"web-sys",
@@ -9325,7 +9363,7 @@ dependencies = [
"cargo_metadata 0.18.1",
"glob",
"sha2",
"time 0.3.41",
"time",
"unindent",
"walkdir",
]
@@ -9760,7 +9798,7 @@ dependencies = [
"serde_bytes",
"static_assertions",
"thiserror 1.0.69",
"time 0.3.41",
"time",
"typenum",
"uuid 1.16.0",
"web-time",
@@ -10241,7 +10279,7 @@ dependencies = [
"strum 0.26.3",
"strum_macros 0.26.4",
"sublime_fuzzy",
"time 0.3.41",
"time",
"url",
]

@@ -11327,7 +11365,7 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
dependencies = [
"semver 1.0.26",
"semver",
]

[[package]]
@@ -11743,34 +11781,39 @@ dependencies = [
"libc",
]

[[package]]
name = "self-replace"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03ec815b5eab420ab893f63393878d89c90fdd94c0bcc44c07abb8ad95552fb7"
dependencies = [
"fastrand 2.3.0",
"tempfile",
"windows-sys 0.52.0",
]

[[package]]
name = "self_update"
version = "0.27.0"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb85f1802f7b987237b8525c0fde86ea86f31c957c1875467c727d5b921179c"
checksum = "d832c086ece0dacc29fb2947bb4219b8f6e12fe9e40b7108f9e57c4224e47b5c"
dependencies = [
"either",
"flate2",
"hyper 0.14.32",
"indicatif 0.15.0",
"hyper 1.6.0",
"indicatif",
"log",
"quick-xml 0.20.0",
"quick-xml 0.37.2",
"regex",
"reqwest 0.11.27",
"semver 0.11.0",
"reqwest 0.12.15",
"self-replace",
"semver",
"serde_json",
"tar",
"tempfile",
"zip 0.5.13",
]

[[package]]
name = "semver"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser",
"urlencoding",
"zip 2.4.2",
"zipsign-api",
]

[[package]]
@@ -11782,15 +11825,6 @@ dependencies = [
"serde",
]

[[package]]
name = "semver-parser"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9900206b54a3527fdc7b8a938bffd94a568bac4f4aa8113b209df75a09c0dec2"
dependencies = [
"pest",
]

[[package]]
name = "seq-macro"
version = "0.3.6"
@@ -11957,7 +11991,7 @@ dependencies = [
"serde_derive",
"serde_json",
"serde_with_macros",
"time 0.3.41",
"time",
]

[[package]]
@@ -13015,17 +13049,6 @@ dependencies = [
"weezl",
]

[[package]]
name = "time"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi 0.3.9",
]

[[package]]
name = "time"
version = "0.3.41"
@@ -13188,7 +13211,7 @@ dependencies = [
"derive_builder",
"esaxx-rs",
"getrandom 0.2.15",
"indicatif 0.17.11",
"indicatif",
"itertools 0.13.0",
"lazy_static",
"log",
@@ -14074,12 +14097,6 @@ dependencies = [
"try-lock",
]

[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"

[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
@@ -15367,7 +15384,7 @@ dependencies = [
"oid-registry",
"rusticata-macros",
"thiserror 1.0.69",
"time 0.3.41",
"time",
]

[[package]]
@@ -15979,7 +15996,7 @@ dependencies = [
"rustls 0.23.25",
"rustls-webpki 0.102.8",
"serde",
"time 0.3.41",
"time",
"tokio",
"tokio-util",
"tracing",
@@ -16030,7 +16047,7 @@ dependencies = [
"rustls-pki-types",
"rustls-webpki 0.102.8",
"secrecy",
"time 0.3.41",
"time",
"tokio",
"tokio-util",
"tracing",
@@ -16115,7 +16132,7 @@ dependencies = [
"rustls-webpki 0.102.8",
"secrecy",
"socket2 0.5.8",
"time 0.3.41",
"time",
"tls-listener",
"tokio",
"tokio-rustls 0.26.2",
@@ -16601,29 +16618,44 @@ dependencies = [

[[package]]
name = "zip"
version = "0.5.13"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93ab48844d61251bb3835145c521d88aa4031d7139e8485990f60ca911fa0815"
checksum = "9cc23c04387f4da0374be4533ad1208cbb091d5c11d070dfef13676ad6497164"
dependencies = [
"byteorder",
"arbitrary",
"crc32fast",
"crossbeam-utils",
"displaydoc",
"indexmap 2.8.0",
"num_enum",
"thiserror 1.0.69",
"time 0.1.45",
]

[[package]]
name = "zip"
version = "1.1.4"
version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cc23c04387f4da0374be4533ad1208cbb091d5c11d070dfef13676ad6497164"
checksum = "fabe6324e908f85a1c52063ce7aa26b68dcb7eb6dbc83a2d148403c9bc3eba50"
dependencies = [
"arbitrary",
"crc32fast",
"crossbeam-utils",
"displaydoc",
"indexmap 2.8.0",
"num_enum",
"thiserror 1.0.69",
"memchr",
"thiserror 2.0.12",
"time",
]

[[package]]
name = "zipsign-api"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dba6063ff82cdbd9a765add16d369abe81e520f836054e997c2db217ceca40c0"
dependencies = [
"base64 0.22.1",
"ed25519-dalek",
"thiserror 2.0.12",
]

[[package]]


+ 3
- 0
Cargo.toml View File

@@ -69,6 +69,7 @@ dora-metrics = { version = "0.3.10", path = "libraries/extensions/telemetry/metr
dora-download = { version = "0.3.10", path = "libraries/extensions/download" }
shared-memory-server = { version = "0.3.10", path = "libraries/shared-memory-server" }
communication-layer-request-reply = { version = "0.3.10", path = "libraries/communication-layer/request-reply" }
dora-cli = { version = "0.3.10", path = "binaries/cli" }
dora-runtime = { version = "0.3.10", path = "binaries/runtime" }
dora-daemon = { version = "0.3.10", path = "binaries/daemon" }
dora-coordinator = { version = "0.3.10", path = "binaries/coordinator" }
@@ -88,6 +89,7 @@ pyo3 = { version = "0.23", features = [
"multiple-pymethods",
] }
pythonize = "0.23"
git2 = { version = "0.18.0", features = ["vendored-openssl"] }

[package]
name = "dora-examples"
@@ -104,6 +106,7 @@ ros2-examples = []
[dev-dependencies]
eyre = "0.6.8"
tokio = "1.24.2"
dora-cli = { workspace = true }
dora-coordinator = { workspace = true }
dora-core = { workspace = true }
dora-message = { workspace = true }


+ 1
- 1
apis/python/node/Cargo.toml View File

@@ -24,7 +24,7 @@ eyre = "0.6"
serde_yaml = "0.8.23"
flume = "0.10.14"
dora-runtime = { workspace = true, features = ["tracing", "metrics", "python"] }
dora-daemon = { workspace = true }
dora-cli = { workspace = true }
dora-download = { workspace = true }
arrow = { workspace = true, features = ["pyarrow"] }
pythonize = { workspace = true }


+ 1
- 14
apis/python/node/src/lib.rs View File

@@ -6,7 +6,6 @@ use std::sync::Arc;
use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_daemon::Daemon;
use dora_download::download_file;
use dora_node_api::dora_core::config::NodeId;
use dora_node_api::dora_core::descriptor::source_is_url;
@@ -382,19 +381,7 @@ pub fn resolve_dataflow(dataflow: String) -> eyre::Result<PathBuf> {
#[pyfunction]
#[pyo3(signature = (dataflow_path, uv=None))]
pub fn run(dataflow_path: String, uv: Option<bool>) -> eyre::Result<()> {
let dataflow_path = resolve_dataflow(dataflow_path).context("could not resolve dataflow")?;
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv.unwrap_or_default()))?;
match result.is_ok() {
true => Ok(()),
false => Err(eyre::eyre!(
"Dataflow failed to run with error: {:?}",
result.node_results
)),
}
dora_cli::command::run(dataflow_path, uv.unwrap_or_default())
}

#[pymodule]


+ 3
- 2
binaries/cli/Cargo.toml View File

@@ -50,7 +50,7 @@ tabwriter = "1.4.0"
log = { version = "0.4.21", features = ["serde"] }
colored = "2.1.0"
env_logger = "0.11.3"
self_update = { version = "0.27.0", features = [
self_update = { version = "0.42.0", features = [
"rustls",
"archive-zip",
"archive-tar",
@@ -60,7 +60,8 @@ pyo3 = { workspace = true, features = [
"extension-module",
"abi3",
], optional = true }

dunce = "1.0.5"
git2 = { workspace = true }

[lib]
name = "dora_cli"


+ 107
- 0
binaries/cli/src/command/build/distributed.rs View File

@@ -0,0 +1,107 @@
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::descriptor::Descriptor;
use dora_message::{
cli_to_coordinator::ControlRequest,
common::{GitSource, LogMessage},
coordinator_to_cli::ControlRequestReply,
id::NodeId,
BuildId,
};
use eyre::{bail, Context};
use std::{
collections::BTreeMap,
net::{SocketAddr, TcpStream},
};

use crate::{output::print_log_message, session::DataflowSession};

pub fn build_distributed_dataflow(
session: &mut TcpRequestReplyConnection,
dataflow: Descriptor,
git_sources: &BTreeMap<NodeId, GitSource>,
dataflow_session: &DataflowSession,
local_working_dir: Option<std::path::PathBuf>,
uv: bool,
) -> eyre::Result<BuildId> {
let build_id = {
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Build {
session_id: dataflow_session.session_id,
dataflow,
git_sources: git_sources.clone(),
prev_git_sources: dataflow_session.git_sources.clone(),
local_working_dir,
uv,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowBuildTriggered { build_id } => {
eprintln!("dataflow build triggered: {build_id}");
build_id
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
};
Ok(build_id)
}

pub fn wait_until_dataflow_built(
build_id: BuildId,
session: &mut TcpRequestReplyConnection,
coordinator_socket: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<BuildId> {
// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?,
};
log_session
.send(
&serde_json::to_vec(&ControlRequest::BuildLogSubscribe {
build_id,
level: log_level,
})
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send build log subscribe request to coordinator")?;
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
match parsed {
Ok(log_message) => {
print_log_message(log_message);
}
Err(err) => {
tracing::warn!("failed to parse log message: {err:?}")
}
}
}
});

let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::WaitForBuild { build_id }).unwrap())
.wrap_err("failed to send WaitForBuild message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowBuildFinished { build_id, result } => match result {
Ok(()) => {
eprintln!("dataflow build finished successfully");
Ok(build_id)
}
Err(err) => bail!("{err}"),
},
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
}

+ 45
- 0
binaries/cli/src/command/build/git.rs View File

@@ -0,0 +1,45 @@
use dora_message::{common::GitSource, descriptor::GitRepoRev};
use eyre::Context;

pub fn fetch_commit_hash(repo_url: String, rev: Option<GitRepoRev>) -> eyre::Result<GitSource> {
let mut remote = git2::Remote::create_detached(repo_url.as_bytes())
.with_context(|| format!("failed to create git remote for {repo_url}"))?;
let connection = remote
.connect_auth(git2::Direction::Fetch, None, None)
.with_context(|| format!("failed to open connection to {repo_url}"))?;
let references = connection
.list()
.with_context(|| format!("failed to list git references of {repo_url}"))?;

let expected_name = match &rev {
Some(GitRepoRev::Branch(branch)) => format!("refs/heads/{branch}"),
Some(GitRepoRev::Tag(tag)) => format!("refs/tags/{tag}"),
Some(GitRepoRev::Rev(rev)) => rev.clone(),
None => "HEAD".into(),
};

let mut commit_hash = None;
for head in references {
if head.name() == expected_name {
commit_hash = Some(head.oid().to_string());
break;
}
}

if commit_hash.is_none() {
if let Some(GitRepoRev::Rev(rev)) = &rev {
// rev might be a commit hash instead of a reference
if rev.is_ascii() && rev.bytes().all(|b| b.is_ascii_alphanumeric()) {
commit_hash = Some(rev.clone());
}
}
}

match commit_hash {
Some(commit_hash) => Ok(GitSource {
repo: repo_url,
commit_hash,
}),
None => eyre::bail!("no matching commit for `{rev:?}`"),
}
}

+ 101
- 0
binaries/cli/src/command/build/local.rs View File

@@ -0,0 +1,101 @@
use std::{collections::BTreeMap, path::PathBuf};

use dora_core::{
build::{BuildInfo, BuildLogger, Builder, GitManager},
descriptor::{Descriptor, DescriptorExt},
};
use dora_message::{common::GitSource, id::NodeId};
use eyre::Context;

use crate::session::DataflowSession;

pub fn build_dataflow_locally(
dataflow: Descriptor,
git_sources: &BTreeMap<NodeId, GitSource>,
dataflow_session: &DataflowSession,
working_dir: PathBuf,
uv: bool,
) -> eyre::Result<BuildInfo> {
let runtime = tokio::runtime::Runtime::new()?;

runtime.block_on(build_dataflow(
dataflow,
git_sources,
dataflow_session,
working_dir,
uv,
))
}

async fn build_dataflow(
dataflow: Descriptor,
git_sources: &BTreeMap<NodeId, GitSource>,
dataflow_session: &DataflowSession,
base_working_dir: PathBuf,
uv: bool,
) -> eyre::Result<BuildInfo> {
let builder = Builder {
session_id: dataflow_session.session_id,
base_working_dir,
uv,
};
let nodes = dataflow.resolve_aliases_and_set_defaults()?;

let mut git_manager = GitManager::default();
let prev_git_sources = &dataflow_session.git_sources;

let mut tasks = Vec::new();

// build nodes
for node in nodes.into_values() {
let node_id = node.id.clone();
let git_source = git_sources.get(&node_id).cloned();
let prev_git_source = prev_git_sources.get(&node_id).cloned();

let task = builder
.clone()
.build_node(
node,
git_source,
prev_git_source,
LocalBuildLogger {
node_id: node_id.clone(),
},
&mut git_manager,
)
.await
.wrap_err_with(|| format!("failed to build node `{node_id}`"))?;
tasks.push((node_id, task));
}

let mut info = BuildInfo {
node_working_dirs: Default::default(),
};
for (node_id, task) in tasks {
let node = task
.await
.with_context(|| format!("failed to build node `{node_id}`"))?;
info.node_working_dirs
.insert(node_id, node.node_working_dir);
}
Ok(info)
}

struct LocalBuildLogger {
node_id: NodeId,
}

impl BuildLogger for LocalBuildLogger {
type Clone = Self;

async fn log_message(&mut self, level: log::Level, message: impl Into<String> + Send) {
let message: String = message.into();
println!("{}: \t{level}: \t{message}", self.node_id);
}

async fn try_clone(&self) -> eyre::Result<Self::Clone> {
Ok(LocalBuildLogger {
node_id: self.node_id.clone(),
})
}
}

+ 162
- 0
binaries/cli/src/command/build/mod.rs View File

@@ -0,0 +1,162 @@
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::{
descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt},
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST},
};
use dora_message::{descriptor::NodeSource, BuildId};
use eyre::Context;
use std::collections::BTreeMap;

use crate::{connect_to_coordinator, resolve_dataflow, session::DataflowSession};

use distributed::{build_distributed_dataflow, wait_until_dataflow_built};
use local::build_dataflow_locally;

mod distributed;
mod git;
mod local;

pub fn build(
dataflow: String,
coordinator_addr: Option<std::net::IpAddr>,
coordinator_port: Option<u16>,
uv: bool,
force_local: bool,
) -> eyre::Result<()> {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow_path).wrap_err("Failed to read yaml dataflow")?;
let mut dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;

let mut git_sources = BTreeMap::new();
let resolved_nodes = dataflow_descriptor
.resolve_aliases_and_set_defaults()
.context("failed to resolve nodes")?;
for (node_id, node) in resolved_nodes {
if let CoreNodeKind::Custom(CustomNode {
source: NodeSource::GitBranch { repo, rev },
..
}) = node.kind
{
let source = git::fetch_commit_hash(repo, rev)
.with_context(|| format!("failed to find commit hash for `{node_id}`"))?;
git_sources.insert(node_id, source);
}
}

let session = connect_to_coordinator_with_defaults(coordinator_addr, coordinator_port);

let build_kind = if force_local {
// user explicitly requested a local build
BuildKind::Local
} else if coordinator_addr.is_some() || coordinator_port.is_some() {
// explicit coordinator address or port set -> there should be a coordinator running
BuildKind::ThroughCoordinator {
coordinator_session: session.context("failed to connect to coordinator")?,
}
} else {
match session {
Ok(coordinator_session) => {
// we found a local coordinator instance at default port -> use it for building
BuildKind::ThroughCoordinator {
coordinator_session,
}
}
Err(_) => {
// no coordinator instance found -> do a local build
BuildKind::Local
}
}
};

match build_kind {
BuildKind::Local => {
println!("running local build");
// use dataflow dir as base working dir
let local_working_dir = dunce::canonicalize(&dataflow_path)
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
let build_info = build_dataflow_locally(
dataflow_descriptor,
&git_sources,
&dataflow_session,
local_working_dir,
uv,
)?;

dataflow_session.git_sources = git_sources;
// generate a random BuildId and store the associated build info
dataflow_session.build_id = Some(BuildId::generate());
dataflow_session.local_build = Some(build_info);
dataflow_session
.write_out_for_dataflow(&dataflow_path)
.context("failed to write out dataflow session file")?;
}
BuildKind::ThroughCoordinator {
mut coordinator_session,
} => {
let local_working_dir = super::local_working_dir(
&dataflow_path,
&dataflow_descriptor,
&mut *coordinator_session,
)?;
let build_id = build_distributed_dataflow(
&mut *coordinator_session,
dataflow_descriptor,
&git_sources,
&dataflow_session,
local_working_dir,
uv,
)?;

dataflow_session.git_sources = git_sources;
dataflow_session
.write_out_for_dataflow(&dataflow_path)
.context("failed to write out dataflow session file")?;

// wait until dataflow build is finished

wait_until_dataflow_built(
build_id,
&mut *coordinator_session,
coordinator_socket(coordinator_addr, coordinator_port),
log::LevelFilter::Info,
)?;

dataflow_session.build_id = Some(build_id);
dataflow_session.local_build = None;
dataflow_session
.write_out_for_dataflow(&dataflow_path)
.context("failed to write out dataflow session file")?;
}
};

Ok(())
}

enum BuildKind {
Local,
ThroughCoordinator {
coordinator_session: Box<TcpRequestReplyConnection>,
},
}

fn connect_to_coordinator_with_defaults(
coordinator_addr: Option<std::net::IpAddr>,
coordinator_port: Option<u16>,
) -> std::io::Result<Box<TcpRequestReplyConnection>> {
let coordinator_socket = coordinator_socket(coordinator_addr, coordinator_port);
connect_to_coordinator(coordinator_socket)
}

fn coordinator_socket(
coordinator_addr: Option<std::net::IpAddr>,
coordinator_port: Option<u16>,
) -> std::net::SocketAddr {
let coordinator_addr = coordinator_addr.unwrap_or(LOCALHOST);
let coordinator_port = coordinator_port.unwrap_or(DORA_COORDINATOR_PORT_CONTROL_DEFAULT);
(coordinator_addr, coordinator_port).into()
}

binaries/cli/src/check.rs → binaries/cli/src/command/check.rs View File


binaries/cli/src/logs.rs → binaries/cli/src/command/logs.rs View File


+ 60
- 0
binaries/cli/src/command/mod.rs View File

@@ -0,0 +1,60 @@
pub use build::build;
pub use logs::logs;
pub use run::run;
pub use start::start;

use std::path::{Path, PathBuf};

use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::descriptor::Descriptor;
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use eyre::{bail, Context, ContextCompat};

mod build;
pub mod check;
mod logs;
mod run;
mod start;
pub mod up;

fn local_working_dir(
dataflow_path: &Path,
dataflow_descriptor: &Descriptor,
coordinator_session: &mut TcpRequestReplyConnection,
) -> eyre::Result<Option<PathBuf>> {
Ok(
if dataflow_descriptor
.nodes
.iter()
.all(|n| n.deploy.machine.is_none())
&& cli_and_daemon_on_same_machine(coordinator_session)?
{
Some(
dunce::canonicalize(dataflow_path)
.context("failed to canonicalize dataflow file path")?
.parent()
.context("dataflow path has no parent dir")?
.to_owned(),
)
} else {
None
},
)
}

fn cli_and_daemon_on_same_machine(session: &mut TcpRequestReplyConnection) -> eyre::Result<bool> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::CliAndDefaultDaemonOnSameMachine).unwrap())
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::CliAndDefaultDaemonIps {
default_daemon,
cli,
} => Ok(default_daemon.is_some() && default_daemon == cli),
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
}

+ 23
- 0
binaries/cli/src/command/run.rs View File

@@ -0,0 +1,23 @@
use dora_daemon::Daemon;
use eyre::Context;
use tokio::runtime::Builder;

use crate::{handle_dataflow_result, resolve_dataflow, session::DataflowSession};

pub fn run(dataflow: String, uv: bool) -> Result<(), eyre::Error> {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(
&dataflow_path,
dataflow_session.build_id,
dataflow_session.local_build,
dataflow_session.session_id,
uv,
))?;
handle_dataflow_result(result, None)
}

binaries/cli/src/attach.rs → binaries/cli/src/command/start/attach.rs View File

@@ -1,4 +1,3 @@
use colored::Colorize;
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor, DescriptorExt};
use dora_message::cli_to_coordinator::ControlRequest;
@@ -16,6 +15,7 @@ use tracing::{error, info};
use uuid::Uuid;

use crate::handle_dataflow_result;
use crate::output::print_log_message;

pub fn attach_dataflow(
dataflow: Descriptor,
@@ -183,42 +183,6 @@ pub fn attach_dataflow(
}
}

pub fn print_log_message(log_message: LogMessage) {
let LogMessage {
dataflow_id,
node_id,
daemon_id,
level,
target,
module_path: _,
file: _,
line: _,
message,
} = log_message;
let level = match level {
log::Level::Error => "ERROR".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
other => format!("{other:5}").normal(),
};
let dataflow = format!(" dataflow `{dataflow_id}`").cyan();
let daemon = match daemon_id {
Some(id) => format!(" on daemon `{id}`"),
None => " on default daemon".to_string(),
}
.bright_black();
let node = match node_id {
Some(node_id) => format!(" {node_id}").bold(),
None => "".normal(),
};
let target = match target {
Some(target) => format!(" {target}").dimmed(),
None => "".normal(),
};

println!("{level}{dataflow}{daemon}{node}{target}: {message}");
}

enum AttachEvent {
Control(ControlRequest),
Log(eyre::Result<LogMessage>),

+ 167
- 0
binaries/cli/src/command/start/mod.rs View File

@@ -0,0 +1,167 @@
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::descriptor::{Descriptor, DescriptorExt};
use dora_message::{
cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::ControlRequestReply,
};
use eyre::{bail, Context};
use std::{
net::{SocketAddr, TcpStream},
path::PathBuf,
};
use uuid::Uuid;

use crate::{
connect_to_coordinator, output::print_log_message, resolve_dataflow, session::DataflowSession,
};
use attach::attach_dataflow;

mod attach;

pub fn start(
dataflow: String,
name: Option<String>,
coordinator_socket: SocketAddr,
attach: bool,
detach: bool,
hot_reload: bool,
uv: bool,
) -> eyre::Result<()> {
let (dataflow, dataflow_descriptor, mut session, dataflow_id) =
start_dataflow(dataflow, name, coordinator_socket, uv)?;

let attach = match (attach, detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
(true, false) => true,
(false, true) => false,
(false, false) => {
println!("attaching to dataflow (use `--detach` to run in background)");
true
}
};

if attach {
let log_level = env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.build()
.filter();

attach_dataflow(
dataflow_descriptor,
dataflow,
dataflow_id,
&mut *session,
hot_reload,
coordinator_socket,
log_level,
)
} else {
// wait until dataflow is started
wait_until_dataflow_started(
dataflow_id,
&mut session,
coordinator_socket,
log::LevelFilter::Info,
)
}
}

fn start_dataflow(
dataflow: String,
name: Option<String>,
coordinator_socket: SocketAddr,
uv: bool,
) -> Result<(PathBuf, Descriptor, Box<TcpRequestReplyConnection>, Uuid), eyre::Error> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let dataflow_session =
DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?;

let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;

let local_working_dir =
super::local_working_dir(&dataflow, &dataflow_descriptor, &mut *session)?;

let dataflow_id = {
let dataflow = dataflow_descriptor.clone();
let session: &mut TcpRequestReplyConnection = &mut *session;
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
build_id: dataflow_session.build_id,
session_id: dataflow_session.session_id,
dataflow,
name,
local_working_dir,
uv,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStartTriggered { uuid } => {
eprintln!("dataflow start triggered: {uuid}");
uuid
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
};
Ok((dataflow, dataflow_descriptor, session, dataflow_id))
}

fn wait_until_dataflow_started(
dataflow_id: Uuid,
session: &mut Box<TcpRequestReplyConnection>,
coordinator_addr: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<()> {
// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_addr)
.wrap_err("failed to connect to dora coordinator")?,
};
log_session
.send(
&serde_json::to_vec(&ControlRequest::LogSubscribe {
dataflow_id,
level: log_level,
})
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send log subscribe request to coordinator")?;
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
match parsed {
Ok(log_message) => {
print_log_message(log_message);
}
Err(err) => {
tracing::warn!("failed to parse log message: {err:?}")
}
}
}
});

let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::WaitForSpawn { dataflow_id }).unwrap())
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowSpawned { uuid } => {
eprintln!("dataflow started: {uuid}");
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
Ok(())
}

binaries/cli/src/up.rs → binaries/cli/src/command/up.rs View File

@@ -1,4 +1,4 @@
use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST};
use crate::{command::check::daemon_running, connect_to_coordinator, LOCALHOST};
use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT;
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use eyre::{bail, Context, ContextCompat};

+ 38
- 196
binaries/cli/src/lib.rs View File

@@ -1,8 +1,5 @@
use attach::{attach_dataflow, print_log_message};
use colored::Colorize;
use communication_layer_request_reply::{
RequestReplyLayer, TcpConnection, TcpLayer, TcpRequestReplyConnection,
};
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_coordinator::Event;
use dora_core::{
descriptor::{source_is_url, Descriptor, DescriptorExt},
@@ -15,7 +12,6 @@ use dora_daemon::Daemon;
use dora_download::download_file;
use dora_message::{
cli_to_coordinator::ControlRequest,
common::LogMessage,
coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus},
};
#[cfg(feature = "tracing")]
@@ -24,11 +20,7 @@ use dora_tracing::{set_up_tracing_opts, FileLogging};
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
use std::{
env::current_dir,
io::Write,
net::{SocketAddr, TcpStream},
};
use std::{env::current_dir, io::Write, net::SocketAddr};
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
@@ -39,13 +31,12 @@ use tokio::runtime::Builder;
use tracing::level_filters::LevelFilter;
use uuid::Uuid;

mod attach;
mod check;
pub mod command;
mod formatting;
mod graph;
mod logs;
pub mod output;
pub mod session;
mod template;
mod up;

const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
@@ -90,14 +81,17 @@ enum Command {
#[clap(value_name = "PATH")]
dataflow: String,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
#[clap(long, value_name = "IP")]
coordinator_addr: Option<IpAddr>,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
#[clap(long, value_name = "PORT")]
coordinator_port: Option<u16>,
// Use UV to build nodes.
#[clap(long, action)]
uv: bool,
// Run build on local machine
#[clap(long, action)]
local: bool,
},
/// Generate a new project or node. Choose the language between Rust, Python, C or C++.
New {
@@ -298,14 +292,14 @@ enum Lang {
}

pub fn lib_main(args: Args) {
if let Err(err) = run(args) {
if let Err(err) = run_cli(args) {
eprintln!("\n\n{}", "[ERROR]".bold().red());
eprintln!("{err:?}");
std::process::exit(1);
}
}

fn run(args: Args) -> eyre::Result<()> {
fn run_cli(args: Args) -> eyre::Result<()> {
#[cfg(feature = "tracing")]
match &args.command {
Command::Daemon {
@@ -347,12 +341,6 @@ fn run(args: Args) -> eyre::Result<()> {
}
};

let log_level = env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.build()
.filter();

match args.command {
Command::Check {
dataflow,
@@ -367,9 +355,9 @@ fn run(args: Args) -> eyre::Result<()> {
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
check::check_environment((coordinator_addr, coordinator_port).into())?
command::check::check_environment((coordinator_addr, coordinator_port).into())?
}
None => check::check_environment((coordinator_addr, coordinator_port).into())?,
None => command::check::check_environment((coordinator_addr, coordinator_port).into())?,
},
Command::Graph {
dataflow,
@@ -383,34 +371,15 @@ fn run(args: Args) -> eyre::Result<()> {
coordinator_addr,
coordinator_port,
uv,
} => {
let coordinator_socket = (coordinator_addr, coordinator_port).into();
let (_, _, mut session, uuid) =
start_dataflow(dataflow, None, coordinator_socket, uv, true)?;
// wait until build is finished
wait_until_dataflow_started(
uuid,
&mut session,
true,
coordinator_socket,
log::LevelFilter::Info,
)?;
}
local,
} => command::build(dataflow, coordinator_addr, coordinator_port, uv, local)?,
Command::New {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Run { dataflow, uv } => {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv))?;
handle_dataflow_result(result, None)?
}
Command::Run { dataflow, uv } => command::run(dataflow, uv)?,
Command::Up { config } => {
up::up(config.as_deref())?;
command::up::up(config.as_deref())?;
}
Command::Logs {
dataflow,
@@ -425,7 +394,7 @@ fn run(args: Args) -> eyre::Result<()> {
if let Some(dataflow) = dataflow {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
logs::logs(&mut *session, uuid, name, node)?
command::logs(&mut *session, uuid, name, node)?
} else {
let active: Vec<dora_message::coordinator_to_cli::DataflowIdAndName> =
list.get_active();
@@ -434,7 +403,7 @@ fn run(args: Args) -> eyre::Result<()> {
[uuid] => uuid.clone(),
_ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?,
};
logs::logs(&mut *session, Some(uuid.uuid), None, node)?
command::logs(&mut *session, Some(uuid.uuid), None, node)?
}
}
Command::Start {
@@ -448,39 +417,15 @@ fn run(args: Args) -> eyre::Result<()> {
uv,
} => {
let coordinator_socket = (coordinator_addr, coordinator_port).into();
let (dataflow, dataflow_descriptor, mut session, dataflow_id) =
start_dataflow(dataflow, name, coordinator_socket, uv, false)?;

let attach = match (attach, detach) {
(true, true) => eyre::bail!("both `--attach` and `--detach` are given"),
(true, false) => true,
(false, true) => false,
(false, false) => {
println!("attaching to dataflow (use `--detach` to run in background)");
true
}
};

if attach {
attach_dataflow(
dataflow_descriptor,
dataflow,
dataflow_id,
&mut *session,
hot_reload,
coordinator_socket,
log_level,
)?
} else {
// wait until dataflow is started
wait_until_dataflow_started(
dataflow_id,
&mut session,
false,
coordinator_socket,
log::LevelFilter::Info,
)?;
}
command::start(
dataflow,
name,
coordinator_socket,
attach,
detach,
hot_reload,
uv,
)?
}
Command::List {
coordinator_addr,
@@ -510,7 +455,7 @@ fn run(args: Args) -> eyre::Result<()> {
config,
coordinator_addr,
coordinator_port,
} => up::destroy(
} => command::up::destroy(
config.as_deref(),
(coordinator_addr, coordinator_port).into(),
)?,
@@ -560,8 +505,11 @@ fn run(args: Args) -> eyre::Result<()> {
coordinator_addr
);
}
let dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;

let result = Daemon::run_dataflow(&dataflow_path, false).await?;
let result = Daemon::run_dataflow(&dataflow_path,
dataflow_session.build_id, dataflow_session.local_build, dataflow_session.session_id, false).await?;
handle_dataflow_result(result, None)
}
None => {
@@ -632,114 +580,6 @@ fn run(args: Args) -> eyre::Result<()> {
Ok(())
}

fn start_dataflow(
dataflow: String,
name: Option<String>,
coordinator_socket: SocketAddr,
uv: bool,
build_only: bool,
) -> Result<(PathBuf, Descriptor, Box<TcpRequestReplyConnection>, Uuid), eyre::Error> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
let mut session = connect_to_coordinator(coordinator_socket)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = {
let dataflow = dataflow_descriptor.clone();
let session: &mut TcpRequestReplyConnection = &mut *session;
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
dataflow,
name,
local_working_dir: working_dir,
uv,
build_only,
})
.unwrap(),
)
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStartTriggered { uuid } => {
if build_only {
eprintln!("dataflow build triggered");
} else {
eprintln!("dataflow start triggered: {uuid}");
}
uuid
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
};
Ok((dataflow, dataflow_descriptor, session, dataflow_id))
}

fn wait_until_dataflow_started(
dataflow_id: Uuid,
session: &mut Box<TcpRequestReplyConnection>,
build_only: bool,
coordinator_addr: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<()> {
// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_addr)
.wrap_err("failed to connect to dora coordinator")?,
};
log_session
.send(
&serde_json::to_vec(&ControlRequest::LogSubscribe {
dataflow_id,
level: log_level,
})
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send log subscribe request to coordinator")?;
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
match parsed {
Ok(log_message) => {
print_log_message(log_message);
}
Err(err) => {
tracing::warn!("failed to parse log message: {err:?}")
}
}
}
});

let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::WaitForSpawn { dataflow_id }).unwrap())
.wrap_err("failed to send start dataflow message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowSpawned { uuid } => {
if build_only {
eprintln!("dataflow build finished");
} else {
eprintln!("dataflow started: {uuid}");
}
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
Ok(())
}

fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
@@ -890,6 +730,8 @@ use pyo3::{
wrap_pyfunction, Bound, PyResult, Python,
};

use crate::session::DataflowSession;

#[cfg(feature = "python")]
#[pyfunction]
fn py_main(_py: Python) -> PyResult<()> {


+ 48
- 0
binaries/cli/src/output.rs View File

@@ -0,0 +1,48 @@
use colored::Colorize;
use dora_message::common::LogMessage;

pub fn print_log_message(log_message: LogMessage) {
let LogMessage {
build_id,
dataflow_id,
node_id,
daemon_id,
level,
target,
module_path: _,
file: _,
line: _,
message,
} = log_message;
let level = match level {
log::Level::Error => "ERROR".red(),
log::Level::Warn => "WARN ".yellow(),
log::Level::Info => "INFO ".green(),
other => format!("{other:5}").normal(),
};
let dataflow = if let Some(dataflow_id) = dataflow_id {
format!(" dataflow `{dataflow_id}`").cyan()
} else {
String::new().cyan()
};
let build = if let Some(build_id) = build_id {
format!(" build `{build_id}`").cyan()
} else {
String::new().cyan()
};
let daemon = match daemon_id {
Some(id) => format!(" on daemon `{id}`"),
None => " on default daemon".to_string(),
}
.bright_black();
let node = match node_id {
Some(node_id) => format!(" {node_id}").bold(),
None => "".normal(),
};
let target = match target {
Some(target) => format!(" {target}").dimmed(),
None => "".normal(),
};

println!("{level}{build}{dataflow}{daemon}{node}{target}: {message}");
}

+ 73
- 0
binaries/cli/src/session.rs View File

@@ -0,0 +1,73 @@
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
};

use dora_core::build::BuildInfo;
use dora_message::{common::GitSource, id::NodeId, BuildId, SessionId};
use eyre::{Context, ContextCompat};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DataflowSession {
pub build_id: Option<BuildId>,
pub session_id: SessionId,
pub git_sources: BTreeMap<NodeId, GitSource>,
pub local_build: Option<BuildInfo>,
}

impl Default for DataflowSession {
fn default() -> Self {
Self {
build_id: None,
session_id: SessionId::generate(),
git_sources: Default::default(),
local_build: Default::default(),
}
}
}

impl DataflowSession {
pub fn read_session(dataflow_path: &Path) -> eyre::Result<Self> {
let session_file = session_file_path(dataflow_path)?;
if session_file.exists() {
if let Ok(parsed) = deserialize(&session_file) {
return Ok(parsed);
} else {
tracing::warn!("failed to read dataflow session file, regenerating (you might need to run `dora build` again)");
}
}

let default_session = DataflowSession::default();
default_session.write_out_for_dataflow(dataflow_path)?;
Ok(default_session)
}

pub fn write_out_for_dataflow(&self, dataflow_path: &Path) -> eyre::Result<()> {
let session_file = session_file_path(dataflow_path)?;
std::fs::write(session_file, self.serialize()?)
.context("failed to write dataflow session file")?;
Ok(())
}

fn serialize(&self) -> eyre::Result<String> {
serde_yaml::to_string(&self).context("failed to serialize dataflow session file")
}
}

fn deserialize(session_file: &Path) -> eyre::Result<DataflowSession> {
std::fs::read_to_string(&session_file)
.context("failed to read DataflowSession file")
.and_then(|s| {
serde_yaml::from_str(&s).context("failed to deserialize DataflowSession file")
})
}

fn session_file_path(dataflow_path: &Path) -> eyre::Result<PathBuf> {
let file_stem = dataflow_path
.file_stem()
.wrap_err("dataflow path has no file stem")?
.to_str()
.wrap_err("dataflow file stem is not valid utf-8")?;
let session_file = dataflow_path.with_file_name(format!("{file_stem}.dora-session.yaml"));
Ok(session_file)
}

+ 28
- 2
binaries/coordinator/src/control.rs View File

@@ -2,7 +2,9 @@ use crate::{
tcp_utils::{tcp_receive, tcp_send},
Event,
};
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use dora_message::{
cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply, BuildId,
};
use eyre::{eyre, Context};
use futures::{
future::{self, Either},
@@ -79,6 +81,7 @@ async fn handle_requests(
tx: mpsc::Sender<ControlEvent>,
_finish_tx: mpsc::Sender<()>,
) {
let peer_addr = connection.peer_addr().ok();
loop {
let next_request = tcp_receive(&mut connection).map(Either::Left);
let coordinator_stopped = tx.closed().map(Either::Right);
@@ -114,11 +117,29 @@ async fn handle_requests(
break;
}

let result = match request {
if let Ok(ControlRequest::BuildLogSubscribe { build_id, level }) = request {
let _ = tx
.send(ControlEvent::BuildLogSubscribe {
build_id,
level,
connection,
})
.await;
break;
}

let mut result = match request {
Ok(request) => handle_request(request, &tx).await,
Err(err) => Err(err),
};

if let Ok(ControlRequestReply::CliAndDefaultDaemonIps { cli, .. }) = &mut result {
if cli.is_none() {
// fill cli IP address in reply
*cli = peer_addr.map(|s| s.ip());
}
}

let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err:?}")));
let serialized: Vec<u8> =
match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") {
@@ -179,6 +200,11 @@ pub enum ControlEvent {
level: log::LevelFilter,
connection: TcpStream,
},
BuildLogSubscribe {
build_id: BuildId,
level: log::LevelFilter,
connection: TcpStream,
},
Error(eyre::Report),
}



+ 282
- 45
binaries/coordinator/src/lib.rs View File

@@ -5,22 +5,27 @@ use crate::{
pub use control::ControlEvent;
use dora_core::{
config::{NodeId, OperatorId},
descriptor::DescriptorExt,
uhlc::{self, HLC},
};
use dora_message::{
cli_to_coordinator::ControlRequest,
common::DaemonId,
common::{DaemonId, GitSource},
coordinator_to_cli::{
ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
DataflowStatus, LogLevel, LogMessage,
},
coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped},
coordinator_to_daemon::{
BuildDataflowNodes, DaemonCoordinatorEvent, RegisterResult, Timestamped,
},
daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
descriptor::{Descriptor, ResolvedNode},
BuildId, DataflowId, SessionId,
};
use eyre::{bail, eyre, ContextCompat, Result, WrapErr};
use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt};
use futures_concurrency::stream::Merge;
use itertools::Itertools;
use log_subscriber::LogSubscriber;
use run::SpawnedDataflow;
use std::{
@@ -139,6 +144,10 @@ impl DaemonConnections {
}
}

fn get(&self, id: &DaemonId) -> Option<&DaemonConnection> {
self.daemons.get(id)
}

fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
self.daemons.get_mut(id)
}
@@ -194,10 +203,12 @@ async fn start_inner(

let mut events = (abortable_events, daemon_events).merge();

let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<Uuid, BTreeMap<DaemonId, DataflowDaemonResult>> =
let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();

let mut running_dataflows: HashMap<DataflowId, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<DataflowId, BTreeMap<DaemonId, DataflowDaemonResult>> =
HashMap::new();
let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
let mut archived_dataflows: HashMap<DataflowId, ArchivedDataflow> = HashMap::new();
let mut daemon_connections = DaemonConnections::default();

while let Some(event) = events.next().await {
@@ -351,9 +362,10 @@ async fn start_inner(
let mut finished_dataflow = entry.remove();
let dataflow_id = finished_dataflow.uuid;
send_log_message(
&mut finished_dataflow,
&mut finished_dataflow.log_subscribers,
&LogMessage {
dataflow_id,
build_id: None,
dataflow_id: Some(dataflow_id),
node_id: None,
daemon_id: None,
level: LogLevel::Info,
@@ -380,7 +392,7 @@ async fn start_inner(
}
if !matches!(
finished_dataflow.spawn_result,
SpawnResult::Spawned { .. }
CachedResult::Cached { .. }
) {
log::error!("pending spawn result on dataflow finish");
}
@@ -399,12 +411,56 @@ async fn start_inner(
reply_sender,
} => {
match request {
ControlRequest::Build {
session_id,
dataflow,
git_sources,
prev_git_sources,
local_working_dir,
uv,
} => {
// assign a random build id
let build_id = BuildId::generate();

let result = build_dataflow(
build_id,
session_id,
dataflow,
git_sources,
prev_git_sources,
local_working_dir,
&clock,
uv,
&mut daemon_connections,
)
.await;
match result {
Ok(build) => {
running_builds.insert(build_id, build);
let _ = reply_sender.send(Ok(
ControlRequestReply::DataflowBuildTriggered { build_id },
));
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
}
}
ControlRequest::WaitForBuild { build_id } => {
if let Some(build) = running_builds.get_mut(&build_id) {
build.build_result.register(reply_sender);
} else {
let _ =
reply_sender.send(Err(eyre!("unknown build id {build_id}")));
}
}
ControlRequest::Start {
build_id,
session_id,
dataflow,
name,
local_working_dir,
uv,
build_only,
} => {
let name = name.or_else(|| names::Generator::default().next());

@@ -419,13 +475,14 @@ async fn start_inner(
}
}
let dataflow = start_dataflow(
build_id,
session_id,
dataflow,
local_working_dir,
name,
&mut daemon_connections,
&clock,
uv,
build_only,
)
.await?;
Ok(dataflow)
@@ -652,6 +709,27 @@ async fn start_inner(
"LogSubscribe request should be handled separately"
)));
}
ControlRequest::BuildLogSubscribe { .. } => {
let _ = reply_sender.send(Err(eyre::eyre!(
"BuildLogSubscribe request should be handled separately"
)));
}
ControlRequest::CliAndDefaultDaemonOnSameMachine => {
let mut default_daemon_ip = None;
if let Some(default_id) = daemon_connections.unnamed().next() {
if let Some(connection) = daemon_connections.get(default_id) {
if let Ok(addr) = connection.stream.peer_addr() {
default_daemon_ip = Some(addr.ip());
}
}
}
let _ = reply_sender.send(Ok(
ControlRequestReply::CliAndDefaultDaemonIps {
default_daemon: default_daemon_ip,
cli: None, // filled later
},
));
}
}
}
ControlEvent::Error(err) => tracing::error!("{err:?}"),
@@ -666,6 +744,17 @@ async fn start_inner(
.push(LogSubscriber::new(level, connection));
}
}
ControlEvent::BuildLogSubscribe {
build_id,
level,
connection,
} => {
if let Some(build) = running_builds.get_mut(&build_id) {
build
.log_subscribers
.push(LogSubscriber::new(level, connection));
}
}
},
Event::DaemonHeartbeatInterval => {
let mut disconnected = BTreeSet::new();
@@ -721,14 +810,52 @@ async fn start_inner(
}
}
Event::Log(message) => {
if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) {
send_log_message(dataflow, &message).await;
if let Some(dataflow_id) = &message.dataflow_id {
if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
send_log_message(&mut dataflow.log_subscribers, &message).await;
}
}
if let Some(build_id) = message.build_id {
if let Some(build) = running_builds.get_mut(&build_id) {
send_log_message(&mut build.log_subscribers, &message).await;
}
}
}
Event::DaemonExit { daemon_id } => {
tracing::info!("Daemon `{daemon_id}` exited");
daemon_connections.remove(&daemon_id);
}
Event::DataflowBuildResult {
build_id,
daemon_id,
result,
} => match running_builds.get_mut(&build_id) {
Some(build) => {
build.pending_build_results.remove(&daemon_id);
match result {
Ok(()) => {}
Err(err) => {
build.errors.push(format!("{err:?}"));
}
};
if build.pending_build_results.is_empty() {
tracing::info!("dataflow build finished: `{build_id}`");
let mut build = running_builds.remove(&build_id).unwrap();
let result = if build.errors.is_empty() {
Ok(())
} else {
Err(format!("build failed: {}", build.errors.join("\n\n")))
};

build.build_result.set_result(Ok(
ControlRequestReply::DataflowBuildFinished { build_id, result },
));
}
}
None => {
tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map");
}
},
Event::DataflowSpawnResult {
dataflow_id,
daemon_id,
@@ -739,21 +866,10 @@ async fn start_inner(
match result {
Ok(()) => {
if dataflow.pending_spawn_results.is_empty() {
tracing::info!(
"successfully {} dataflow `{dataflow_id}`",
if dataflow.build_only {
"built"
} else {
"spawned"
}
);
tracing::info!("successfully spawned dataflow `{dataflow_id}`",);
dataflow.spawn_result.set_result(Ok(
ControlRequestReply::DataflowSpawned { uuid: dataflow_id },
));

if dataflow.build_only {
running_dataflows.remove(&dataflow_id);
}
}
}
Err(err) => {
@@ -783,8 +899,8 @@ async fn start_inner(
Ok(())
}

async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) {
for subscriber in &mut dataflow.log_subscribers {
async fn send_log_message(log_subscribers: &mut Vec<LogSubscriber>, message: &LogMessage) {
for subscriber in log_subscribers.iter_mut() {
let send_result =
tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));

@@ -792,7 +908,7 @@ async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage)
subscriber.close();
}
}
dataflow.log_subscribers.retain(|s| !s.is_closed());
log_subscribers.retain(|s| !s.is_closed());
}

fn dataflow_result(
@@ -859,6 +975,15 @@ async fn send_heartbeat_message(
.wrap_err("failed to send heartbeat message to daemon")
}

struct RunningBuild {
errors: Vec<String>,
build_result: CachedResult,

log_subscribers: Vec<LogSubscriber>,

pending_build_results: BTreeSet<DaemonId>,
}

struct RunningDataflow {
name: Option<String>,
uuid: Uuid,
@@ -869,26 +994,24 @@ struct RunningDataflow {
exited_before_subscribe: Vec<NodeId>,
nodes: BTreeMap<NodeId, ResolvedNode>,

spawn_result: SpawnResult,
spawn_result: CachedResult,
stop_reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,

log_subscribers: Vec<LogSubscriber>,

pending_spawn_results: BTreeSet<DaemonId>,

build_only: bool,
}

pub enum SpawnResult {
pub enum CachedResult {
Pending {
result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
},
Spawned {
Cached {
result: eyre::Result<ControlRequestReply>,
},
}

impl Default for SpawnResult {
impl Default for CachedResult {
fn default() -> Self {
Self::Pending {
result_senders: Vec::new(),
@@ -896,14 +1019,14 @@ impl Default for SpawnResult {
}
}

impl SpawnResult {
impl CachedResult {
fn register(
&mut self,
reply_sender: tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>,
) {
match self {
SpawnResult::Pending { result_senders } => result_senders.push(reply_sender),
SpawnResult::Spawned { result } => {
CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
CachedResult::Cached { result } => {
Self::send_result_to(result, reply_sender);
}
}
@@ -911,13 +1034,13 @@ impl SpawnResult {

fn set_result(&mut self, result: eyre::Result<ControlRequestReply>) {
match self {
SpawnResult::Pending { result_senders } => {
CachedResult::Pending { result_senders } => {
for sender in result_senders.drain(..) {
Self::send_result_to(&result, sender);
}
*self = SpawnResult::Spawned { result };
*self = CachedResult::Cached { result };
}
SpawnResult::Spawned { .. } => {}
CachedResult::Cached { .. } => {}
}
}

@@ -1123,26 +1246,135 @@ async fn retrieve_logs(
reply_logs.map_err(|err| eyre!(err))
}

#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(daemon_connections, clock))]
async fn build_dataflow(
build_id: BuildId,
session_id: SessionId,
dataflow: Descriptor,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
local_working_dir: Option<PathBuf>,
clock: &HLC,
uv: bool,
daemon_connections: &mut DaemonConnections,
) -> eyre::Result<RunningBuild> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;

let mut git_sources_by_daemon = git_sources
.into_iter()
.into_grouping_map_by(|(id, _)| nodes.get(id).and_then(|n| n.deploy.machine.as_ref()))
.collect();
let mut prev_git_sources_by_daemon = prev_git_sources
.into_iter()
.into_grouping_map_by(|(id, _)| nodes.get(id).and_then(|n| n.deploy.machine.as_ref()))
.collect();

let nodes_by_daemon = nodes.values().into_group_map_by(|n| &n.deploy.machine);

let mut daemons = BTreeSet::new();
for (machine, nodes_on_machine) in &nodes_by_daemon {
let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
tracing::debug!(
"Running dataflow build `{session_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
);

let build_command = BuildDataflowNodes {
build_id,
session_id,
local_working_dir: local_working_dir.clone(),
git_sources: git_sources_by_daemon
.remove(&machine.as_ref())
.unwrap_or_default(),
prev_git_sources: prev_git_sources_by_daemon
.remove(&machine.as_ref())
.unwrap_or_default(),
dataflow_descriptor: dataflow.clone(),
nodes_on_machine,
uv,
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Build(build_command),
timestamp: clock.new_timestamp(),
})?;

let daemon_id = build_dataflow_on_machine(daemon_connections, machine.as_deref(), &message)
.await
.wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
daemons.insert(daemon_id);
}

tracing::info!("successfully triggered dataflow build `{session_id}`",);

Ok(RunningBuild {
errors: Vec::new(),
build_result: CachedResult::default(),
log_subscribers: Vec::new(),
pending_build_results: daemons,
})
}

async fn build_dataflow_on_machine(
daemon_connections: &mut DaemonConnections,
machine: Option<&str>,
message: &[u8],
) -> Result<DaemonId, eyre::ErrReport> {
let daemon_id = match machine {
Some(machine) => daemon_connections
.get_matching_daemon_id(machine)
.wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
.clone(),
None => daemon_connections
.unnamed()
.next()
.wrap_err("no unnamed daemon connections")?
.clone(),
};

let daemon_connection = daemon_connections
.get_mut(&daemon_id)
.wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?;
tcp_send(&mut daemon_connection.stream, message)
.await
.wrap_err("failed to send build message to daemon")?;

let reply_raw = tcp_receive(&mut daemon_connection.stream)
.await
.wrap_err("failed to receive build reply from daemon")?;
match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize build reply from daemon")?
{
DaemonCoordinatorReply::TriggerBuildResult(result) => result
.map_err(|e| eyre!(e))
.wrap_err("daemon returned an error")?,
_ => bail!("unexpected reply"),
}
Ok(daemon_id)
}

#[allow(clippy::too_many_arguments)]
async fn start_dataflow(
build_id: Option<BuildId>,
session_id: SessionId,
dataflow: Descriptor,
working_dir: PathBuf,
local_working_dir: Option<PathBuf>,
name: Option<String>,
daemon_connections: &mut DaemonConnections,
clock: &HLC,
uv: bool,
build_only: bool,
) -> eyre::Result<RunningDataflow> {
let SpawnedDataflow {
uuid,
daemons,
nodes,
} = spawn_dataflow(
build_id,
session_id,
dataflow,
working_dir,
local_working_dir,
daemon_connections,
clock,
uv,
build_only,
)
.await?;
Ok(RunningDataflow {
@@ -1156,11 +1388,10 @@ async fn start_dataflow(
exited_before_subscribe: Default::default(),
daemons: daemons.clone(),
nodes,
spawn_result: SpawnResult::default(),
spawn_result: CachedResult::default(),
stop_reply_senders: Vec::new(),
log_subscribers: Vec::new(),
pending_spawn_results: daemons,
build_only,
})
}

@@ -1235,6 +1466,11 @@ pub enum Event {
DaemonExit {
daemon_id: dora_message::common::DaemonId,
},
DataflowBuildResult {
build_id: BuildId,
daemon_id: DaemonId,
result: eyre::Result<()>,
},
DataflowSpawnResult {
dataflow_id: uuid::Uuid,
daemon_id: DaemonId,
@@ -1264,6 +1500,7 @@ impl Event {
Event::CtrlC => "CtrlC",
Event::Log(_) => "Log",
Event::DaemonExit { .. } => "DaemonExit",
Event::DataflowBuildResult { .. } => "DataflowBuildResult",
Event::DataflowSpawnResult { .. } => "DataflowSpawnResult",
}
}


+ 10
- 0
binaries/coordinator/src/listener.rs View File

@@ -112,6 +112,16 @@ pub async fn handle_connection(
break;
}
}
DaemonEvent::BuildResult { build_id, result } => {
let event = Event::DataflowBuildResult {
build_id,
daemon_id,
result: result.map_err(|err| eyre::eyre!(err)),
};
if events_tx.send(event).await.is_err() {
break;
}
}
DaemonEvent::SpawnResult {
dataflow_id,
result,


+ 10
- 16
binaries/coordinator/src/run/mod.rs View File

@@ -10,6 +10,7 @@ use dora_message::{
daemon_to_coordinator::DaemonCoordinatorReply,
descriptor::{Descriptor, ResolvedNode},
id::NodeId,
BuildId, SessionId,
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use itertools::Itertools;
@@ -21,12 +22,13 @@ use uuid::{NoContext, Timestamp, Uuid};

#[tracing::instrument(skip(daemon_connections, clock))]
pub(super) async fn spawn_dataflow(
build_id: Option<BuildId>,
session_id: SessionId,
dataflow: Descriptor,
working_dir: PathBuf,
local_working_dir: Option<PathBuf>,
daemon_connections: &mut DaemonConnections,
clock: &HLC,
uv: bool,
build_only: bool,
) -> eyre::Result<SpawnedDataflow> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));
@@ -37,18 +39,18 @@ pub(super) async fn spawn_dataflow(
for (machine, nodes_on_machine) in &nodes_by_daemon {
let spawn_nodes = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
tracing::debug!(
"{} dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})",
if build_only { "Building" } else { "Spawning" }
"Spawning dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})"
);

let spawn_command = SpawnDataflowNodes {
build_id,
session_id,
dataflow_id: uuid,
working_dir: working_dir.clone(),
local_working_dir: local_working_dir.clone(),
nodes: nodes.clone(),
dataflow_descriptor: dataflow.clone(),
spawn_nodes,
uv,
build_only,
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),
@@ -57,19 +59,11 @@ pub(super) async fn spawn_dataflow(

let daemon_id = spawn_dataflow_on_machine(daemon_connections, machine.as_deref(), &message)
.await
.wrap_err_with(|| {
format!(
"failed to {} dataflow on machine `{machine:?}`",
if build_only { "build" } else { "spawn" }
)
})?;
.wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?;
daemons.insert(daemon_id);
}

tracing::info!(
"successfully triggered dataflow {} `{uuid}`",
if build_only { "build" } else { "spawn" }
);
tracing::info!("successfully triggered dataflow spawn `{uuid}`",);

Ok(SpawnedDataflow {
uuid,


+ 2
- 1
binaries/daemon/Cargo.toml View File

@@ -45,5 +45,6 @@ crossbeam = "0.8.4"
crossbeam-skiplist = "0.1.3"
zenoh = "1.1.1"
url = "2.5.4"
git2 = { version = "0.18.0", features = ["vendored-openssl"] }
git2 = { workspace = true }
dunce = "1.0.5"
itertools = "0.14"

+ 280
- 46
binaries/daemon/src/lib.rs View File

@@ -2,6 +2,7 @@ use aligned_vec::{AVec, ConstAlign};
use coordinator::CoordinatorEvent;
use crossbeam::queue::ArrayQueue;
use dora_core::{
build::{self, BuildInfo, GitManager},
config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
descriptor::{
read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
@@ -12,10 +13,11 @@ use dora_core::{
};
use dora_message::{
common::{
DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause,
NodeExitStatus,
},
coordinator_to_cli::DataflowResult,
coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes},
daemon_to_coordinator::{
CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
},
@@ -24,7 +26,7 @@ use dora_message::{
descriptor::NodeSource,
metadata::{self, ArrowTypeInfo},
node_to_daemon::{DynamicNodeEvent, Timestamped},
DataflowId,
BuildId, DataflowId, SessionId,
};
use dora_node_api::{arrow::datatypes::DataType, Parameter};
use eyre::{bail, eyre, Context, ContextCompat, Result};
@@ -38,6 +40,7 @@ use socket_stream_utils::socket_stream_send;
use spawn::Spawner;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
env::current_dir,
future::Future,
net::SocketAddr,
path::{Path, PathBuf},
@@ -101,12 +104,14 @@ pub struct Daemon {

logger: DaemonLogger,

repos_in_use: BTreeMap<PathBuf, BTreeSet<DataflowId>>,
sessions: BTreeMap<SessionId, BuildId>,
builds: BTreeMap<BuildId, BuildInfo>,
git_manager: GitManager,
}

type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;

struct NodePrepareTask<F> {
struct NodeBuildTask<F> {
node_id: NodeId,
dynamic_node: bool,
task: F,
@@ -148,12 +153,19 @@ impl Daemon {
None,
clock,
Some(remote_daemon_events_tx),
Default::default(),
)
.await
.map(|_| ())
}

pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> {
pub async fn run_dataflow(
dataflow_path: &Path,
build_id: Option<BuildId>,
local_build: Option<BuildInfo>,
session_id: SessionId,
uv: bool,
) -> eyre::Result<DataflowResult> {
let working_dir = dataflow_path
.canonicalize()
.context("failed to canonicalize dataflow path")?
@@ -167,13 +179,14 @@ impl Daemon {

let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
let spawn_command = SpawnDataflowNodes {
build_id,
session_id,
dataflow_id,
working_dir,
local_working_dir: Some(working_dir),
spawn_nodes: nodes.keys().cloned().collect(),
nodes,
dataflow_descriptor: descriptor,
uv,
build_only: false,
};

let clock = Arc::new(HLC::default());
@@ -204,6 +217,16 @@ impl Daemon {
Some(exit_when_done),
clock.clone(),
None,
if let Some(local_build) = local_build {
let Some(build_id) = build_id else {
bail!("no build_id, but local_build set")
};
let mut builds = BTreeMap::new();
builds.insert(build_id, local_build);
builds
} else {
Default::default()
},
);

let spawn_result = reply_rx
@@ -235,6 +258,7 @@ impl Daemon {
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
clock: Arc<HLC>,
remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
builds: BTreeMap<BuildId, BuildInfo>,
) -> eyre::Result<DaemonRunResult> {
let coordinator_connection = match coordinator_addr {
Some(addr) => {
@@ -298,7 +322,9 @@ impl Daemon {
clock,
zenoh_session,
remote_daemon_events_tx,
repos_in_use: Default::default(),
git_manager: Default::default(),
builds,
sessions: Default::default(),
};

let dora_events = ReceiverStream::new(dora_events_rx);
@@ -418,14 +444,41 @@ impl Daemon {
.await?;
}
},
Event::BuildDataflowResult {
build_id,
session_id,
result,
} => {
let (build_info, result) = match result {
Ok(build_info) => (Some(build_info), Ok(())),
Err(err) => (None, Err(err)),
};
if let Some(build_info) = build_info {
self.builds.insert(build_id, build_info);
if let Some(old_build_id) = self.sessions.insert(session_id, build_id) {
self.builds.remove(&old_build_id);
}
}
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
daemon_id: self.daemon_id.clone(),
event: DaemonEvent::BuildResult {
build_id,
result: result.map_err(|err| format!("{err:?}")),
},
},
timestamp: self.clock.new_timestamp(),
})?;
socket_stream_send(connection, &msg).await.wrap_err(
"failed to send BuildDataflowResult message to dora-coordinator",
)?;
}
}
Event::SpawnDataflowResult {
dataflow_id,
result,
build_only,
} => {
if build_only {
self.running.remove(&dataflow_id);
}
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
@@ -437,9 +490,9 @@ impl Daemon {
},
timestamp: self.clock.new_timestamp(),
})?;
socket_stream_send(connection, &msg)
.await
.wrap_err("failed to send Exit message to dora-coordinator")?;
socket_stream_send(connection, &msg).await.wrap_err(
"failed to send SpawnDataflowResult message to dora-coordinator",
)?;
}
}
}
@@ -476,35 +529,93 @@ impl Daemon {
reply_tx: Sender<Option<DaemonCoordinatorReply>>,
) -> eyre::Result<RunStatus> {
let status = match event {
DaemonCoordinatorEvent::Build(BuildDataflowNodes {
build_id,
session_id,
local_working_dir,
git_sources,
prev_git_sources,
dataflow_descriptor,
nodes_on_machine,
uv,
}) => {
match dataflow_descriptor.communication.remote {
dora_core::config::RemoteCommunicationConfig::Tcp => {}
}

let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;

let result = self
.build_dataflow(
build_id,
session_id,
base_working_dir,
git_sources,
prev_git_sources,
dataflow_descriptor,
nodes_on_machine,
uv,
)
.await;
let (trigger_result, result_task) = match result {
Ok(result_task) => (Ok(()), Some(result_task)),
Err(err) => (Err(format!("{err:?}")), None),
};
let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result);
let _ = reply_tx.send(Some(reply)).map_err(|_| {
error!("could not send `TriggerBuildResult` reply from daemon to coordinator")
});

let result_tx = self.events_tx.clone();
let clock = self.clock.clone();
if let Some(result_task) = result_task {
tokio::spawn(async move {
let message = Timestamped {
inner: Event::BuildDataflowResult {
build_id,
session_id,
result: result_task.await,
},
timestamp: clock.new_timestamp(),
};
let _ = result_tx
.send(message)
.map_err(|_| {
error!(
"could not send `BuildResult` reply from daemon to coordinator"
)
})
.await;
});
}

RunStatus::Continue
}
DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
build_id,
session_id,
dataflow_id,
working_dir,
local_working_dir,
nodes,
dataflow_descriptor,
spawn_nodes,
uv,
build_only,
}) => {
match dataflow_descriptor.communication.remote {
dora_core::config::RemoteCommunicationConfig::Tcp => {}
}

// Use the working directory if it exists, otherwise use the working directory where the daemon is spawned
let working_dir = if working_dir.exists() {
working_dir
} else {
std::env::current_dir().wrap_err("failed to get current working dir")?
};
let base_working_dir = self.base_working_dir(local_working_dir, session_id)?;

let result = self
.spawn_dataflow(
build_id,
dataflow_id,
working_dir,
base_working_dir,
nodes,
dataflow_descriptor,
spawn_nodes,
uv,
build_only,
)
.await;
let (trigger_result, result_task) = match result {
@@ -524,7 +635,6 @@ impl Daemon {
inner: Event::SpawnDataflowResult {
dataflow_id,
result: result_task.await,
build_only,
},
timestamp: clock.new_timestamp(),
};
@@ -770,15 +880,100 @@ impl Daemon {
}
}

#[allow(clippy::too_many_arguments)]
async fn build_dataflow(
&mut self,
build_id: BuildId,
session_id: SessionId,
base_working_dir: PathBuf,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
dataflow_descriptor: Descriptor,
local_nodes: BTreeSet<NodeId>,
uv: bool,
) -> eyre::Result<impl Future<Output = eyre::Result<BuildInfo>>> {
let builder = build::Builder {
session_id,
base_working_dir,
uv,
};
let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;

let mut tasks = Vec::new();

// build nodes
for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
let dynamic_node = node.kind.dynamic();

let node_id = node.id.clone();
let mut logger = self.logger.for_node_build(build_id, node_id.clone());
logger.log(LogLevel::Info, "building").await;
let git_source = git_sources.get(&node_id).cloned();
let prev_git_source = prev_git_sources.get(&node_id).cloned();

let logger_cloned = logger
.try_clone_impl()
.await
.wrap_err("failed to clone logger")?;

match builder
.clone()
.build_node(
node,
git_source,
prev_git_source,
logger_cloned,
&mut self.git_manager,
)
.await
.wrap_err_with(|| format!("failed to build node `{node_id}`"))
{
Ok(result) => {
tasks.push(NodeBuildTask {
node_id,
task: result,
dynamic_node,
});
}
Err(err) => {
logger.log(LogLevel::Error, format!("{err:?}")).await;
return Err(err);
}
}
}

let task = async move {
let mut info = BuildInfo {
node_working_dirs: Default::default(),
};
for task in tasks {
let NodeBuildTask {
node_id,
dynamic_node,
task,
} = task;
let node = task
.await
.with_context(|| format!("failed to build node `{node_id}`"))?;
info.node_working_dirs
.insert(node_id, node.node_working_dir);
}
Ok(info)
};

Ok(task)
}

#[allow(clippy::too_many_arguments)]
async fn spawn_dataflow(
&mut self,
dataflow_id: uuid::Uuid,
working_dir: PathBuf,
build_id: Option<BuildId>,
dataflow_id: DataflowId,
base_working_dir: PathBuf,
nodes: BTreeMap<NodeId, ResolvedNode>,
dataflow_descriptor: Descriptor,
spawn_nodes: BTreeSet<NodeId>,
uv: bool,
build_only: bool,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
let mut logger = self
.logger
@@ -790,7 +985,8 @@ impl Daemon {
RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor);
let dataflow = match self.running.entry(dataflow_id) {
std::collections::hash_map::Entry::Vacant(entry) => {
self.working_dir.insert(dataflow_id, working_dir.clone());
self.working_dir
.insert(dataflow_id, base_working_dir.clone());
entry.insert(dataflow)
}
std::collections::hash_map::Entry::Occupied(_) => {
@@ -800,6 +996,11 @@ impl Daemon {

let mut stopped = Vec::new();

let node_working_dirs = build_id
.and_then(|build_id| self.builds.get(&build_id))
.map(|info| info.node_working_dirs.clone())
.unwrap_or_default();

// calculate info about mappings
for node in nodes.values() {
let local = spawn_nodes.contains(&node.id);
@@ -838,12 +1039,10 @@ impl Daemon {

let spawner = Spawner {
dataflow_id,
working_dir,
daemon_tx: self.events_tx.clone(),
dataflow_descriptor,
clock: self.clock.clone(),
uv,
build_only,
};

let mut tasks = Vec::new();
@@ -869,19 +1068,18 @@ impl Daemon {
logger
.log(LogLevel::Info, Some("daemon".into()), "spawning")
.await;
let node_working_dir = node_working_dirs
.get(&node_id)
.unwrap_or(&base_working_dir)
.clone();
match spawner
.clone()
.prepare_node(
node,
node_stderr_most_recent,
&mut logger,
&mut self.repos_in_use,
)
.spawn_node(node, node_working_dir, node_stderr_most_recent, &mut logger)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
{
Ok(result) => {
tasks.push(NodePrepareTask {
tasks.push(NodeBuildTask {
node_id,
task: result,
dynamic_node,
@@ -979,7 +1177,7 @@ impl Daemon {
async fn spawn_prepared_nodes(
dataflow_id: Uuid,
mut logger: DataflowLogger<'_>,
tasks: Vec<NodePrepareTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
tasks: Vec<NodeBuildTask<impl Future<Output = eyre::Result<spawn::PreparedNode>>>>,
events_tx: mpsc::Sender<Timestamped<Event>>,
clock: Arc<HLC>,
) -> eyre::Result<()> {
@@ -995,7 +1193,7 @@ impl Daemon {
let mut failed_to_prepare = None;
let mut prepared_nodes = Vec::new();
for task in tasks {
let NodePrepareTask {
let NodeBuildTask {
node_id,
dynamic_node,
task,
@@ -1567,9 +1765,12 @@ impl Daemon {
.clone(),
};

self.repos_in_use.values_mut().for_each(|dataflows| {
dataflows.remove(&dataflow_id);
});
self.git_manager
.clones_in_use
.values_mut()
.for_each(|dataflows| {
dataflows.remove(&dataflow_id);
});

logger
.log(
@@ -1799,6 +2000,34 @@ impl Daemon {
}
Ok(RunStatus::Continue)
}

fn base_working_dir(
&self,
local_working_dir: Option<PathBuf>,
session_id: SessionId,
) -> eyre::Result<PathBuf> {
match local_working_dir {
Some(working_dir) => {
// check that working directory exists
if working_dir.exists() {
Ok(working_dir)
} else {
bail!(
"working directory does not exist: {}",
working_dir.display(),
)
}
}
None => {
// use subfolder of daemon working dir
let daemon_working_dir =
current_dir().context("failed to get daemon working dir")?;
Ok(daemon_working_dir
.join("_work")
.join(session_id.to_string()))
}
}
}
}

async fn set_up_event_stream(
@@ -2272,10 +2501,14 @@ pub enum Event {
dynamic_node: bool,
result: Result<RunningNode, NodeError>,
},
BuildDataflowResult {
build_id: BuildId,
session_id: SessionId,
result: eyre::Result<BuildInfo>,
},
SpawnDataflowResult {
dataflow_id: Uuid,
result: eyre::Result<()>,
build_only: bool,
},
}

@@ -2298,6 +2531,7 @@ impl Event {
Event::SecondCtrlC => "SecondCtrlC",
Event::DaemonError(_) => "DaemonError",
Event::SpawnNodeResult { .. } => "SpawnNodeResult",
Event::BuildDataflowResult { .. } => "BuildDataflowResult",
Event::SpawnDataflowResult { .. } => "SpawnDataflowResult",
}
}


+ 81
- 7
binaries/daemon/src/log.rs View File

@@ -4,10 +4,11 @@ use std::{
sync::Arc,
};

use dora_core::{config::NodeId, uhlc};
use dora_core::{build::BuildLogger, config::NodeId, uhlc};
use dora_message::{
common::{DaemonId, LogLevel, LogMessage, Timestamped},
daemon_to_coordinator::{CoordinatorRequest, DaemonEvent},
BuildId,
};
use eyre::Context;
use tokio::net::TcpStream;
@@ -81,7 +82,7 @@ impl<'a> DataflowLogger<'a> {
message: impl Into<String>,
) {
self.logger
.log(level, self.dataflow_id, node_id, target, message)
.log(level, Some(self.dataflow_id), node_id, target, message)
.await
}

@@ -93,6 +94,44 @@ impl<'a> DataflowLogger<'a> {
}
}

pub struct NodeBuildLogger<'a> {
build_id: BuildId,
node_id: NodeId,
logger: CowMut<'a, DaemonLogger>,
}

impl NodeBuildLogger<'_> {
pub async fn log(&mut self, level: LogLevel, message: impl Into<String>) {
self.logger
.log_build(self.build_id, level, Some(self.node_id.clone()), message)
.await
}

pub async fn try_clone_impl(&self) -> eyre::Result<NodeBuildLogger<'static>> {
Ok(NodeBuildLogger {
build_id: self.build_id,
node_id: self.node_id.clone(),
logger: CowMut::Owned(self.logger.try_clone().await?),
})
}
}

impl BuildLogger for NodeBuildLogger<'_> {
type Clone = NodeBuildLogger<'static>;

fn log_message(
&mut self,
level: LogLevel,
message: impl Into<String> + Send,
) -> impl std::future::Future<Output = ()> + Send {
self.log(level, message)
}

fn try_clone(&self) -> impl std::future::Future<Output = eyre::Result<Self::Clone>> + Send {
self.try_clone_impl()
}
}

pub struct DaemonLogger {
daemon_id: DaemonId,
logger: Logger,
@@ -106,6 +145,14 @@ impl DaemonLogger {
}
}

pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger {
NodeBuildLogger {
build_id,
node_id,
logger: CowMut::Borrowed(self),
}
}

pub fn inner(&self) -> &Logger {
&self.logger
}
@@ -113,12 +160,13 @@ impl DaemonLogger {
pub async fn log(
&mut self,
level: LogLevel,
dataflow_id: Uuid,
dataflow_id: Option<Uuid>,
node_id: Option<NodeId>,
target: Option<String>,
message: impl Into<String>,
) {
let message = LogMessage {
build_id: None,
daemon_id: Some(self.daemon_id.clone()),
dataflow_id,
node_id,
@@ -132,6 +180,28 @@ impl DaemonLogger {
self.logger.log(message).await
}

pub async fn log_build(
&mut self,
build_id: BuildId,
level: LogLevel,
node_id: Option<NodeId>,
message: impl Into<String>,
) {
let message = LogMessage {
build_id: Some(build_id),
daemon_id: Some(self.daemon_id.clone()),
dataflow_id: None,
node_id,
level,
target: Some("build".into()),
module_path: None,
file: None,
line: None,
message: message.into(),
};
self.logger.log(message).await
}

pub(crate) fn daemon_id(&self) -> &DaemonId {
&self.daemon_id
}
@@ -181,7 +251,8 @@ impl Logger {
match message.level {
LogLevel::Error => {
tracing::error!(
dataflow_id = message.dataflow_id.to_string(),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
@@ -193,7 +264,8 @@ impl Logger {
}
LogLevel::Warn => {
tracing::warn!(
dataflow_id = message.dataflow_id.to_string(),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
@@ -205,7 +277,8 @@ impl Logger {
}
LogLevel::Info => {
tracing::info!(
dataflow_id = message.dataflow_id.to_string(),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
@@ -217,7 +290,8 @@ impl Logger {
}
LogLevel::Debug => {
tracing::debug!(
dataflow_id = message.dataflow_id.to_string(),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,


+ 0
- 286
binaries/daemon/src/spawn/git.rs View File

@@ -1,286 +0,0 @@
use crate::log::NodeLogger;
use dora_message::{common::LogLevel, descriptor::GitRepoRev, DataflowId};
use eyre::{ContextCompat, WrapErr};
use git2::FetchOptions;
use std::{
collections::{BTreeMap, BTreeSet},
path::{Path, PathBuf},
};
use url::Url;
use uuid::Uuid;

pub struct GitFolder {
/// The URL of the git repository.
repo_addr: String,
/// The branch, tag, or git revision to checkout.
rev: Option<GitRepoRev>,
/// The directory that should contain the checked-out repository.
clone_dir: PathBuf,
/// Specifies whether an existing repo should be reused.
reuse: ReuseOptions,
}

impl GitFolder {
pub fn choose_clone_dir(
dataflow_id: uuid::Uuid,
repo_addr: String,
rev: Option<GitRepoRev>,
target_dir: &Path,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> eyre::Result<Self> {
let repo_url = Url::parse(&repo_addr).context("failed to parse git repository URL")?;

let base_dir = {
let base = {
let mut path =
target_dir.join(repo_url.host_str().context("git URL has no hostname")?);

path.extend(repo_url.path_segments().context("no path in git URL")?);
path
};
match &rev {
None => base,
Some(rev) => match rev {
GitRepoRev::Branch(branch) => base.join("branch").join(branch),
GitRepoRev::Tag(tag) => base.join("tag").join(tag),
GitRepoRev::Rev(rev) => base.join("rev").join(rev),
},
}
};
let clone_dir = if clone_dir_exists(&base_dir, repos_in_use) {
let used_by_other = used_by_other_dataflow(dataflow_id, &base_dir, repos_in_use);
if used_by_other {
// don't reuse, choose new directory
// (TODO reuse if still up to date)

let dir_name = base_dir.file_name().unwrap().to_str().unwrap();
let mut i = 1;
loop {
let new_path = base_dir.with_file_name(format!("{dir_name}-{i}"));
if clone_dir_exists(&new_path, repos_in_use)
&& used_by_other_dataflow(dataflow_id, &new_path, repos_in_use)
{
i += 1;
} else {
break new_path;
}
}
} else {
base_dir
}
} else {
base_dir
};
let clone_dir = dunce::simplified(&clone_dir).to_owned();

let reuse = if clone_dir_exists(&clone_dir, repos_in_use) {
let empty = BTreeSet::new();
let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
if used_by_other_dataflow {
// The directory is currently in use by another dataflow. We currently don't
// support reusing the same clone across multiple dataflow runs. Above, we
// choose a new directory if we detect such a case. So this `if` branch
// should never be reached.
eyre::bail!("clone_dir is already in use by other dataflow")
} else if in_use.is_empty() {
// The cloned repo is not used by any dataflow, so we can safely reuse it. However,
// the clone might be still on an older commit, so we need to do a `git fetch`
// before we reuse it.
ReuseOptions::ReuseAfterFetch
} else {
// This clone is already used for another node of this dataflow. We will do a
// `git fetch` operation for the first node of this dataflow, so we don't need
// to do it again for other nodes of the dataflow. So we can simply reuse the
// directory without doing any additional git operations.
ReuseOptions::Reuse
}
} else {
ReuseOptions::NewClone
};
repos_in_use
.entry(clone_dir.clone())
.or_default()
.insert(dataflow_id);

Ok(GitFolder {
clone_dir,
reuse,
repo_addr,
rev,
})
}

pub async fn prepare(self, logger: &mut NodeLogger<'_>) -> eyre::Result<PathBuf> {
let GitFolder {
clone_dir,
reuse,
repo_addr,
rev,
} = self;

let rev_str = rev_str(&rev);
let refname = rev.clone().map(|rev| match rev {
GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"),
GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"),
GitRepoRev::Rev(rev) => rev,
});

match reuse {
ReuseOptions::NewClone => {
let repository = clone_into(&repo_addr, &rev, &clone_dir, logger).await?;
checkout_tree(&repository, refname)?;
}
ReuseOptions::ReuseAfterFetch => {
logger
.log(
LogLevel::Info,
None,
format!("fetching changes and reusing {repo_addr}{rev_str}"),
)
.await;
let refname_cloned = refname.clone();
let clone_dir = clone_dir.clone();
let repository = fetch_changes(clone_dir, refname_cloned).await?;
checkout_tree(&repository, refname)?;
}
ReuseOptions::Reuse => {
logger
.log(
LogLevel::Info,
None,
format!("reusing up-to-date {repo_addr}{rev_str}"),
)
.await;
}
};
Ok(clone_dir)
}
}

fn used_by_other_dataflow(
dataflow_id: uuid::Uuid,
clone_dir_base: &PathBuf,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> bool {
let empty = BTreeSet::new();
let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
used_by_other_dataflow
}

enum ReuseOptions {
/// Create a new clone of the repository.
NewClone,
/// Reuse an existing up-to-date clone of the repository.
Reuse,
/// Update an older clone of the repository, then reuse it.
ReuseAfterFetch,
}

fn rev_str(rev: &Option<GitRepoRev>) -> String {
match rev {
Some(GitRepoRev::Branch(branch)) => format!(" (branch {branch})"),
Some(GitRepoRev::Tag(tag)) => format!(" (tag {tag})"),
Some(GitRepoRev::Rev(rev)) => format!(" (rev {rev})"),
None => String::new(),
}
}

async fn clone_into(
repo_addr: &String,
rev: &Option<GitRepoRev>,
clone_dir: &Path,
logger: &mut NodeLogger<'_>,
) -> eyre::Result<git2::Repository> {
if let Some(parent) = clone_dir.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create parent directory for git clone")?;
}

let rev_str = rev_str(rev);
logger
.log(
LogLevel::Info,
None,
format!("cloning {repo_addr}{rev_str} into {}", clone_dir.display()),
)
.await;
let rev: Option<GitRepoRev> = rev.clone();
let clone_into = clone_dir.to_owned();
let repo_addr = repo_addr.clone();
let task = tokio::task::spawn_blocking(move || {
let mut builder = git2::build::RepoBuilder::new();
let mut fetch_options = git2::FetchOptions::new();
fetch_options.download_tags(git2::AutotagOption::All);
builder.fetch_options(fetch_options);
if let Some(GitRepoRev::Branch(branch)) = &rev {
builder.branch(branch);
}
builder
.clone(&repo_addr, &clone_into)
.context("failed to clone repo")
});
let repo = task.await??;
Ok(repo)
}

async fn fetch_changes(
repo_dir: PathBuf,
refname: Option<String>,
) -> Result<git2::Repository, eyre::Error> {
let fetch_changes = tokio::task::spawn_blocking(move || {
let repository = git2::Repository::open(&repo_dir).context("failed to open git repo")?;

{
let mut remote = repository
.find_remote("origin")
.context("failed to find remote `origin` in repo")?;
remote
.connect(git2::Direction::Fetch)
.context("failed to connect to remote")?;
let default_branch = remote
.default_branch()
.context("failed to get default branch for remote")?;
let fetch = match &refname {
Some(refname) => refname,
None => default_branch
.as_str()
.context("failed to read default branch as string")?,
};
let mut fetch_options = FetchOptions::new();
fetch_options.download_tags(git2::AutotagOption::All);
remote
.fetch(&[&fetch], Some(&mut fetch_options), None)
.context("failed to fetch from git repo")?;
}
Result::<_, eyre::Error>::Ok(repository)
});
let repository = fetch_changes.await??;
Ok(repository)
}

fn checkout_tree(repository: &git2::Repository, refname: Option<String>) -> eyre::Result<()> {
if let Some(refname) = refname {
let (object, reference) = repository
.revparse_ext(&refname)
.context("failed to parse ref")?;
repository
.checkout_tree(&object, None)
.context("failed to checkout ref")?;
match reference {
Some(reference) => repository
.set_head(reference.name().context("failed to get reference_name")?)
.context("failed to set head")?,
None => repository
.set_head_detached(object.id())
.context("failed to set detached head")?,
}
}
Ok(())
}

fn clone_dir_exists(dir: &PathBuf, repos_in_use: &BTreeMap<PathBuf, BTreeSet<Uuid>>) -> bool {
repos_in_use.contains_key(dir) || dir.exists()
}

+ 18
- 103
binaries/daemon/src/spawn/mod.rs View File

@@ -7,11 +7,10 @@ use aligned_vec::{AVec, ConstAlign};
use crossbeam::queue::ArrayQueue;
use dora_arrow_convert::IntoArrow;
use dora_core::{
build::run_build_command,
config::DataId,
descriptor::{
resolve_path, source_is_url, CustomNode, Descriptor, OperatorDefinition, OperatorSource,
PythonSource, ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE,
resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource,
ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE,
},
get_python_path,
uhlc::HLC,
@@ -21,7 +20,6 @@ use dora_message::{
common::{LogLevel, LogMessage},
daemon_to_coordinator::{DataMessage, NodeExitStatus, Timestamped},
daemon_to_node::{NodeConfig, RuntimeConfig},
descriptor::EnvValue,
id::NodeId,
DataflowId,
};
@@ -31,9 +29,7 @@ use dora_node_api::{
Metadata,
};
use eyre::{ContextCompat, WrapErr};
use git::GitFolder;
use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
path::{Path, PathBuf},
process::Stdio,
@@ -46,27 +42,23 @@ use tokio::{
};
use tracing::error;

mod git;

#[derive(Clone)]
pub struct Spawner {
pub dataflow_id: DataflowId,
pub working_dir: PathBuf,
pub daemon_tx: mpsc::Sender<Timestamped<Event>>,
pub dataflow_descriptor: Descriptor,
/// clock is required for generating timestamps when dropping messages early because queue is full
pub clock: Arc<HLC>,
pub uv: bool,
pub build_only: bool,
}

impl Spawner {
pub async fn prepare_node(
pub async fn spawn_node(
self,
node: ResolvedNode,
node_working_dir: PathBuf,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
logger: &mut NodeLogger<'_>,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> eyre::Result<impl Future<Output = eyre::Result<PreparedNode>>> {
let dataflow_id = self.dataflow_id;
let node_id = node.id.clone();
@@ -101,24 +93,6 @@ impl Spawner {
dynamic: node.kind.dynamic(),
};

let prepared_git = if let dora_core::descriptor::CoreNodeKind::Custom(CustomNode {
source: dora_message::descriptor::NodeSource::GitBranch { repo, rev },
..
}) = &node.kind
{
let target_dir = self.working_dir.join("build");
let git_folder = GitFolder::choose_clone_dir(
self.dataflow_id,
repo.clone(),
rev.clone(),
&target_dir,
repos_in_use,
)?;
Some(git_folder)
} else {
None
};

let mut logger = logger
.try_clone()
.await
@@ -126,10 +100,10 @@ impl Spawner {
let task = async move {
self.prepare_node_inner(
node,
node_working_dir,
&mut logger,
dataflow_id,
node_config,
prepared_git,
node_stderr_most_recent,
)
.await
@@ -138,33 +112,21 @@ impl Spawner {
}

async fn prepare_node_inner(
mut self,
self,
node: ResolvedNode,
node_working_dir: PathBuf,
logger: &mut NodeLogger<'_>,
dataflow_id: uuid::Uuid,
node_config: NodeConfig,
git_folder: Option<GitFolder>,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
) -> eyre::Result<PreparedNode> {
let (command, error_msg) = match &node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
let build_dir = match git_folder {
Some(git_folder) => git_folder.prepare(logger).await?,
None => self.working_dir.clone(),
};

if let Some(build) = &n.build {
self.build_node(logger, &node.env, build_dir.clone(), build)
.await?;
}
let mut command = if self.build_only {
None
} else {
path_spawn_command(&build_dir, self.uv, logger, n, true).await?
};
let mut command =
path_spawn_command(&node_working_dir, self.uv, logger, n, true).await?;

if let Some(command) = &mut command {
command.current_dir(&self.working_dir);
command.current_dir(&node_working_dir);
command.stdin(Stdio::null());

command.env(
@@ -205,14 +167,6 @@ impl Spawner {
(command, error_msg)
}
dora_core::descriptor::CoreNodeKind::Runtime(n) => {
// run build commands
for operator in &n.operators {
if let Some(build) = &operator.config.build {
self.build_node(logger, &node.env, self.working_dir.clone(), build)
.await?;
}
}

let python_operators: Vec<&OperatorDefinition> = n
.operators
.iter()
@@ -224,9 +178,7 @@ impl Spawner {
.iter()
.any(|x| !matches!(x.config.source, OperatorSource::Python { .. }));

let mut command = if self.build_only {
None
} else if !python_operators.is_empty() && !other_operators {
let mut command = if !python_operators.is_empty() && !other_operators {
// Use python to spawn runtime if there is a python operator

// TODO: Handle multi-operator runtime once sub-interpreter is supported
@@ -304,7 +256,7 @@ impl Spawner {
};

if let Some(command) = &mut command {
command.current_dir(&self.working_dir);
command.current_dir(&node_working_dir);

command.env(
"DORA_RUNTIME_CONFIG",
@@ -337,7 +289,7 @@ impl Spawner {
Ok(PreparedNode {
command,
spawn_error_msg: error_msg,
working_dir: self.working_dir,
node_working_dir,
dataflow_id,
node,
node_config,
@@ -346,50 +298,12 @@ impl Spawner {
node_stderr_most_recent,
})
}

async fn build_node(
&mut self,
logger: &mut NodeLogger<'_>,
node_env: &Option<BTreeMap<String, EnvValue>>,
working_dir: PathBuf,
build: &String,
) -> Result<(), eyre::Error> {
logger
.log(
LogLevel::Info,
None,
format!("running build command: `{build}"),
)
.await;
let build = build.to_owned();
let uv = self.uv;
let node_env = node_env.clone();
let mut logger = logger.try_clone().await.context("failed to clone logger")?;
let (stdout_tx, mut stdout) = tokio::sync::mpsc::channel(10);
let task = tokio::task::spawn_blocking(move || {
run_build_command(&build, &working_dir, uv, &node_env, stdout_tx)
.context("build command failed")
});
tokio::spawn(async move {
while let Some(line) = stdout.recv().await {
logger
.log(
LogLevel::Info,
Some("build command".into()),
line.unwrap_or_else(|err| format!("io err: {}", err.kind())),
)
.await;
}
});
task.await??;
Ok(())
}
}

pub struct PreparedNode {
command: Option<tokio::process::Command>,
spawn_error_msg: String,
working_dir: PathBuf,
node_working_dir: PathBuf,
dataflow_id: DataflowId,
node: ResolvedNode,
node_config: NodeConfig,
@@ -430,7 +344,7 @@ impl PreparedNode {
.await;

let dataflow_dir: PathBuf = self
.working_dir
.node_working_dir
.join("out")
.join(self.dataflow_id.to_string());
if !dataflow_dir.exists() {
@@ -438,7 +352,7 @@ impl PreparedNode {
}
let (tx, mut rx) = mpsc::channel(10);
let mut file = File::create(log::log_path(
&self.working_dir,
&self.node_working_dir,
&self.dataflow_id,
&self.node.id,
))
@@ -642,7 +556,8 @@ impl PreparedNode {
cloned_logger
.log(LogMessage {
daemon_id: Some(daemon_id.clone()),
dataflow_id,
dataflow_id: Some(dataflow_id),
build_id: None,
level: LogLevel::Info,
node_id: Some(node_id.clone()),
target: Some("stdout".into()),


+ 14
- 0
examples/benchmark/run.rs View File

@@ -11,12 +11,26 @@ async fn main() -> eyre::Result<()> {
.wrap_err("failed to set working dir")?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

run_dataflow(dataflow).await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);


+ 1
- 0
examples/c++-dataflow/.gitignore View File

@@ -1 +1,2 @@
*.o
/build

+ 1
- 0
examples/c++-ros2-dataflow/.gitignore View File

@@ -1 +1,2 @@
*.o
/build

+ 20
- 2
examples/multiple-daemons/run.rs View File

@@ -1,3 +1,4 @@
use dora_cli::session::DataflowSession;
use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::{read_as_descriptor, DescriptorExt},
@@ -37,6 +38,7 @@ async fn main() -> eyre::Result<()> {
.wrap_err("failed to set working dir")?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1);
let coordinator_bind = SocketAddr::new(
@@ -138,15 +140,19 @@ async fn start_dataflow(
.check(&working_dir)
.wrap_err("could not validate yaml")?;

let dataflow_session =
DataflowSession::read_session(dataflow).context("failed to read DataflowSession")?;

let (reply_sender, reply) = oneshot::channel();
coordinator_events_tx
.send(Event::Control(ControlEvent::IncomingRequest {
request: ControlRequest::Start {
build_id: dataflow_session.build_id,
session_id: dataflow_session.session_id,
dataflow: dataflow_descriptor,
local_working_dir: working_dir,
local_working_dir: Some(working_dir),
name: None,
uv: false,
build_only: false,
},
reply_sender,
}))
@@ -228,6 +234,18 @@ async fn destroy(coordinator_events_tx: &Sender<Event>) -> eyre::Result<()> {
}
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);


+ 9
- 0
examples/python-ros2-dataflow/run.rs View File

@@ -40,6 +40,15 @@ async fn main() -> eyre::Result<()> {
async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();

// First build the dataflow (install requirements)
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};

let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");


+ 2
- 0
examples/rust-dataflow-git/.gitignore View File

@@ -0,0 +1,2 @@
/build
/git

+ 3
- 3
examples/rust-dataflow-git/dataflow.yml View File

@@ -1,7 +1,7 @@
nodes:
- id: rust-node
git: https://github.com/dora-rs/dora.git
rev: e31b2a34 # pinned commit, update this when changing the message crate
rev: 64a2dc9c # pinned commit, update this when changing the message crate
build: cargo build -p rust-dataflow-example-node
path: target/debug/rust-dataflow-example-node
inputs:
@@ -11,7 +11,7 @@ nodes:

- id: rust-status-node
git: https://github.com/dora-rs/dora.git
rev: e31b2a34 # pinned commit, update this when changing the message crate
rev: 64a2dc9c # pinned commit, update this when changing the message crate
build: cargo build -p rust-dataflow-example-status-node
path: target/debug/rust-dataflow-example-status-node
inputs:
@@ -22,7 +22,7 @@ nodes:

- id: rust-sink
git: https://github.com/dora-rs/dora.git
rev: e31b2a34 # pinned commit, update this when changing the message crate
rev: 64a2dc9c # pinned commit, update this when changing the message crate
build: cargo build -p rust-dataflow-example-sink
path: target/debug/rust-dataflow-example-sink
inputs:


+ 13
- 0
examples/rust-dataflow-git/run.rs View File

@@ -16,12 +16,25 @@ async fn main() -> eyre::Result<()> {
} else {
Path::new("dataflow.yml")
};
build_dataflow(dataflow).await?;

run_dataflow(dataflow).await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);


+ 1
- 0
examples/rust-dataflow-url/.gitignore View File

@@ -0,0 +1 @@
/build

+ 13
- 0
examples/rust-dataflow-url/run.rs View File

@@ -11,12 +11,25 @@ async fn main() -> eyre::Result<()> {
.wrap_err("failed to set working dir")?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

run_dataflow(dataflow).await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);


+ 13
- 0
examples/rust-dataflow/run.rs View File

@@ -16,12 +16,25 @@ async fn main() -> eyre::Result<()> {
} else {
Path::new("dataflow.yml")
};
build_dataflow(dataflow).await?;

run_dataflow(dataflow).await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);


+ 4
- 1
libraries/core/Cargo.toml View File

@@ -19,8 +19,11 @@ which = "5.0.0"
uuid = { version = "1.7", features = ["serde", "v7"] }
tracing = "0.1"
serde-with-expand-env = "1.1.0"
tokio = { version = "1.24.1", features = ["fs", "process", "sync"] }
tokio = { version = "1.24.1", features = ["fs", "process", "sync", "rt"] }
schemars = "0.8.19"
serde_json = "1.0.117"
log = { version = "0.4.21", features = ["serde"] }
dunce = "1.0.5"
url = "2.5.4"
git2 = { workspace = true }
itertools = "0.14"

libraries/core/src/build.rs → libraries/core/src/build/build_command.rs View File


+ 353
- 0
libraries/core/src/build/git.rs View File

@@ -0,0 +1,353 @@
use crate::build::BuildLogger;
use dora_message::{common::LogLevel, descriptor::GitRepoRev, DataflowId, SessionId};
use eyre::{bail, ContextCompat, WrapErr};
use git2::FetchOptions;
use itertools::Itertools;
use std::{
collections::{BTreeMap, BTreeSet},
path::{Path, PathBuf},
};
use url::Url;

#[derive(Default)]
pub struct GitManager {
/// Directories that are currently in use by running dataflows.
pub clones_in_use: BTreeMap<PathBuf, BTreeSet<DataflowId>>,
/// Builds that are prepared, but not done yet.
prepared_builds: BTreeMap<SessionId, PreparedBuild>,
reuse_for: BTreeMap<PathBuf, PathBuf>,
}

#[derive(Default)]
struct PreparedBuild {
/// Clone dirs that will be created during the build process.
///
/// This allows subsequent nodes to reuse the dirs.
planned_clone_dirs: BTreeSet<PathBuf>,
}

impl GitManager {
pub fn choose_clone_dir(
&mut self,
session_id: SessionId,
repo_url: Url,
commit_hash: String,
prev_commit_hash: Option<String>,
target_dir: &Path,
) -> eyre::Result<GitFolder> {
let clone_dir = Self::clone_dir_path(&target_dir, &repo_url, &commit_hash)?;

if let Some(using) = self.clones_in_use.get(&clone_dir) {
if !using.is_empty() {
// The directory is currently in use by another dataflow. Rebuilding
// while a dataflow is running could lead to unintended behavior.
eyre::bail!(
"the build directory is still in use by the following \
dataflows, please stop them before rebuilding: {}",
using.iter().join(", ")
)
}
}

let reuse = if self.clone_dir_ready(session_id, &clone_dir) {
// The directory already contains a checkout of the commit we're interested in.
// So we can simply reuse the directory without doing any additional git
// operations.
ReuseOptions::Reuse {
dir: clone_dir.clone(),
}
} else if let Some(previous_commit_hash) = prev_commit_hash {
// we might be able to update a previous clone
let prev_clone_dir =
Self::clone_dir_path(&target_dir, &repo_url, &previous_commit_hash)?;

if self
.clones_in_use
.get(&prev_clone_dir)
.map(|ids| !ids.is_empty())
.unwrap_or(false)
{
// previous clone is still in use -> we cannot rename it, but we can copy it
ReuseOptions::CopyAndFetch {
from: prev_clone_dir,
target_dir: clone_dir.clone(),
commit_hash,
}
} else if prev_clone_dir.exists() {
// there is an unused previous clone that is not in use -> rename it
ReuseOptions::RenameAndFetch {
from: prev_clone_dir,
target_dir: clone_dir.clone(),
commit_hash,
}
} else {
// no existing clone associated with previous build id
ReuseOptions::NewClone {
target_dir: clone_dir.clone(),
repo_url,
commit_hash,
}
}
} else {
// no previous build that we can reuse
ReuseOptions::NewClone {
target_dir: clone_dir.clone(),
repo_url,
commit_hash,
}
};
self.register_ready_clone_dir(session_id, clone_dir);

Ok(GitFolder { reuse })
}

pub fn in_use(&self, dir: &Path) -> bool {
self.clones_in_use
.get(dir)
.map(|ids| !ids.is_empty())
.unwrap_or(false)
}

pub fn clone_dir_ready(&self, session_id: SessionId, dir: &Path) -> bool {
self.prepared_builds
.get(&session_id)
.map(|p| p.planned_clone_dirs.contains(dir))
.unwrap_or(false)
|| dir.exists()
}

pub fn register_ready_clone_dir(&mut self, session_id: SessionId, dir: PathBuf) -> bool {
self.prepared_builds
.entry(session_id)
.or_default()
.planned_clone_dirs
.insert(dir)
}

fn clone_dir_path(
base_dir: &Path,
repo_url: &Url,
commit_hash: &String,
) -> eyre::Result<PathBuf> {
let mut path = base_dir.join(repo_url.host_str().context("git URL has no hostname")?);
path.extend(repo_url.path_segments().context("no path in git URL")?);
let path = path.join(commit_hash);
Ok(dunce::simplified(&path).to_owned())
}
}

pub struct GitFolder {
/// Specifies whether an existing repo should be reused.
reuse: ReuseOptions,
}

impl GitFolder {
pub async fn prepare(self, logger: &mut impl BuildLogger) -> eyre::Result<PathBuf> {
let GitFolder { reuse } = self;

eprintln!("reuse: {reuse:?}");
let clone_dir = match reuse {
ReuseOptions::NewClone {
target_dir,
repo_url,
commit_hash,
} => {
logger
.log_message(
LogLevel::Info,
format!(
"cloning {repo_url}#{commit_hash} into {}",
target_dir.display()
),
)
.await;
let clone_target = target_dir.clone();
let checkout_result = tokio::task::spawn_blocking(move || {
let repository = clone_into(repo_url.clone(), &clone_target)
.with_context(|| format!("failed to clone git repo from `{repo_url}`"))?;
checkout_tree(&repository, &commit_hash)
.with_context(|| format!("failed to checkout commit `{commit_hash}`"))
})
.await
.unwrap();

match checkout_result {
Ok(()) => target_dir,
Err(err) => {
logger
.log_message(LogLevel::Error, format!("{err:?}"))
.await;
// remove erroneous clone again
if let Err(err) = std::fs::remove_dir_all(target_dir) {
logger
.log_message(
LogLevel::Error,
format!(
"failed to remove clone dir after clone/checkout error: {}",
err.kind()
),
)
.await;
}
bail!(err)
}
}
}
ReuseOptions::CopyAndFetch {
from,
target_dir,
commit_hash,
} => {
tokio::fs::copy(&from, &target_dir)
.await
.context("failed to copy repo clone")?;

logger
.log_message(
LogLevel::Info,
format!("fetching changes after copying {}", from.display()),
)
.await;

let repository = fetch_changes(&target_dir, None).await?;
checkout_tree(&repository, &commit_hash)?;
target_dir
}
ReuseOptions::RenameAndFetch {
from,
target_dir,
commit_hash,
} => {
tokio::fs::rename(&from, &target_dir)
.await
.context("failed to rename repo clone")?;

logger
.log_message(
LogLevel::Info,
format!("fetching changes after renaming {}", from.display()),
)
.await;

let repository = fetch_changes(&target_dir, None).await?;
checkout_tree(&repository, &commit_hash)?;
target_dir
}
ReuseOptions::Reuse { dir } => {
logger
.log_message(
LogLevel::Info,
format!("reusing up-to-date {}", dir.display()),
)
.await;
dir
}
};
Ok(clone_dir)
}
}

#[derive(Debug)]
enum ReuseOptions {
/// Create a new clone of the repository.
NewClone {
target_dir: PathBuf,
repo_url: Url,
commit_hash: String,
},
/// Reuse an existing up-to-date clone of the repository.
Reuse { dir: PathBuf },
/// Copy an older clone of the repository and fetch changes, then reuse it.
CopyAndFetch {
from: PathBuf,
target_dir: PathBuf,
commit_hash: String,
},
/// Rename an older clone of the repository and fetch changes, then reuse it.
RenameAndFetch {
from: PathBuf,
target_dir: PathBuf,
commit_hash: String,
},
}

fn rev_str(rev: &Option<GitRepoRev>) -> String {
match rev {
Some(GitRepoRev::Branch(branch)) => format!(" (branch {branch})"),
Some(GitRepoRev::Tag(tag)) => format!(" (tag {tag})"),
Some(GitRepoRev::Rev(rev)) => format!(" (rev {rev})"),
None => String::new(),
}
}

fn clone_into(repo_addr: Url, clone_dir: &Path) -> eyre::Result<git2::Repository> {
if let Some(parent) = clone_dir.parent() {
std::fs::create_dir_all(parent)
.context("failed to create parent directory for git clone")?;
}

let clone_dir = clone_dir.to_owned();

let mut builder = git2::build::RepoBuilder::new();
let mut fetch_options = git2::FetchOptions::new();
fetch_options.download_tags(git2::AutotagOption::All);
builder.fetch_options(fetch_options);
builder
.clone(repo_addr.as_str(), &clone_dir)
.context("failed to clone repo")
}

async fn fetch_changes(
repo_dir: &Path,
refname: Option<String>,
) -> Result<git2::Repository, eyre::Error> {
let repo_dir = repo_dir.to_owned();
let fetch_changes = tokio::task::spawn_blocking(move || {
let repository = git2::Repository::open(&repo_dir).context("failed to open git repo")?;

{
let mut remote = repository
.find_remote("origin")
.context("failed to find remote `origin` in repo")?;
remote
.connect(git2::Direction::Fetch)
.context("failed to connect to remote")?;
let default_branch = remote
.default_branch()
.context("failed to get default branch for remote")?;
let fetch = match &refname {
Some(refname) => refname,
None => default_branch
.as_str()
.context("failed to read default branch as string")?,
};
let mut fetch_options = FetchOptions::new();
fetch_options.download_tags(git2::AutotagOption::All);
remote
.fetch(&[&fetch], Some(&mut fetch_options), None)
.context("failed to fetch from git repo")?;
}
Result::<_, eyre::Error>::Ok(repository)
});
let repository = fetch_changes.await??;
Ok(repository)
}

fn checkout_tree(repository: &git2::Repository, commit_hash: &str) -> eyre::Result<()> {
let (object, reference) = repository
.revparse_ext(commit_hash)
.context("failed to parse ref")?;
repository
.checkout_tree(&object, None)
.context("failed to checkout ref")?;
match reference {
Some(reference) => repository
.set_head(reference.name().context("failed to get reference_name")?)
.context("failed to set head")?,
None => repository
.set_head_detached(object.id())
.context("failed to set detached head")?,
}

Ok(())
}

+ 15
- 0
libraries/core/src/build/logger.rs View File

@@ -0,0 +1,15 @@
use std::future::Future;

use dora_message::common::LogLevel;

pub trait BuildLogger: Send {
type Clone: BuildLogger + 'static;

fn log_message(
&mut self,
level: LogLevel,
message: impl Into<String> + Send,
) -> impl Future<Output = ()> + Send;

fn try_clone(&self) -> impl Future<Output = eyre::Result<Self::Clone>> + Send;
}

+ 139
- 0
libraries/core/src/build/mod.rs View File

@@ -0,0 +1,139 @@
pub use git::GitManager;
pub use logger::BuildLogger;

use url::Url;

use std::{collections::BTreeMap, future::Future, path::PathBuf};

use crate::descriptor::ResolvedNode;
use dora_message::{
common::{GitSource, LogLevel},
descriptor::{CoreNodeKind, EnvValue},
id::NodeId,
SessionId,
};
use eyre::Context;

use build_command::run_build_command;
use git::GitFolder;

mod build_command;
mod git;
mod logger;

#[derive(Clone)]
pub struct Builder {
pub session_id: SessionId,
pub base_working_dir: PathBuf,
pub uv: bool,
}

impl Builder {
pub async fn build_node(
self,
node: ResolvedNode,
git: Option<GitSource>,
prev_git: Option<GitSource>,
mut logger: impl BuildLogger,
git_manager: &mut GitManager,
) -> eyre::Result<impl Future<Output = eyre::Result<BuiltNode>>> {
let prepared_git = if let Some(GitSource { repo, commit_hash }) = git {
let repo_url = Url::parse(&repo).context("failed to parse git repository URL")?;
let target_dir = self.base_working_dir.join("git");
let prev_hash = prev_git.filter(|p| p.repo == repo).map(|p| p.commit_hash);
let git_folder = git_manager.choose_clone_dir(
self.session_id,
repo_url,
commit_hash,
prev_hash,
&target_dir,
)?;
Some(git_folder)
} else {
None
};

let task = async move { self.build_node_inner(node, &mut logger, prepared_git).await };
Ok(task)
}

async fn build_node_inner(
self,
node: ResolvedNode,
logger: &mut impl BuildLogger,
git_folder: Option<GitFolder>,
) -> eyre::Result<BuiltNode> {
logger.log_message(LogLevel::Debug, "building node").await;
let node_working_dir = match &node.kind {
CoreNodeKind::Custom(n) => {
let node_working_dir = match git_folder {
Some(git_folder) => git_folder.prepare(logger).await?,
None => self.base_working_dir,
};

if let Some(build) = &n.build {
build_node(logger, &node.env, node_working_dir.clone(), build, self.uv).await?;
}
node_working_dir
}
CoreNodeKind::Runtime(n) => {
// run build commands
for operator in &n.operators {
if let Some(build) = &operator.config.build {
build_node(
logger,
&node.env,
self.base_working_dir.clone(),
build,
self.uv,
)
.await?;
}
}
self.base_working_dir.clone()
}
};
Ok(BuiltNode { node_working_dir })
}
}

async fn build_node(
logger: &mut impl BuildLogger,
node_env: &Option<BTreeMap<String, EnvValue>>,
working_dir: PathBuf,
build: &String,
uv: bool,
) -> eyre::Result<()> {
logger
.log_message(LogLevel::Info, format!("running build command: `{build}"))
.await;
let build = build.to_owned();
let node_env = node_env.clone();
let mut logger = logger.try_clone().await.context("failed to clone logger")?;
let (stdout_tx, mut stdout) = tokio::sync::mpsc::channel(10);
let task = tokio::task::spawn_blocking(move || {
run_build_command(&build, &working_dir, uv, &node_env, stdout_tx)
.context("build command failed")
});
tokio::spawn(async move {
while let Some(line) = stdout.recv().await {
logger
.log_message(
LogLevel::Info,
line.unwrap_or_else(|err| format!("io err: {}", err.kind())),
)
.await;
}
});
task.await??;
Ok(())
}

pub struct BuiltNode {
pub node_working_dir: PathBuf,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BuildInfo {
pub node_working_dirs: BTreeMap<NodeId, PathBuf>,
}

+ 0
- 0
libraries/core/src/git.rs View File


+ 1
- 0
libraries/core/src/lib.rs View File

@@ -9,6 +9,7 @@ pub use dora_message::{config, uhlc};

pub mod build;
pub mod descriptor;
pub mod git;
pub mod metadata;
pub mod topics;



+ 36
- 5
libraries/message/src/cli_to_coordinator.rs View File

@@ -1,22 +1,48 @@
use std::{path::PathBuf, time::Duration};
use std::{collections::BTreeMap, path::PathBuf, time::Duration};

use uuid::Uuid;

use crate::{
common::GitSource,
descriptor::Descriptor,
id::{NodeId, OperatorId},
BuildId, SessionId,
};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequest {
Build {
session_id: SessionId,
dataflow: Descriptor,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
local_working_dir: Option<PathBuf>,
uv: bool,
},
WaitForBuild {
build_id: BuildId,
},
Start {
build_id: Option<BuildId>,
session_id: SessionId,
dataflow: Descriptor,
name: Option<String>,
// TODO: remove this once we figure out deploying of node/operator
// binaries from CLI to coordinator/daemon
local_working_dir: PathBuf,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
local_working_dir: Option<PathBuf>,
uv: bool,
build_only: bool,
},
WaitForSpawn {
dataflow_id: Uuid,
@@ -50,4 +76,9 @@ pub enum ControlRequest {
dataflow_id: Uuid,
level: log::LevelFilter,
},
BuildLogSubscribe {
build_id: BuildId,
level: log::LevelFilter,
},
CliAndDefaultDaemonOnSameMachine,
}

+ 9
- 2
libraries/message/src/common.rs View File

@@ -5,14 +5,15 @@ use aligned_vec::{AVec, ConstAlign};
use eyre::Context as _;
use uuid::Uuid;

use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, DataflowId};
use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId};

pub use log::Level as LogLevel;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[must_use]
pub struct LogMessage {
pub dataflow_id: DataflowId,
pub build_id: Option<BuildId>,
pub dataflow_id: Option<DataflowId>,
pub node_id: Option<NodeId>,
pub daemon_id: Option<DaemonId>,
pub level: LogLevel,
@@ -239,3 +240,9 @@ impl std::fmt::Display for DaemonId {
write!(f, "{}", self.uuid)
}
}

#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
pub struct GitSource {
pub repo: String,
pub commit_hash: String,
}

+ 29
- 6
libraries/message/src/coordinator_to_cli.rs View File

@@ -1,23 +1,46 @@
use std::collections::{BTreeMap, BTreeSet};
use std::{
collections::{BTreeMap, BTreeSet},
net::IpAddr,
};

use uuid::Uuid;

pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus};
use crate::{common::DaemonId, id::NodeId};
use crate::{common::DaemonId, id::NodeId, BuildId};

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply {
Error(String),
CoordinatorStopped,
DataflowStartTriggered { uuid: Uuid },
DataflowSpawned { uuid: Uuid },
DataflowReloaded { uuid: Uuid },
DataflowStopped { uuid: Uuid, result: DataflowResult },
DataflowBuildTriggered {
build_id: BuildId,
},
DataflowBuildFinished {
build_id: BuildId,
result: Result<(), String>,
},
DataflowStartTriggered {
uuid: Uuid,
},
DataflowSpawned {
uuid: Uuid,
},
DataflowReloaded {
uuid: Uuid,
},
DataflowStopped {
uuid: Uuid,
result: DataflowResult,
},
DataflowList(DataflowList),
DestroyOk,
DaemonConnected(bool),
ConnectedDaemons(BTreeSet<DaemonId>),
Logs(Vec<u8>),
CliAndDefaultDaemonIps {
default_daemon: Option<IpAddr>,
cli: Option<IpAddr>,
},
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]


+ 32
- 4
libraries/message/src/coordinator_to_daemon.rs View File

@@ -5,10 +5,10 @@ use std::{
};

use crate::{
common::DaemonId,
common::{DaemonId, GitSource},
descriptor::{Descriptor, ResolvedNode},
id::{NodeId, OperatorId},
DataflowId,
BuildId, DataflowId, SessionId,
};

pub use crate::common::Timestamped;
@@ -33,6 +33,7 @@ impl RegisterResult {

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorEvent {
Build(BuildDataflowNodes),
Spawn(SpawnDataflowNodes),
AllNodesReady {
dataflow_id: DataflowId,
@@ -55,13 +56,40 @@ pub enum DaemonCoordinatorEvent {
Heartbeat,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct BuildDataflowNodes {
pub build_id: BuildId,
pub session_id: SessionId,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
pub local_working_dir: Option<PathBuf>,
pub git_sources: BTreeMap<NodeId, GitSource>,
pub prev_git_sources: BTreeMap<NodeId, GitSource>,
pub dataflow_descriptor: Descriptor,
pub nodes_on_machine: BTreeSet<NodeId>,
pub uv: bool,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnDataflowNodes {
pub build_id: Option<BuildId>,
pub session_id: SessionId,
pub dataflow_id: DataflowId,
pub working_dir: PathBuf,
/// Allows overwriting the base working dir when CLI and daemon are
/// running on the same machine.
///
/// Must not be used for multi-machine dataflows.
///
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
pub local_working_dir: Option<PathBuf>,
pub nodes: BTreeMap<NodeId, ResolvedNode>,
pub dataflow_descriptor: Descriptor,
pub spawn_nodes: BTreeSet<NodeId>,
pub uv: bool,
pub build_only: bool,
}

+ 8
- 1
libraries/message/src/daemon_to_coordinator.rs View File

@@ -3,7 +3,9 @@ use std::collections::BTreeMap;
pub use crate::common::{
DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped,
};
use crate::{common::DaemonId, current_crate_version, id::NodeId, versions_compatible, DataflowId};
use crate::{
common::DaemonId, current_crate_version, id::NodeId, versions_compatible, BuildId, DataflowId,
};

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
@@ -46,6 +48,10 @@ impl DaemonRegisterRequest {

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum DaemonEvent {
BuildResult {
build_id: BuildId,
result: Result<(), String>,
},
SpawnResult {
dataflow_id: DataflowId,
result: Result<(), String>,
@@ -77,6 +83,7 @@ impl DataflowDaemonResult {

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorReply {
TriggerBuildResult(Result<(), String>),
TriggerSpawnResult(Result<(), String>),
ReloadResult(Result<(), String>),
StopResult(Result<(), String>),


+ 6
- 0
libraries/message/src/descriptor.rs View File

@@ -253,6 +253,12 @@ pub enum NodeSource {
},
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum ResolvedNodeSource {
Local,
GitCommit { repo: String, commit_hash: String },
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum GitRepoRev {
Branch(String),


+ 35
- 0
libraries/message/src/lib.rs View File

@@ -24,9 +24,44 @@ pub mod coordinator_to_cli;

pub use arrow_data;
pub use arrow_schema;
use uuid::Uuid;

pub type DataflowId = uuid::Uuid;

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash,
)]
pub struct SessionId(uuid::Uuid);

impl SessionId {
pub fn generate() -> Self {
Self(Uuid::new_v4())
}
}

impl std::fmt::Display for SessionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SessionId({})", self.0)
}
}

#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash,
)]
pub struct BuildId(uuid::Uuid);

impl BuildId {
pub fn generate() -> Self {
Self(Uuid::new_v4())
}
}

impl std::fmt::Display for BuildId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BuildId({})", self.0)
}
}

fn current_crate_version() -> semver::Version {
let crate_version_raw = env!("CARGO_PKG_VERSION");



Loading…
Cancel
Save