You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

minddata_pipeline_parser.py 10 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """Thr parser for parsing minddata pipeline files."""
  16. import csv
  17. import json
  18. import os
  19. import stat
  20. from queue import Queue
  21. from mindspore.profiler.common.exceptions.exceptions import \
  22. ProfilerPathErrorException, ProfilerFileNotFoundException, \
  23. ProfilerDirNotFoundException, ProfilerRawFileException
  24. from mindspore import log as logger
  25. from mindspore.profiler.common.validator.validate_path import \
  26. validate_and_normalize_path
  27. class MinddataPipelineParser:
  28. """
  29. Thr parser for parsing minddata pipeline files.
  30. Args:
  31. source_dir (str): The minddata pipeline source dir.
  32. device_id (str): The device ID.
  33. output_path (str): The directory of the parsed file. Default: `./`.
  34. Raises:
  35. ProfilerPathErrorException: If the minddata pipeline file path or
  36. the output path is invalid.
  37. ProfilerFileNotFoundException: If the minddata pipeline file or
  38. the output dir does not exist.
  39. """
  40. _raw_pipeline_file_name = 'pipeline_profiling_{}.json'
  41. _parsed_pipeline_file_name = 'minddata_pipeline_raw_{}.csv'
  42. _col_names = [
  43. 'op_id', 'op_type', 'num_workers', 'output_queue_size',
  44. 'output_queue_average_size', 'output_queue_length',
  45. 'output_queue_usage_rate', 'sample_interval', 'parent_id', 'children_id'
  46. ]
  47. def __init__(self, source_dir, device_id, output_path='./'):
  48. self._device_id = device_id
  49. self._pipeline_path = self._get_pipeline_path(source_dir)
  50. self._save_path = self._get_save_path(output_path)
  51. @property
  52. def save_path(self):
  53. """
  54. The property of save path.
  55. Returns:
  56. str, the save path.
  57. """
  58. return self._save_path
  59. def parse(self):
  60. """
  61. Parse the minddata pipeline files.
  62. Raises:
  63. ProfilerRawFileException: If fails to parse the raw file of
  64. minddata pipeline or the file is empty.
  65. """
  66. with open(self._pipeline_path, 'r') as file:
  67. try:
  68. pipeline_info = json.load(file)
  69. except (json.JSONDecodeError, TypeError) as err:
  70. logger.warning(err)
  71. raise ProfilerRawFileException(
  72. 'Fail to parse minddata pipeline file.'
  73. )
  74. if not pipeline_info:
  75. logger.warning('The minddata pipeline file is empty.')
  76. raise ProfilerRawFileException(
  77. 'The minddata pipeline file is empty.'
  78. )
  79. self._parse_and_save(pipeline_info)
  80. def _get_pipeline_path(self, source_dir):
  81. """
  82. Get the minddata pipeline file path.
  83. Args:
  84. source_dir (str): The minddata pipeline source dir.
  85. Returns:
  86. str, the minddata pipeline file path.
  87. """
  88. pipeline_path = os.path.join(
  89. source_dir,
  90. self._raw_pipeline_file_name.format(self._device_id)
  91. )
  92. try:
  93. pipeline_path = validate_and_normalize_path(pipeline_path)
  94. except RuntimeError:
  95. logger.warning('Minddata pipeline file is invalid.')
  96. raise ProfilerPathErrorException('Minddata pipeline file is invalid.')
  97. if not os.path.isfile(pipeline_path):
  98. logger.warning(
  99. 'The minddata pipeline file <%s> not found.', pipeline_path
  100. )
  101. raise ProfilerFileNotFoundException(pipeline_path)
  102. return pipeline_path
  103. def _get_save_path(self, output_path):
  104. """
  105. Get the save path.
  106. Args:
  107. output_path (str): The output dir.
  108. Returns:
  109. str, the save path.
  110. """
  111. try:
  112. output_dir = validate_and_normalize_path(output_path)
  113. except ValidationError:
  114. logger.warning('Output path is invalid.')
  115. raise ProfilerPathErrorException('Output path is invalid.')
  116. if not os.path.isdir(output_dir):
  117. logger.warning('The output dir <%s> not found.', output_dir)
  118. raise ProfilerDirNotFoundException(output_dir)
  119. return os.path.join(
  120. output_dir, self._parsed_pipeline_file_name.format(self._device_id)
  121. )
  122. def _parse_and_save(self, pipeline_info):
  123. """
  124. Parse and save the parsed minddata pipeline file.
  125. Args:
  126. pipeline_info (dict): The pipeline info reads from the raw file of
  127. the minddata pipeline.
  128. Raises:
  129. ProfilerRawFileException: If the format of minddata pipeline raw
  130. file is wrong.
  131. """
  132. sample_interval = pipeline_info.get('sampling_interval')
  133. op_info = pipeline_info.get('op_info')
  134. if sample_interval is None or not op_info:
  135. raise ProfilerRawFileException(
  136. 'The format of minddata pipeline raw file is wrong.'
  137. )
  138. op_id_info_cache = {}
  139. for item in op_info:
  140. if not item:
  141. raise ProfilerRawFileException(
  142. 'The content of minddata pipeline raw file is wrong.'
  143. )
  144. op_id_info_cache[item.get('op_id')] = item
  145. with open(self._save_path, 'w') as save_file:
  146. csv_writer = csv.writer(save_file)
  147. csv_writer.writerow(self._col_names)
  148. self._parse_and_save_op_info(
  149. csv_writer, op_id_info_cache, sample_interval
  150. )
  151. os.chmod(self._save_path, stat.S_IREAD | stat.S_IWRITE)
  152. def _parse_and_save_op_info(self, csv_writer, op_id_info_cache,
  153. sample_interval):
  154. """
  155. Parse and save the minddata pipeline operator information.
  156. Args:
  157. csv_writer (csv.writer): The csv writer.
  158. op_id_info_cache (dict): The operator id and information cache.
  159. sample_interval (int): The sample interval.
  160. Raises:
  161. ProfilerRawFileException: If the operator that id is 0 does not exist.
  162. """
  163. queue = Queue()
  164. root_node = op_id_info_cache.get(0)
  165. if not root_node:
  166. raise ProfilerRawFileException(
  167. 'The format of minddata pipeline raw file is wrong, '
  168. 'the operator that id is 0 does not exist.'
  169. )
  170. root_node['parent_id'] = None
  171. queue.put_nowait(root_node)
  172. while not queue.empty():
  173. node = queue.get_nowait()
  174. self._update_child_node(node, op_id_info_cache)
  175. csv_writer.writerow(self._get_op_info(node, sample_interval))
  176. op_id = node.get('op_id')
  177. children_ids = node.get('children')
  178. if not children_ids:
  179. continue
  180. for child_op_id in children_ids:
  181. sub_node = op_id_info_cache.get(child_op_id)
  182. sub_node['parent_id'] = op_id
  183. queue.put_nowait(sub_node)
  184. def _update_child_node(self, node, op_id_info_cache):
  185. """
  186. Updates the child node information of the operator.
  187. Args:
  188. node (dict): The node represents an operator.
  189. op_id_info_cache (dict): The operator id and information cache.
  190. """
  191. child_op_ids = node.get('children')
  192. if not child_op_ids:
  193. return
  194. queue = Queue()
  195. self._cp_list_item_to_queue(child_op_ids, queue)
  196. new_child_op_ids = []
  197. while not queue.empty():
  198. child_op_id = queue.get_nowait()
  199. child_node = op_id_info_cache.get(child_op_id)
  200. if child_node is None:
  201. continue
  202. metrics = child_node.get('metrics')
  203. if not metrics or not metrics.get('output_queue'):
  204. op_ids = child_node.get('children')
  205. if op_ids:
  206. self._cp_list_item_to_queue(op_ids, queue)
  207. else:
  208. new_child_op_ids.append(child_op_id)
  209. node['children'] = new_child_op_ids
  210. def _get_op_info(self, op_node, sample_interval):
  211. """
  212. Get the operator information.
  213. Args:
  214. op_node (dict): The node represents an operator.
  215. sample_interval (int): The sample interval.
  216. Returns:
  217. list[str, int, float], the operator information.
  218. """
  219. queue_size = None
  220. queue_average_size = None
  221. queue_length = None
  222. queue_usage_rate = None
  223. metrics = op_node.get('metrics')
  224. if metrics:
  225. output_queue = metrics.get('output_queue')
  226. if output_queue:
  227. queue_size = output_queue.get('size')
  228. queue_average_size = sum(queue_size) / len(queue_size)
  229. queue_length = output_queue.get('length')
  230. queue_usage_rate = queue_average_size / queue_length
  231. children_id = op_node.get('children')
  232. op_info = [
  233. op_node.get('op_id'),
  234. op_node.get('op_type'),
  235. op_node.get('num_workers'),
  236. queue_size,
  237. queue_average_size,
  238. queue_length,
  239. queue_usage_rate,
  240. sample_interval,
  241. op_node.get('parent_id'),
  242. children_id if children_id else None
  243. ]
  244. return op_info
  245. def _cp_list_item_to_queue(self, inner_list, queue):
  246. """
  247. Copy the contents of a list to a queue.
  248. Args:
  249. inner_list (list): The list.
  250. queue (Queue): The target queue.
  251. """
  252. for item in inner_list:
  253. queue.put_nowait(item)