You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

profiling.py 12 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """profiling api file."""
  16. import os
  17. import time
  18. from tabulate import tabulate
  19. from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser
  20. from mindinsight.profiler.parser.framework_parser import FrameworkParser
  21. from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser
  22. from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser
  23. from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
  24. from mindinsight.profiler.analyser.integrator import Integrator
  25. from mindinsight.profiler.common._utils import get_file_names, fwrite_format
  26. from mindinsight.profiler.common.validator.validate_path import \
  27. validate_and_normalize_path
  28. from mindinsight.profiler.common.validator.checkparam import \
  29. check_bool, check_subgraph
  30. from mindinsight.profiler.common.log import logger
  31. from mindinsight.utils.exceptions import MindInsightException
  32. profiling_log_base_path = "/var/log/npu/profiling"
  33. class Profiler:
  34. """Performance profiling tool."""
  35. _base_profiling_container_path = "/var/log/npu/profiling/container"
  36. _hwts_output_filename_target = "output_format_data_hwts_"
  37. _opcompute_output_filename_target = "output_op_compute_time_"
  38. _aicpu_op_output_filename_target = "output_data_preprocess_aicpu_"
  39. def __init__(self, subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data',
  40. optypes_to_deal='', optypes_not_deal='Variable', job_id=""):
  41. """
  42. Init profiling service, called berfore network training.
  43. Args:
  44. subgraph(str): which subgraph to monit and anlayse, can be 'all', 'Default', 'Gradients'.
  45. is_detail(Bool): whether to show profiling data for op_instace level, only show optype level if False.
  46. is_show_op_path(Bool): whether to save the full path for each op instace.
  47. output_path(Bool): output data path.
  48. optypes_to_deal(List): Op type names, the data of which optype should be collected and analysed,
  49. will deal with all op if null.
  50. optypes_not_deal(List): Op type names, the data of which optype will not be collected and analysed.
  51. """
  52. dev_id = os.getenv('DEVICE_ID')
  53. if not dev_id:
  54. dev_id = "0"
  55. logger.error("Fail to get DEVICE_ID, use 0 instead.")
  56. self._dev_id = dev_id
  57. self._container_path = os.path.join(self._base_profiling_container_path, dev_id)
  58. data_path = os.path.join(self._container_path, "data")
  59. if not os.path.exists(data_path):
  60. os.makedirs(data_path)
  61. self._output_path = validate_and_normalize_path(output_path,
  62. 'Profiler output path (' + output_path + ')')
  63. self._output_path = os.path.join(self._output_path, "profiler")
  64. if not os.path.exists(self._output_path):
  65. os.makedirs(self._output_path)
  66. os.environ['PROFILING_MODE'] = 'true'
  67. os.environ['PROFILING_OPTIONS'] = 'training_trace:task_trace'
  68. os.environ['AICPU_PROFILING_MODE'] = 'true'
  69. os.environ['PROFILING_DIR'] = str(self._container_path)
  70. self._subgraph = check_subgraph(subgraph)
  71. self._valid_optype_name = optypes_to_deal.split(",") if optypes_to_deal else []
  72. self._filt_optype_names = optypes_not_deal.split(",") if optypes_not_deal else []
  73. self._detail = check_bool(is_detail, 'is_detail')
  74. self._withfullpath = check_bool(is_show_op_path, 'is_show_op_path')
  75. self._profiling_job_id = job_id
  76. self._start_time = int(time.time() * 10000000)
  77. logger.info("Profiling: profiling start time: %d", self._start_time)
  78. def analyse(self):
  79. """Collect and analyze performance data, called after training or during training."""
  80. try:
  81. from mindspore.communication.management import release
  82. release()
  83. except ImportError:
  84. logger.error("Profiling: fail to import release from mindspore.")
  85. logger.info("begin profiler analyse")
  86. job_id = self._get_profiling_job_id()
  87. if not job_id:
  88. msg = ("Fail to get profiling job, please check whether job dir was generated under path %s"\
  89. %profiling_log_base_path)
  90. raise RuntimeError(msg)
  91. logger.info("Profiling: job id is %s ", job_id)
  92. source_path = os.path.join(profiling_log_base_path, job_id)
  93. # parse hwts.log.data.45.dev file, and get task profiling data
  94. hwts_output_filename = self._hwts_output_filename_target + self._dev_id + ".txt"
  95. hwts_output_filename = os.path.join(self._output_path, hwts_output_filename)
  96. hwtslog_parser = HWTSLogParser(source_path, hwts_output_filename)
  97. result = hwtslog_parser.execute()
  98. if not result:
  99. logger.error("Profiling: fail to parse hwts log file.")
  100. return
  101. # parse Framework file, and get the relation of op and tasks
  102. framework_parser = FrameworkParser(job_id, self._dev_id, self._output_path)
  103. framework_parser.parse()
  104. op_task_dict = framework_parser.to_task_id_full_op_name_dict()
  105. if not op_task_dict:
  106. logger.error("Profiling: fail to parse framework files.")
  107. return
  108. # get op compute time from hwts data and framework data, write output_op_compute_time.txt
  109. opcompute_output_filename = self._opcompute_output_filename_target + self._dev_id + ".txt"
  110. opcompute_output_filename = os.path.join(self._output_path, opcompute_output_filename)
  111. optime_parser = OPComputeTimeParser(hwts_output_filename, opcompute_output_filename, op_task_dict)
  112. optime_parser.execute()
  113. # parse DATA_PREPROCESS.dev.AICPU file, write output_data_preprocess_aicpu_x.txt
  114. output_data_preprocess_aicpu = self._aicpu_op_output_filename_target + self._dev_id + ".txt"
  115. output_data_preprocess_aicpu = os.path.join(self._output_path, output_data_preprocess_aicpu)
  116. aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu)
  117. aicpu_data_parser.execute()
  118. # analyse op compute time info
  119. try:
  120. self._analyser_op_info()
  121. except MindInsightException as err:
  122. logger.error(err.message)
  123. def __del__(self):
  124. """Disable the profiling collection service, called after training."""
  125. os.environ['PROFILING_MODE'] = str("false")
  126. def _get_profiling_job_id(self):
  127. """Get profiling job id, which was generated by ada service.
  128. Returns:
  129. str: profiling jon id.
  130. """
  131. if self._profiling_job_id:
  132. return self._profiling_job_id
  133. job_id = ""
  134. cmd = "ls -t " + profiling_log_base_path + "|grep JOB|awk '{print $1}'"
  135. r = os.popen(cmd)
  136. profiling_job_dirs = r.readlines()
  137. r.close()
  138. for item in profiling_job_dirs:
  139. path = os.path.join(profiling_log_base_path, item.strip())
  140. log_file = get_file_names(path, "host_start.log")
  141. if not log_file:
  142. logger.error("Profiling: job path %s, host_start.log not exist.", path)
  143. continue
  144. log_file = os.path.join(path, log_file[0])
  145. item_dict = self._parse_host_start_log(log_file)
  146. if not item_dict:
  147. logger.error("Profiling: job path %s, fail to get job start info.", path)
  148. continue
  149. if self._start_time > int(item_dict["start_time"]):
  150. logger.info("Profiling: job path %s, start_time %s, training start_time %d.",
  151. path, item_dict["start_time"], self._start_time)
  152. break
  153. if self._dev_id != item_dict["device_id"]:
  154. logger.info("Profiling: job path %s, dev id %s, training device id %s.",
  155. path, item_dict["device_id"], self._dev_id)
  156. continue
  157. job_id = item.strip()
  158. break
  159. return job_id
  160. def _parse_host_start_log(self, input_file):
  161. """
  162. Parse host start log file, get the device id and start time of the job.
  163. Args:
  164. input_file(str): the file path of the host start log file.
  165. Returns:
  166. dict: job start time and device id.
  167. """
  168. item_dict = {}
  169. for line in open(input_file):
  170. if "Device" in line:
  171. item_dict["device_id"] = line[7:len(line)-2]
  172. elif "clock_realtime" in line:
  173. item_dict["start_time"] = line[16:len(line)-3]
  174. return item_dict
  175. def _analyser_op_info(self):
  176. """Analyser the operator information."""
  177. integrator = Integrator(self._output_path, self._dev_id)
  178. integrator.integrate()
  179. aicore_type_result = self._query_op_type_info()
  180. detail_file_path = os.path.join(
  181. self._output_path,
  182. 'output_op_compute_time_detail_{}.txt'.format(self._dev_id)
  183. )
  184. fwrite_format(detail_file_path, data_source='title:op compute time')
  185. display_names = [
  186. 'optype_name', 'compute_time(ms, per-step)',
  187. 'called_times(per-step)', 'percent'
  188. ]
  189. data_source = tabulate(aicore_type_result, display_names)
  190. fwrite_format(detail_file_path, data_source=data_source, is_print=True)
  191. if self._detail:
  192. op_type_order = [item[0] for item in aicore_type_result]
  193. aicore_detail_result = self._query_op_detail_info(op_type_order)
  194. fwrite_format(detail_file_path, data_source='', is_print=True)
  195. fwrite_format(detail_file_path, data_source='Detail:', is_print=True)
  196. data_source = tabulate(
  197. aicore_detail_result.get('object'),
  198. aicore_detail_result.get('col_name')
  199. )
  200. fwrite_format(detail_file_path, data_source=data_source, is_print=True)
  201. def _query_op_type_info(self):
  202. """
  203. Query AICORE operator type information.
  204. Returns:
  205. list[list], the AICORE operator type and execution time information.
  206. """
  207. condition = {
  208. 'sort_condition': {
  209. 'name': 'execution_time',
  210. 'type': 'descending'
  211. }
  212. }
  213. analyser = AnalyserFactory.instance().get_analyser(
  214. 'aicore_type', self._output_path, self._dev_id
  215. )
  216. result = analyser.query(condition)
  217. return result.get('object')
  218. def _query_op_detail_info(self, op_type_order):
  219. """
  220. Query AICORE operator detail information.
  221. Args:
  222. op_type_order(list): The name of the op type in order.
  223. Returns:
  224. dict, the AICORE operator detail information.
  225. """
  226. op_type_condition = {}
  227. if self._valid_optype_name:
  228. op_type_condition['in'] = self._valid_optype_name
  229. if self._filt_optype_names:
  230. op_type_condition['not_in'] = self._filt_optype_names
  231. subgraph_condition = {}
  232. if self._subgraph != 'all':
  233. subgraph_condition['in'] = [self._subgraph]
  234. filter_condition = {
  235. 'op_type': op_type_condition,
  236. 'subgraph': subgraph_condition,
  237. 'is_display_detail': False,
  238. 'is_display_full_op_name': self._withfullpath
  239. }
  240. analyser = AnalyserFactory.instance().get_analyser(
  241. 'aicore_detail', self._output_path, self._dev_id
  242. )
  243. result = analyser.query_and_sort_by_op_type(
  244. filter_condition, op_type_order
  245. )
  246. return result