You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

minddata_analyser.py 12 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """Data process analyser."""
  16. import os
  17. from mindinsight.profiler.analyser.base_analyser import BaseAnalyser
  18. class MinddataAnalyser(BaseAnalyser):
  19. """The Minddata profiling analyser."""
  20. DEVICE_QUEUE_EMPTY_WARNING_THRESHOLD = 0.7
  21. DEVICE_QUEUE_NOT_EMPTY_THRESHOLD = 0.95
  22. def analyse_get_next_info(self, info_type="all"):
  23. """
  24. Analyse the get_next operation info.
  25. Args:
  26. info_type (str): The info type to return, default return both queue and time info,
  27. other options are ["queue", "time"].
  28. Returns:
  29. list[list], all get_next operation info, each info contains node_name, start, end, queue_size.
  30. """
  31. # init queue info result
  32. queue_info = dict()
  33. queue_size_list = []
  34. empty_step_count = 0
  35. # init time info result
  36. time_info = dict()
  37. time_list = []
  38. total_cost = 0
  39. file_name = "minddata_aicpu_" + self._device_id + ".txt"
  40. file_path = MinddataAnalyser.find_target_file(self._profiling_dir, file_name)
  41. if file_path:
  42. with open(file_path) as data_file:
  43. for line in data_file.readlines():
  44. node_info = line.split()
  45. if node_info and node_info[0] == "GetNext_dequeue_wait":
  46. # analyse target info type
  47. if len(node_info) > 3 and info_type in ["all", "queue"]:
  48. queue_size_list.append(int(node_info[3]))
  49. if node_info[3] == '0':
  50. empty_step_count += 1
  51. if len(node_info) > 2 and info_type in ["all", "time"]:
  52. one_step_cost_time = (float(node_info[2]) - float(node_info[1]))/1e3
  53. time_list.append(one_step_cost_time)
  54. total_cost += one_step_cost_time
  55. if info_type in ["all", "queue"]:
  56. queue_info["size"] = len(queue_size_list)
  57. queue_info["info"] = {"queue": queue_size_list}
  58. queue_info["summary"] = {
  59. "queue_summary": {
  60. "empty_queue": empty_step_count
  61. }
  62. }
  63. if len(node_info) > 2 and info_type in ["all", "time"]:
  64. time_info["size"] = len(time_list)
  65. time_info["info"] = {"get_next": time_list}
  66. time_info["summary"] = {
  67. "time_summary": {
  68. "avg_cost": "0" if not time_list else str(total_cost / len(time_list))
  69. }
  70. }
  71. return queue_info, time_info
  72. def analyse_device_queue_info(self, info_type="all"):
  73. """
  74. Analyse the device_queue operation info.
  75. Args:
  76. info_type (str): The info type to return, default return both queue and time info,
  77. other options are ["queue", "time"].
  78. Returns:
  79. dict, queue size info.
  80. dict, time cost info.
  81. """
  82. # init queue info result
  83. queue_info = dict()
  84. get_time_list, push_time_list, total_time_list = [], [], []
  85. total_cost, total_push, total_get = 0, 0, 0
  86. # init time info result
  87. time_info = dict()
  88. queue_size_list = []
  89. empty_step, full_step = 0, 0
  90. device_queue_file_name = "device_queue_profiling_" + self._device_id + ".txt"
  91. device_queue_file_path = MinddataAnalyser.find_target_file(self._profiling_dir, device_queue_file_name)
  92. feed_file_name = "dataset_iterator_profiling_" + self._device_id + ".txt"
  93. feed_file_path = MinddataAnalyser.find_target_file(self._profiling_dir, feed_file_name)
  94. file_path = ""
  95. if device_queue_file_path:
  96. file_path = device_queue_file_path
  97. elif not device_queue_file_path and feed_file_path:
  98. file_path = feed_file_path
  99. if file_path:
  100. with open(file_path) as data_file:
  101. for line in data_file.readlines():
  102. op_info = line.split()
  103. # time info
  104. if op_info and op_info[0] == "0" and info_type in ["all", "time"]:
  105. # sub_type: 0 get_time, 1 push time, 2 total time
  106. # op_info: 2: step num 3: cost time
  107. if op_info[1] == "0":
  108. get_time_list.append([int(op_info[2]), float(op_info[3])])
  109. total_cost += float(op_info[3])
  110. elif op_info[1] == "1":
  111. push_time_list.append([int(op_info[2]), float(op_info[3])])
  112. total_push += float(op_info[3])
  113. elif op_info[1] == "2":
  114. total_time_list.append([int(op_info[2]), float(op_info[3])])
  115. total_get += float(op_info[3])
  116. elif op_info and op_info[0] == "1" and info_type in ["all", "queue"]:
  117. queue_size_list.append([int(op_info[2]), int(op_info[3])])
  118. if op_info[1] == op_info[3]:
  119. full_step += 1
  120. if op_info[3] == "0":
  121. empty_step += 1
  122. if info_type in ["all", "time"]:
  123. total_time_list = MinddataAnalyser.sort_step(total_time_list)
  124. push_time_list = MinddataAnalyser.sort_step(push_time_list)
  125. get_time_list = MinddataAnalyser.sort_step(get_time_list)
  126. time_info["size"] = len(total_time_list)
  127. time_info["info"] = {"total_cost": total_time_list,
  128. "push_cost": push_time_list,
  129. "get_cost": get_time_list}
  130. time_info["summary"] = {"time_summary": {"avg_cost": total_cost/time_info["size"]}}
  131. time_info["summary"]["time_summary"]["get_cost"] = total_get/time_info["size"]
  132. time_info["summary"]["time_summary"]["push_cost"] = total_push/time_info["size"]
  133. if info_type in ["all", "queue"]:
  134. queue_size_list = MinddataAnalyser.sort_step(queue_size_list)
  135. queue_info["size"] = len(queue_size_list)
  136. queue_info["info"] = {"queue": queue_size_list}
  137. queue_info["summary"] = {"queue_summary": {"empty_queue": empty_step}}
  138. queue_info["summary"]["queue_summary"]["full_queue"] = full_step
  139. return queue_info, time_info
  140. @staticmethod
  141. def analyse_queue_summary(get_next_queue_info, device_queue_info):
  142. """
  143. Analyse the queue summary info.
  144. Args:
  145. get_next_queue_info (dict): the get_next queue info return by ananlyser.
  146. device_queue_info (dict): the device queue info return by ananlyser.
  147. Returns:
  148. dict, the summary of queue.
  149. """
  150. if get_next_queue_info and device_queue_info:
  151. result = {"data_process": {"status": "normal"},
  152. "device_queue_op": {"status": "normal"},
  153. "tdt": {"status": "normal"},
  154. "get_next": {"status": "normal"}}
  155. get_next_queue_empty_count = get_next_queue_info.get(
  156. "summary", {}).get("queue_summary", {}).get("empty_queue", 0)
  157. result["get_next_queue_info"] = {
  158. "summary": {
  159. "empty_batch_count": get_next_queue_empty_count,
  160. "total_batch": get_next_queue_info.get("size")
  161. }
  162. }
  163. device_queue_empty_count = device_queue_info.get(
  164. "summary", {}).get("queue_summary", {}).get("empty_queue", 0)
  165. device_queue_full_count = device_queue_info.get(
  166. "summary", {}).get("queue_summary", {}).get("full_queue", 0)
  167. result["device_queue_info"] = {
  168. "summary": {
  169. "empty_batch_count": device_queue_empty_count,
  170. "full_batch_count": device_queue_full_count,
  171. "total_batch": device_queue_info.get("size")
  172. }
  173. }
  174. if get_next_queue_empty_count:
  175. if device_queue_empty_count > device_queue_info.get("size", 0)*\
  176. MinddataAnalyser.DEVICE_QUEUE_EMPTY_WARNING_THRESHOLD:
  177. result["data_process"]["status"] = "warning"
  178. elif device_queue_empty_count < device_queue_info.get("size", 0)*\
  179. MinddataAnalyser.DEVICE_QUEUE_NOT_EMPTY_THRESHOLD:
  180. result["tdt"]["status"] = "warning"
  181. result["device_queue_op"]["status"] = "warning"
  182. elif device_queue_info and not get_next_queue_info:
  183. result = {"data_process": {"status": "normal"},
  184. "fpbp": {"status": "normal"}}
  185. device_queue_empty_count = device_queue_info.get(
  186. "summary", {}).get("queue_summary", {}).get("empty_queue", 0)
  187. device_queue_full_count = device_queue_info.get(
  188. "summary", {}).get("queue_summary", {}).get("full_queue", 0)
  189. result["device_queue_info"] = {
  190. "summary": {
  191. "empty_batch_count": device_queue_empty_count,
  192. "full_batch_count": device_queue_full_count,
  193. "total_batch": device_queue_info.get("size")
  194. }
  195. }
  196. if device_queue_empty_count > device_queue_info.get("size", 0)*0.7:
  197. result["data_process"]["status"] = "warning"
  198. else:
  199. result = {}
  200. return result
  201. @staticmethod
  202. def sort_step(step_info_list):
  203. """
  204. Sorting the list by the first item and return the list of second item.
  205. Args:
  206. step_info_list (list): the step info, contains [step_num, info].
  207. Returns:
  208. list, the info list sorted by step.
  209. """
  210. step_info_list.sort(key=lambda x: x[0])
  211. result = []
  212. for item in step_info_list:
  213. result.append(item[1])
  214. return result
  215. @staticmethod
  216. def find_target_file(file_dir, file_name):
  217. """
  218. Find the target file in dir, and return the find file's abs path or "".
  219. Args:
  220. file_dir (str): The target file dir.
  221. file_name (str): The target file name.
  222. Returns:
  223. str, the abs file path.
  224. """
  225. target_file_path = ""
  226. for root_path, _, file_names in os.walk(file_dir):
  227. for item in file_names:
  228. if item == file_name:
  229. target_file_path = os.path.join(root_path, file_name)
  230. return target_file_path
  231. def _filter(self, filter_condition):
  232. """
  233. Filter the profiling data according to the filter condition.
  234. Args:
  235. filter_condition (dict): The filter condition.
  236. """
  237. def _load(self):
  238. """Load data according to the parsed profiling files."""