| @@ -1,134 +0,0 @@ | |||
| use super::{default_tracing, Executable}; | |||
| use crate::common::resolve_dataflow; | |||
| use dora_core::{ | |||
| config::OperatorId, | |||
| descriptor::{Descriptor, DescriptorExt, NodeExt, SINGLE_OPERATOR_DEFAULT_ID}, | |||
| }; | |||
| use dora_message::descriptor::EnvValue; | |||
| use eyre::{eyre, Context}; | |||
| use std::{collections::BTreeMap, path::Path, process::Command}; | |||
| #[derive(Debug, clap::Args)] | |||
| /// Run build commands provided in the given dataflow. | |||
| pub struct Build { | |||
| /// Path to the dataflow descriptor file | |||
| #[clap(value_name = "PATH")] | |||
| dataflow: String, | |||
| // Use UV to build nodes. | |||
| #[clap(long, action)] | |||
| uv: bool, | |||
| } | |||
| impl Executable for Build { | |||
| fn execute(self) -> eyre::Result<()> { | |||
| default_tracing()?; | |||
| build(self.dataflow, self.uv) | |||
| } | |||
| } | |||
| pub fn build(dataflow: String, uv: bool) -> eyre::Result<()> { | |||
| let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; | |||
| let descriptor = Descriptor::blocking_read(&dataflow)?; | |||
| let dataflow_absolute = if dataflow.is_relative() { | |||
| std::env::current_dir().unwrap().join(dataflow) | |||
| } else { | |||
| dataflow.to_owned() | |||
| }; | |||
| let working_dir = dataflow_absolute.parent().unwrap(); | |||
| let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string()); | |||
| for node in descriptor.nodes { | |||
| match node.kind()? { | |||
| dora_core::descriptor::NodeKind::Standard(_) => { | |||
| run_build_command(node.build.as_deref(), working_dir, uv, node.env.clone()) | |||
| .with_context(|| { | |||
| format!("build command failed for standard node `{}`", node.id) | |||
| })? | |||
| } | |||
| dora_core::descriptor::NodeKind::Runtime(runtime_node) => { | |||
| for operator in &runtime_node.operators { | |||
| run_build_command( | |||
| operator.config.build.as_deref(), | |||
| working_dir, | |||
| uv, | |||
| node.env.clone(), | |||
| ) | |||
| .with_context(|| { | |||
| format!( | |||
| "build command failed for operator `{}/{}`", | |||
| node.id, operator.id | |||
| ) | |||
| })?; | |||
| } | |||
| } | |||
| dora_core::descriptor::NodeKind::Custom(custom_node) => run_build_command( | |||
| custom_node.build.as_deref(), | |||
| working_dir, | |||
| uv, | |||
| node.env.clone(), | |||
| ) | |||
| .with_context(|| format!("build command failed for custom node `{}`", node.id))?, | |||
| dora_core::descriptor::NodeKind::Operator(operator) => run_build_command( | |||
| operator.config.build.as_deref(), | |||
| working_dir, | |||
| uv, | |||
| node.env.clone(), | |||
| ) | |||
| .with_context(|| { | |||
| format!( | |||
| "build command failed for operator `{}/{}`", | |||
| node.id, | |||
| operator.id.as_ref().unwrap_or(&default_op_id) | |||
| ) | |||
| })?, | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| fn run_build_command( | |||
| build: Option<&str>, | |||
| working_dir: &Path, | |||
| uv: bool, | |||
| envs: Option<BTreeMap<String, EnvValue>>, | |||
| ) -> eyre::Result<()> { | |||
| if let Some(build) = build { | |||
| let lines = build.lines().collect::<Vec<_>>(); | |||
| for build_line in lines { | |||
| let mut split = build_line.split_whitespace(); | |||
| let program = split | |||
| .next() | |||
| .ok_or_else(|| eyre!("build command is empty"))?; | |||
| let mut cmd = if uv && (program == "pip" || program == "pip3") { | |||
| let mut cmd = Command::new("uv"); | |||
| cmd.arg("pip"); | |||
| cmd | |||
| } else { | |||
| Command::new(program) | |||
| }; | |||
| cmd.args(split); | |||
| // Inject Environment Variables | |||
| if let Some(envs) = envs.clone() { | |||
| for (key, value) in envs { | |||
| let value = value.to_string(); | |||
| cmd.env(key, value); | |||
| } | |||
| } | |||
| cmd.current_dir(working_dir); | |||
| let exit_status = cmd | |||
| .status() | |||
| .wrap_err_with(|| format!("failed to run `{}`", build))?; | |||
| if !exit_status.success() { | |||
| return Err(eyre!("build command `{build_line}` returned {exit_status}")); | |||
| } | |||
| } | |||
| Ok(()) | |||
| } else { | |||
| Ok(()) | |||
| } | |||
| } | |||
| @@ -0,0 +1,107 @@ | |||
| use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; | |||
| use dora_core::descriptor::Descriptor; | |||
| use dora_message::{ | |||
| cli_to_coordinator::ControlRequest, | |||
| common::{GitSource, LogMessage}, | |||
| coordinator_to_cli::ControlRequestReply, | |||
| id::NodeId, | |||
| BuildId, | |||
| }; | |||
| use eyre::{bail, Context}; | |||
| use std::{ | |||
| collections::BTreeMap, | |||
| net::{SocketAddr, TcpStream}, | |||
| }; | |||
| use crate::{output::print_log_message, session::DataflowSession}; | |||
| pub fn build_distributed_dataflow( | |||
| session: &mut TcpRequestReplyConnection, | |||
| dataflow: Descriptor, | |||
| git_sources: &BTreeMap<NodeId, GitSource>, | |||
| dataflow_session: &DataflowSession, | |||
| local_working_dir: Option<std::path::PathBuf>, | |||
| uv: bool, | |||
| ) -> eyre::Result<BuildId> { | |||
| let build_id = { | |||
| let reply_raw = session | |||
| .request( | |||
| &serde_json::to_vec(&ControlRequest::Build { | |||
| session_id: dataflow_session.session_id, | |||
| dataflow, | |||
| git_sources: git_sources.clone(), | |||
| prev_git_sources: dataflow_session.git_sources.clone(), | |||
| local_working_dir, | |||
| uv, | |||
| }) | |||
| .unwrap(), | |||
| ) | |||
| .wrap_err("failed to send start dataflow message")?; | |||
| let result: ControlRequestReply = | |||
| serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; | |||
| match result { | |||
| ControlRequestReply::DataflowBuildTriggered { build_id } => { | |||
| eprintln!("dataflow build triggered: {build_id}"); | |||
| build_id | |||
| } | |||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||
| other => bail!("unexpected start dataflow reply: {other:?}"), | |||
| } | |||
| }; | |||
| Ok(build_id) | |||
| } | |||
| pub fn wait_until_dataflow_built( | |||
| build_id: BuildId, | |||
| session: &mut TcpRequestReplyConnection, | |||
| coordinator_socket: SocketAddr, | |||
| log_level: log::LevelFilter, | |||
| ) -> eyre::Result<BuildId> { | |||
| // subscribe to log messages | |||
| let mut log_session = TcpConnection { | |||
| stream: TcpStream::connect(coordinator_socket) | |||
| .wrap_err("failed to connect to dora coordinator")?, | |||
| }; | |||
| log_session | |||
| .send( | |||
| &serde_json::to_vec(&ControlRequest::BuildLogSubscribe { | |||
| build_id, | |||
| level: log_level, | |||
| }) | |||
| .wrap_err("failed to serialize message")?, | |||
| ) | |||
| .wrap_err("failed to send build log subscribe request to coordinator")?; | |||
| std::thread::spawn(move || { | |||
| while let Ok(raw) = log_session.receive() { | |||
| let parsed: eyre::Result<LogMessage> = | |||
| serde_json::from_slice(&raw).context("failed to parse log message"); | |||
| match parsed { | |||
| Ok(log_message) => { | |||
| print_log_message(log_message, false, true); | |||
| } | |||
| Err(err) => { | |||
| tracing::warn!("failed to parse log message: {err:?}") | |||
| } | |||
| } | |||
| } | |||
| }); | |||
| let reply_raw = session | |||
| .request(&serde_json::to_vec(&ControlRequest::WaitForBuild { build_id }).unwrap()) | |||
| .wrap_err("failed to send WaitForBuild message")?; | |||
| let result: ControlRequestReply = | |||
| serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; | |||
| match result { | |||
| ControlRequestReply::DataflowBuildFinished { build_id, result } => match result { | |||
| Ok(()) => { | |||
| eprintln!("dataflow build finished successfully"); | |||
| Ok(build_id) | |||
| } | |||
| Err(err) => bail!("{err}"), | |||
| }, | |||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||
| other => bail!("unexpected start dataflow reply: {other:?}"), | |||
| } | |||
| } | |||
| @@ -0,0 +1,45 @@ | |||
| use dora_message::{common::GitSource, descriptor::GitRepoRev}; | |||
| use eyre::Context; | |||
| pub fn fetch_commit_hash(repo_url: String, rev: Option<GitRepoRev>) -> eyre::Result<GitSource> { | |||
| let mut remote = git2::Remote::create_detached(repo_url.as_bytes()) | |||
| .with_context(|| format!("failed to create git remote for {repo_url}"))?; | |||
| let connection = remote | |||
| .connect_auth(git2::Direction::Fetch, None, None) | |||
| .with_context(|| format!("failed to open connection to {repo_url}"))?; | |||
| let references = connection | |||
| .list() | |||
| .with_context(|| format!("failed to list git references of {repo_url}"))?; | |||
| let expected_name = match &rev { | |||
| Some(GitRepoRev::Branch(branch)) => format!("refs/heads/{branch}"), | |||
| Some(GitRepoRev::Tag(tag)) => format!("refs/tags/{tag}"), | |||
| Some(GitRepoRev::Rev(rev)) => rev.clone(), | |||
| None => "HEAD".into(), | |||
| }; | |||
| let mut commit_hash = None; | |||
| for head in references { | |||
| if head.name() == expected_name { | |||
| commit_hash = Some(head.oid().to_string()); | |||
| break; | |||
| } | |||
| } | |||
| if commit_hash.is_none() { | |||
| if let Some(GitRepoRev::Rev(rev)) = &rev { | |||
| // rev might be a commit hash instead of a reference | |||
| if rev.is_ascii() && rev.bytes().all(|b| b.is_ascii_alphanumeric()) { | |||
| commit_hash = Some(rev.clone()); | |||
| } | |||
| } | |||
| } | |||
| match commit_hash { | |||
| Some(commit_hash) => Ok(GitSource { | |||
| repo: repo_url, | |||
| commit_hash, | |||
| }), | |||
| None => eyre::bail!("no matching commit for `{rev:?}`"), | |||
| } | |||
| } | |||
| @@ -0,0 +1,121 @@ | |||
| use std::{collections::BTreeMap, path::PathBuf}; | |||
| use colored::Colorize; | |||
| use dora_core::{ | |||
| build::{BuildInfo, BuildLogger, Builder, GitManager, LogLevelOrStdout, PrevGitSource}, | |||
| descriptor::{Descriptor, DescriptorExt}, | |||
| }; | |||
| use dora_message::{common::GitSource, id::NodeId}; | |||
| use eyre::Context; | |||
| use crate::session::DataflowSession; | |||
| pub fn build_dataflow_locally( | |||
| dataflow: Descriptor, | |||
| git_sources: &BTreeMap<NodeId, GitSource>, | |||
| dataflow_session: &DataflowSession, | |||
| working_dir: PathBuf, | |||
| uv: bool, | |||
| ) -> eyre::Result<BuildInfo> { | |||
| let runtime = tokio::runtime::Runtime::new()?; | |||
| runtime.block_on(build_dataflow( | |||
| dataflow, | |||
| git_sources, | |||
| dataflow_session, | |||
| working_dir, | |||
| uv, | |||
| )) | |||
| } | |||
| async fn build_dataflow( | |||
| dataflow: Descriptor, | |||
| git_sources: &BTreeMap<NodeId, GitSource>, | |||
| dataflow_session: &DataflowSession, | |||
| base_working_dir: PathBuf, | |||
| uv: bool, | |||
| ) -> eyre::Result<BuildInfo> { | |||
| let builder = Builder { | |||
| session_id: dataflow_session.session_id, | |||
| base_working_dir, | |||
| uv, | |||
| }; | |||
| let nodes = dataflow.resolve_aliases_and_set_defaults()?; | |||
| let mut git_manager = GitManager::default(); | |||
| let prev_git_sources = &dataflow_session.git_sources; | |||
| let mut tasks = Vec::new(); | |||
| // build nodes | |||
| for node in nodes.into_values() { | |||
| let node_id = node.id.clone(); | |||
| let git_source = git_sources.get(&node_id).cloned(); | |||
| let prev_git_source = prev_git_sources.get(&node_id).cloned(); | |||
| let prev_git = prev_git_source.map(|prev_source| PrevGitSource { | |||
| still_needed_for_this_build: git_sources.values().any(|s| s == &prev_source), | |||
| git_source: prev_source, | |||
| }); | |||
| let task = builder | |||
| .clone() | |||
| .build_node( | |||
| node, | |||
| git_source, | |||
| prev_git, | |||
| LocalBuildLogger { | |||
| node_id: node_id.clone(), | |||
| }, | |||
| &mut git_manager, | |||
| ) | |||
| .await | |||
| .wrap_err_with(|| format!("failed to build node `{node_id}`"))?; | |||
| tasks.push((node_id, task)); | |||
| } | |||
| let mut info = BuildInfo { | |||
| node_working_dirs: Default::default(), | |||
| }; | |||
| for (node_id, task) in tasks { | |||
| let node = task | |||
| .await | |||
| .with_context(|| format!("failed to build node `{node_id}`"))?; | |||
| info.node_working_dirs | |||
| .insert(node_id, node.node_working_dir); | |||
| } | |||
| Ok(info) | |||
| } | |||
| struct LocalBuildLogger { | |||
| node_id: NodeId, | |||
| } | |||
| impl BuildLogger for LocalBuildLogger { | |||
| type Clone = Self; | |||
| async fn log_message( | |||
| &mut self, | |||
| level: impl Into<LogLevelOrStdout> + Send, | |||
| message: impl Into<String> + Send, | |||
| ) { | |||
| let level = match level.into() { | |||
| LogLevelOrStdout::LogLevel(level) => match level { | |||
| log::Level::Error => "ERROR ".red(), | |||
| log::Level::Warn => "WARN ".yellow(), | |||
| log::Level::Info => "INFO ".green(), | |||
| log::Level::Debug => "DEBUG ".bright_blue(), | |||
| log::Level::Trace => "TRACE ".dimmed(), | |||
| }, | |||
| LogLevelOrStdout::Stdout => "stdout".italic().dimmed(), | |||
| }; | |||
| let node = self.node_id.to_string().bold().bright_black(); | |||
| let message: String = message.into(); | |||
| println!("{node}: {level} {message}"); | |||
| } | |||
| async fn try_clone(&self) -> eyre::Result<Self::Clone> { | |||
| Ok(LocalBuildLogger { | |||
| node_id: self.node_id.clone(), | |||
| }) | |||
| } | |||
| } | |||
| @@ -0,0 +1,205 @@ | |||
| use communication_layer_request_reply::TcpRequestReplyConnection; | |||
| use dora_core::{ | |||
| descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt}, | |||
| topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}, | |||
| }; | |||
| use dora_message::{descriptor::NodeSource, BuildId}; | |||
| use eyre::Context; | |||
| use std::{collections::BTreeMap, net::IpAddr}; | |||
| use super::{default_tracing, Executable}; | |||
| use crate::{ | |||
| common::{connect_to_coordinator, local_working_dir, resolve_dataflow}, | |||
| session::DataflowSession, | |||
| }; | |||
| use distributed::{build_distributed_dataflow, wait_until_dataflow_built}; | |||
| use local::build_dataflow_locally; | |||
| mod distributed; | |||
| mod git; | |||
| mod local; | |||
| #[derive(Debug, clap::Args)] | |||
| /// Run build commands provided in the given dataflow. | |||
| pub struct Build { | |||
| /// Path to the dataflow descriptor file | |||
| #[clap(value_name = "PATH")] | |||
| dataflow: String, | |||
| /// Address of the dora coordinator | |||
| #[clap(long, value_name = "IP")] | |||
| coordinator_addr: Option<IpAddr>, | |||
| /// Port number of the coordinator control server | |||
| #[clap(long, value_name = "PORT")] | |||
| coordinator_port: Option<u16>, | |||
| // Use UV to build nodes. | |||
| #[clap(long, action)] | |||
| uv: bool, | |||
| // Run build on local machine | |||
| #[clap(long, action)] | |||
| local: bool, | |||
| } | |||
| impl Executable for Build { | |||
| fn execute(self) -> eyre::Result<()> { | |||
| default_tracing()?; | |||
| build( | |||
| self.dataflow, | |||
| self.coordinator_addr, | |||
| self.coordinator_port, | |||
| self.uv, | |||
| self.local, | |||
| ) | |||
| } | |||
| } | |||
| pub fn build( | |||
| dataflow: String, | |||
| coordinator_addr: Option<IpAddr>, | |||
| coordinator_port: Option<u16>, | |||
| uv: bool, | |||
| force_local: bool, | |||
| ) -> eyre::Result<()> { | |||
| let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; | |||
| let dataflow_descriptor = | |||
| Descriptor::blocking_read(&dataflow_path).wrap_err("Failed to read yaml dataflow")?; | |||
| let mut dataflow_session = | |||
| DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?; | |||
| let mut git_sources = BTreeMap::new(); | |||
| let resolved_nodes = dataflow_descriptor | |||
| .resolve_aliases_and_set_defaults() | |||
| .context("failed to resolve nodes")?; | |||
| for (node_id, node) in resolved_nodes { | |||
| if let CoreNodeKind::Custom(CustomNode { | |||
| source: NodeSource::GitBranch { repo, rev }, | |||
| .. | |||
| }) = node.kind | |||
| { | |||
| let source = git::fetch_commit_hash(repo, rev) | |||
| .with_context(|| format!("failed to find commit hash for `{node_id}`"))?; | |||
| git_sources.insert(node_id, source); | |||
| } | |||
| } | |||
| let session = || connect_to_coordinator_with_defaults(coordinator_addr, coordinator_port); | |||
| let build_kind = if force_local { | |||
| log::info!("Building locally, as requested through `--force-local`"); | |||
| BuildKind::Local | |||
| } else if dataflow_descriptor.nodes.iter().all(|n| n.deploy.is_none()) { | |||
| log::info!("Building locally because dataflow does not contain any `deploy` sections"); | |||
| BuildKind::Local | |||
| } else if coordinator_addr.is_some() || coordinator_port.is_some() { | |||
| log::info!("Building through coordinator, using the given coordinator socket information"); | |||
| // explicit coordinator address or port set -> there should be a coordinator running | |||
| BuildKind::ThroughCoordinator { | |||
| coordinator_session: session().context("failed to connect to coordinator")?, | |||
| } | |||
| } else { | |||
| match session() { | |||
| Ok(coordinator_session) => { | |||
| // we found a local coordinator instance at default port -> use it for building | |||
| log::info!("Found local dora coordinator instance -> building through coordinator"); | |||
| BuildKind::ThroughCoordinator { | |||
| coordinator_session, | |||
| } | |||
| } | |||
| Err(_) => { | |||
| log::warn!("No dora coordinator instance found -> trying a local build"); | |||
| // no coordinator instance found -> do a local build | |||
| BuildKind::Local | |||
| } | |||
| } | |||
| }; | |||
| match build_kind { | |||
| BuildKind::Local => { | |||
| log::info!("running local build"); | |||
| // use dataflow dir as base working dir | |||
| let local_working_dir = dunce::canonicalize(&dataflow_path) | |||
| .context("failed to canonicalize dataflow path")? | |||
| .parent() | |||
| .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? | |||
| .to_owned(); | |||
| let build_info = build_dataflow_locally( | |||
| dataflow_descriptor, | |||
| &git_sources, | |||
| &dataflow_session, | |||
| local_working_dir, | |||
| uv, | |||
| )?; | |||
| dataflow_session.git_sources = git_sources; | |||
| // generate a random BuildId and store the associated build info | |||
| dataflow_session.build_id = Some(BuildId::generate()); | |||
| dataflow_session.local_build = Some(build_info); | |||
| dataflow_session | |||
| .write_out_for_dataflow(&dataflow_path) | |||
| .context("failed to write out dataflow session file")?; | |||
| } | |||
| BuildKind::ThroughCoordinator { | |||
| mut coordinator_session, | |||
| } => { | |||
| let local_working_dir = local_working_dir( | |||
| &dataflow_path, | |||
| &dataflow_descriptor, | |||
| &mut *coordinator_session, | |||
| )?; | |||
| let build_id = build_distributed_dataflow( | |||
| &mut *coordinator_session, | |||
| dataflow_descriptor, | |||
| &git_sources, | |||
| &dataflow_session, | |||
| local_working_dir, | |||
| uv, | |||
| )?; | |||
| dataflow_session.git_sources = git_sources; | |||
| dataflow_session | |||
| .write_out_for_dataflow(&dataflow_path) | |||
| .context("failed to write out dataflow session file")?; | |||
| // wait until dataflow build is finished | |||
| wait_until_dataflow_built( | |||
| build_id, | |||
| &mut *coordinator_session, | |||
| coordinator_socket(coordinator_addr, coordinator_port), | |||
| log::LevelFilter::Info, | |||
| )?; | |||
| dataflow_session.build_id = Some(build_id); | |||
| dataflow_session.local_build = None; | |||
| dataflow_session | |||
| .write_out_for_dataflow(&dataflow_path) | |||
| .context("failed to write out dataflow session file")?; | |||
| } | |||
| }; | |||
| Ok(()) | |||
| } | |||
| enum BuildKind { | |||
| Local, | |||
| ThroughCoordinator { | |||
| coordinator_session: Box<TcpRequestReplyConnection>, | |||
| }, | |||
| } | |||
| fn connect_to_coordinator_with_defaults( | |||
| coordinator_addr: Option<std::net::IpAddr>, | |||
| coordinator_port: Option<u16>, | |||
| ) -> std::io::Result<Box<TcpRequestReplyConnection>> { | |||
| let coordinator_socket = coordinator_socket(coordinator_addr, coordinator_port); | |||
| connect_to_coordinator(coordinator_socket) | |||
| } | |||
| fn coordinator_socket( | |||
| coordinator_addr: Option<std::net::IpAddr>, | |||
| coordinator_port: Option<u16>, | |||
| ) -> std::net::SocketAddr { | |||
| let coordinator_addr = coordinator_addr.unwrap_or(LOCALHOST); | |||
| let coordinator_port = coordinator_port.unwrap_or(DORA_COORDINATOR_PORT_CONTROL_DEFAULT); | |||
| (coordinator_addr, coordinator_port).into() | |||
| } | |||