diff --git a/.gitignore b/.gitignore index 2bab6ed3..ade7dc46 100644 --- a/.gitignore +++ b/.gitignore @@ -34,7 +34,7 @@ __pycache__/ # Distribution / packaging .Python -build/ +/build/ develop-eggs/ dist/ downloads/ @@ -179,4 +179,4 @@ out/ #Miscellaneous yolo.yml -~* \ No newline at end of file +~* diff --git a/Cargo.lock b/Cargo.lock index 919ab7e4..9067383e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -851,7 +851,7 @@ dependencies = [ "num-traits", "rusticata-macros", "thiserror 1.0.69", - "time 0.3.41", + "time", ] [[package]] @@ -1415,7 +1415,7 @@ dependencies = [ "path_abs", "plist", "regex", - "semver 1.0.26", + "semver", "serde", "serde_yaml 0.9.34+deprecated", "shell-words", @@ -1823,7 +1823,7 @@ checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" dependencies = [ "camino", "cargo-platform", - "semver 1.0.26", + "semver", "serde", "serde_json", ] @@ -1836,7 +1836,7 @@ checksum = "2d886547e41f740c616ae73108f6eb70afe6d940c7bc697cb30f13daec073037" dependencies = [ "camino", "cargo-platform", - "semver 1.0.26", + "semver", "serde", "serde_json", "thiserror 1.0.69", @@ -2521,6 +2521,33 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96a6ac251f4a2aca6b3f91340350eab87ae57c3f127ffeb585e92bd336717991" +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "rustc_version", + "subtle", + "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "cxx" version = "1.0.149" @@ -2955,10 +2982,12 @@ dependencies = [ "dora-operator-api-c", "dora-runtime", "dora-tracing", + "dunce", "duration-str", "env_logger 0.11.7", "eyre", "futures", + "git2", "inquire", "log", "notify 5.2.0", @@ -3004,6 +3033,8 @@ dependencies = [ "dora-message", "dunce", "eyre", + "git2", + "itertools 0.14.0", "log", "once_cell", "schemars", @@ -3013,6 +3044,7 @@ dependencies = [ "serde_yaml 0.9.34+deprecated", "tokio", "tracing", + "url", "uuid 1.16.0", "which", ] @@ -3039,6 +3071,7 @@ dependencies = [ "futures", "futures-concurrency", "git2", + "itertools 0.14.0", "serde_json", "serde_yaml 0.8.26", "shared-memory-server", @@ -3067,6 +3100,7 @@ dependencies = [ name = "dora-examples" version = "0.0.0" dependencies = [ + "dora-cli", "dora-coordinator", "dora-core", "dora-download", @@ -3108,7 +3142,7 @@ dependencies = [ "log", "once_cell", "schemars", - "semver 1.0.26", + "semver", "serde", "serde-with-expand-env", "serde_yaml 0.9.34+deprecated", @@ -3196,7 +3230,7 @@ name = "dora-node-api-python" version = "0.3.10" dependencies = [ "arrow 54.2.1", - "dora-daemon", + "dora-cli", "dora-download", "dora-node-api", "dora-operator-api-python", @@ -3455,7 +3489,7 @@ dependencies = [ "rust_decimal", "serde", "thiserror 1.0.69", - "time 0.3.41", + "time", ] [[package]] @@ -3492,6 +3526,31 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18aade80d5e09429040243ce1143ddc08a92d7a22820ac512610410a4dd5214f" +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "pkcs8 0.10.2", + "signature 2.2.0", +] + +[[package]] +name = "ed25519-dalek" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" +dependencies = [ + "curve25519-dalek", + "ed25519", + "serde", + "sha2", + "signature 2.2.0", + "subtle", + "zeroize", +] + [[package]] name = "eframe" version = "0.31.1" @@ -4099,6 +4158,12 @@ dependencies = [ "anyhow", ] +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "filetime" version = "0.2.25" @@ -5073,7 +5138,7 @@ dependencies = [ "dirs 5.0.1", "futures", "http 1.3.1", - "indicatif 0.17.11", + "indicatif", "libc", "log", "num_cpus", @@ -5204,7 +5269,7 @@ dependencies = [ "http 0.2.12", "http-serde", "serde", - "time 0.3.41", + "time", ] [[package]] @@ -5602,18 +5667,6 @@ dependencies = [ "serde", ] -[[package]] -name = "indicatif" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7baab56125e25686df467fe470785512329883aab42696d661247aca2a2896e4" -dependencies = [ - "console", - "lazy_static", - "number_prefix 0.3.0", - "regex", -] - [[package]] name = "indicatif" version = "0.17.11" @@ -5621,7 +5674,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235" dependencies = [ "console", - "number_prefix 0.4.0", + "number_prefix", "portable-atomic", "rayon", "unicode-width 0.2.0", @@ -6730,7 +6783,7 @@ dependencies = [ "hf-hub", "image", "indexmap 2.8.0", - "indicatif 0.17.11", + "indicatif", "interprocess", "itertools 0.13.0", "llguidance", @@ -7367,12 +7420,6 @@ dependencies = [ "libc", ] -[[package]] -name = "number_prefix" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a" - [[package]] name = "number_prefix" version = "0.4.0" @@ -8382,7 +8429,7 @@ dependencies = [ "indexmap 2.8.0", "quick-xml 0.32.0", "serde", - "time 0.3.41", + "time", ] [[package]] @@ -8880,15 +8927,6 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" -[[package]] -name = "quick-xml" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26aab6b48e2590e4a64d1ed808749ba06257882b461d01ca71baeb747074a6dd" -dependencies = [ - "memchr", -] - [[package]] name = "quick-xml" version = "0.30.0" @@ -9238,7 +9276,7 @@ dependencies = [ "serde_json", "sha2", "thiserror 1.0.69", - "time 0.3.41", + "time", "url", "uuid 1.16.0", "web-sys", @@ -9325,7 +9363,7 @@ dependencies = [ "cargo_metadata 0.18.1", "glob", "sha2", - "time 0.3.41", + "time", "unindent", "walkdir", ] @@ -9760,7 +9798,7 @@ dependencies = [ "serde_bytes", "static_assertions", "thiserror 1.0.69", - "time 0.3.41", + "time", "typenum", "uuid 1.16.0", "web-time", @@ -10241,7 +10279,7 @@ dependencies = [ "strum 0.26.3", "strum_macros 0.26.4", "sublime_fuzzy", - "time 0.3.41", + "time", "url", ] @@ -11327,7 +11365,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" dependencies = [ - "semver 1.0.26", + "semver", ] [[package]] @@ -11743,34 +11781,39 @@ dependencies = [ "libc", ] +[[package]] +name = "self-replace" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03ec815b5eab420ab893f63393878d89c90fdd94c0bcc44c07abb8ad95552fb7" +dependencies = [ + "fastrand 2.3.0", + "tempfile", + "windows-sys 0.52.0", +] + [[package]] name = "self_update" -version = "0.27.0" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fb85f1802f7b987237b8525c0fde86ea86f31c957c1875467c727d5b921179c" +checksum = "d832c086ece0dacc29fb2947bb4219b8f6e12fe9e40b7108f9e57c4224e47b5c" dependencies = [ "either", "flate2", - "hyper 0.14.32", - "indicatif 0.15.0", + "hyper 1.6.0", + "indicatif", "log", - "quick-xml 0.20.0", + "quick-xml 0.37.2", "regex", - "reqwest 0.11.27", - "semver 0.11.0", + "reqwest 0.12.15", + "self-replace", + "semver", "serde_json", "tar", "tempfile", - "zip 0.5.13", -] - -[[package]] -name = "semver" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" -dependencies = [ - "semver-parser", + "urlencoding", + "zip 2.4.2", + "zipsign-api", ] [[package]] @@ -11782,15 +11825,6 @@ dependencies = [ "serde", ] -[[package]] -name = "semver-parser" -version = "0.10.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9900206b54a3527fdc7b8a938bffd94a568bac4f4aa8113b209df75a09c0dec2" -dependencies = [ - "pest", -] - [[package]] name = "seq-macro" version = "0.3.6" @@ -11957,7 +11991,7 @@ dependencies = [ "serde_derive", "serde_json", "serde_with_macros", - "time 0.3.41", + "time", ] [[package]] @@ -13015,17 +13049,6 @@ dependencies = [ "weezl", ] -[[package]] -name = "time" -version = "0.1.45" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi 0.3.9", -] - [[package]] name = "time" version = "0.3.41" @@ -13188,7 +13211,7 @@ dependencies = [ "derive_builder", "esaxx-rs", "getrandom 0.2.15", - "indicatif 0.17.11", + "indicatif", "itertools 0.13.0", "lazy_static", "log", @@ -14074,12 +14097,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -15367,7 +15384,7 @@ dependencies = [ "oid-registry", "rusticata-macros", "thiserror 1.0.69", - "time 0.3.41", + "time", ] [[package]] @@ -15979,7 +15996,7 @@ dependencies = [ "rustls 0.23.25", "rustls-webpki 0.102.8", "serde", - "time 0.3.41", + "time", "tokio", "tokio-util", "tracing", @@ -16030,7 +16047,7 @@ dependencies = [ "rustls-pki-types", "rustls-webpki 0.102.8", "secrecy", - "time 0.3.41", + "time", "tokio", "tokio-util", "tracing", @@ -16115,7 +16132,7 @@ dependencies = [ "rustls-webpki 0.102.8", "secrecy", "socket2 0.5.8", - "time 0.3.41", + "time", "tls-listener", "tokio", "tokio-rustls 0.26.2", @@ -16601,29 +16618,44 @@ dependencies = [ [[package]] name = "zip" -version = "0.5.13" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93ab48844d61251bb3835145c521d88aa4031d7139e8485990f60ca911fa0815" +checksum = "9cc23c04387f4da0374be4533ad1208cbb091d5c11d070dfef13676ad6497164" dependencies = [ - "byteorder", + "arbitrary", "crc32fast", + "crossbeam-utils", + "displaydoc", + "indexmap 2.8.0", + "num_enum", "thiserror 1.0.69", - "time 0.1.45", ] [[package]] name = "zip" -version = "1.1.4" +version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cc23c04387f4da0374be4533ad1208cbb091d5c11d070dfef13676ad6497164" +checksum = "fabe6324e908f85a1c52063ce7aa26b68dcb7eb6dbc83a2d148403c9bc3eba50" dependencies = [ "arbitrary", "crc32fast", "crossbeam-utils", "displaydoc", "indexmap 2.8.0", - "num_enum", - "thiserror 1.0.69", + "memchr", + "thiserror 2.0.12", + "time", +] + +[[package]] +name = "zipsign-api" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dba6063ff82cdbd9a765add16d369abe81e520f836054e997c2db217ceca40c0" +dependencies = [ + "base64 0.22.1", + "ed25519-dalek", + "thiserror 2.0.12", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a353bf17..d2489a52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ dora-metrics = { version = "0.3.10", path = "libraries/extensions/telemetry/metr dora-download = { version = "0.3.10", path = "libraries/extensions/download" } shared-memory-server = { version = "0.3.10", path = "libraries/shared-memory-server" } communication-layer-request-reply = { version = "0.3.10", path = "libraries/communication-layer/request-reply" } +dora-cli = { version = "0.3.10", path = "binaries/cli" } dora-runtime = { version = "0.3.10", path = "binaries/runtime" } dora-daemon = { version = "0.3.10", path = "binaries/daemon" } dora-coordinator = { version = "0.3.10", path = "binaries/coordinator" } @@ -88,6 +89,7 @@ pyo3 = { version = "0.23", features = [ "multiple-pymethods", ] } pythonize = "0.23" +git2 = { version = "0.18.0", features = ["vendored-openssl"] } [package] name = "dora-examples" @@ -104,6 +106,7 @@ ros2-examples = [] [dev-dependencies] eyre = "0.6.8" tokio = "1.24.2" +dora-cli = { workspace = true } dora-coordinator = { workspace = true } dora-core = { workspace = true } dora-message = { workspace = true } diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 54ebff5c..c06fbbaa 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -24,7 +24,7 @@ eyre = "0.6" serde_yaml = "0.8.23" flume = "0.10.14" dora-runtime = { workspace = true, features = ["tracing", "metrics", "python"] } -dora-daemon = { workspace = true } +dora-cli = { workspace = true } dora-download = { workspace = true } arrow = { workspace = true, features = ["pyarrow"] } pythonize = { workspace = true } diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index e2a249a9..2d3634cd 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use std::time::Duration; use arrow::pyarrow::{FromPyArrow, ToPyArrow}; -use dora_daemon::Daemon; use dora_download::download_file; use dora_node_api::dora_core::config::NodeId; use dora_node_api::dora_core::descriptor::source_is_url; @@ -382,19 +381,7 @@ pub fn resolve_dataflow(dataflow: String) -> eyre::Result { #[pyfunction] #[pyo3(signature = (dataflow_path, uv=None))] pub fn run(dataflow_path: String, uv: Option) -> eyre::Result<()> { - let dataflow_path = resolve_dataflow(dataflow_path).context("could not resolve dataflow")?; - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .context("tokio runtime failed")?; - let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv.unwrap_or_default()))?; - match result.is_ok() { - true => Ok(()), - false => Err(eyre::eyre!( - "Dataflow failed to run with error: {:?}", - result.node_results - )), - } + dora_cli::command::run(dataflow_path, uv.unwrap_or_default()) } #[pymodule] diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 20d42015..96b67a76 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -50,7 +50,7 @@ tabwriter = "1.4.0" log = { version = "0.4.21", features = ["serde"] } colored = "2.1.0" env_logger = "0.11.3" -self_update = { version = "0.27.0", features = [ +self_update = { version = "0.42.0", features = [ "rustls", "archive-zip", "archive-tar", @@ -60,7 +60,8 @@ pyo3 = { workspace = true, features = [ "extension-module", "abi3", ], optional = true } - +dunce = "1.0.5" +git2 = { workspace = true } [lib] name = "dora_cli" diff --git a/binaries/cli/src/command/build/distributed.rs b/binaries/cli/src/command/build/distributed.rs new file mode 100644 index 00000000..9e7fca67 --- /dev/null +++ b/binaries/cli/src/command/build/distributed.rs @@ -0,0 +1,107 @@ +use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; +use dora_core::descriptor::Descriptor; +use dora_message::{ + cli_to_coordinator::ControlRequest, + common::{GitSource, LogMessage}, + coordinator_to_cli::ControlRequestReply, + id::NodeId, + BuildId, +}; +use eyre::{bail, Context}; +use std::{ + collections::BTreeMap, + net::{SocketAddr, TcpStream}, +}; + +use crate::{output::print_log_message, session::DataflowSession}; + +pub fn build_distributed_dataflow( + session: &mut TcpRequestReplyConnection, + dataflow: Descriptor, + git_sources: &BTreeMap, + dataflow_session: &DataflowSession, + local_working_dir: Option, + uv: bool, +) -> eyre::Result { + let build_id = { + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::Build { + session_id: dataflow_session.session_id, + dataflow, + git_sources: git_sources.clone(), + prev_git_sources: dataflow_session.git_sources.clone(), + local_working_dir, + uv, + }) + .unwrap(), + ) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowBuildTriggered { build_id } => { + eprintln!("dataflow build triggered: {build_id}"); + build_id + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } + }; + Ok(build_id) +} + +pub fn wait_until_dataflow_built( + build_id: BuildId, + session: &mut TcpRequestReplyConnection, + coordinator_socket: SocketAddr, + log_level: log::LevelFilter, +) -> eyre::Result { + // subscribe to log messages + let mut log_session = TcpConnection { + stream: TcpStream::connect(coordinator_socket) + .wrap_err("failed to connect to dora coordinator")?, + }; + log_session + .send( + &serde_json::to_vec(&ControlRequest::BuildLogSubscribe { + build_id, + level: log_level, + }) + .wrap_err("failed to serialize message")?, + ) + .wrap_err("failed to send build log subscribe request to coordinator")?; + std::thread::spawn(move || { + while let Ok(raw) = log_session.receive() { + let parsed: eyre::Result = + serde_json::from_slice(&raw).context("failed to parse log message"); + match parsed { + Ok(log_message) => { + print_log_message(log_message); + } + Err(err) => { + tracing::warn!("failed to parse log message: {err:?}") + } + } + } + }); + + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::WaitForBuild { build_id }).unwrap()) + .wrap_err("failed to send WaitForBuild message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowBuildFinished { build_id, result } => match result { + Ok(()) => { + eprintln!("dataflow build finished successfully"); + Ok(build_id) + } + Err(err) => bail!("{err}"), + }, + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } +} diff --git a/binaries/cli/src/command/build/git.rs b/binaries/cli/src/command/build/git.rs new file mode 100644 index 00000000..18faba87 --- /dev/null +++ b/binaries/cli/src/command/build/git.rs @@ -0,0 +1,45 @@ +use dora_message::{common::GitSource, descriptor::GitRepoRev}; +use eyre::Context; + +pub fn fetch_commit_hash(repo_url: String, rev: Option) -> eyre::Result { + let mut remote = git2::Remote::create_detached(repo_url.as_bytes()) + .with_context(|| format!("failed to create git remote for {repo_url}"))?; + let connection = remote + .connect_auth(git2::Direction::Fetch, None, None) + .with_context(|| format!("failed to open connection to {repo_url}"))?; + let references = connection + .list() + .with_context(|| format!("failed to list git references of {repo_url}"))?; + + let expected_name = match &rev { + Some(GitRepoRev::Branch(branch)) => format!("refs/heads/{branch}"), + Some(GitRepoRev::Tag(tag)) => format!("refs/tags/{tag}"), + Some(GitRepoRev::Rev(rev)) => rev.clone(), + None => "HEAD".into(), + }; + + let mut commit_hash = None; + for head in references { + if head.name() == expected_name { + commit_hash = Some(head.oid().to_string()); + break; + } + } + + if commit_hash.is_none() { + if let Some(GitRepoRev::Rev(rev)) = &rev { + // rev might be a commit hash instead of a reference + if rev.is_ascii() && rev.bytes().all(|b| b.is_ascii_alphanumeric()) { + commit_hash = Some(rev.clone()); + } + } + } + + match commit_hash { + Some(commit_hash) => Ok(GitSource { + repo: repo_url, + commit_hash, + }), + None => eyre::bail!("no matching commit for `{rev:?}`"), + } +} diff --git a/binaries/cli/src/command/build/local.rs b/binaries/cli/src/command/build/local.rs new file mode 100644 index 00000000..ac28eeca --- /dev/null +++ b/binaries/cli/src/command/build/local.rs @@ -0,0 +1,101 @@ +use std::{collections::BTreeMap, path::PathBuf}; + +use dora_core::{ + build::{BuildInfo, BuildLogger, Builder, GitManager}, + descriptor::{Descriptor, DescriptorExt}, +}; +use dora_message::{common::GitSource, id::NodeId}; +use eyre::Context; + +use crate::session::DataflowSession; + +pub fn build_dataflow_locally( + dataflow: Descriptor, + git_sources: &BTreeMap, + dataflow_session: &DataflowSession, + working_dir: PathBuf, + uv: bool, +) -> eyre::Result { + let runtime = tokio::runtime::Runtime::new()?; + + runtime.block_on(build_dataflow( + dataflow, + git_sources, + dataflow_session, + working_dir, + uv, + )) +} + +async fn build_dataflow( + dataflow: Descriptor, + git_sources: &BTreeMap, + dataflow_session: &DataflowSession, + base_working_dir: PathBuf, + uv: bool, +) -> eyre::Result { + let builder = Builder { + session_id: dataflow_session.session_id, + base_working_dir, + uv, + }; + let nodes = dataflow.resolve_aliases_and_set_defaults()?; + + let mut git_manager = GitManager::default(); + let prev_git_sources = &dataflow_session.git_sources; + + let mut tasks = Vec::new(); + + // build nodes + for node in nodes.into_values() { + let node_id = node.id.clone(); + let git_source = git_sources.get(&node_id).cloned(); + let prev_git_source = prev_git_sources.get(&node_id).cloned(); + + let task = builder + .clone() + .build_node( + node, + git_source, + prev_git_source, + LocalBuildLogger { + node_id: node_id.clone(), + }, + &mut git_manager, + ) + .await + .wrap_err_with(|| format!("failed to build node `{node_id}`"))?; + tasks.push((node_id, task)); + } + + let mut info = BuildInfo { + node_working_dirs: Default::default(), + }; + for (node_id, task) in tasks { + let node = task + .await + .with_context(|| format!("failed to build node `{node_id}`"))?; + info.node_working_dirs + .insert(node_id, node.node_working_dir); + } + Ok(info) +} + +struct LocalBuildLogger { + node_id: NodeId, +} + +impl BuildLogger for LocalBuildLogger { + type Clone = Self; + + async fn log_message(&mut self, level: log::Level, message: impl Into + Send) { + let message: String = message.into(); + println!("{}: \t{level}: \t{message}", self.node_id); + } + + async fn try_clone(&self) -> eyre::Result { + Ok(LocalBuildLogger { + node_id: self.node_id.clone(), + }) + } +} diff --git a/binaries/cli/src/command/build/mod.rs b/binaries/cli/src/command/build/mod.rs new file mode 100644 index 00000000..fff1d452 --- /dev/null +++ b/binaries/cli/src/command/build/mod.rs @@ -0,0 +1,162 @@ +use communication_layer_request_reply::TcpRequestReplyConnection; +use dora_core::{ + descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt}, + topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}, +}; +use dora_message::{descriptor::NodeSource, BuildId}; +use eyre::Context; +use std::collections::BTreeMap; + +use crate::{connect_to_coordinator, resolve_dataflow, session::DataflowSession}; + +use distributed::{build_distributed_dataflow, wait_until_dataflow_built}; +use local::build_dataflow_locally; + +mod distributed; +mod git; +mod local; + +pub fn build( + dataflow: String, + coordinator_addr: Option, + coordinator_port: Option, + uv: bool, + force_local: bool, +) -> eyre::Result<()> { + let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let dataflow_descriptor = + Descriptor::blocking_read(&dataflow_path).wrap_err("Failed to read yaml dataflow")?; + let mut dataflow_session = + DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?; + + let mut git_sources = BTreeMap::new(); + let resolved_nodes = dataflow_descriptor + .resolve_aliases_and_set_defaults() + .context("failed to resolve nodes")?; + for (node_id, node) in resolved_nodes { + if let CoreNodeKind::Custom(CustomNode { + source: NodeSource::GitBranch { repo, rev }, + .. + }) = node.kind + { + let source = git::fetch_commit_hash(repo, rev) + .with_context(|| format!("failed to find commit hash for `{node_id}`"))?; + git_sources.insert(node_id, source); + } + } + + let session = connect_to_coordinator_with_defaults(coordinator_addr, coordinator_port); + + let build_kind = if force_local { + // user explicitly requested a local build + BuildKind::Local + } else if coordinator_addr.is_some() || coordinator_port.is_some() { + // explicit coordinator address or port set -> there should be a coordinator running + BuildKind::ThroughCoordinator { + coordinator_session: session.context("failed to connect to coordinator")?, + } + } else { + match session { + Ok(coordinator_session) => { + // we found a local coordinator instance at default port -> use it for building + BuildKind::ThroughCoordinator { + coordinator_session, + } + } + Err(_) => { + // no coordinator instance found -> do a local build + BuildKind::Local + } + } + }; + + match build_kind { + BuildKind::Local => { + println!("running local build"); + // use dataflow dir as base working dir + let local_working_dir = dunce::canonicalize(&dataflow_path) + .context("failed to canonicalize dataflow path")? + .parent() + .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? + .to_owned(); + let build_info = build_dataflow_locally( + dataflow_descriptor, + &git_sources, + &dataflow_session, + local_working_dir, + uv, + )?; + + dataflow_session.git_sources = git_sources; + // generate a random BuildId and store the associated build info + dataflow_session.build_id = Some(BuildId::generate()); + dataflow_session.local_build = Some(build_info); + dataflow_session + .write_out_for_dataflow(&dataflow_path) + .context("failed to write out dataflow session file")?; + } + BuildKind::ThroughCoordinator { + mut coordinator_session, + } => { + let local_working_dir = super::local_working_dir( + &dataflow_path, + &dataflow_descriptor, + &mut *coordinator_session, + )?; + let build_id = build_distributed_dataflow( + &mut *coordinator_session, + dataflow_descriptor, + &git_sources, + &dataflow_session, + local_working_dir, + uv, + )?; + + dataflow_session.git_sources = git_sources; + dataflow_session + .write_out_for_dataflow(&dataflow_path) + .context("failed to write out dataflow session file")?; + + // wait until dataflow build is finished + + wait_until_dataflow_built( + build_id, + &mut *coordinator_session, + coordinator_socket(coordinator_addr, coordinator_port), + log::LevelFilter::Info, + )?; + + dataflow_session.build_id = Some(build_id); + dataflow_session.local_build = None; + dataflow_session + .write_out_for_dataflow(&dataflow_path) + .context("failed to write out dataflow session file")?; + } + }; + + Ok(()) +} + +enum BuildKind { + Local, + ThroughCoordinator { + coordinator_session: Box, + }, +} + +fn connect_to_coordinator_with_defaults( + coordinator_addr: Option, + coordinator_port: Option, +) -> std::io::Result> { + let coordinator_socket = coordinator_socket(coordinator_addr, coordinator_port); + connect_to_coordinator(coordinator_socket) +} + +fn coordinator_socket( + coordinator_addr: Option, + coordinator_port: Option, +) -> std::net::SocketAddr { + let coordinator_addr = coordinator_addr.unwrap_or(LOCALHOST); + let coordinator_port = coordinator_port.unwrap_or(DORA_COORDINATOR_PORT_CONTROL_DEFAULT); + (coordinator_addr, coordinator_port).into() +} diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/command/check.rs similarity index 100% rename from binaries/cli/src/check.rs rename to binaries/cli/src/command/check.rs diff --git a/binaries/cli/src/logs.rs b/binaries/cli/src/command/logs.rs similarity index 100% rename from binaries/cli/src/logs.rs rename to binaries/cli/src/command/logs.rs diff --git a/binaries/cli/src/command/mod.rs b/binaries/cli/src/command/mod.rs new file mode 100644 index 00000000..77654440 --- /dev/null +++ b/binaries/cli/src/command/mod.rs @@ -0,0 +1,60 @@ +pub use build::build; +pub use logs::logs; +pub use run::run; +pub use start::start; + +use std::path::{Path, PathBuf}; + +use communication_layer_request_reply::TcpRequestReplyConnection; +use dora_core::descriptor::Descriptor; +use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; +use eyre::{bail, Context, ContextCompat}; + +mod build; +pub mod check; +mod logs; +mod run; +mod start; +pub mod up; + +fn local_working_dir( + dataflow_path: &Path, + dataflow_descriptor: &Descriptor, + coordinator_session: &mut TcpRequestReplyConnection, +) -> eyre::Result> { + Ok( + if dataflow_descriptor + .nodes + .iter() + .all(|n| n.deploy.machine.is_none()) + && cli_and_daemon_on_same_machine(coordinator_session)? + { + Some( + dunce::canonicalize(dataflow_path) + .context("failed to canonicalize dataflow file path")? + .parent() + .context("dataflow path has no parent dir")? + .to_owned(), + ) + } else { + None + }, + ) +} + +fn cli_and_daemon_on_same_machine(session: &mut TcpRequestReplyConnection) -> eyre::Result { + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::CliAndDefaultDaemonOnSameMachine).unwrap()) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::CliAndDefaultDaemonIps { + default_daemon, + cli, + } => Ok(default_daemon.is_some() && default_daemon == cli), + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } +} diff --git a/binaries/cli/src/command/run.rs b/binaries/cli/src/command/run.rs new file mode 100644 index 00000000..df01d16e --- /dev/null +++ b/binaries/cli/src/command/run.rs @@ -0,0 +1,23 @@ +use dora_daemon::Daemon; +use eyre::Context; +use tokio::runtime::Builder; + +use crate::{handle_dataflow_result, resolve_dataflow, session::DataflowSession}; + +pub fn run(dataflow: String, uv: bool) -> Result<(), eyre::Error> { + let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let dataflow_session = + DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?; + let rt = Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + let result = rt.block_on(Daemon::run_dataflow( + &dataflow_path, + dataflow_session.build_id, + dataflow_session.local_build, + dataflow_session.session_id, + uv, + ))?; + handle_dataflow_result(result, None) +} diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/command/start/attach.rs similarity index 86% rename from binaries/cli/src/attach.rs rename to binaries/cli/src/command/start/attach.rs index e40be1d8..05d776e0 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/command/start/attach.rs @@ -1,4 +1,3 @@ -use colored::Colorize; use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor, DescriptorExt}; use dora_message::cli_to_coordinator::ControlRequest; @@ -16,6 +15,7 @@ use tracing::{error, info}; use uuid::Uuid; use crate::handle_dataflow_result; +use crate::output::print_log_message; pub fn attach_dataflow( dataflow: Descriptor, @@ -183,42 +183,6 @@ pub fn attach_dataflow( } } -pub fn print_log_message(log_message: LogMessage) { - let LogMessage { - dataflow_id, - node_id, - daemon_id, - level, - target, - module_path: _, - file: _, - line: _, - message, - } = log_message; - let level = match level { - log::Level::Error => "ERROR".red(), - log::Level::Warn => "WARN ".yellow(), - log::Level::Info => "INFO ".green(), - other => format!("{other:5}").normal(), - }; - let dataflow = format!(" dataflow `{dataflow_id}`").cyan(); - let daemon = match daemon_id { - Some(id) => format!(" on daemon `{id}`"), - None => " on default daemon".to_string(), - } - .bright_black(); - let node = match node_id { - Some(node_id) => format!(" {node_id}").bold(), - None => "".normal(), - }; - let target = match target { - Some(target) => format!(" {target}").dimmed(), - None => "".normal(), - }; - - println!("{level}{dataflow}{daemon}{node}{target}: {message}"); -} - enum AttachEvent { Control(ControlRequest), Log(eyre::Result), diff --git a/binaries/cli/src/command/start/mod.rs b/binaries/cli/src/command/start/mod.rs new file mode 100644 index 00000000..5275a62d --- /dev/null +++ b/binaries/cli/src/command/start/mod.rs @@ -0,0 +1,167 @@ +use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; +use dora_core::descriptor::{Descriptor, DescriptorExt}; +use dora_message::{ + cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::ControlRequestReply, +}; +use eyre::{bail, Context}; +use std::{ + net::{SocketAddr, TcpStream}, + path::PathBuf, +}; +use uuid::Uuid; + +use crate::{ + connect_to_coordinator, output::print_log_message, resolve_dataflow, session::DataflowSession, +}; +use attach::attach_dataflow; + +mod attach; + +pub fn start( + dataflow: String, + name: Option, + coordinator_socket: SocketAddr, + attach: bool, + detach: bool, + hot_reload: bool, + uv: bool, +) -> eyre::Result<()> { + let (dataflow, dataflow_descriptor, mut session, dataflow_id) = + start_dataflow(dataflow, name, coordinator_socket, uv)?; + + let attach = match (attach, detach) { + (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), + (true, false) => true, + (false, true) => false, + (false, false) => { + println!("attaching to dataflow (use `--detach` to run in background)"); + true + } + }; + + if attach { + let log_level = env_logger::Builder::new() + .filter_level(log::LevelFilter::Info) + .parse_default_env() + .build() + .filter(); + + attach_dataflow( + dataflow_descriptor, + dataflow, + dataflow_id, + &mut *session, + hot_reload, + coordinator_socket, + log_level, + ) + } else { + // wait until dataflow is started + wait_until_dataflow_started( + dataflow_id, + &mut session, + coordinator_socket, + log::LevelFilter::Info, + ) + } +} + +fn start_dataflow( + dataflow: String, + name: Option, + coordinator_socket: SocketAddr, + uv: bool, +) -> Result<(PathBuf, Descriptor, Box, Uuid), eyre::Error> { + let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let dataflow_descriptor = + Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; + let dataflow_session = + DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?; + + let mut session = connect_to_coordinator(coordinator_socket) + .wrap_err("failed to connect to dora coordinator")?; + + let local_working_dir = + super::local_working_dir(&dataflow, &dataflow_descriptor, &mut *session)?; + + let dataflow_id = { + let dataflow = dataflow_descriptor.clone(); + let session: &mut TcpRequestReplyConnection = &mut *session; + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::Start { + build_id: dataflow_session.build_id, + session_id: dataflow_session.session_id, + dataflow, + name, + local_working_dir, + uv, + }) + .unwrap(), + ) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStartTriggered { uuid } => { + eprintln!("dataflow start triggered: {uuid}"); + uuid + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } + }; + Ok((dataflow, dataflow_descriptor, session, dataflow_id)) +} + +fn wait_until_dataflow_started( + dataflow_id: Uuid, + session: &mut Box, + coordinator_addr: SocketAddr, + log_level: log::LevelFilter, +) -> eyre::Result<()> { + // subscribe to log messages + let mut log_session = TcpConnection { + stream: TcpStream::connect(coordinator_addr) + .wrap_err("failed to connect to dora coordinator")?, + }; + log_session + .send( + &serde_json::to_vec(&ControlRequest::LogSubscribe { + dataflow_id, + level: log_level, + }) + .wrap_err("failed to serialize message")?, + ) + .wrap_err("failed to send log subscribe request to coordinator")?; + std::thread::spawn(move || { + while let Ok(raw) = log_session.receive() { + let parsed: eyre::Result = + serde_json::from_slice(&raw).context("failed to parse log message"); + match parsed { + Ok(log_message) => { + print_log_message(log_message); + } + Err(err) => { + tracing::warn!("failed to parse log message: {err:?}") + } + } + } + }); + + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::WaitForSpawn { dataflow_id }).unwrap()) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowSpawned { uuid } => { + eprintln!("dataflow started: {uuid}"); + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } + Ok(()) +} diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/command/up.rs similarity index 98% rename from binaries/cli/src/up.rs rename to binaries/cli/src/command/up.rs index 16f1a4c1..03eead4a 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/command/up.rs @@ -1,4 +1,4 @@ -use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST}; +use crate::{command::check::daemon_running, connect_to_coordinator, LOCALHOST}; use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT; use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; use eyre::{bail, Context, ContextCompat}; diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index 1bbc6cc0..aeaca232 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -1,8 +1,5 @@ -use attach::{attach_dataflow, print_log_message}; use colored::Colorize; -use communication_layer_request_reply::{ - RequestReplyLayer, TcpConnection, TcpLayer, TcpRequestReplyConnection, -}; +use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; use dora_coordinator::Event; use dora_core::{ descriptor::{source_is_url, Descriptor, DescriptorExt}, @@ -15,7 +12,6 @@ use dora_daemon::Daemon; use dora_download::download_file; use dora_message::{ cli_to_coordinator::ControlRequest, - common::LogMessage, coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus}, }; #[cfg(feature = "tracing")] @@ -24,11 +20,7 @@ use dora_tracing::{set_up_tracing_opts, FileLogging}; use duration_str::parse; use eyre::{bail, Context}; use formatting::FormatDataflowError; -use std::{ - env::current_dir, - io::Write, - net::{SocketAddr, TcpStream}, -}; +use std::{env::current_dir, io::Write, net::SocketAddr}; use std::{ net::{IpAddr, Ipv4Addr}, path::PathBuf, @@ -39,13 +31,12 @@ use tokio::runtime::Builder; use tracing::level_filters::LevelFilter; use uuid::Uuid; -mod attach; -mod check; +pub mod command; mod formatting; mod graph; -mod logs; +pub mod output; +pub mod session; mod template; -mod up; const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); @@ -90,14 +81,17 @@ enum Command { #[clap(value_name = "PATH")] dataflow: String, /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, + #[clap(long, value_name = "IP")] + coordinator_addr: Option, /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, + #[clap(long, value_name = "PORT")] + coordinator_port: Option, // Use UV to build nodes. #[clap(long, action)] uv: bool, + // Run build on local machine + #[clap(long, action)] + local: bool, }, /// Generate a new project or node. Choose the language between Rust, Python, C or C++. New { @@ -298,14 +292,14 @@ enum Lang { } pub fn lib_main(args: Args) { - if let Err(err) = run(args) { + if let Err(err) = run_cli(args) { eprintln!("\n\n{}", "[ERROR]".bold().red()); eprintln!("{err:?}"); std::process::exit(1); } } -fn run(args: Args) -> eyre::Result<()> { +fn run_cli(args: Args) -> eyre::Result<()> { #[cfg(feature = "tracing")] match &args.command { Command::Daemon { @@ -347,12 +341,6 @@ fn run(args: Args) -> eyre::Result<()> { } }; - let log_level = env_logger::Builder::new() - .filter_level(log::LevelFilter::Info) - .parse_default_env() - .build() - .filter(); - match args.command { Command::Check { dataflow, @@ -367,9 +355,9 @@ fn run(args: Args) -> eyre::Result<()> { .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; - check::check_environment((coordinator_addr, coordinator_port).into())? + command::check::check_environment((coordinator_addr, coordinator_port).into())? } - None => check::check_environment((coordinator_addr, coordinator_port).into())?, + None => command::check::check_environment((coordinator_addr, coordinator_port).into())?, }, Command::Graph { dataflow, @@ -383,34 +371,15 @@ fn run(args: Args) -> eyre::Result<()> { coordinator_addr, coordinator_port, uv, - } => { - let coordinator_socket = (coordinator_addr, coordinator_port).into(); - let (_, _, mut session, uuid) = - start_dataflow(dataflow, None, coordinator_socket, uv, true)?; - // wait until build is finished - wait_until_dataflow_started( - uuid, - &mut session, - true, - coordinator_socket, - log::LevelFilter::Info, - )?; - } + local, + } => command::build(dataflow, coordinator_addr, coordinator_port, uv, local)?, Command::New { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, - Command::Run { dataflow, uv } => { - let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; - let rt = Builder::new_multi_thread() - .enable_all() - .build() - .context("tokio runtime failed")?; - let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv))?; - handle_dataflow_result(result, None)? - } + Command::Run { dataflow, uv } => command::run(dataflow, uv)?, Command::Up { config } => { - up::up(config.as_deref())?; + command::up::up(config.as_deref())?; } Command::Logs { dataflow, @@ -425,7 +394,7 @@ fn run(args: Args) -> eyre::Result<()> { if let Some(dataflow) = dataflow { let uuid = Uuid::parse_str(&dataflow).ok(); let name = if uuid.is_some() { None } else { Some(dataflow) }; - logs::logs(&mut *session, uuid, name, node)? + command::logs(&mut *session, uuid, name, node)? } else { let active: Vec = list.get_active(); @@ -434,7 +403,7 @@ fn run(args: Args) -> eyre::Result<()> { [uuid] => uuid.clone(), _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, }; - logs::logs(&mut *session, Some(uuid.uuid), None, node)? + command::logs(&mut *session, Some(uuid.uuid), None, node)? } } Command::Start { @@ -448,39 +417,15 @@ fn run(args: Args) -> eyre::Result<()> { uv, } => { let coordinator_socket = (coordinator_addr, coordinator_port).into(); - let (dataflow, dataflow_descriptor, mut session, dataflow_id) = - start_dataflow(dataflow, name, coordinator_socket, uv, false)?; - - let attach = match (attach, detach) { - (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), - (true, false) => true, - (false, true) => false, - (false, false) => { - println!("attaching to dataflow (use `--detach` to run in background)"); - true - } - }; - - if attach { - attach_dataflow( - dataflow_descriptor, - dataflow, - dataflow_id, - &mut *session, - hot_reload, - coordinator_socket, - log_level, - )? - } else { - // wait until dataflow is started - wait_until_dataflow_started( - dataflow_id, - &mut session, - false, - coordinator_socket, - log::LevelFilter::Info, - )?; - } + command::start( + dataflow, + name, + coordinator_socket, + attach, + detach, + hot_reload, + uv, + )? } Command::List { coordinator_addr, @@ -510,7 +455,7 @@ fn run(args: Args) -> eyre::Result<()> { config, coordinator_addr, coordinator_port, - } => up::destroy( + } => command::up::destroy( config.as_deref(), (coordinator_addr, coordinator_port).into(), )?, @@ -560,8 +505,11 @@ fn run(args: Args) -> eyre::Result<()> { coordinator_addr ); } + let dataflow_session = + DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?; - let result = Daemon::run_dataflow(&dataflow_path, false).await?; + let result = Daemon::run_dataflow(&dataflow_path, + dataflow_session.build_id, dataflow_session.local_build, dataflow_session.session_id, false).await?; handle_dataflow_result(result, None) } None => { @@ -632,114 +580,6 @@ fn run(args: Args) -> eyre::Result<()> { Ok(()) } -fn start_dataflow( - dataflow: String, - name: Option, - coordinator_socket: SocketAddr, - uv: bool, - build_only: bool, -) -> Result<(PathBuf, Descriptor, Box, Uuid), eyre::Error> { - let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; - let dataflow_descriptor = - Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; - let working_dir = dataflow - .canonicalize() - .context("failed to canonicalize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? - .to_owned(); - let mut session = connect_to_coordinator(coordinator_socket) - .wrap_err("failed to connect to dora coordinator")?; - let dataflow_id = { - let dataflow = dataflow_descriptor.clone(); - let session: &mut TcpRequestReplyConnection = &mut *session; - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::Start { - dataflow, - name, - local_working_dir: working_dir, - uv, - build_only, - }) - .unwrap(), - ) - .wrap_err("failed to send start dataflow message")?; - - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStartTriggered { uuid } => { - if build_only { - eprintln!("dataflow build triggered"); - } else { - eprintln!("dataflow start triggered: {uuid}"); - } - uuid - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - } - }; - Ok((dataflow, dataflow_descriptor, session, dataflow_id)) -} - -fn wait_until_dataflow_started( - dataflow_id: Uuid, - session: &mut Box, - build_only: bool, - coordinator_addr: SocketAddr, - log_level: log::LevelFilter, -) -> eyre::Result<()> { - // subscribe to log messages - let mut log_session = TcpConnection { - stream: TcpStream::connect(coordinator_addr) - .wrap_err("failed to connect to dora coordinator")?, - }; - log_session - .send( - &serde_json::to_vec(&ControlRequest::LogSubscribe { - dataflow_id, - level: log_level, - }) - .wrap_err("failed to serialize message")?, - ) - .wrap_err("failed to send log subscribe request to coordinator")?; - std::thread::spawn(move || { - while let Ok(raw) = log_session.receive() { - let parsed: eyre::Result = - serde_json::from_slice(&raw).context("failed to parse log message"); - match parsed { - Ok(log_message) => { - print_log_message(log_message); - } - Err(err) => { - tracing::warn!("failed to parse log message: {err:?}") - } - } - } - }); - - let reply_raw = session - .request(&serde_json::to_vec(&ControlRequest::WaitForSpawn { dataflow_id }).unwrap()) - .wrap_err("failed to send start dataflow message")?; - - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowSpawned { uuid } => { - if build_only { - eprintln!("dataflow build finished"); - } else { - eprintln!("dataflow started: {uuid}"); - } - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - } - Ok(()) -} - fn stop_dataflow_interactive( grace_duration: Option, session: &mut TcpRequestReplyConnection, @@ -890,6 +730,8 @@ use pyo3::{ wrap_pyfunction, Bound, PyResult, Python, }; +use crate::session::DataflowSession; + #[cfg(feature = "python")] #[pyfunction] fn py_main(_py: Python) -> PyResult<()> { diff --git a/binaries/cli/src/output.rs b/binaries/cli/src/output.rs new file mode 100644 index 00000000..ad35ad67 --- /dev/null +++ b/binaries/cli/src/output.rs @@ -0,0 +1,48 @@ +use colored::Colorize; +use dora_message::common::LogMessage; + +pub fn print_log_message(log_message: LogMessage) { + let LogMessage { + build_id, + dataflow_id, + node_id, + daemon_id, + level, + target, + module_path: _, + file: _, + line: _, + message, + } = log_message; + let level = match level { + log::Level::Error => "ERROR".red(), + log::Level::Warn => "WARN ".yellow(), + log::Level::Info => "INFO ".green(), + other => format!("{other:5}").normal(), + }; + let dataflow = if let Some(dataflow_id) = dataflow_id { + format!(" dataflow `{dataflow_id}`").cyan() + } else { + String::new().cyan() + }; + let build = if let Some(build_id) = build_id { + format!(" build `{build_id}`").cyan() + } else { + String::new().cyan() + }; + let daemon = match daemon_id { + Some(id) => format!(" on daemon `{id}`"), + None => " on default daemon".to_string(), + } + .bright_black(); + let node = match node_id { + Some(node_id) => format!(" {node_id}").bold(), + None => "".normal(), + }; + let target = match target { + Some(target) => format!(" {target}").dimmed(), + None => "".normal(), + }; + + println!("{level}{build}{dataflow}{daemon}{node}{target}: {message}"); +} diff --git a/binaries/cli/src/session.rs b/binaries/cli/src/session.rs new file mode 100644 index 00000000..29609e54 --- /dev/null +++ b/binaries/cli/src/session.rs @@ -0,0 +1,73 @@ +use std::{ + collections::BTreeMap, + path::{Path, PathBuf}, +}; + +use dora_core::build::BuildInfo; +use dora_message::{common::GitSource, id::NodeId, BuildId, SessionId}; +use eyre::{Context, ContextCompat}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DataflowSession { + pub build_id: Option, + pub session_id: SessionId, + pub git_sources: BTreeMap, + pub local_build: Option, +} + +impl Default for DataflowSession { + fn default() -> Self { + Self { + build_id: None, + session_id: SessionId::generate(), + git_sources: Default::default(), + local_build: Default::default(), + } + } +} + +impl DataflowSession { + pub fn read_session(dataflow_path: &Path) -> eyre::Result { + let session_file = session_file_path(dataflow_path)?; + if session_file.exists() { + if let Ok(parsed) = deserialize(&session_file) { + return Ok(parsed); + } else { + tracing::warn!("failed to read dataflow session file, regenerating (you might need to run `dora build` again)"); + } + } + + let default_session = DataflowSession::default(); + default_session.write_out_for_dataflow(dataflow_path)?; + Ok(default_session) + } + + pub fn write_out_for_dataflow(&self, dataflow_path: &Path) -> eyre::Result<()> { + let session_file = session_file_path(dataflow_path)?; + std::fs::write(session_file, self.serialize()?) + .context("failed to write dataflow session file")?; + Ok(()) + } + + fn serialize(&self) -> eyre::Result { + serde_yaml::to_string(&self).context("failed to serialize dataflow session file") + } +} + +fn deserialize(session_file: &Path) -> eyre::Result { + std::fs::read_to_string(&session_file) + .context("failed to read DataflowSession file") + .and_then(|s| { + serde_yaml::from_str(&s).context("failed to deserialize DataflowSession file") + }) +} + +fn session_file_path(dataflow_path: &Path) -> eyre::Result { + let file_stem = dataflow_path + .file_stem() + .wrap_err("dataflow path has no file stem")? + .to_str() + .wrap_err("dataflow file stem is not valid utf-8")?; + let session_file = dataflow_path.with_file_name(format!("{file_stem}.dora-session.yaml")); + Ok(session_file) +} diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index c0e92417..446233a8 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -2,7 +2,9 @@ use crate::{ tcp_utils::{tcp_receive, tcp_send}, Event, }; -use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; +use dora_message::{ + cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply, BuildId, +}; use eyre::{eyre, Context}; use futures::{ future::{self, Either}, @@ -79,6 +81,7 @@ async fn handle_requests( tx: mpsc::Sender, _finish_tx: mpsc::Sender<()>, ) { + let peer_addr = connection.peer_addr().ok(); loop { let next_request = tcp_receive(&mut connection).map(Either::Left); let coordinator_stopped = tx.closed().map(Either::Right); @@ -114,11 +117,29 @@ async fn handle_requests( break; } - let result = match request { + if let Ok(ControlRequest::BuildLogSubscribe { build_id, level }) = request { + let _ = tx + .send(ControlEvent::BuildLogSubscribe { + build_id, + level, + connection, + }) + .await; + break; + } + + let mut result = match request { Ok(request) => handle_request(request, &tx).await, Err(err) => Err(err), }; + if let Ok(ControlRequestReply::CliAndDefaultDaemonIps { cli, .. }) = &mut result { + if cli.is_none() { + // fill cli IP address in reply + *cli = peer_addr.map(|s| s.ip()); + } + } + let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err:?}"))); let serialized: Vec = match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") { @@ -179,6 +200,11 @@ pub enum ControlEvent { level: log::LevelFilter, connection: TcpStream, }, + BuildLogSubscribe { + build_id: BuildId, + level: log::LevelFilter, + connection: TcpStream, + }, Error(eyre::Report), } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 1302662c..7f5f7d3f 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -5,22 +5,27 @@ use crate::{ pub use control::ControlEvent; use dora_core::{ config::{NodeId, OperatorId}, + descriptor::DescriptorExt, uhlc::{self, HLC}, }; use dora_message::{ cli_to_coordinator::ControlRequest, - common::DaemonId, + common::{DaemonId, GitSource}, coordinator_to_cli::{ ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult, DataflowStatus, LogLevel, LogMessage, }, - coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped}, + coordinator_to_daemon::{ + BuildDataflowNodes, DaemonCoordinatorEvent, RegisterResult, Timestamped, + }, daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult}, descriptor::{Descriptor, ResolvedNode}, + BuildId, DataflowId, SessionId, }; use eyre::{bail, eyre, ContextCompat, Result, WrapErr}; use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt}; use futures_concurrency::stream::Merge; +use itertools::Itertools; use log_subscriber::LogSubscriber; use run::SpawnedDataflow; use std::{ @@ -139,6 +144,10 @@ impl DaemonConnections { } } + fn get(&self, id: &DaemonId) -> Option<&DaemonConnection> { + self.daemons.get(id) + } + fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> { self.daemons.get_mut(id) } @@ -194,10 +203,12 @@ async fn start_inner( let mut events = (abortable_events, daemon_events).merge(); - let mut running_dataflows: HashMap = HashMap::new(); - let mut dataflow_results: HashMap> = + let mut running_builds: HashMap = HashMap::new(); + + let mut running_dataflows: HashMap = HashMap::new(); + let mut dataflow_results: HashMap> = HashMap::new(); - let mut archived_dataflows: HashMap = HashMap::new(); + let mut archived_dataflows: HashMap = HashMap::new(); let mut daemon_connections = DaemonConnections::default(); while let Some(event) = events.next().await { @@ -351,9 +362,10 @@ async fn start_inner( let mut finished_dataflow = entry.remove(); let dataflow_id = finished_dataflow.uuid; send_log_message( - &mut finished_dataflow, + &mut finished_dataflow.log_subscribers, &LogMessage { - dataflow_id, + build_id: None, + dataflow_id: Some(dataflow_id), node_id: None, daemon_id: None, level: LogLevel::Info, @@ -380,7 +392,7 @@ async fn start_inner( } if !matches!( finished_dataflow.spawn_result, - SpawnResult::Spawned { .. } + CachedResult::Cached { .. } ) { log::error!("pending spawn result on dataflow finish"); } @@ -399,12 +411,56 @@ async fn start_inner( reply_sender, } => { match request { + ControlRequest::Build { + session_id, + dataflow, + git_sources, + prev_git_sources, + local_working_dir, + uv, + } => { + // assign a random build id + let build_id = BuildId::generate(); + + let result = build_dataflow( + build_id, + session_id, + dataflow, + git_sources, + prev_git_sources, + local_working_dir, + &clock, + uv, + &mut daemon_connections, + ) + .await; + match result { + Ok(build) => { + running_builds.insert(build_id, build); + let _ = reply_sender.send(Ok( + ControlRequestReply::DataflowBuildTriggered { build_id }, + )); + } + Err(err) => { + let _ = reply_sender.send(Err(err)); + } + } + } + ControlRequest::WaitForBuild { build_id } => { + if let Some(build) = running_builds.get_mut(&build_id) { + build.build_result.register(reply_sender); + } else { + let _ = + reply_sender.send(Err(eyre!("unknown build id {build_id}"))); + } + } ControlRequest::Start { + build_id, + session_id, dataflow, name, local_working_dir, uv, - build_only, } => { let name = name.or_else(|| names::Generator::default().next()); @@ -419,13 +475,14 @@ async fn start_inner( } } let dataflow = start_dataflow( + build_id, + session_id, dataflow, local_working_dir, name, &mut daemon_connections, &clock, uv, - build_only, ) .await?; Ok(dataflow) @@ -652,6 +709,27 @@ async fn start_inner( "LogSubscribe request should be handled separately" ))); } + ControlRequest::BuildLogSubscribe { .. } => { + let _ = reply_sender.send(Err(eyre::eyre!( + "BuildLogSubscribe request should be handled separately" + ))); + } + ControlRequest::CliAndDefaultDaemonOnSameMachine => { + let mut default_daemon_ip = None; + if let Some(default_id) = daemon_connections.unnamed().next() { + if let Some(connection) = daemon_connections.get(default_id) { + if let Ok(addr) = connection.stream.peer_addr() { + default_daemon_ip = Some(addr.ip()); + } + } + } + let _ = reply_sender.send(Ok( + ControlRequestReply::CliAndDefaultDaemonIps { + default_daemon: default_daemon_ip, + cli: None, // filled later + }, + )); + } } } ControlEvent::Error(err) => tracing::error!("{err:?}"), @@ -666,6 +744,17 @@ async fn start_inner( .push(LogSubscriber::new(level, connection)); } } + ControlEvent::BuildLogSubscribe { + build_id, + level, + connection, + } => { + if let Some(build) = running_builds.get_mut(&build_id) { + build + .log_subscribers + .push(LogSubscriber::new(level, connection)); + } + } }, Event::DaemonHeartbeatInterval => { let mut disconnected = BTreeSet::new(); @@ -721,14 +810,52 @@ async fn start_inner( } } Event::Log(message) => { - if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) { - send_log_message(dataflow, &message).await; + if let Some(dataflow_id) = &message.dataflow_id { + if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) { + send_log_message(&mut dataflow.log_subscribers, &message).await; + } + } + if let Some(build_id) = message.build_id { + if let Some(build) = running_builds.get_mut(&build_id) { + send_log_message(&mut build.log_subscribers, &message).await; + } } } Event::DaemonExit { daemon_id } => { tracing::info!("Daemon `{daemon_id}` exited"); daemon_connections.remove(&daemon_id); } + Event::DataflowBuildResult { + build_id, + daemon_id, + result, + } => match running_builds.get_mut(&build_id) { + Some(build) => { + build.pending_build_results.remove(&daemon_id); + match result { + Ok(()) => {} + Err(err) => { + build.errors.push(format!("{err:?}")); + } + }; + if build.pending_build_results.is_empty() { + tracing::info!("dataflow build finished: `{build_id}`"); + let mut build = running_builds.remove(&build_id).unwrap(); + let result = if build.errors.is_empty() { + Ok(()) + } else { + Err(format!("build failed: {}", build.errors.join("\n\n"))) + }; + + build.build_result.set_result(Ok( + ControlRequestReply::DataflowBuildFinished { build_id, result }, + )); + } + } + None => { + tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"); + } + }, Event::DataflowSpawnResult { dataflow_id, daemon_id, @@ -739,21 +866,10 @@ async fn start_inner( match result { Ok(()) => { if dataflow.pending_spawn_results.is_empty() { - tracing::info!( - "successfully {} dataflow `{dataflow_id}`", - if dataflow.build_only { - "built" - } else { - "spawned" - } - ); + tracing::info!("successfully spawned dataflow `{dataflow_id}`",); dataflow.spawn_result.set_result(Ok( ControlRequestReply::DataflowSpawned { uuid: dataflow_id }, )); - - if dataflow.build_only { - running_dataflows.remove(&dataflow_id); - } } } Err(err) => { @@ -783,8 +899,8 @@ async fn start_inner( Ok(()) } -async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) { - for subscriber in &mut dataflow.log_subscribers { +async fn send_log_message(log_subscribers: &mut Vec, message: &LogMessage) { + for subscriber in log_subscribers.iter_mut() { let send_result = tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message)); @@ -792,7 +908,7 @@ async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) subscriber.close(); } } - dataflow.log_subscribers.retain(|s| !s.is_closed()); + log_subscribers.retain(|s| !s.is_closed()); } fn dataflow_result( @@ -859,6 +975,15 @@ async fn send_heartbeat_message( .wrap_err("failed to send heartbeat message to daemon") } +struct RunningBuild { + errors: Vec, + build_result: CachedResult, + + log_subscribers: Vec, + + pending_build_results: BTreeSet, +} + struct RunningDataflow { name: Option, uuid: Uuid, @@ -869,26 +994,24 @@ struct RunningDataflow { exited_before_subscribe: Vec, nodes: BTreeMap, - spawn_result: SpawnResult, + spawn_result: CachedResult, stop_reply_senders: Vec>>, log_subscribers: Vec, pending_spawn_results: BTreeSet, - - build_only: bool, } -pub enum SpawnResult { +pub enum CachedResult { Pending { result_senders: Vec>>, }, - Spawned { + Cached { result: eyre::Result, }, } -impl Default for SpawnResult { +impl Default for CachedResult { fn default() -> Self { Self::Pending { result_senders: Vec::new(), @@ -896,14 +1019,14 @@ impl Default for SpawnResult { } } -impl SpawnResult { +impl CachedResult { fn register( &mut self, reply_sender: tokio::sync::oneshot::Sender>, ) { match self { - SpawnResult::Pending { result_senders } => result_senders.push(reply_sender), - SpawnResult::Spawned { result } => { + CachedResult::Pending { result_senders } => result_senders.push(reply_sender), + CachedResult::Cached { result } => { Self::send_result_to(result, reply_sender); } } @@ -911,13 +1034,13 @@ impl SpawnResult { fn set_result(&mut self, result: eyre::Result) { match self { - SpawnResult::Pending { result_senders } => { + CachedResult::Pending { result_senders } => { for sender in result_senders.drain(..) { Self::send_result_to(&result, sender); } - *self = SpawnResult::Spawned { result }; + *self = CachedResult::Cached { result }; } - SpawnResult::Spawned { .. } => {} + CachedResult::Cached { .. } => {} } } @@ -1123,26 +1246,135 @@ async fn retrieve_logs( reply_logs.map_err(|err| eyre!(err)) } +#[allow(clippy::too_many_arguments)] +#[tracing::instrument(skip(daemon_connections, clock))] +async fn build_dataflow( + build_id: BuildId, + session_id: SessionId, + dataflow: Descriptor, + git_sources: BTreeMap, + prev_git_sources: BTreeMap, + local_working_dir: Option, + clock: &HLC, + uv: bool, + daemon_connections: &mut DaemonConnections, +) -> eyre::Result { + let nodes = dataflow.resolve_aliases_and_set_defaults()?; + + let mut git_sources_by_daemon = git_sources + .into_iter() + .into_grouping_map_by(|(id, _)| nodes.get(id).and_then(|n| n.deploy.machine.as_ref())) + .collect(); + let mut prev_git_sources_by_daemon = prev_git_sources + .into_iter() + .into_grouping_map_by(|(id, _)| nodes.get(id).and_then(|n| n.deploy.machine.as_ref())) + .collect(); + + let nodes_by_daemon = nodes.values().into_group_map_by(|n| &n.deploy.machine); + + let mut daemons = BTreeSet::new(); + for (machine, nodes_on_machine) in &nodes_by_daemon { + let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect(); + tracing::debug!( + "Running dataflow build `{session_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})" + ); + + let build_command = BuildDataflowNodes { + build_id, + session_id, + local_working_dir: local_working_dir.clone(), + git_sources: git_sources_by_daemon + .remove(&machine.as_ref()) + .unwrap_or_default(), + prev_git_sources: prev_git_sources_by_daemon + .remove(&machine.as_ref()) + .unwrap_or_default(), + dataflow_descriptor: dataflow.clone(), + nodes_on_machine, + uv, + }; + let message = serde_json::to_vec(&Timestamped { + inner: DaemonCoordinatorEvent::Build(build_command), + timestamp: clock.new_timestamp(), + })?; + + let daemon_id = build_dataflow_on_machine(daemon_connections, machine.as_deref(), &message) + .await + .wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?; + daemons.insert(daemon_id); + } + + tracing::info!("successfully triggered dataflow build `{session_id}`",); + + Ok(RunningBuild { + errors: Vec::new(), + build_result: CachedResult::default(), + log_subscribers: Vec::new(), + pending_build_results: daemons, + }) +} + +async fn build_dataflow_on_machine( + daemon_connections: &mut DaemonConnections, + machine: Option<&str>, + message: &[u8], +) -> Result { + let daemon_id = match machine { + Some(machine) => daemon_connections + .get_matching_daemon_id(machine) + .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))? + .clone(), + None => daemon_connections + .unnamed() + .next() + .wrap_err("no unnamed daemon connections")? + .clone(), + }; + + let daemon_connection = daemon_connections + .get_mut(&daemon_id) + .wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?; + tcp_send(&mut daemon_connection.stream, message) + .await + .wrap_err("failed to send build message to daemon")?; + + let reply_raw = tcp_receive(&mut daemon_connection.stream) + .await + .wrap_err("failed to receive build reply from daemon")?; + match serde_json::from_slice(&reply_raw) + .wrap_err("failed to deserialize build reply from daemon")? + { + DaemonCoordinatorReply::TriggerBuildResult(result) => result + .map_err(|e| eyre!(e)) + .wrap_err("daemon returned an error")?, + _ => bail!("unexpected reply"), + } + Ok(daemon_id) +} + +#[allow(clippy::too_many_arguments)] async fn start_dataflow( + build_id: Option, + session_id: SessionId, dataflow: Descriptor, - working_dir: PathBuf, + local_working_dir: Option, name: Option, daemon_connections: &mut DaemonConnections, clock: &HLC, uv: bool, - build_only: bool, ) -> eyre::Result { let SpawnedDataflow { uuid, daemons, nodes, } = spawn_dataflow( + build_id, + session_id, dataflow, - working_dir, + local_working_dir, daemon_connections, clock, uv, - build_only, ) .await?; Ok(RunningDataflow { @@ -1156,11 +1388,10 @@ async fn start_dataflow( exited_before_subscribe: Default::default(), daemons: daemons.clone(), nodes, - spawn_result: SpawnResult::default(), + spawn_result: CachedResult::default(), stop_reply_senders: Vec::new(), log_subscribers: Vec::new(), pending_spawn_results: daemons, - build_only, }) } @@ -1235,6 +1466,11 @@ pub enum Event { DaemonExit { daemon_id: dora_message::common::DaemonId, }, + DataflowBuildResult { + build_id: BuildId, + daemon_id: DaemonId, + result: eyre::Result<()>, + }, DataflowSpawnResult { dataflow_id: uuid::Uuid, daemon_id: DaemonId, @@ -1264,6 +1500,7 @@ impl Event { Event::CtrlC => "CtrlC", Event::Log(_) => "Log", Event::DaemonExit { .. } => "DaemonExit", + Event::DataflowBuildResult { .. } => "DataflowBuildResult", Event::DataflowSpawnResult { .. } => "DataflowSpawnResult", } } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 39e17bca..ab7e3b9d 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -112,6 +112,16 @@ pub async fn handle_connection( break; } } + DaemonEvent::BuildResult { build_id, result } => { + let event = Event::DataflowBuildResult { + build_id, + daemon_id, + result: result.map_err(|err| eyre::eyre!(err)), + }; + if events_tx.send(event).await.is_err() { + break; + } + } DaemonEvent::SpawnResult { dataflow_id, result, diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 425f0213..ca89fb87 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -10,6 +10,7 @@ use dora_message::{ daemon_to_coordinator::DaemonCoordinatorReply, descriptor::{Descriptor, ResolvedNode}, id::NodeId, + BuildId, SessionId, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use itertools::Itertools; @@ -21,12 +22,13 @@ use uuid::{NoContext, Timestamp, Uuid}; #[tracing::instrument(skip(daemon_connections, clock))] pub(super) async fn spawn_dataflow( + build_id: Option, + session_id: SessionId, dataflow: Descriptor, - working_dir: PathBuf, + local_working_dir: Option, daemon_connections: &mut DaemonConnections, clock: &HLC, uv: bool, - build_only: bool, ) -> eyre::Result { let nodes = dataflow.resolve_aliases_and_set_defaults()?; let uuid = Uuid::new_v7(Timestamp::now(NoContext)); @@ -37,18 +39,18 @@ pub(super) async fn spawn_dataflow( for (machine, nodes_on_machine) in &nodes_by_daemon { let spawn_nodes = nodes_on_machine.iter().map(|n| n.id.clone()).collect(); tracing::debug!( - "{} dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})", - if build_only { "Building" } else { "Spawning" } + "Spawning dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})" ); let spawn_command = SpawnDataflowNodes { + build_id, + session_id, dataflow_id: uuid, - working_dir: working_dir.clone(), + local_working_dir: local_working_dir.clone(), nodes: nodes.clone(), dataflow_descriptor: dataflow.clone(), spawn_nodes, uv, - build_only, }; let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::Spawn(spawn_command), @@ -57,19 +59,11 @@ pub(super) async fn spawn_dataflow( let daemon_id = spawn_dataflow_on_machine(daemon_connections, machine.as_deref(), &message) .await - .wrap_err_with(|| { - format!( - "failed to {} dataflow on machine `{machine:?}`", - if build_only { "build" } else { "spawn" } - ) - })?; + .wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?; daemons.insert(daemon_id); } - tracing::info!( - "successfully triggered dataflow {} `{uuid}`", - if build_only { "build" } else { "spawn" } - ); + tracing::info!("successfully triggered dataflow spawn `{uuid}`",); Ok(SpawnedDataflow { uuid, diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 6b9f7381..fdfd3596 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -45,5 +45,6 @@ crossbeam = "0.8.4" crossbeam-skiplist = "0.1.3" zenoh = "1.1.1" url = "2.5.4" -git2 = { version = "0.18.0", features = ["vendored-openssl"] } +git2 = { workspace = true } dunce = "1.0.5" +itertools = "0.14" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 364ca6ae..f7a16c9a 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -2,6 +2,7 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; use crossbeam::queue::ArrayQueue; use dora_core::{ + build::{self, BuildInfo, GitManager}, config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId}, descriptor::{ read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode, @@ -12,10 +13,11 @@ use dora_core::{ }; use dora_message::{ common::{ - DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus, + DaemonId, DataMessage, DropToken, GitSource, LogLevel, NodeError, NodeErrorCause, + NodeExitStatus, }, coordinator_to_cli::DataflowResult, - coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes}, + coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes}, daemon_to_coordinator::{ CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult, }, @@ -24,7 +26,7 @@ use dora_message::{ descriptor::NodeSource, metadata::{self, ArrowTypeInfo}, node_to_daemon::{DynamicNodeEvent, Timestamped}, - DataflowId, + BuildId, DataflowId, SessionId, }; use dora_node_api::{arrow::datatypes::DataType, Parameter}; use eyre::{bail, eyre, Context, ContextCompat, Result}; @@ -38,6 +40,7 @@ use socket_stream_utils::socket_stream_send; use spawn::Spawner; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, + env::current_dir, future::Future, net::SocketAddr, path::{Path, PathBuf}, @@ -101,12 +104,14 @@ pub struct Daemon { logger: DaemonLogger, - repos_in_use: BTreeMap>, + sessions: BTreeMap, + builds: BTreeMap, + git_manager: GitManager, } type DaemonRunResult = BTreeMap>>; -struct NodePrepareTask { +struct NodeBuildTask { node_id: NodeId, dynamic_node: bool, task: F, @@ -148,12 +153,19 @@ impl Daemon { None, clock, Some(remote_daemon_events_tx), + Default::default(), ) .await .map(|_| ()) } - pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result { + pub async fn run_dataflow( + dataflow_path: &Path, + build_id: Option, + local_build: Option, + session_id: SessionId, + uv: bool, + ) -> eyre::Result { let working_dir = dataflow_path .canonicalize() .context("failed to canonicalize dataflow path")? @@ -167,13 +179,14 @@ impl Daemon { let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext)); let spawn_command = SpawnDataflowNodes { + build_id, + session_id, dataflow_id, - working_dir, + local_working_dir: Some(working_dir), spawn_nodes: nodes.keys().cloned().collect(), nodes, dataflow_descriptor: descriptor, uv, - build_only: false, }; let clock = Arc::new(HLC::default()); @@ -204,6 +217,16 @@ impl Daemon { Some(exit_when_done), clock.clone(), None, + if let Some(local_build) = local_build { + let Some(build_id) = build_id else { + bail!("no build_id, but local_build set") + }; + let mut builds = BTreeMap::new(); + builds.insert(build_id, local_build); + builds + } else { + Default::default() + }, ); let spawn_result = reply_rx @@ -235,6 +258,7 @@ impl Daemon { exit_when_done: Option>, clock: Arc, remote_daemon_events_tx: Option>>>, + builds: BTreeMap, ) -> eyre::Result { let coordinator_connection = match coordinator_addr { Some(addr) => { @@ -298,7 +322,9 @@ impl Daemon { clock, zenoh_session, remote_daemon_events_tx, - repos_in_use: Default::default(), + git_manager: Default::default(), + builds, + sessions: Default::default(), }; let dora_events = ReceiverStream::new(dora_events_rx); @@ -418,14 +444,41 @@ impl Daemon { .await?; } }, + Event::BuildDataflowResult { + build_id, + session_id, + result, + } => { + let (build_info, result) = match result { + Ok(build_info) => (Some(build_info), Ok(())), + Err(err) => (None, Err(err)), + }; + if let Some(build_info) = build_info { + self.builds.insert(build_id, build_info); + if let Some(old_build_id) = self.sessions.insert(session_id, build_id) { + self.builds.remove(&old_build_id); + } + } + if let Some(connection) = &mut self.coordinator_connection { + let msg = serde_json::to_vec(&Timestamped { + inner: CoordinatorRequest::Event { + daemon_id: self.daemon_id.clone(), + event: DaemonEvent::BuildResult { + build_id, + result: result.map_err(|err| format!("{err:?}")), + }, + }, + timestamp: self.clock.new_timestamp(), + })?; + socket_stream_send(connection, &msg).await.wrap_err( + "failed to send BuildDataflowResult message to dora-coordinator", + )?; + } + } Event::SpawnDataflowResult { dataflow_id, result, - build_only, } => { - if build_only { - self.running.remove(&dataflow_id); - } if let Some(connection) = &mut self.coordinator_connection { let msg = serde_json::to_vec(&Timestamped { inner: CoordinatorRequest::Event { @@ -437,9 +490,9 @@ impl Daemon { }, timestamp: self.clock.new_timestamp(), })?; - socket_stream_send(connection, &msg) - .await - .wrap_err("failed to send Exit message to dora-coordinator")?; + socket_stream_send(connection, &msg).await.wrap_err( + "failed to send SpawnDataflowResult message to dora-coordinator", + )?; } } } @@ -476,35 +529,93 @@ impl Daemon { reply_tx: Sender>, ) -> eyre::Result { let status = match event { + DaemonCoordinatorEvent::Build(BuildDataflowNodes { + build_id, + session_id, + local_working_dir, + git_sources, + prev_git_sources, + dataflow_descriptor, + nodes_on_machine, + uv, + }) => { + match dataflow_descriptor.communication.remote { + dora_core::config::RemoteCommunicationConfig::Tcp => {} + } + + let base_working_dir = self.base_working_dir(local_working_dir, session_id)?; + + let result = self + .build_dataflow( + build_id, + session_id, + base_working_dir, + git_sources, + prev_git_sources, + dataflow_descriptor, + nodes_on_machine, + uv, + ) + .await; + let (trigger_result, result_task) = match result { + Ok(result_task) => (Ok(()), Some(result_task)), + Err(err) => (Err(format!("{err:?}")), None), + }; + let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result); + let _ = reply_tx.send(Some(reply)).map_err(|_| { + error!("could not send `TriggerBuildResult` reply from daemon to coordinator") + }); + + let result_tx = self.events_tx.clone(); + let clock = self.clock.clone(); + if let Some(result_task) = result_task { + tokio::spawn(async move { + let message = Timestamped { + inner: Event::BuildDataflowResult { + build_id, + session_id, + result: result_task.await, + }, + timestamp: clock.new_timestamp(), + }; + let _ = result_tx + .send(message) + .map_err(|_| { + error!( + "could not send `BuildResult` reply from daemon to coordinator" + ) + }) + .await; + }); + } + + RunStatus::Continue + } DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { + build_id, + session_id, dataflow_id, - working_dir, + local_working_dir, nodes, dataflow_descriptor, spawn_nodes, uv, - build_only, }) => { match dataflow_descriptor.communication.remote { dora_core::config::RemoteCommunicationConfig::Tcp => {} } - // Use the working directory if it exists, otherwise use the working directory where the daemon is spawned - let working_dir = if working_dir.exists() { - working_dir - } else { - std::env::current_dir().wrap_err("failed to get current working dir")? - }; + let base_working_dir = self.base_working_dir(local_working_dir, session_id)?; let result = self .spawn_dataflow( + build_id, dataflow_id, - working_dir, + base_working_dir, nodes, dataflow_descriptor, spawn_nodes, uv, - build_only, ) .await; let (trigger_result, result_task) = match result { @@ -524,7 +635,6 @@ impl Daemon { inner: Event::SpawnDataflowResult { dataflow_id, result: result_task.await, - build_only, }, timestamp: clock.new_timestamp(), }; @@ -770,15 +880,100 @@ impl Daemon { } } + #[allow(clippy::too_many_arguments)] + async fn build_dataflow( + &mut self, + build_id: BuildId, + session_id: SessionId, + base_working_dir: PathBuf, + git_sources: BTreeMap, + prev_git_sources: BTreeMap, + dataflow_descriptor: Descriptor, + local_nodes: BTreeSet, + uv: bool, + ) -> eyre::Result>> { + let builder = build::Builder { + session_id, + base_working_dir, + uv, + }; + let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?; + + let mut tasks = Vec::new(); + + // build nodes + for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) { + let dynamic_node = node.kind.dynamic(); + + let node_id = node.id.clone(); + let mut logger = self.logger.for_node_build(build_id, node_id.clone()); + logger.log(LogLevel::Info, "building").await; + let git_source = git_sources.get(&node_id).cloned(); + let prev_git_source = prev_git_sources.get(&node_id).cloned(); + + let logger_cloned = logger + .try_clone_impl() + .await + .wrap_err("failed to clone logger")?; + + match builder + .clone() + .build_node( + node, + git_source, + prev_git_source, + logger_cloned, + &mut self.git_manager, + ) + .await + .wrap_err_with(|| format!("failed to build node `{node_id}`")) + { + Ok(result) => { + tasks.push(NodeBuildTask { + node_id, + task: result, + dynamic_node, + }); + } + Err(err) => { + logger.log(LogLevel::Error, format!("{err:?}")).await; + return Err(err); + } + } + } + + let task = async move { + let mut info = BuildInfo { + node_working_dirs: Default::default(), + }; + for task in tasks { + let NodeBuildTask { + node_id, + dynamic_node, + task, + } = task; + let node = task + .await + .with_context(|| format!("failed to build node `{node_id}`"))?; + info.node_working_dirs + .insert(node_id, node.node_working_dir); + } + Ok(info) + }; + + Ok(task) + } + + #[allow(clippy::too_many_arguments)] async fn spawn_dataflow( &mut self, - dataflow_id: uuid::Uuid, - working_dir: PathBuf, + build_id: Option, + dataflow_id: DataflowId, + base_working_dir: PathBuf, nodes: BTreeMap, dataflow_descriptor: Descriptor, spawn_nodes: BTreeSet, uv: bool, - build_only: bool, ) -> eyre::Result>> { let mut logger = self .logger @@ -790,7 +985,8 @@ impl Daemon { RunningDataflow::new(dataflow_id, self.daemon_id.clone(), &dataflow_descriptor); let dataflow = match self.running.entry(dataflow_id) { std::collections::hash_map::Entry::Vacant(entry) => { - self.working_dir.insert(dataflow_id, working_dir.clone()); + self.working_dir + .insert(dataflow_id, base_working_dir.clone()); entry.insert(dataflow) } std::collections::hash_map::Entry::Occupied(_) => { @@ -800,6 +996,11 @@ impl Daemon { let mut stopped = Vec::new(); + let node_working_dirs = build_id + .and_then(|build_id| self.builds.get(&build_id)) + .map(|info| info.node_working_dirs.clone()) + .unwrap_or_default(); + // calculate info about mappings for node in nodes.values() { let local = spawn_nodes.contains(&node.id); @@ -838,12 +1039,10 @@ impl Daemon { let spawner = Spawner { dataflow_id, - working_dir, daemon_tx: self.events_tx.clone(), dataflow_descriptor, clock: self.clock.clone(), uv, - build_only, }; let mut tasks = Vec::new(); @@ -869,19 +1068,18 @@ impl Daemon { logger .log(LogLevel::Info, Some("daemon".into()), "spawning") .await; + let node_working_dir = node_working_dirs + .get(&node_id) + .unwrap_or(&base_working_dir) + .clone(); match spawner .clone() - .prepare_node( - node, - node_stderr_most_recent, - &mut logger, - &mut self.repos_in_use, - ) + .spawn_node(node, node_working_dir, node_stderr_most_recent, &mut logger) .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) { Ok(result) => { - tasks.push(NodePrepareTask { + tasks.push(NodeBuildTask { node_id, task: result, dynamic_node, @@ -979,7 +1177,7 @@ impl Daemon { async fn spawn_prepared_nodes( dataflow_id: Uuid, mut logger: DataflowLogger<'_>, - tasks: Vec>>>, + tasks: Vec>>>, events_tx: mpsc::Sender>, clock: Arc, ) -> eyre::Result<()> { @@ -995,7 +1193,7 @@ impl Daemon { let mut failed_to_prepare = None; let mut prepared_nodes = Vec::new(); for task in tasks { - let NodePrepareTask { + let NodeBuildTask { node_id, dynamic_node, task, @@ -1567,9 +1765,12 @@ impl Daemon { .clone(), }; - self.repos_in_use.values_mut().for_each(|dataflows| { - dataflows.remove(&dataflow_id); - }); + self.git_manager + .clones_in_use + .values_mut() + .for_each(|dataflows| { + dataflows.remove(&dataflow_id); + }); logger .log( @@ -1799,6 +2000,34 @@ impl Daemon { } Ok(RunStatus::Continue) } + + fn base_working_dir( + &self, + local_working_dir: Option, + session_id: SessionId, + ) -> eyre::Result { + match local_working_dir { + Some(working_dir) => { + // check that working directory exists + if working_dir.exists() { + Ok(working_dir) + } else { + bail!( + "working directory does not exist: {}", + working_dir.display(), + ) + } + } + None => { + // use subfolder of daemon working dir + let daemon_working_dir = + current_dir().context("failed to get daemon working dir")?; + Ok(daemon_working_dir + .join("_work") + .join(session_id.to_string())) + } + } + } } async fn set_up_event_stream( @@ -2272,10 +2501,14 @@ pub enum Event { dynamic_node: bool, result: Result, }, + BuildDataflowResult { + build_id: BuildId, + session_id: SessionId, + result: eyre::Result, + }, SpawnDataflowResult { dataflow_id: Uuid, result: eyre::Result<()>, - build_only: bool, }, } @@ -2298,6 +2531,7 @@ impl Event { Event::SecondCtrlC => "SecondCtrlC", Event::DaemonError(_) => "DaemonError", Event::SpawnNodeResult { .. } => "SpawnNodeResult", + Event::BuildDataflowResult { .. } => "BuildDataflowResult", Event::SpawnDataflowResult { .. } => "SpawnDataflowResult", } } diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs index 26488361..c5fe171a 100644 --- a/binaries/daemon/src/log.rs +++ b/binaries/daemon/src/log.rs @@ -4,10 +4,11 @@ use std::{ sync::Arc, }; -use dora_core::{config::NodeId, uhlc}; +use dora_core::{build::BuildLogger, config::NodeId, uhlc}; use dora_message::{ common::{DaemonId, LogLevel, LogMessage, Timestamped}, daemon_to_coordinator::{CoordinatorRequest, DaemonEvent}, + BuildId, }; use eyre::Context; use tokio::net::TcpStream; @@ -81,7 +82,7 @@ impl<'a> DataflowLogger<'a> { message: impl Into, ) { self.logger - .log(level, self.dataflow_id, node_id, target, message) + .log(level, Some(self.dataflow_id), node_id, target, message) .await } @@ -93,6 +94,44 @@ impl<'a> DataflowLogger<'a> { } } +pub struct NodeBuildLogger<'a> { + build_id: BuildId, + node_id: NodeId, + logger: CowMut<'a, DaemonLogger>, +} + +impl NodeBuildLogger<'_> { + pub async fn log(&mut self, level: LogLevel, message: impl Into) { + self.logger + .log_build(self.build_id, level, Some(self.node_id.clone()), message) + .await + } + + pub async fn try_clone_impl(&self) -> eyre::Result> { + Ok(NodeBuildLogger { + build_id: self.build_id, + node_id: self.node_id.clone(), + logger: CowMut::Owned(self.logger.try_clone().await?), + }) + } +} + +impl BuildLogger for NodeBuildLogger<'_> { + type Clone = NodeBuildLogger<'static>; + + fn log_message( + &mut self, + level: LogLevel, + message: impl Into + Send, + ) -> impl std::future::Future + Send { + self.log(level, message) + } + + fn try_clone(&self) -> impl std::future::Future> + Send { + self.try_clone_impl() + } +} + pub struct DaemonLogger { daemon_id: DaemonId, logger: Logger, @@ -106,6 +145,14 @@ impl DaemonLogger { } } + pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger { + NodeBuildLogger { + build_id, + node_id, + logger: CowMut::Borrowed(self), + } + } + pub fn inner(&self) -> &Logger { &self.logger } @@ -113,12 +160,13 @@ impl DaemonLogger { pub async fn log( &mut self, level: LogLevel, - dataflow_id: Uuid, + dataflow_id: Option, node_id: Option, target: Option, message: impl Into, ) { let message = LogMessage { + build_id: None, daemon_id: Some(self.daemon_id.clone()), dataflow_id, node_id, @@ -132,6 +180,28 @@ impl DaemonLogger { self.logger.log(message).await } + pub async fn log_build( + &mut self, + build_id: BuildId, + level: LogLevel, + node_id: Option, + message: impl Into, + ) { + let message = LogMessage { + build_id: Some(build_id), + daemon_id: Some(self.daemon_id.clone()), + dataflow_id: None, + node_id, + level, + target: Some("build".into()), + module_path: None, + file: None, + line: None, + message: message.into(), + }; + self.logger.log(message).await + } + pub(crate) fn daemon_id(&self) -> &DaemonId { &self.daemon_id } @@ -181,7 +251,8 @@ impl Logger { match message.level { LogLevel::Error => { tracing::error!( - dataflow_id = message.dataflow_id.to_string(), + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, module_path = message.module_path, @@ -193,7 +264,8 @@ impl Logger { } LogLevel::Warn => { tracing::warn!( - dataflow_id = message.dataflow_id.to_string(), + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, module_path = message.module_path, @@ -205,7 +277,8 @@ impl Logger { } LogLevel::Info => { tracing::info!( - dataflow_id = message.dataflow_id.to_string(), + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, module_path = message.module_path, @@ -217,7 +290,8 @@ impl Logger { } LogLevel::Debug => { tracing::debug!( - dataflow_id = message.dataflow_id.to_string(), + build_id = ?message.build_id.map(|id| id.to_string()), + dataflow_id = ?message.dataflow_id.map(|id| id.to_string()), node_id = ?message.node_id.map(|id| id.to_string()), target = message.target, module_path = message.module_path, diff --git a/binaries/daemon/src/spawn/git.rs b/binaries/daemon/src/spawn/git.rs deleted file mode 100644 index 9803d1f2..00000000 --- a/binaries/daemon/src/spawn/git.rs +++ /dev/null @@ -1,286 +0,0 @@ -use crate::log::NodeLogger; -use dora_message::{common::LogLevel, descriptor::GitRepoRev, DataflowId}; -use eyre::{ContextCompat, WrapErr}; -use git2::FetchOptions; -use std::{ - collections::{BTreeMap, BTreeSet}, - path::{Path, PathBuf}, -}; -use url::Url; -use uuid::Uuid; - -pub struct GitFolder { - /// The URL of the git repository. - repo_addr: String, - /// The branch, tag, or git revision to checkout. - rev: Option, - /// The directory that should contain the checked-out repository. - clone_dir: PathBuf, - /// Specifies whether an existing repo should be reused. - reuse: ReuseOptions, -} - -impl GitFolder { - pub fn choose_clone_dir( - dataflow_id: uuid::Uuid, - repo_addr: String, - rev: Option, - target_dir: &Path, - repos_in_use: &mut BTreeMap>, - ) -> eyre::Result { - let repo_url = Url::parse(&repo_addr).context("failed to parse git repository URL")?; - - let base_dir = { - let base = { - let mut path = - target_dir.join(repo_url.host_str().context("git URL has no hostname")?); - - path.extend(repo_url.path_segments().context("no path in git URL")?); - path - }; - match &rev { - None => base, - Some(rev) => match rev { - GitRepoRev::Branch(branch) => base.join("branch").join(branch), - GitRepoRev::Tag(tag) => base.join("tag").join(tag), - GitRepoRev::Rev(rev) => base.join("rev").join(rev), - }, - } - }; - let clone_dir = if clone_dir_exists(&base_dir, repos_in_use) { - let used_by_other = used_by_other_dataflow(dataflow_id, &base_dir, repos_in_use); - if used_by_other { - // don't reuse, choose new directory - // (TODO reuse if still up to date) - - let dir_name = base_dir.file_name().unwrap().to_str().unwrap(); - let mut i = 1; - loop { - let new_path = base_dir.with_file_name(format!("{dir_name}-{i}")); - if clone_dir_exists(&new_path, repos_in_use) - && used_by_other_dataflow(dataflow_id, &new_path, repos_in_use) - { - i += 1; - } else { - break new_path; - } - } - } else { - base_dir - } - } else { - base_dir - }; - let clone_dir = dunce::simplified(&clone_dir).to_owned(); - - let reuse = if clone_dir_exists(&clone_dir, repos_in_use) { - let empty = BTreeSet::new(); - let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty); - let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); - if used_by_other_dataflow { - // The directory is currently in use by another dataflow. We currently don't - // support reusing the same clone across multiple dataflow runs. Above, we - // choose a new directory if we detect such a case. So this `if` branch - // should never be reached. - eyre::bail!("clone_dir is already in use by other dataflow") - } else if in_use.is_empty() { - // The cloned repo is not used by any dataflow, so we can safely reuse it. However, - // the clone might be still on an older commit, so we need to do a `git fetch` - // before we reuse it. - ReuseOptions::ReuseAfterFetch - } else { - // This clone is already used for another node of this dataflow. We will do a - // `git fetch` operation for the first node of this dataflow, so we don't need - // to do it again for other nodes of the dataflow. So we can simply reuse the - // directory without doing any additional git operations. - ReuseOptions::Reuse - } - } else { - ReuseOptions::NewClone - }; - repos_in_use - .entry(clone_dir.clone()) - .or_default() - .insert(dataflow_id); - - Ok(GitFolder { - clone_dir, - reuse, - repo_addr, - rev, - }) - } - - pub async fn prepare(self, logger: &mut NodeLogger<'_>) -> eyre::Result { - let GitFolder { - clone_dir, - reuse, - repo_addr, - rev, - } = self; - - let rev_str = rev_str(&rev); - let refname = rev.clone().map(|rev| match rev { - GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"), - GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"), - GitRepoRev::Rev(rev) => rev, - }); - - match reuse { - ReuseOptions::NewClone => { - let repository = clone_into(&repo_addr, &rev, &clone_dir, logger).await?; - checkout_tree(&repository, refname)?; - } - ReuseOptions::ReuseAfterFetch => { - logger - .log( - LogLevel::Info, - None, - format!("fetching changes and reusing {repo_addr}{rev_str}"), - ) - .await; - let refname_cloned = refname.clone(); - let clone_dir = clone_dir.clone(); - let repository = fetch_changes(clone_dir, refname_cloned).await?; - checkout_tree(&repository, refname)?; - } - ReuseOptions::Reuse => { - logger - .log( - LogLevel::Info, - None, - format!("reusing up-to-date {repo_addr}{rev_str}"), - ) - .await; - } - }; - Ok(clone_dir) - } -} - -fn used_by_other_dataflow( - dataflow_id: uuid::Uuid, - clone_dir_base: &PathBuf, - repos_in_use: &mut BTreeMap>, -) -> bool { - let empty = BTreeSet::new(); - let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty); - let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id); - used_by_other_dataflow -} - -enum ReuseOptions { - /// Create a new clone of the repository. - NewClone, - /// Reuse an existing up-to-date clone of the repository. - Reuse, - /// Update an older clone of the repository, then reuse it. - ReuseAfterFetch, -} - -fn rev_str(rev: &Option) -> String { - match rev { - Some(GitRepoRev::Branch(branch)) => format!(" (branch {branch})"), - Some(GitRepoRev::Tag(tag)) => format!(" (tag {tag})"), - Some(GitRepoRev::Rev(rev)) => format!(" (rev {rev})"), - None => String::new(), - } -} - -async fn clone_into( - repo_addr: &String, - rev: &Option, - clone_dir: &Path, - logger: &mut NodeLogger<'_>, -) -> eyre::Result { - if let Some(parent) = clone_dir.parent() { - tokio::fs::create_dir_all(parent) - .await - .context("failed to create parent directory for git clone")?; - } - - let rev_str = rev_str(rev); - logger - .log( - LogLevel::Info, - None, - format!("cloning {repo_addr}{rev_str} into {}", clone_dir.display()), - ) - .await; - let rev: Option = rev.clone(); - let clone_into = clone_dir.to_owned(); - let repo_addr = repo_addr.clone(); - let task = tokio::task::spawn_blocking(move || { - let mut builder = git2::build::RepoBuilder::new(); - let mut fetch_options = git2::FetchOptions::new(); - fetch_options.download_tags(git2::AutotagOption::All); - builder.fetch_options(fetch_options); - if let Some(GitRepoRev::Branch(branch)) = &rev { - builder.branch(branch); - } - builder - .clone(&repo_addr, &clone_into) - .context("failed to clone repo") - }); - let repo = task.await??; - Ok(repo) -} - -async fn fetch_changes( - repo_dir: PathBuf, - refname: Option, -) -> Result { - let fetch_changes = tokio::task::spawn_blocking(move || { - let repository = git2::Repository::open(&repo_dir).context("failed to open git repo")?; - - { - let mut remote = repository - .find_remote("origin") - .context("failed to find remote `origin` in repo")?; - remote - .connect(git2::Direction::Fetch) - .context("failed to connect to remote")?; - let default_branch = remote - .default_branch() - .context("failed to get default branch for remote")?; - let fetch = match &refname { - Some(refname) => refname, - None => default_branch - .as_str() - .context("failed to read default branch as string")?, - }; - let mut fetch_options = FetchOptions::new(); - fetch_options.download_tags(git2::AutotagOption::All); - remote - .fetch(&[&fetch], Some(&mut fetch_options), None) - .context("failed to fetch from git repo")?; - } - Result::<_, eyre::Error>::Ok(repository) - }); - let repository = fetch_changes.await??; - Ok(repository) -} - -fn checkout_tree(repository: &git2::Repository, refname: Option) -> eyre::Result<()> { - if let Some(refname) = refname { - let (object, reference) = repository - .revparse_ext(&refname) - .context("failed to parse ref")?; - repository - .checkout_tree(&object, None) - .context("failed to checkout ref")?; - match reference { - Some(reference) => repository - .set_head(reference.name().context("failed to get reference_name")?) - .context("failed to set head")?, - None => repository - .set_head_detached(object.id()) - .context("failed to set detached head")?, - } - } - Ok(()) -} - -fn clone_dir_exists(dir: &PathBuf, repos_in_use: &BTreeMap>) -> bool { - repos_in_use.contains_key(dir) || dir.exists() -} diff --git a/binaries/daemon/src/spawn/mod.rs b/binaries/daemon/src/spawn/mod.rs index 9bf15360..055045a9 100644 --- a/binaries/daemon/src/spawn/mod.rs +++ b/binaries/daemon/src/spawn/mod.rs @@ -7,11 +7,10 @@ use aligned_vec::{AVec, ConstAlign}; use crossbeam::queue::ArrayQueue; use dora_arrow_convert::IntoArrow; use dora_core::{ - build::run_build_command, config::DataId, descriptor::{ - resolve_path, source_is_url, CustomNode, Descriptor, OperatorDefinition, OperatorSource, - PythonSource, ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE, + resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, + ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE, }, get_python_path, uhlc::HLC, @@ -21,7 +20,6 @@ use dora_message::{ common::{LogLevel, LogMessage}, daemon_to_coordinator::{DataMessage, NodeExitStatus, Timestamped}, daemon_to_node::{NodeConfig, RuntimeConfig}, - descriptor::EnvValue, id::NodeId, DataflowId, }; @@ -31,9 +29,7 @@ use dora_node_api::{ Metadata, }; use eyre::{ContextCompat, WrapErr}; -use git::GitFolder; use std::{ - collections::{BTreeMap, BTreeSet}, future::Future, path::{Path, PathBuf}, process::Stdio, @@ -46,27 +42,23 @@ use tokio::{ }; use tracing::error; -mod git; - #[derive(Clone)] pub struct Spawner { pub dataflow_id: DataflowId, - pub working_dir: PathBuf, pub daemon_tx: mpsc::Sender>, pub dataflow_descriptor: Descriptor, /// clock is required for generating timestamps when dropping messages early because queue is full pub clock: Arc, pub uv: bool, - pub build_only: bool, } impl Spawner { - pub async fn prepare_node( + pub async fn spawn_node( self, node: ResolvedNode, + node_working_dir: PathBuf, node_stderr_most_recent: Arc>, logger: &mut NodeLogger<'_>, - repos_in_use: &mut BTreeMap>, ) -> eyre::Result>> { let dataflow_id = self.dataflow_id; let node_id = node.id.clone(); @@ -101,24 +93,6 @@ impl Spawner { dynamic: node.kind.dynamic(), }; - let prepared_git = if let dora_core::descriptor::CoreNodeKind::Custom(CustomNode { - source: dora_message::descriptor::NodeSource::GitBranch { repo, rev }, - .. - }) = &node.kind - { - let target_dir = self.working_dir.join("build"); - let git_folder = GitFolder::choose_clone_dir( - self.dataflow_id, - repo.clone(), - rev.clone(), - &target_dir, - repos_in_use, - )?; - Some(git_folder) - } else { - None - }; - let mut logger = logger .try_clone() .await @@ -126,10 +100,10 @@ impl Spawner { let task = async move { self.prepare_node_inner( node, + node_working_dir, &mut logger, dataflow_id, node_config, - prepared_git, node_stderr_most_recent, ) .await @@ -138,33 +112,21 @@ impl Spawner { } async fn prepare_node_inner( - mut self, + self, node: ResolvedNode, + node_working_dir: PathBuf, logger: &mut NodeLogger<'_>, dataflow_id: uuid::Uuid, node_config: NodeConfig, - git_folder: Option, node_stderr_most_recent: Arc>, ) -> eyre::Result { let (command, error_msg) = match &node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { - let build_dir = match git_folder { - Some(git_folder) => git_folder.prepare(logger).await?, - None => self.working_dir.clone(), - }; - - if let Some(build) = &n.build { - self.build_node(logger, &node.env, build_dir.clone(), build) - .await?; - } - let mut command = if self.build_only { - None - } else { - path_spawn_command(&build_dir, self.uv, logger, n, true).await? - }; + let mut command = + path_spawn_command(&node_working_dir, self.uv, logger, n, true).await?; if let Some(command) = &mut command { - command.current_dir(&self.working_dir); + command.current_dir(&node_working_dir); command.stdin(Stdio::null()); command.env( @@ -205,14 +167,6 @@ impl Spawner { (command, error_msg) } dora_core::descriptor::CoreNodeKind::Runtime(n) => { - // run build commands - for operator in &n.operators { - if let Some(build) = &operator.config.build { - self.build_node(logger, &node.env, self.working_dir.clone(), build) - .await?; - } - } - let python_operators: Vec<&OperatorDefinition> = n .operators .iter() @@ -224,9 +178,7 @@ impl Spawner { .iter() .any(|x| !matches!(x.config.source, OperatorSource::Python { .. })); - let mut command = if self.build_only { - None - } else if !python_operators.is_empty() && !other_operators { + let mut command = if !python_operators.is_empty() && !other_operators { // Use python to spawn runtime if there is a python operator // TODO: Handle multi-operator runtime once sub-interpreter is supported @@ -304,7 +256,7 @@ impl Spawner { }; if let Some(command) = &mut command { - command.current_dir(&self.working_dir); + command.current_dir(&node_working_dir); command.env( "DORA_RUNTIME_CONFIG", @@ -337,7 +289,7 @@ impl Spawner { Ok(PreparedNode { command, spawn_error_msg: error_msg, - working_dir: self.working_dir, + node_working_dir, dataflow_id, node, node_config, @@ -346,50 +298,12 @@ impl Spawner { node_stderr_most_recent, }) } - - async fn build_node( - &mut self, - logger: &mut NodeLogger<'_>, - node_env: &Option>, - working_dir: PathBuf, - build: &String, - ) -> Result<(), eyre::Error> { - logger - .log( - LogLevel::Info, - None, - format!("running build command: `{build}"), - ) - .await; - let build = build.to_owned(); - let uv = self.uv; - let node_env = node_env.clone(); - let mut logger = logger.try_clone().await.context("failed to clone logger")?; - let (stdout_tx, mut stdout) = tokio::sync::mpsc::channel(10); - let task = tokio::task::spawn_blocking(move || { - run_build_command(&build, &working_dir, uv, &node_env, stdout_tx) - .context("build command failed") - }); - tokio::spawn(async move { - while let Some(line) = stdout.recv().await { - logger - .log( - LogLevel::Info, - Some("build command".into()), - line.unwrap_or_else(|err| format!("io err: {}", err.kind())), - ) - .await; - } - }); - task.await??; - Ok(()) - } } pub struct PreparedNode { command: Option, spawn_error_msg: String, - working_dir: PathBuf, + node_working_dir: PathBuf, dataflow_id: DataflowId, node: ResolvedNode, node_config: NodeConfig, @@ -430,7 +344,7 @@ impl PreparedNode { .await; let dataflow_dir: PathBuf = self - .working_dir + .node_working_dir .join("out") .join(self.dataflow_id.to_string()); if !dataflow_dir.exists() { @@ -438,7 +352,7 @@ impl PreparedNode { } let (tx, mut rx) = mpsc::channel(10); let mut file = File::create(log::log_path( - &self.working_dir, + &self.node_working_dir, &self.dataflow_id, &self.node.id, )) @@ -642,7 +556,8 @@ impl PreparedNode { cloned_logger .log(LogMessage { daemon_id: Some(daemon_id.clone()), - dataflow_id, + dataflow_id: Some(dataflow_id), + build_id: None, level: LogLevel::Info, node_id: Some(node_id.clone()), target: Some("stdout".into()), diff --git a/examples/benchmark/run.rs b/examples/benchmark/run.rs index 8e0076bc..b6bed6fe 100644 --- a/examples/benchmark/run.rs +++ b/examples/benchmark/run.rs @@ -11,12 +11,26 @@ async fn main() -> eyre::Result<()> { .wrap_err("failed to set working dir")?; let dataflow = Path::new("dataflow.yml"); + build_dataflow(dataflow).await?; run_dataflow(dataflow).await?; Ok(()) } +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--release"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); diff --git a/examples/c++-dataflow/.gitignore b/examples/c++-dataflow/.gitignore index 5761abcf..d255f72c 100644 --- a/examples/c++-dataflow/.gitignore +++ b/examples/c++-dataflow/.gitignore @@ -1 +1,2 @@ *.o +/build diff --git a/examples/c++-ros2-dataflow/.gitignore b/examples/c++-ros2-dataflow/.gitignore index 5761abcf..d255f72c 100644 --- a/examples/c++-ros2-dataflow/.gitignore +++ b/examples/c++-ros2-dataflow/.gitignore @@ -1 +1,2 @@ *.o +/build diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 130d43c1..4d8a40e1 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,3 +1,4 @@ +use dora_cli::session::DataflowSession; use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::{read_as_descriptor, DescriptorExt}, @@ -37,6 +38,7 @@ async fn main() -> eyre::Result<()> { .wrap_err("failed to set working dir")?; let dataflow = Path::new("dataflow.yml"); + build_dataflow(dataflow).await?; let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); let coordinator_bind = SocketAddr::new( @@ -138,15 +140,19 @@ async fn start_dataflow( .check(&working_dir) .wrap_err("could not validate yaml")?; + let dataflow_session = + DataflowSession::read_session(dataflow).context("failed to read DataflowSession")?; + let (reply_sender, reply) = oneshot::channel(); coordinator_events_tx .send(Event::Control(ControlEvent::IncomingRequest { request: ControlRequest::Start { + build_id: dataflow_session.build_id, + session_id: dataflow_session.session_id, dataflow: dataflow_descriptor, - local_working_dir: working_dir, + local_working_dir: Some(working_dir), name: None, uv: false, - build_only: false, }, reply_sender, })) @@ -228,6 +234,18 @@ async fn destroy(coordinator_events_tx: &Sender) -> eyre::Result<()> { } } +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); diff --git a/examples/python-ros2-dataflow/run.rs b/examples/python-ros2-dataflow/run.rs index 2873426e..23b254e2 100644 --- a/examples/python-ros2-dataflow/run.rs +++ b/examples/python-ros2-dataflow/run.rs @@ -40,6 +40,15 @@ async fn main() -> eyre::Result<()> { async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); + // First build the dataflow (install requirements) + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow).arg("--uv"); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + let mut cmd = tokio::process::Command::new(&cargo); cmd.arg("run"); cmd.arg("--package").arg("dora-cli"); diff --git a/examples/rust-dataflow-git/.gitignore b/examples/rust-dataflow-git/.gitignore new file mode 100644 index 00000000..dfdc87e3 --- /dev/null +++ b/examples/rust-dataflow-git/.gitignore @@ -0,0 +1,2 @@ +/build +/git diff --git a/examples/rust-dataflow-git/dataflow.yml b/examples/rust-dataflow-git/dataflow.yml index f4bca5df..a64b2170 100644 --- a/examples/rust-dataflow-git/dataflow.yml +++ b/examples/rust-dataflow-git/dataflow.yml @@ -1,7 +1,7 @@ nodes: - id: rust-node git: https://github.com/dora-rs/dora.git - rev: e31b2a34 # pinned commit, update this when changing the message crate + rev: 64a2dc9c # pinned commit, update this when changing the message crate build: cargo build -p rust-dataflow-example-node path: target/debug/rust-dataflow-example-node inputs: @@ -11,7 +11,7 @@ nodes: - id: rust-status-node git: https://github.com/dora-rs/dora.git - rev: e31b2a34 # pinned commit, update this when changing the message crate + rev: 64a2dc9c # pinned commit, update this when changing the message crate build: cargo build -p rust-dataflow-example-status-node path: target/debug/rust-dataflow-example-status-node inputs: @@ -22,7 +22,7 @@ nodes: - id: rust-sink git: https://github.com/dora-rs/dora.git - rev: e31b2a34 # pinned commit, update this when changing the message crate + rev: 64a2dc9c # pinned commit, update this when changing the message crate build: cargo build -p rust-dataflow-example-sink path: target/debug/rust-dataflow-example-sink inputs: diff --git a/examples/rust-dataflow-git/run.rs b/examples/rust-dataflow-git/run.rs index 6a6a8782..490c5c57 100644 --- a/examples/rust-dataflow-git/run.rs +++ b/examples/rust-dataflow-git/run.rs @@ -16,12 +16,25 @@ async fn main() -> eyre::Result<()> { } else { Path::new("dataflow.yml") }; + build_dataflow(dataflow).await?; run_dataflow(dataflow).await?; Ok(()) } +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); diff --git a/examples/rust-dataflow-url/.gitignore b/examples/rust-dataflow-url/.gitignore new file mode 100644 index 00000000..796b96d1 --- /dev/null +++ b/examples/rust-dataflow-url/.gitignore @@ -0,0 +1 @@ +/build diff --git a/examples/rust-dataflow-url/run.rs b/examples/rust-dataflow-url/run.rs index e93a5d28..6f511970 100644 --- a/examples/rust-dataflow-url/run.rs +++ b/examples/rust-dataflow-url/run.rs @@ -11,12 +11,25 @@ async fn main() -> eyre::Result<()> { .wrap_err("failed to set working dir")?; let dataflow = Path::new("dataflow.yml"); + build_dataflow(dataflow).await?; run_dataflow(dataflow).await?; Ok(()) } +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); diff --git a/examples/rust-dataflow/run.rs b/examples/rust-dataflow/run.rs index 6a6a8782..490c5c57 100644 --- a/examples/rust-dataflow/run.rs +++ b/examples/rust-dataflow/run.rs @@ -16,12 +16,25 @@ async fn main() -> eyre::Result<()> { } else { Path::new("dataflow.yml") }; + build_dataflow(dataflow).await?; run_dataflow(dataflow).await?; Ok(()) } +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 7d9233b1..af467ffe 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -19,8 +19,11 @@ which = "5.0.0" uuid = { version = "1.7", features = ["serde", "v7"] } tracing = "0.1" serde-with-expand-env = "1.1.0" -tokio = { version = "1.24.1", features = ["fs", "process", "sync"] } +tokio = { version = "1.24.1", features = ["fs", "process", "sync", "rt"] } schemars = "0.8.19" serde_json = "1.0.117" log = { version = "0.4.21", features = ["serde"] } dunce = "1.0.5" +url = "2.5.4" +git2 = { workspace = true } +itertools = "0.14" diff --git a/libraries/core/src/build.rs b/libraries/core/src/build/build_command.rs similarity index 100% rename from libraries/core/src/build.rs rename to libraries/core/src/build/build_command.rs diff --git a/libraries/core/src/build/git.rs b/libraries/core/src/build/git.rs new file mode 100644 index 00000000..f53a5c9e --- /dev/null +++ b/libraries/core/src/build/git.rs @@ -0,0 +1,353 @@ +use crate::build::BuildLogger; +use dora_message::{common::LogLevel, descriptor::GitRepoRev, DataflowId, SessionId}; +use eyre::{bail, ContextCompat, WrapErr}; +use git2::FetchOptions; +use itertools::Itertools; +use std::{ + collections::{BTreeMap, BTreeSet}, + path::{Path, PathBuf}, +}; +use url::Url; + +#[derive(Default)] +pub struct GitManager { + /// Directories that are currently in use by running dataflows. + pub clones_in_use: BTreeMap>, + /// Builds that are prepared, but not done yet. + prepared_builds: BTreeMap, + reuse_for: BTreeMap, +} + +#[derive(Default)] +struct PreparedBuild { + /// Clone dirs that will be created during the build process. + /// + /// This allows subsequent nodes to reuse the dirs. + planned_clone_dirs: BTreeSet, +} + +impl GitManager { + pub fn choose_clone_dir( + &mut self, + session_id: SessionId, + repo_url: Url, + commit_hash: String, + prev_commit_hash: Option, + target_dir: &Path, + ) -> eyre::Result { + let clone_dir = Self::clone_dir_path(&target_dir, &repo_url, &commit_hash)?; + + if let Some(using) = self.clones_in_use.get(&clone_dir) { + if !using.is_empty() { + // The directory is currently in use by another dataflow. Rebuilding + // while a dataflow is running could lead to unintended behavior. + eyre::bail!( + "the build directory is still in use by the following \ + dataflows, please stop them before rebuilding: {}", + using.iter().join(", ") + ) + } + } + + let reuse = if self.clone_dir_ready(session_id, &clone_dir) { + // The directory already contains a checkout of the commit we're interested in. + // So we can simply reuse the directory without doing any additional git + // operations. + ReuseOptions::Reuse { + dir: clone_dir.clone(), + } + } else if let Some(previous_commit_hash) = prev_commit_hash { + // we might be able to update a previous clone + let prev_clone_dir = + Self::clone_dir_path(&target_dir, &repo_url, &previous_commit_hash)?; + + if self + .clones_in_use + .get(&prev_clone_dir) + .map(|ids| !ids.is_empty()) + .unwrap_or(false) + { + // previous clone is still in use -> we cannot rename it, but we can copy it + ReuseOptions::CopyAndFetch { + from: prev_clone_dir, + target_dir: clone_dir.clone(), + commit_hash, + } + } else if prev_clone_dir.exists() { + // there is an unused previous clone that is not in use -> rename it + ReuseOptions::RenameAndFetch { + from: prev_clone_dir, + target_dir: clone_dir.clone(), + commit_hash, + } + } else { + // no existing clone associated with previous build id + ReuseOptions::NewClone { + target_dir: clone_dir.clone(), + repo_url, + commit_hash, + } + } + } else { + // no previous build that we can reuse + ReuseOptions::NewClone { + target_dir: clone_dir.clone(), + repo_url, + commit_hash, + } + }; + self.register_ready_clone_dir(session_id, clone_dir); + + Ok(GitFolder { reuse }) + } + + pub fn in_use(&self, dir: &Path) -> bool { + self.clones_in_use + .get(dir) + .map(|ids| !ids.is_empty()) + .unwrap_or(false) + } + + pub fn clone_dir_ready(&self, session_id: SessionId, dir: &Path) -> bool { + self.prepared_builds + .get(&session_id) + .map(|p| p.planned_clone_dirs.contains(dir)) + .unwrap_or(false) + || dir.exists() + } + + pub fn register_ready_clone_dir(&mut self, session_id: SessionId, dir: PathBuf) -> bool { + self.prepared_builds + .entry(session_id) + .or_default() + .planned_clone_dirs + .insert(dir) + } + + fn clone_dir_path( + base_dir: &Path, + repo_url: &Url, + commit_hash: &String, + ) -> eyre::Result { + let mut path = base_dir.join(repo_url.host_str().context("git URL has no hostname")?); + path.extend(repo_url.path_segments().context("no path in git URL")?); + let path = path.join(commit_hash); + Ok(dunce::simplified(&path).to_owned()) + } +} + +pub struct GitFolder { + /// Specifies whether an existing repo should be reused. + reuse: ReuseOptions, +} + +impl GitFolder { + pub async fn prepare(self, logger: &mut impl BuildLogger) -> eyre::Result { + let GitFolder { reuse } = self; + + eprintln!("reuse: {reuse:?}"); + let clone_dir = match reuse { + ReuseOptions::NewClone { + target_dir, + repo_url, + commit_hash, + } => { + logger + .log_message( + LogLevel::Info, + format!( + "cloning {repo_url}#{commit_hash} into {}", + target_dir.display() + ), + ) + .await; + let clone_target = target_dir.clone(); + let checkout_result = tokio::task::spawn_blocking(move || { + let repository = clone_into(repo_url.clone(), &clone_target) + .with_context(|| format!("failed to clone git repo from `{repo_url}`"))?; + checkout_tree(&repository, &commit_hash) + .with_context(|| format!("failed to checkout commit `{commit_hash}`")) + }) + .await + .unwrap(); + + match checkout_result { + Ok(()) => target_dir, + Err(err) => { + logger + .log_message(LogLevel::Error, format!("{err:?}")) + .await; + // remove erroneous clone again + if let Err(err) = std::fs::remove_dir_all(target_dir) { + logger + .log_message( + LogLevel::Error, + format!( + "failed to remove clone dir after clone/checkout error: {}", + err.kind() + ), + ) + .await; + } + bail!(err) + } + } + } + ReuseOptions::CopyAndFetch { + from, + target_dir, + commit_hash, + } => { + tokio::fs::copy(&from, &target_dir) + .await + .context("failed to copy repo clone")?; + + logger + .log_message( + LogLevel::Info, + format!("fetching changes after copying {}", from.display()), + ) + .await; + + let repository = fetch_changes(&target_dir, None).await?; + checkout_tree(&repository, &commit_hash)?; + target_dir + } + ReuseOptions::RenameAndFetch { + from, + target_dir, + commit_hash, + } => { + tokio::fs::rename(&from, &target_dir) + .await + .context("failed to rename repo clone")?; + + logger + .log_message( + LogLevel::Info, + format!("fetching changes after renaming {}", from.display()), + ) + .await; + + let repository = fetch_changes(&target_dir, None).await?; + checkout_tree(&repository, &commit_hash)?; + target_dir + } + ReuseOptions::Reuse { dir } => { + logger + .log_message( + LogLevel::Info, + format!("reusing up-to-date {}", dir.display()), + ) + .await; + dir + } + }; + Ok(clone_dir) + } +} + +#[derive(Debug)] +enum ReuseOptions { + /// Create a new clone of the repository. + NewClone { + target_dir: PathBuf, + repo_url: Url, + commit_hash: String, + }, + /// Reuse an existing up-to-date clone of the repository. + Reuse { dir: PathBuf }, + /// Copy an older clone of the repository and fetch changes, then reuse it. + CopyAndFetch { + from: PathBuf, + target_dir: PathBuf, + commit_hash: String, + }, + /// Rename an older clone of the repository and fetch changes, then reuse it. + RenameAndFetch { + from: PathBuf, + target_dir: PathBuf, + commit_hash: String, + }, +} + +fn rev_str(rev: &Option) -> String { + match rev { + Some(GitRepoRev::Branch(branch)) => format!(" (branch {branch})"), + Some(GitRepoRev::Tag(tag)) => format!(" (tag {tag})"), + Some(GitRepoRev::Rev(rev)) => format!(" (rev {rev})"), + None => String::new(), + } +} + +fn clone_into(repo_addr: Url, clone_dir: &Path) -> eyre::Result { + if let Some(parent) = clone_dir.parent() { + std::fs::create_dir_all(parent) + .context("failed to create parent directory for git clone")?; + } + + let clone_dir = clone_dir.to_owned(); + + let mut builder = git2::build::RepoBuilder::new(); + let mut fetch_options = git2::FetchOptions::new(); + fetch_options.download_tags(git2::AutotagOption::All); + builder.fetch_options(fetch_options); + builder + .clone(repo_addr.as_str(), &clone_dir) + .context("failed to clone repo") +} + +async fn fetch_changes( + repo_dir: &Path, + refname: Option, +) -> Result { + let repo_dir = repo_dir.to_owned(); + let fetch_changes = tokio::task::spawn_blocking(move || { + let repository = git2::Repository::open(&repo_dir).context("failed to open git repo")?; + + { + let mut remote = repository + .find_remote("origin") + .context("failed to find remote `origin` in repo")?; + remote + .connect(git2::Direction::Fetch) + .context("failed to connect to remote")?; + let default_branch = remote + .default_branch() + .context("failed to get default branch for remote")?; + let fetch = match &refname { + Some(refname) => refname, + None => default_branch + .as_str() + .context("failed to read default branch as string")?, + }; + let mut fetch_options = FetchOptions::new(); + fetch_options.download_tags(git2::AutotagOption::All); + remote + .fetch(&[&fetch], Some(&mut fetch_options), None) + .context("failed to fetch from git repo")?; + } + Result::<_, eyre::Error>::Ok(repository) + }); + let repository = fetch_changes.await??; + Ok(repository) +} + +fn checkout_tree(repository: &git2::Repository, commit_hash: &str) -> eyre::Result<()> { + let (object, reference) = repository + .revparse_ext(commit_hash) + .context("failed to parse ref")?; + repository + .checkout_tree(&object, None) + .context("failed to checkout ref")?; + match reference { + Some(reference) => repository + .set_head(reference.name().context("failed to get reference_name")?) + .context("failed to set head")?, + None => repository + .set_head_detached(object.id()) + .context("failed to set detached head")?, + } + + Ok(()) +} diff --git a/libraries/core/src/build/logger.rs b/libraries/core/src/build/logger.rs new file mode 100644 index 00000000..d683bcd4 --- /dev/null +++ b/libraries/core/src/build/logger.rs @@ -0,0 +1,15 @@ +use std::future::Future; + +use dora_message::common::LogLevel; + +pub trait BuildLogger: Send { + type Clone: BuildLogger + 'static; + + fn log_message( + &mut self, + level: LogLevel, + message: impl Into + Send, + ) -> impl Future + Send; + + fn try_clone(&self) -> impl Future> + Send; +} diff --git a/libraries/core/src/build/mod.rs b/libraries/core/src/build/mod.rs new file mode 100644 index 00000000..3995b222 --- /dev/null +++ b/libraries/core/src/build/mod.rs @@ -0,0 +1,139 @@ +pub use git::GitManager; +pub use logger::BuildLogger; + +use url::Url; + +use std::{collections::BTreeMap, future::Future, path::PathBuf}; + +use crate::descriptor::ResolvedNode; +use dora_message::{ + common::{GitSource, LogLevel}, + descriptor::{CoreNodeKind, EnvValue}, + id::NodeId, + SessionId, +}; +use eyre::Context; + +use build_command::run_build_command; +use git::GitFolder; + +mod build_command; +mod git; +mod logger; + +#[derive(Clone)] +pub struct Builder { + pub session_id: SessionId, + pub base_working_dir: PathBuf, + pub uv: bool, +} + +impl Builder { + pub async fn build_node( + self, + node: ResolvedNode, + git: Option, + prev_git: Option, + mut logger: impl BuildLogger, + git_manager: &mut GitManager, + ) -> eyre::Result>> { + let prepared_git = if let Some(GitSource { repo, commit_hash }) = git { + let repo_url = Url::parse(&repo).context("failed to parse git repository URL")?; + let target_dir = self.base_working_dir.join("git"); + let prev_hash = prev_git.filter(|p| p.repo == repo).map(|p| p.commit_hash); + let git_folder = git_manager.choose_clone_dir( + self.session_id, + repo_url, + commit_hash, + prev_hash, + &target_dir, + )?; + Some(git_folder) + } else { + None + }; + + let task = async move { self.build_node_inner(node, &mut logger, prepared_git).await }; + Ok(task) + } + + async fn build_node_inner( + self, + node: ResolvedNode, + logger: &mut impl BuildLogger, + git_folder: Option, + ) -> eyre::Result { + logger.log_message(LogLevel::Debug, "building node").await; + let node_working_dir = match &node.kind { + CoreNodeKind::Custom(n) => { + let node_working_dir = match git_folder { + Some(git_folder) => git_folder.prepare(logger).await?, + None => self.base_working_dir, + }; + + if let Some(build) = &n.build { + build_node(logger, &node.env, node_working_dir.clone(), build, self.uv).await?; + } + node_working_dir + } + CoreNodeKind::Runtime(n) => { + // run build commands + for operator in &n.operators { + if let Some(build) = &operator.config.build { + build_node( + logger, + &node.env, + self.base_working_dir.clone(), + build, + self.uv, + ) + .await?; + } + } + self.base_working_dir.clone() + } + }; + Ok(BuiltNode { node_working_dir }) + } +} + +async fn build_node( + logger: &mut impl BuildLogger, + node_env: &Option>, + working_dir: PathBuf, + build: &String, + uv: bool, +) -> eyre::Result<()> { + logger + .log_message(LogLevel::Info, format!("running build command: `{build}")) + .await; + let build = build.to_owned(); + let node_env = node_env.clone(); + let mut logger = logger.try_clone().await.context("failed to clone logger")?; + let (stdout_tx, mut stdout) = tokio::sync::mpsc::channel(10); + let task = tokio::task::spawn_blocking(move || { + run_build_command(&build, &working_dir, uv, &node_env, stdout_tx) + .context("build command failed") + }); + tokio::spawn(async move { + while let Some(line) = stdout.recv().await { + logger + .log_message( + LogLevel::Info, + line.unwrap_or_else(|err| format!("io err: {}", err.kind())), + ) + .await; + } + }); + task.await??; + Ok(()) +} + +pub struct BuiltNode { + pub node_working_dir: PathBuf, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct BuildInfo { + pub node_working_dirs: BTreeMap, +} diff --git a/libraries/core/src/git.rs b/libraries/core/src/git.rs new file mode 100644 index 00000000..e69de29b diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index 90f2c564..f18a6059 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -9,6 +9,7 @@ pub use dora_message::{config, uhlc}; pub mod build; pub mod descriptor; +pub mod git; pub mod metadata; pub mod topics; diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index 456bb1bd..bf3d3a03 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -1,22 +1,48 @@ -use std::{path::PathBuf, time::Duration}; +use std::{collections::BTreeMap, path::PathBuf, time::Duration}; use uuid::Uuid; use crate::{ + common::GitSource, descriptor::Descriptor, id::{NodeId, OperatorId}, + BuildId, SessionId, }; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequest { + Build { + session_id: SessionId, + dataflow: Descriptor, + git_sources: BTreeMap, + prev_git_sources: BTreeMap, + /// Allows overwriting the base working dir when CLI and daemon are + /// running on the same machine. + /// + /// Must not be used for multi-machine dataflows. + /// + /// Note that nodes with git sources still use a subdirectory of + /// the base working dir. + local_working_dir: Option, + uv: bool, + }, + WaitForBuild { + build_id: BuildId, + }, Start { + build_id: Option, + session_id: SessionId, dataflow: Descriptor, name: Option, - // TODO: remove this once we figure out deploying of node/operator - // binaries from CLI to coordinator/daemon - local_working_dir: PathBuf, + /// Allows overwriting the base working dir when CLI and daemon are + /// running on the same machine. + /// + /// Must not be used for multi-machine dataflows. + /// + /// Note that nodes with git sources still use a subdirectory of + /// the base working dir. + local_working_dir: Option, uv: bool, - build_only: bool, }, WaitForSpawn { dataflow_id: Uuid, @@ -50,4 +76,9 @@ pub enum ControlRequest { dataflow_id: Uuid, level: log::LevelFilter, }, + BuildLogSubscribe { + build_id: BuildId, + level: log::LevelFilter, + }, + CliAndDefaultDaemonOnSameMachine, } diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 015b163e..83591811 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -5,14 +5,15 @@ use aligned_vec::{AVec, ConstAlign}; use eyre::Context as _; use uuid::Uuid; -use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, DataflowId}; +use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId}; pub use log::Level as LogLevel; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[must_use] pub struct LogMessage { - pub dataflow_id: DataflowId, + pub build_id: Option, + pub dataflow_id: Option, pub node_id: Option, pub daemon_id: Option, pub level: LogLevel, @@ -239,3 +240,9 @@ impl std::fmt::Display for DaemonId { write!(f, "{}", self.uuid) } } + +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] +pub struct GitSource { + pub repo: String, + pub commit_hash: String, +} diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index 87eb7ae7..02243468 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -1,23 +1,46 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::{ + collections::{BTreeMap, BTreeSet}, + net::IpAddr, +}; use uuid::Uuid; pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus}; -use crate::{common::DaemonId, id::NodeId}; +use crate::{common::DaemonId, id::NodeId, BuildId}; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { Error(String), CoordinatorStopped, - DataflowStartTriggered { uuid: Uuid }, - DataflowSpawned { uuid: Uuid }, - DataflowReloaded { uuid: Uuid }, - DataflowStopped { uuid: Uuid, result: DataflowResult }, + DataflowBuildTriggered { + build_id: BuildId, + }, + DataflowBuildFinished { + build_id: BuildId, + result: Result<(), String>, + }, + DataflowStartTriggered { + uuid: Uuid, + }, + DataflowSpawned { + uuid: Uuid, + }, + DataflowReloaded { + uuid: Uuid, + }, + DataflowStopped { + uuid: Uuid, + result: DataflowResult, + }, DataflowList(DataflowList), DestroyOk, DaemonConnected(bool), ConnectedDaemons(BTreeSet), Logs(Vec), + CliAndDefaultDaemonIps { + default_daemon: Option, + cli: Option, + }, } #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index 8c68a6ca..69da8923 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -5,10 +5,10 @@ use std::{ }; use crate::{ - common::DaemonId, + common::{DaemonId, GitSource}, descriptor::{Descriptor, ResolvedNode}, id::{NodeId, OperatorId}, - DataflowId, + BuildId, DataflowId, SessionId, }; pub use crate::common::Timestamped; @@ -33,6 +33,7 @@ impl RegisterResult { #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum DaemonCoordinatorEvent { + Build(BuildDataflowNodes), Spawn(SpawnDataflowNodes), AllNodesReady { dataflow_id: DataflowId, @@ -55,13 +56,40 @@ pub enum DaemonCoordinatorEvent { Heartbeat, } +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct BuildDataflowNodes { + pub build_id: BuildId, + pub session_id: SessionId, + /// Allows overwriting the base working dir when CLI and daemon are + /// running on the same machine. + /// + /// Must not be used for multi-machine dataflows. + /// + /// Note that nodes with git sources still use a subdirectory of + /// the base working dir. + pub local_working_dir: Option, + pub git_sources: BTreeMap, + pub prev_git_sources: BTreeMap, + pub dataflow_descriptor: Descriptor, + pub nodes_on_machine: BTreeSet, + pub uv: bool, +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct SpawnDataflowNodes { + pub build_id: Option, + pub session_id: SessionId, pub dataflow_id: DataflowId, - pub working_dir: PathBuf, + /// Allows overwriting the base working dir when CLI and daemon are + /// running on the same machine. + /// + /// Must not be used for multi-machine dataflows. + /// + /// Note that nodes with git sources still use a subdirectory of + /// the base working dir. + pub local_working_dir: Option, pub nodes: BTreeMap, pub dataflow_descriptor: Descriptor, pub spawn_nodes: BTreeSet, pub uv: bool, - pub build_only: bool, } diff --git a/libraries/message/src/daemon_to_coordinator.rs b/libraries/message/src/daemon_to_coordinator.rs index 309697be..ccafb0a5 100644 --- a/libraries/message/src/daemon_to_coordinator.rs +++ b/libraries/message/src/daemon_to_coordinator.rs @@ -3,7 +3,9 @@ use std::collections::BTreeMap; pub use crate::common::{ DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped, }; -use crate::{common::DaemonId, current_crate_version, id::NodeId, versions_compatible, DataflowId}; +use crate::{ + common::DaemonId, current_crate_version, id::NodeId, versions_compatible, BuildId, DataflowId, +}; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum CoordinatorRequest { @@ -46,6 +48,10 @@ impl DaemonRegisterRequest { #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum DaemonEvent { + BuildResult { + build_id: BuildId, + result: Result<(), String>, + }, SpawnResult { dataflow_id: DataflowId, result: Result<(), String>, @@ -77,6 +83,7 @@ impl DataflowDaemonResult { #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum DaemonCoordinatorReply { + TriggerBuildResult(Result<(), String>), TriggerSpawnResult(Result<(), String>), ReloadResult(Result<(), String>), StopResult(Result<(), String>), diff --git a/libraries/message/src/descriptor.rs b/libraries/message/src/descriptor.rs index 02f660d4..ca583c0b 100644 --- a/libraries/message/src/descriptor.rs +++ b/libraries/message/src/descriptor.rs @@ -253,6 +253,12 @@ pub enum NodeSource { }, } +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub enum ResolvedNodeSource { + Local, + GitCommit { repo: String, commit_hash: String }, +} + #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub enum GitRepoRev { Branch(String), diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index 9d1870e0..365eab9f 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -24,9 +24,44 @@ pub mod coordinator_to_cli; pub use arrow_data; pub use arrow_schema; +use uuid::Uuid; pub type DataflowId = uuid::Uuid; +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, +)] +pub struct SessionId(uuid::Uuid); + +impl SessionId { + pub fn generate() -> Self { + Self(Uuid::new_v4()) + } +} + +impl std::fmt::Display for SessionId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "SessionId({})", self.0) + } +} + +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, +)] +pub struct BuildId(uuid::Uuid); + +impl BuildId { + pub fn generate() -> Self { + Self(Uuid::new_v4()) + } +} + +impl std::fmt::Display for BuildId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BuildId({})", self.0) + } +} + fn current_crate_version() -> semver::Version { let crate_version_raw = env!("CARGO_PKG_VERSION");