From c05d4cd9ff83f6e16fc51dfadba7151fc044b625 Mon Sep 17 00:00:00 2001 From: sjfhsjfh Date: Wed, 25 Jun 2025 23:06:42 +0800 Subject: [PATCH] chore: revert deletions --- binaries/cli/src/command/build.rs | 134 ------------ binaries/cli/src/command/build/distributed.rs | 107 +++++++++ binaries/cli/src/command/build/git.rs | 45 ++++ binaries/cli/src/command/build/local.rs | 121 +++++++++++ binaries/cli/src/command/build/mod.rs | 205 ++++++++++++++++++ 5 files changed, 478 insertions(+), 134 deletions(-) delete mode 100644 binaries/cli/src/command/build.rs create mode 100644 binaries/cli/src/command/build/distributed.rs create mode 100644 binaries/cli/src/command/build/git.rs create mode 100644 binaries/cli/src/command/build/local.rs create mode 100644 binaries/cli/src/command/build/mod.rs diff --git a/binaries/cli/src/command/build.rs b/binaries/cli/src/command/build.rs deleted file mode 100644 index d781142e..00000000 --- a/binaries/cli/src/command/build.rs +++ /dev/null @@ -1,134 +0,0 @@ -use super::{default_tracing, Executable}; -use crate::common::resolve_dataflow; -use dora_core::{ - config::OperatorId, - descriptor::{Descriptor, DescriptorExt, NodeExt, SINGLE_OPERATOR_DEFAULT_ID}, -}; -use dora_message::descriptor::EnvValue; -use eyre::{eyre, Context}; -use std::{collections::BTreeMap, path::Path, process::Command}; - -#[derive(Debug, clap::Args)] -/// Run build commands provided in the given dataflow. -pub struct Build { - /// Path to the dataflow descriptor file - #[clap(value_name = "PATH")] - dataflow: String, - // Use UV to build nodes. - #[clap(long, action)] - uv: bool, -} - -impl Executable for Build { - fn execute(self) -> eyre::Result<()> { - default_tracing()?; - build(self.dataflow, self.uv) - } -} - -pub fn build(dataflow: String, uv: bool) -> eyre::Result<()> { - let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; - let descriptor = Descriptor::blocking_read(&dataflow)?; - let dataflow_absolute = if dataflow.is_relative() { - std::env::current_dir().unwrap().join(dataflow) - } else { - dataflow.to_owned() - }; - let working_dir = dataflow_absolute.parent().unwrap(); - - let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string()); - - for node in descriptor.nodes { - match node.kind()? { - dora_core::descriptor::NodeKind::Standard(_) => { - run_build_command(node.build.as_deref(), working_dir, uv, node.env.clone()) - .with_context(|| { - format!("build command failed for standard node `{}`", node.id) - })? - } - dora_core::descriptor::NodeKind::Runtime(runtime_node) => { - for operator in &runtime_node.operators { - run_build_command( - operator.config.build.as_deref(), - working_dir, - uv, - node.env.clone(), - ) - .with_context(|| { - format!( - "build command failed for operator `{}/{}`", - node.id, operator.id - ) - })?; - } - } - dora_core::descriptor::NodeKind::Custom(custom_node) => run_build_command( - custom_node.build.as_deref(), - working_dir, - uv, - node.env.clone(), - ) - .with_context(|| format!("build command failed for custom node `{}`", node.id))?, - dora_core::descriptor::NodeKind::Operator(operator) => run_build_command( - operator.config.build.as_deref(), - working_dir, - uv, - node.env.clone(), - ) - .with_context(|| { - format!( - "build command failed for operator `{}/{}`", - node.id, - operator.id.as_ref().unwrap_or(&default_op_id) - ) - })?, - } - } - - Ok(()) -} - -fn run_build_command( - build: Option<&str>, - working_dir: &Path, - uv: bool, - envs: Option>, -) -> eyre::Result<()> { - if let Some(build) = build { - let lines = build.lines().collect::>(); - for build_line in lines { - let mut split = build_line.split_whitespace(); - - let program = split - .next() - .ok_or_else(|| eyre!("build command is empty"))?; - let mut cmd = if uv && (program == "pip" || program == "pip3") { - let mut cmd = Command::new("uv"); - cmd.arg("pip"); - cmd - } else { - Command::new(program) - }; - cmd.args(split); - - // Inject Environment Variables - if let Some(envs) = envs.clone() { - for (key, value) in envs { - let value = value.to_string(); - cmd.env(key, value); - } - } - - cmd.current_dir(working_dir); - let exit_status = cmd - .status() - .wrap_err_with(|| format!("failed to run `{}`", build))?; - if !exit_status.success() { - return Err(eyre!("build command `{build_line}` returned {exit_status}")); - } - } - Ok(()) - } else { - Ok(()) - } -} diff --git a/binaries/cli/src/command/build/distributed.rs b/binaries/cli/src/command/build/distributed.rs new file mode 100644 index 00000000..1f6591fe --- /dev/null +++ b/binaries/cli/src/command/build/distributed.rs @@ -0,0 +1,107 @@ +use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; +use dora_core::descriptor::Descriptor; +use dora_message::{ + cli_to_coordinator::ControlRequest, + common::{GitSource, LogMessage}, + coordinator_to_cli::ControlRequestReply, + id::NodeId, + BuildId, +}; +use eyre::{bail, Context}; +use std::{ + collections::BTreeMap, + net::{SocketAddr, TcpStream}, +}; + +use crate::{output::print_log_message, session::DataflowSession}; + +pub fn build_distributed_dataflow( + session: &mut TcpRequestReplyConnection, + dataflow: Descriptor, + git_sources: &BTreeMap, + dataflow_session: &DataflowSession, + local_working_dir: Option, + uv: bool, +) -> eyre::Result { + let build_id = { + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::Build { + session_id: dataflow_session.session_id, + dataflow, + git_sources: git_sources.clone(), + prev_git_sources: dataflow_session.git_sources.clone(), + local_working_dir, + uv, + }) + .unwrap(), + ) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowBuildTriggered { build_id } => { + eprintln!("dataflow build triggered: {build_id}"); + build_id + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } + }; + Ok(build_id) +} + +pub fn wait_until_dataflow_built( + build_id: BuildId, + session: &mut TcpRequestReplyConnection, + coordinator_socket: SocketAddr, + log_level: log::LevelFilter, +) -> eyre::Result { + // subscribe to log messages + let mut log_session = TcpConnection { + stream: TcpStream::connect(coordinator_socket) + .wrap_err("failed to connect to dora coordinator")?, + }; + log_session + .send( + &serde_json::to_vec(&ControlRequest::BuildLogSubscribe { + build_id, + level: log_level, + }) + .wrap_err("failed to serialize message")?, + ) + .wrap_err("failed to send build log subscribe request to coordinator")?; + std::thread::spawn(move || { + while let Ok(raw) = log_session.receive() { + let parsed: eyre::Result = + serde_json::from_slice(&raw).context("failed to parse log message"); + match parsed { + Ok(log_message) => { + print_log_message(log_message, false, true); + } + Err(err) => { + tracing::warn!("failed to parse log message: {err:?}") + } + } + } + }); + + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::WaitForBuild { build_id }).unwrap()) + .wrap_err("failed to send WaitForBuild message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowBuildFinished { build_id, result } => match result { + Ok(()) => { + eprintln!("dataflow build finished successfully"); + Ok(build_id) + } + Err(err) => bail!("{err}"), + }, + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } +} \ No newline at end of file diff --git a/binaries/cli/src/command/build/git.rs b/binaries/cli/src/command/build/git.rs new file mode 100644 index 00000000..18faba87 --- /dev/null +++ b/binaries/cli/src/command/build/git.rs @@ -0,0 +1,45 @@ +use dora_message::{common::GitSource, descriptor::GitRepoRev}; +use eyre::Context; + +pub fn fetch_commit_hash(repo_url: String, rev: Option) -> eyre::Result { + let mut remote = git2::Remote::create_detached(repo_url.as_bytes()) + .with_context(|| format!("failed to create git remote for {repo_url}"))?; + let connection = remote + .connect_auth(git2::Direction::Fetch, None, None) + .with_context(|| format!("failed to open connection to {repo_url}"))?; + let references = connection + .list() + .with_context(|| format!("failed to list git references of {repo_url}"))?; + + let expected_name = match &rev { + Some(GitRepoRev::Branch(branch)) => format!("refs/heads/{branch}"), + Some(GitRepoRev::Tag(tag)) => format!("refs/tags/{tag}"), + Some(GitRepoRev::Rev(rev)) => rev.clone(), + None => "HEAD".into(), + }; + + let mut commit_hash = None; + for head in references { + if head.name() == expected_name { + commit_hash = Some(head.oid().to_string()); + break; + } + } + + if commit_hash.is_none() { + if let Some(GitRepoRev::Rev(rev)) = &rev { + // rev might be a commit hash instead of a reference + if rev.is_ascii() && rev.bytes().all(|b| b.is_ascii_alphanumeric()) { + commit_hash = Some(rev.clone()); + } + } + } + + match commit_hash { + Some(commit_hash) => Ok(GitSource { + repo: repo_url, + commit_hash, + }), + None => eyre::bail!("no matching commit for `{rev:?}`"), + } +} diff --git a/binaries/cli/src/command/build/local.rs b/binaries/cli/src/command/build/local.rs new file mode 100644 index 00000000..6fed9747 --- /dev/null +++ b/binaries/cli/src/command/build/local.rs @@ -0,0 +1,121 @@ +use std::{collections::BTreeMap, path::PathBuf}; + +use colored::Colorize; +use dora_core::{ + build::{BuildInfo, BuildLogger, Builder, GitManager, LogLevelOrStdout, PrevGitSource}, + descriptor::{Descriptor, DescriptorExt}, +}; +use dora_message::{common::GitSource, id::NodeId}; +use eyre::Context; + +use crate::session::DataflowSession; + +pub fn build_dataflow_locally( + dataflow: Descriptor, + git_sources: &BTreeMap, + dataflow_session: &DataflowSession, + working_dir: PathBuf, + uv: bool, +) -> eyre::Result { + let runtime = tokio::runtime::Runtime::new()?; + + runtime.block_on(build_dataflow( + dataflow, + git_sources, + dataflow_session, + working_dir, + uv, + )) +} + +async fn build_dataflow( + dataflow: Descriptor, + git_sources: &BTreeMap, + dataflow_session: &DataflowSession, + base_working_dir: PathBuf, + uv: bool, +) -> eyre::Result { + let builder = Builder { + session_id: dataflow_session.session_id, + base_working_dir, + uv, + }; + let nodes = dataflow.resolve_aliases_and_set_defaults()?; + + let mut git_manager = GitManager::default(); + let prev_git_sources = &dataflow_session.git_sources; + + let mut tasks = Vec::new(); + + // build nodes + for node in nodes.into_values() { + let node_id = node.id.clone(); + let git_source = git_sources.get(&node_id).cloned(); + let prev_git_source = prev_git_sources.get(&node_id).cloned(); + let prev_git = prev_git_source.map(|prev_source| PrevGitSource { + still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source), + git_source: prev_source, + }); + + let task = builder + .clone() + .build_node( + node, + git_source, + prev_git, + LocalBuildLogger { + node_id: node_id.clone(), + }, + &mut git_manager, + ) + .await + .wrap_err_with(|| format!("failed to build node `{node_id}`"))?; + tasks.push((node_id, task)); + } + + let mut info = BuildInfo { + node_working_dirs: Default::default(), + }; + for (node_id, task) in tasks { + let node = task + .await + .with_context(|| format!("failed to build node `{node_id}`"))?; + info.node_working_dirs + .insert(node_id, node.node_working_dir); + } + Ok(info) +} + +struct LocalBuildLogger { + node_id: NodeId, +} + +impl BuildLogger for LocalBuildLogger { + type Clone = Self; + + async fn log_message( + &mut self, + level: impl Into + Send, + message: impl Into + Send, + ) { + let level = match level.into() { + LogLevelOrStdout::LogLevel(level) => match level { + log::Level::Error => "ERROR ".red(), + log::Level::Warn => "WARN ".yellow(), + log::Level::Info => "INFO ".green(), + log::Level::Debug => "DEBUG ".bright_blue(), + log::Level::Trace => "TRACE ".dimmed(), + }, + LogLevelOrStdout::Stdout => "stdout".italic().dimmed(), + }; + let node = self.node_id.to_string().bold().bright_black(); + let message: String = message.into(); + println!("{node}: {level} {message}"); + } + + async fn try_clone(&self) -> eyre::Result { + Ok(LocalBuildLogger { + node_id: self.node_id.clone(), + }) + } +} \ No newline at end of file diff --git a/binaries/cli/src/command/build/mod.rs b/binaries/cli/src/command/build/mod.rs new file mode 100644 index 00000000..32eed2e0 --- /dev/null +++ b/binaries/cli/src/command/build/mod.rs @@ -0,0 +1,205 @@ +use communication_layer_request_reply::TcpRequestReplyConnection; +use dora_core::{ + descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt}, + topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}, +}; +use dora_message::{descriptor::NodeSource, BuildId}; +use eyre::Context; +use std::{collections::BTreeMap, net::IpAddr}; + +use super::{default_tracing, Executable}; +use crate::{ + common::{connect_to_coordinator, local_working_dir, resolve_dataflow}, + session::DataflowSession, +}; + +use distributed::{build_distributed_dataflow, wait_until_dataflow_built}; +use local::build_dataflow_locally; + +mod distributed; +mod git; +mod local; + +#[derive(Debug, clap::Args)] +/// Run build commands provided in the given dataflow. +pub struct Build { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH")] + dataflow: String, + /// Address of the dora coordinator + #[clap(long, value_name = "IP")] + coordinator_addr: Option, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT")] + coordinator_port: Option, + // Use UV to build nodes. + #[clap(long, action)] + uv: bool, + // Run build on local machine + #[clap(long, action)] + local: bool, +} + +impl Executable for Build { + fn execute(self) -> eyre::Result<()> { + default_tracing()?; + build( + self.dataflow, + self.coordinator_addr, + self.coordinator_port, + self.uv, + self.local, + ) + } +} + +pub fn build( + dataflow: String, + coordinator_addr: Option, + coordinator_port: Option, + uv: bool, + force_local: bool, +) -> eyre::Result<()> { + let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let dataflow_descriptor = + Descriptor::blocking_read(&dataflow_path).wrap_err("Failed to read yaml dataflow")?; + let mut dataflow_session = + DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?; + + let mut git_sources = BTreeMap::new(); + let resolved_nodes = dataflow_descriptor + .resolve_aliases_and_set_defaults() + .context("failed to resolve nodes")?; + for (node_id, node) in resolved_nodes { + if let CoreNodeKind::Custom(CustomNode { + source: NodeSource::GitBranch { repo, rev }, + .. + }) = node.kind + { + let source = git::fetch_commit_hash(repo, rev) + .with_context(|| format!("failed to find commit hash for `{node_id}`"))?; + git_sources.insert(node_id, source); + } + } + + let session = || connect_to_coordinator_with_defaults(coordinator_addr, coordinator_port); + + let build_kind = if force_local { + log::info!("Building locally, as requested through `--force-local`"); + BuildKind::Local + } else if dataflow_descriptor.nodes.iter().all(|n| n.deploy.is_none()) { + log::info!("Building locally because dataflow does not contain any `deploy` sections"); + BuildKind::Local + } else if coordinator_addr.is_some() || coordinator_port.is_some() { + log::info!("Building through coordinator, using the given coordinator socket information"); + // explicit coordinator address or port set -> there should be a coordinator running + BuildKind::ThroughCoordinator { + coordinator_session: session().context("failed to connect to coordinator")?, + } + } else { + match session() { + Ok(coordinator_session) => { + // we found a local coordinator instance at default port -> use it for building + log::info!("Found local dora coordinator instance -> building through coordinator"); + BuildKind::ThroughCoordinator { + coordinator_session, + } + } + Err(_) => { + log::warn!("No dora coordinator instance found -> trying a local build"); + // no coordinator instance found -> do a local build + BuildKind::Local + } + } + }; + + match build_kind { + BuildKind::Local => { + log::info!("running local build"); + // use dataflow dir as base working dir + let local_working_dir = dunce::canonicalize(&dataflow_path) + .context("failed to canonicalize dataflow path")? + .parent() + .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? + .to_owned(); + let build_info = build_dataflow_locally( + dataflow_descriptor, + &git_sources, + &dataflow_session, + local_working_dir, + uv, + )?; + + dataflow_session.git_sources = git_sources; + // generate a random BuildId and store the associated build info + dataflow_session.build_id = Some(BuildId::generate()); + dataflow_session.local_build = Some(build_info); + dataflow_session + .write_out_for_dataflow(&dataflow_path) + .context("failed to write out dataflow session file")?; + } + BuildKind::ThroughCoordinator { + mut coordinator_session, + } => { + let local_working_dir = local_working_dir( + &dataflow_path, + &dataflow_descriptor, + &mut *coordinator_session, + )?; + let build_id = build_distributed_dataflow( + &mut *coordinator_session, + dataflow_descriptor, + &git_sources, + &dataflow_session, + local_working_dir, + uv, + )?; + + dataflow_session.git_sources = git_sources; + dataflow_session + .write_out_for_dataflow(&dataflow_path) + .context("failed to write out dataflow session file")?; + + // wait until dataflow build is finished + + wait_until_dataflow_built( + build_id, + &mut *coordinator_session, + coordinator_socket(coordinator_addr, coordinator_port), + log::LevelFilter::Info, + )?; + + dataflow_session.build_id = Some(build_id); + dataflow_session.local_build = None; + dataflow_session + .write_out_for_dataflow(&dataflow_path) + .context("failed to write out dataflow session file")?; + } + }; + + Ok(()) +} + +enum BuildKind { + Local, + ThroughCoordinator { + coordinator_session: Box, + }, +} + +fn connect_to_coordinator_with_defaults( + coordinator_addr: Option, + coordinator_port: Option, +) -> std::io::Result> { + let coordinator_socket = coordinator_socket(coordinator_addr, coordinator_port); + connect_to_coordinator(coordinator_socket) +} + +fn coordinator_socket( + coordinator_addr: Option, + coordinator_port: Option, +) -> std::net::SocketAddr { + let coordinator_addr = coordinator_addr.unwrap_or(LOCALHOST); + let coordinator_port = coordinator_port.unwrap_or(DORA_COORDINATOR_PORT_CONTROL_DEFAULT); + (coordinator_addr, coordinator_port).into() +}