|
- #!/usr/bin/env python3
- # -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
- # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
- #
- # THIS FILE IS PART OF adtools PROJECT
- #
- # DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- # Version 2, December 2004
- #
- # Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
- #
- # Everyone is permitted to copy and distribute verbatim or modified
- # copies of this license document, and changing it is allowed as long
- # as the name is changed.
- #
- # DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- # TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
- #
- # 0. You just DO WHAT THE FUCK YOU WANT TO.
- #
- # Copyright (C) 2021- donkey <anjingyu_ws@foxmail.com>
-
- import os
- import time
- import json
- import subprocess
- import platform
- import logging
- import multiprocessing
- import re
- import tempfile
- import traceback
- import locale
- import shutil
- import tarfile
- import random
- from contextlib import contextmanager
-
- import crayons
-
- import requests
- from requests.auth import HTTPBasicAuth
-
-
- __all__ = ["AdUtil", "AdGitHelper", "AdMakeHelper", "WindowsToolchainHelper"]
- __author__ = ['"donkey" <anjingyu_ws@foxmail.com>']
- SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
-
-
- class AdtoolsLogger(logging.Logger):
- def debug(self, msg, *args, **kwargs):
- super(AdtoolsLogger, self).debug(crayons.cyan(msg), *args, **kwargs)
-
- def info(self, msg, *args, **kwargs):
- super(AdtoolsLogger, self).info(crayons.green(msg), *args, **kwargs)
-
- def error(self, msg, *args, **kwargs):
- super(AdtoolsLogger, self).error(crayons.red(msg), *args, **kwargs)
-
- def warning(self, msg, *args, **kwargs):
- super(AdtoolsLogger, self).warning(crayons.yellow(msg), *args, **kwargs)
-
- def critical(self, msg, *args, **kwargs):
- super(AdtoolsLogger, self).critical(crayons.red(msg), *args, **kwargs)
-
-
- class Logger:
- DEFAULT_NAME = "adtools"
- PLAIN_FORMAT_STRING = "[{asctime}][{levelname:1.1s}] {message}"
- DEFAULT_LEVEL = logging.DEBUG
-
- __instance = None
-
- @staticmethod
- def instance():
- """Static access method."""
- if Logger.__instance is None:
- Logger()
- return Logger.__instance
-
- def __init__(self):
- """Virtually private constructor."""
- if Logger.__instance is not None:
- raise RuntimeError("This class is a singleton!")
- else:
- Logger.__instance = self
- logging.setLoggerClass(AdtoolsLogger)
- # Disable all the log information that level < DEBUG
- logging.disable(Logger.DEFAULT_LEVEL)
- self.__logger = logging.getLogger(Logger.DEFAULT_NAME)
- self.__console = logging.StreamHandler()
- formatter = logging.Formatter(Logger.PLAIN_FORMAT_STRING, style="{")
- self.__console.setFormatter(formatter)
- self.__console.setLevel(Logger.DEFAULT_LEVEL)
- self.__logger.addHandler(self.__console)
- self.__logger.setLevel(Logger.DEFAULT_LEVEL)
-
- def enable(self):
- logging.disable(logging.NOTSET)
-
- def disable(self):
- logging.disable(logging.CRITICAL)
-
- @property
- def logger(self):
- return self.__logger
-
-
- Logger.instance()
- D = Logger.instance().logger.debug
- I = Logger.instance().logger.info
- E = Logger.instance().logger.error
- W = Logger.instance().logger.warning
- C = Logger.instance().logger.critical
-
-
- class AdUtil:
- SYSTEM = platform.system()
- DEFAULT_LOCALE = locale.getdefaultlocale()
-
- @staticmethod
- def run_command(cmd, need_output=False) -> tuple:
- if not cmd:
- raise RuntimeError("Command is invalid!")
-
- if isinstance(cmd, list):
- cmd = " ".join(cmd)
-
- D(f"Running Command: {cmd}")
-
- returncode = 0
-
- if need_output:
- try:
- result = subprocess.check_output(
- args=cmd, shell=True, stderr=subprocess.STDOUT
- )
- try:
- result = result.decode(AdUtil.DEFAULT_LOCALE[1])
- except Exception:
- result = result.decode("utf8")
- except subprocess.CalledProcessError as e:
- try:
- result = e.output.decode(AdUtil.DEFAULT_LOCALE[1])
- except Exception:
- result = e.output.decode("utf8")
- returncode = e.returncode
- return returncode, result
- else:
- returncode = subprocess.call(args=cmd, shell=True)
- return returncode, ""
-
- @staticmethod
- def remove(path: str) -> bool:
- if not os.path.islink(path) and not os.path.exists(path):
- W(f"[{path}] is not existent!")
- return False
- elif os.path.islink(path):
- try:
- os.unlink(path)
- except Exception as e:
- E(str(e))
- return False
- elif os.path.isdir(path):
- try:
- shutil.rmtree(path)
- except Exception as e:
- E(str(e))
- return False
- elif os.path.isfile(path):
- try:
- os.remove(path)
- except Exception as e:
- E(str(e))
- return False
- else:
- raise RuntimeError(
- f"[{path}] is not a directory or a regular file or a link!"
- )
- return False
- return True
-
- @staticmethod
- def mkdirs(dirpath: str) -> bool:
- dirpath = os.path.abspath(dirpath)
- if not os.path.exists(dirpath):
- try:
- os.makedirs(dirpath)
- except Exception:
- return False
- return True
-
- @staticmethod
- def mklink(src, target) -> bool:
- # NOTE(anjingyu): We will always create a link to absolute path by default
- if not os.path.exists(src):
- W(f"The source path<{src}> is not existent!")
- return False
-
- src_path = os.path.abspath(src)
-
- return os.symlink(src_path, target)
-
- @staticmethod
- def command_exists(cmd: str) -> bool:
- if AdUtil.SYSTEM == "Windows":
- ret, _ = AdUtil.run_command(f"where {cmd} 1>nul 2>&1")
- else:
- ret, _ = AdUtil.run_command(f"command -v {cmd} 1>/dev/null 2>&1")
-
- return ret == 0
-
- @staticmethod
- def random_str(expected_len: int = 8) -> str:
- candidate = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
- return "".join(random.choices(candidate, k=expected_len))
-
- @staticmethod
- def request_json(url, **kargw) -> tuple:
- kargw["url"] = url
- if "method" not in kargw:
- kargw["method"] = "GET"
-
- succ = True
- ret = None
-
- try:
- resp: requests.Response = requests.request(**kargw)
-
- if resp.ok:
- ret = resp.json()
- else:
- succ = False
- except Exception:
- traceback.print_exc()
- succ = False
-
- return succ, ret
-
- @staticmethod
- @contextmanager
- def chdir(dir):
- cwd = os.path.abspath(os.getcwd())
- os.chdir(dir)
- yield
- os.chdir(cwd)
-
- @staticmethod
- @contextmanager
- def time_consumed():
- start = time.time()
- yield
- D("Time Consumed: {:.2f} ms".format((time.time() - start) * 1000))
-
- @staticmethod
- def arch() -> str:
- """Guess the architecture of current platform."""
- plt = "unknown"
- if AdUtil.SYSTEM == "Windows":
- # Only 64bit Windows has this environment variable
- # - AMD64
- # - IA64
- # - ARM64
- # - EM64T
- arch = os.getenv("PROCESSOR_ARCHITECTURE", "").upper()
- if (
- os.getenv("PROCESSOR_ARCHITEW6432", "")
- or arch == "AMD64"
- or arch == "IA64"
- or arch == "EM64T" # for Windows-XP 64
- ):
- plt = "x64"
- elif arch == "ARM64":
- plt = "arm64"
- elif arch == "X86":
- plt = "x86"
- else:
- plt = "x86"
- else: # Linux or Darwin
- _arch = AdUtil.run_command("uname -m", True)[1].strip()
-
- if _arch.startswith("arm") and _arch != "arm64":
- plt = "armhf"
- elif _arch == "arm64" or _arch == "aarch64":
- plt = "arm64"
- elif _arch == "riscv64":
- plt = "riscv64"
- elif _arch in ["x86_64", "amd64", "ia64"]:
- plt = "x64"
- else:
- plt = _arch
-
- return plt
-
- @classmethod
- def gen_list_header(cls, num: int):
- input_bit_num = len(str(num))
- idx = 0
-
- for _ in range(num):
- idx += 1
- if idx > num:
- yield ""
- else:
- yield "[{:0>{}d}/{}]".format(idx, input_bit_num, num)
-
- @classmethod
- def get_files(cls, dirpath, ext=None, recursively=True):
- """
- Get files with specified extension recursively.
- """
- if not ext:
- ext_list = None
- elif isinstance(ext, list):
- ext_list = ext
- else:
- ext_list = [ext]
-
- if os.path.isdir(dirpath):
- result_list = []
- for d in os.listdir(dirpath):
- tmp_dir = os.path.join(dirpath, d)
- # if `ext_list` is not None check the extension
- if ext_list:
- if os.path.isfile(tmp_dir):
- for e in ext_list:
- if d.endswith(e):
- result_list.append(tmp_dir)
- break
- elif recursively and os.path.isdir(tmp_dir):
- result_list.extend(cls.get_files(tmp_dir, ext_list))
- else:
- if os.path.isfile(tmp_dir):
- result_list.append(tmp_dir)
- elif recursively and os.path.isdir(tmp_dir):
- result_list.extend(cls.get_files(tmp_dir, ext_list))
- return result_list
- elif os.path.isfile(dirpath) and dirpath.endswith(ext):
- return [dirpath]
- else:
- return []
-
- @classmethod
- def pack_tar(cls, filepath, files, fc=lambda x: (x, x)):
- # Pack
- fpath = os.path.abspath(filepath)
- fdir = os.path.dirname(filepath)
-
- if not os.path.exists(fdir):
- os.makedirs(fdir)
-
- tarFile = tarfile.open(name=fpath, mode="w:gz")
-
- try:
- file_num = len(files)
-
- for f, head in zip(files, AdUtil.gen_list_header(file_num)):
- f, arcf = fc(f)
- tarFile.add(name=f, arcname=arcf)
- I(f"{head} -> {f}")
- I("Files packed.")
-
- tarFile.close()
- except Exception as e:
- os.unlink(fpath)
- E(str(e))
- raise e
-
- return fpath
-
- @classmethod
- def format_size(cls, size, fraction=2) -> str:
- units = ["B", "KB", "MB", "GB", "TB", "PB", "EB"]
- idx = 0
-
- while idx < len(units):
- new_size = size / 1024.0
-
- if new_size < 1.0:
- break
-
- size = new_size
- idx += 1
-
- return "{:.{}f} {}".format(size, fraction, units[idx])
-
- @classmethod
- def home(cls) -> str:
- if AdUtil.SYSTEM == "Windows":
- for env in ["USERPROFILE", "HOME", "~"]:
- home_dir = os.getenv(env, "")
- if home_dir and os.path.isdir(home_dir):
- return os.path.abspath(home_dir)
- else:
- return os.path.abspath(os.getenv("HOME", ""))
- return ""
-
- @classmethod
- def get_cpu_number(cls) -> int:
- return multiprocessing.cpu_count()
-
- @classmethod
- def get_recommend_cpu_number(cls) -> int:
- '''Get the recommended CPU core count for tasks,
- so that the tasks will never freeze the entire system'''
- default_job_count = AdUtil.get_cpu_number()
-
- if default_job_count > 8:
- default_job_count = 8
- elif default_job_count > 2:
- default_job_count = default_job_count - 2
- else:
- default_job_count = 1
-
- return default_job_count
-
- class NexusHelper:
- def __init__(self, host: str = "", auth_info: tuple = ()):
- self._host = host
-
- if not self._host:
- if os.getenv("NEXUS_HOST", None):
- self._host = os.getenv("NEXUS_HOST")
- else:
- raise Exception(
- "Missing Required Arguments: One of host and NEXUS_HOST must be set!"
- )
-
- self._host = self._host[:-1] if self._host.endswith("/") else self._host
-
- if auth_info and len(auth_info) == 2:
- self._auth_info = HTTPBasicAuth(auth_info[0], auth_info[1])
- # Load user and password from environment variables if they exist
- elif os.getenv("NEXUS_USERNAME", "") and os.getenv("NEXUS_PASSWORD", ""):
- self._auth_info = HTTPBasicAuth(
- os.getenv("NEXUS_USERNAME"), os.getenv("NEXUS_PASSWORD")
- )
- else:
- self._auth_info = None
- W("Please make sure the Nexus3 service can be accessed by anonymous users.")
-
- def fetch(self, repo, group, fname, fpath) -> bool:
- url = f"{self._host}/repository/{repo}/{group}/{fname}"
-
- r = requests.head(url, auth=self._auth_info)
- if r.status_code != 200:
- W(f"The file {fname} is not exiting!")
- return False
-
- r = requests.get(url, auth=self._auth_info, stream=True)
- fpath = os.path.abspath(fpath)
-
- if os.path.isdir(fpath):
- fpath = os.path.join(fpath, os.path.basename(url))
- else:
- dirname = os.path.dirname(fpath)
- if not os.path.isdir(dirname):
- os.makedirs(dirname)
-
- with open(fpath, "wb") as fd:
- chunk_size = 40960
- for chunk in r.iter_content(chunk_size=chunk_size):
- fd.write(chunk)
-
- if url.endswith(".tar.gz"):
- try:
- tarf = tarfile.open(name=fpath, mode="r:gz")
- tarf.extractall(path=os.path.dirname(fpath))
- tarf.close()
- except tarfile.ReadError:
- W(f"Unable to extract file: {fpath}!")
- return False
- else:
- W("Unsupported format, only .tar.gz is supported!")
- return False
-
- os.remove(fpath)
-
- return True
-
- def __put_one_file(
- self, repo, group, file_path, parent_dir_len, ignore_structure=False
- ):
- if ignore_structure:
- fname = os.path.basename(file_path)
- url = f"{self._host}/repository/{repo}/{group}/{fname}"
- else:
- fname = file_path[parent_dir_len:].replace("\\", "/")
- url = f"{self._host}/repository/{repo}/{group}/{fname}"
-
- try:
- r = requests.put(
- url,
- auth=self._auth_info,
- data=open(file_path, "rb").read(),
- )
-
- if not r.ok:
- E(f"HTTP Status Code: {r.status_code}")
-
- return r.ok
- except Exception:
- traceback.print_exc()
- return False
-
- def upload(self, repo, group, filepath):
- try:
- if os.path.isdir(filepath):
- one_dir = filepath
- parent_dir = (
- one_dir
- if one_dir[-1] in ["\\", "/"]
- else "".join([one_dir, os.path.sep])
- )
- parent_dir_len = len(parent_dir)
- for path, _, file_list in os.walk(one_dir):
- for file_name in file_list:
- if not self.__put_one_file(
- repo, group, os.path.join(path, file_name), parent_dir_len
- ):
- E("Failed to put file: {} to {}".format(file_name, repo))
- return False
- elif os.path.isfile(filepath):
- fp = os.path.abspath(filepath)
- if not self.__put_one_file(
- repo, group, fp, len(os.path.dirname(fp)) + 1
- ):
- E("Failed to put file: {} to {}".format(fp, repo))
- return False
- except Exception:
- traceback.print_exc()
- return False
-
- return True
-
-
- class AdGitHelper:
- def __init__(self, git=None, dry_run=False):
- self.__git = os.path.abspath(git) if git else "git"
- self.__dry_run = dry_run
-
- def command(self, cmd, *args, **kwargs):
- """Parse and run the git sub-command"""
- nargs = list()
- kargs = list()
- if kwargs:
- for key, val in kwargs.items():
- key_cvrt = key.strip().replace("_", "-")
- if key_cvrt.endswith("-") and key_cvrt != "--":
- if val:
- kargs.append("{}={}".format(key_cvrt[:-1], val))
- else:
- kargs.append(key_cvrt[:-1])
- else:
- if val:
- kargs.append("{} {}".format(key_cvrt, val))
- else:
- kargs.append(key_cvrt)
- nargs.extend(kargs)
- if args:
- nargs.extend(args)
- cmds = "{} {} {}".format(self.__git, cmd, " ".join(nargs) if nargs else "")
- cmds = cmds.strip()
- try:
- if not self.__dry_run:
- rc, rs = AdUtil.run_command(cmds, True)
- return (rc, rs)
- else:
- print("adgit: {}", crayons.cyan(cmds))
- return (0, "Dry running...")
- except Exception:
- return (-1, "Invalid command")
-
- def __getattr__(self, attr_str):
- def func(*list_params, **dict_params):
- cmd_list = ["init", "clone"]
- cmd = attr_str.replace("_", "-")
- if cmd not in cmd_list and not self.is_git_dir():
- return (-100, "Not a valid git repo")
- return self.command(cmd, *list_params, **dict_params)
-
- return func
-
- def __setattr__(self, s, v):
- self.__dict__[s] = v
-
- def is_git_dir(self, cur_dir="."):
- with AdUtil.chdir(cur_dir):
- code, _ = self.command("rev-parse", "--git-dir")
- if code == 0:
- return True
- return False
-
- def current_branch(self, cur_dir="."):
- with AdUtil.chdir(cur_dir):
- code, branch = self.command("rev-parse", "--abbrev-ref", "HEAD")
- if code != 0:
- W(f"Failed to run command: code<{code}>\n{branch}")
- return None
-
- return branch.strip()
-
- def head_log_info(self, cur_dir="."):
- with AdUtil.chdir(cur_dir):
- ver = "UNKNOWN"
- code, version_info = self.command(
- "log", "-1", '--format="Commit: %H\nAuthor: %aN <%aE>\nDateTime: %aD"'
- )
- if code != 0:
- W(f"Failed to get version information in <{cur_dir}>")
- else:
- ver = version_info
- return ver
-
- def url(self, cur_dir="."):
- with AdUtil.chdir(cur_dir):
- code, branches = self.remote(_v=None)
- if code != 0:
- E(f"Failed to get remote url: code<{code}>\n{branches}")
- return None
-
- for url_line in branches.split("\n"):
- url = url_line.split()[1].strip()
- return url
-
- def has_branch(self, branch_name, cur_dir="."):
- with AdUtil.chdir(cur_dir):
- code, branch = self.command(
- "rev-parse", "--verify", f"refs/heads/{branch_name}"
- )
- return code == 0
-
- def has_remote_branch(self, branch_name, repo="origin", cur_dir="."):
- with AdUtil.chdir(cur_dir):
- code, branch = self.command(
- "rev-parse", "--verify", f"refs/remotes/{repo}/{branch_name}"
- )
- return code == 0
-
- def get_version(self, branch_name="HEAD", cur_dir="."):
- with AdUtil.chdir(cur_dir):
- info = ""
- is_tag = False
-
- # Check whether current commit is a tag(or a list of tags)
- # Always retrieve the latest one.
- code, tags = self.tag("HEAD", __contains=None)
- if code == 0 and tags:
- tags = [t.strip() for t in tags.split()]
- tags.sort(reverse=True)
- info = tags[0]
- is_tag = True
-
- if not info:
- # Check whether current branch is a development branch, named as x.x.x
- code, rbranch = self.rev_parse("HEAD@{u}", __abbrev_ref=None)
- if code == 0:
- info = rbranch.split("/")[-1]
- else:
- # No upstream configured for current local branch,
- # so just use local branch name.
- info = self.current_branch()
-
- # If failed to get branch and tag, just statistics the count of commits.
- if not info:
- info = "0.0.0"
- # Get the description of version
- code, revision = self.rev_list(branch_name, __count=None)
- if code != 0:
- W(
- f"Failed to get revision from commit record in branch<{branch_name}>: {revision}"
- )
- else:
- info = f"{info}.{revision}"
-
- return info, is_tag
-
- def get_revision(self, branch_name="HEAD", cur_dir="."):
- """
- {
- "date": "Sun, 27 Sep 2020 23:50:17 +0800",
- "commit": "1f845d066b96181a7f7a8f6429086674f53d2c42",
- "short_commit": "1f845d0",
- "author": "author: author@email.com",
- "version": "3.18.2.88",
- "istag": true
- }
- """
- ver_obj = {}
- with AdUtil.chdir(cur_dir):
- info, is_tag = self.get_version(branch_name, cur_dir)
-
- code, version_info = self.command(
- "log",
- "-1",
- r'--format="{\"date\": \"%aD\", \"commit\": \"%H\", \"short_commit\": \"%h\", \"author\": \"%aN: %aE\", \"version\": \"\"}"',
- )
- if code != 0:
- W(f"Failed to get revision information in <{cur_dir}>")
- else:
- ver_obj = json.loads(version_info)
- ver_obj["version"] = info.strip()
- ver_obj["istag"] = is_tag
- return ver_obj
-
-
- class AdMakeHelper(object):
- """Parse the cmake script and retrieve all the direct and in-direct dependencies"""
-
- def __init__(self, cur_dir="."):
- self._dir = os.path.abspath(cur_dir)
- ws_dir = os.getenv("ADMAKE_WS_DIR", "")
- if ws_dir:
- self._ws_dir = ws_dir
- else:
- self._ws_dir = os.path.dirname(cur_dir)
-
- def modules(self):
- modules = list()
- third_modules = list()
- all_modules = list()
- deps_dict = dict()
- mods, third_mods = self._modules(self._dir)
- cur_mod = os.path.basename(self._dir)
-
- for mod in mods:
- if mod != cur_mod:
- deps_dict[mod] = False
- modules.append(mod)
-
- for mod in third_mods:
- deps_dict[mod] = True
- third_modules.append(mod)
-
- for mod in mods:
- all_modules.extend(self._deps(os.path.join(self._ws_dir, mod), deps_dict))
-
- for mod, third in all_modules:
- if third:
- third_modules.append(mod)
- elif mod != cur_mod:
- modules.append(mod)
-
- return list(set(modules)), list(set(third_modules))
-
- def _deps(self, cur_dir=".", deps_dict={}):
- mods, third_mods = self._modules(cur_dir)
- deps = list()
-
- for mod in mods:
- if mod not in deps_dict:
- deps_dict[mod] = False # non-thirdparty
- deps.append((mod, False))
- deps.extend(self._deps(os.path.join(self._ws_dir, mod), deps_dict))
-
- for mod in third_mods:
- if mod not in deps_dict:
- deps_dict[mod] = True # thridparty
- deps.append((mod, True))
- deps.extend(self._deps(os.path.join(self._ws_dir, mod), deps_dict))
-
- return deps
-
- def _modules(self, cur_dir="."):
- abspath = os.path.abspath(cur_dir)
- module_name = os.path.basename(abspath)
- cmake_file = os.path.join(abspath, "cmake", "{}.cmake".format(module_name))
- cmake_list_file = os.path.join(abspath, "CMakeLists.txt")
-
- mods = list()
- third_mods = list()
-
- if os.path.isfile(cmake_file):
- # If the cmake_file is existent, means this project maybe a library
- mods.extend(self._find_modules_in_file(cmake_file))
- third_mods.extend(self._find_3rd_modules_in_file(cmake_file))
-
- if os.path.isfile(cmake_list_file):
- mods.extend(self._find_modules_in_file(cmake_list_file))
- third_mods.extend(self._find_3rd_modules_in_file(cmake_list_file))
-
- return list(set(mods)), list(set(third_mods))
-
- def _find_3rd_modules_in_file(self, file_path):
- result = list()
- with open(file_path) as f:
- # Retrieve append_modules
- txt = f.read()
- mods = re.findall(r"append_3rd_modules\s*\((.*?)\)", txt, re.M | re.S)
- for mod in mods:
- for x in mod.split():
- if x in ["VERSIONS", "DEBUG"]:
- break
- result.append(x.strip())
- return result
-
- def _find_modules_in_file(self, file_path):
- result = list()
- with open(file_path) as f:
- # Retrieve append_modules
- txt = f.read()
- mods = re.findall(r"append_modules\s*\((.*?)\)", txt, re.M | re.S)
- for mod in mods:
- for x in mod.split():
- if x in ["RELEASE", "DIRECTORIES", "BRANCHES"]:
- break
- result.append(x.strip())
- result.extend(re.findall("admsg", txt))
- return result
-
-
- class WindowsToolchainHelper:
- def __init__(self):
- pass
-
- def check_tools(self, vsver: str = "") -> tuple:
- vsver = vsver.lower()
-
- if vsver in ["mingw", "gcc"]:
- if AdUtil.command_exists("gcc") and AdUtil.command_exists("g++"):
- return ("mingw", "MinGW Makefiles")
-
- return self.__check_msvc(vsver)
-
- def __parse_load_win32env(
- self, installation_path: str, required_keys: list = []
- ) -> bool:
- if installation_path.endswith(os.sep):
- installation_path = installation_path[:-1]
-
- os.environ["VSCMD_SKIP_SENDTELEMETRY"] = "yes"
- vcvarsall_bat = os.path.join(
- installation_path, "VC", "Auxiliary", "Build", "vcvarsall.bat"
- )
-
- code, output = AdUtil.run_command(
- [f'call "{vcvarsall_bat}"', "x64 >nul", "&&", "set"], True
- )
-
- if code != 0:
- return False
-
- if required_keys:
- required_keys = dict.fromkeys(required_keys, "")
-
- for line in output.splitlines():
- line = line.strip()
- k, v = line.split("=", 1)
-
- k = k.upper()
-
- if not required_keys or k in required_keys:
- os.environ[k.strip()] = v.strip()
-
- return True
-
- def __check_msvc(self, vsver: str = "") -> tuple:
- # MSVC
- program_files_x86 = os.getenv("PROGRAMFILES(X86)", "")
- vswhere = os.path.join(
- program_files_x86, "Microsoft Visual Studio", "Installer", "vswhere.exe"
- )
- if not program_files_x86 or not os.path.isfile(vswhere):
- E(
- "Can not find vswhere, admake can only support modern VS(version > 2015) and supported by vswhere"
- )
- return None
-
- cmd_list = [
- f'"{vswhere}"',
- "-nologo",
- "-prerelease",
- "-products",
- "*",
- "-legacy",
- "-format",
- "json",
- ]
- rcode, vswhere_output = AdUtil.run_command(cmd_list, True)
- if rcode != 0:
- E(f"Failed to run vswhere: {vswhere_output}")
- return None
-
- vsver = vsver.lower()
-
- msvcs = {}
- json_config = json.loads(vswhere_output)
- for vs_instance in json_config:
- # Skip invalid instance
- if "catalog" not in vs_instance:
- continue
-
- vs_line_version = vs_instance["catalog"]["productLineVersion"]
- vs_ver = "vs{}".format(vs_line_version)
-
- if vsver and vs_ver != vsver:
- continue
-
- if "installationPath" not in vs_instance:
- continue
-
- ins_path = vs_instance["installationPath"]
- if not self.__parse_load_win32env(
- ins_path, ["PATH", "LIB", "LIBPATH", "INCLUDE"]
- ):
- W(
- f"Failed to load environment for {vs_instance['catalog']['productLineVersion']}"
- )
- continue
-
- vs_name = vs_instance["catalog"]["productName"]
- vs_id = vs_instance["channelId"].split(".")[1]
-
- msvcs[vs_ver] = f"{vs_name} {vs_id} {vs_line_version}"
-
- if vsver:
- return (vsver, msvcs[vs_ver])
-
- # Select the newest version
- vs_key_list = list(msvcs.keys())
- if vs_key_list:
- vs_key_list.sort()
-
- return (vs_key_list[0], msvcs[vs_key_list[0]])
- else:
- return None
-
-
- class LibMerger:
- def __init__(self, ar="ar"):
- self._ar = ar
- self._merged_list = list()
- self._base_archive = None
-
- def set_ar(self, ar):
- if os.path.isfile(ar):
- self._ar = os.path.abspath(ar)
- self._is_cmd = False
- elif AdUtil.command_exists(ar):
- self._ar = ar
- self._is_cmd = True
- else:
- raise RuntimeError(f"{ar} is not a valid file path or command in PATH!")
-
- def set_base_archive(self, file_path):
- # It is a patch, because of some libraries(such as base)
- # contains some .o files with the same name(but are stored in different directories in compiling time),
- # in archive, .a only contains the files without directory information, so when we unpack
- # the .o files, they will be override.
- # This patch will ignore the above problem, use the base archive file as output file,
- # we never need to unpack the base archive file.
- if os.path.isfile(file_path) and (
- file_path.endswith(".a") or file_path.endswith(".lib")
- ):
- self._base_archive = os.path.abspath(file_path)
- else:
- W("{} is not a valid archive file!".format(os.path.abspath(file_path)))
-
- def add_libraries(self, lib_list, *libs):
- if isinstance(lib_list, list):
- l = lib_list
- else:
- l = [lib_list]
- l.extend(libs)
- for f in l:
- f_abspath = os.path.abspath(f)
- # .a for *Unix
- # .lib for Windows
- if os.path.isfile(f) and (f.endswith(".a") or f.endswith(".lib")):
- self._merged_list.append(f_abspath)
- else:
- W(f"[{f_abspath}] is not a valid library(.a|.lib)")
-
- def add_directories(self, dir_list, *dirs):
- if isinstance(dir_list, list):
- l = dir_list
- else:
- l = [dir_list]
- l.extend(dirs)
- for d in l:
- self._merged_list.extend(AdUtil.get_files(os.path.abspath(d), ".a", False))
-
- def merge(self, output):
- if not self._ar:
- raise Exception("Please set the TOOL `AR` via invoking set_ar first!")
-
- # make sure the directory of the output file is existent
- output_file = os.path.abspath(output)
- output_dir = os.path.dirname(output_file)
- AdUtil.mkdirs(output_dir)
-
- if os.path.isfile(output_file):
- AdUtil.remove(output_file)
-
- if self._is_cmd:
- ar_cmd = self._ar
- else:
- ar_cmd = f'"{self._ar}"'
-
- lib_dirs = list() # Save all the directories, contain *.o
- for lib in self._merged_list:
- tmp_dir = tempfile.mkdtemp()
- with AdUtil.chdir(tmp_dir):
- dest_lib = os.path.join(tmp_dir, "lib.a")
- shutil.copyfile(lib, dest_lib)
- cmd_list = [
- ar_cmd,
- # extract all the *.o
- "-x",
- f'"{dest_lib}"',
- ]
- code, msg = AdUtil.run_command(cmd_list, True)
- if code != 0:
- raise Exception(msg)
-
- AdUtil.remove(dest_lib)
- lib_dirs.append(tmp_dir)
-
- # Regard the base archive file as output file,
- # so we will never unpack the base archive.
- if self._base_archive:
- shutil.copyfile(self._base_archive, output_file)
-
- for l_dir in lib_dirs:
- with AdUtil.chdir(l_dir):
- source_objs = [f'"{f}"' for f in os.listdir(l_dir)]
- cmd_list = [
- ar_cmd,
- # quick append object(.o) to a library(.a)
- "-q",
- f'"{output_file}"',
- " ".join(source_objs),
- ]
- code, msg = AdUtil.run_command(cmd_list, True)
- if code != 0:
- raise Exception(msg)
-
- for l_dir in lib_dirs:
- # remove the temporary directory
- AdUtil.remove(l_dir)
-
-
- if __name__ == "__main__":
- print("=" * 80)
- print("Run default test code")
- I("Testing [√]")
- D("Testing [੦]")
- W("Testing [=]")
- E("Testing [×]")
-
- git_helper = AdGitHelper()
- print(git_helper.is_git_dir())
- print(git_helper.current_branch())
- print(git_helper.url())
- print(git_helper.has_branch("master"))
- print(git_helper.head_log_info())
- print("=" * 80)
|