You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

watchpoint_handler.py 23 kB

5 years ago
5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """Define the watchpoint stream handler."""
  16. from mindinsight.debugger.conditionmgr.condition import ValueTypeEnum
  17. from mindinsight.debugger.common.exceptions.exceptions import DebuggerParamValueError, \
  18. DebuggerParamTypeError
  19. from mindinsight.debugger.common.log import LOGGER as log
  20. from mindinsight.debugger.common.utils import is_scope_type
  21. from mindinsight.debugger.proto.debug_grpc_pb2 import SetCMD
  22. from mindinsight.debugger.stream_cache.watchpoint import Watchpoint, WatchpointHit, \
  23. WatchNodeTree
  24. from mindinsight.debugger.stream_handler.base_handler import StreamHandlerBase
  25. class WatchpointHandler(StreamHandlerBase):
  26. """Watchpoint Handler."""
  27. def __init__(self):
  28. self._watchpoints = {}
  29. # list of ids of new created watchpoints
  30. self._created_watchpoints = []
  31. # list of SetCMD of watchpoints to be deleted
  32. self._deleted_watchpoints = []
  33. # dict of <id, Watchpoint> of watchpoints to be updated
  34. self._updated_watchpoints = {}
  35. # the collection of watched node full names, which have been sent to MindSpore
  36. self._latest_id = 0
  37. self._cache_set_cmd = {}
  38. # whether the watchpoint list has been changed since last step
  39. self._outdated = False
  40. def put(self, value):
  41. """
  42. Put Watchpoint into watchpoint handler.
  43. Args:
  44. value (Watchpoint): The name of nodes that have been chosen.
  45. """
  46. new_id = value.watchpoint_id
  47. self._watchpoints[new_id] = value
  48. self._created_watchpoints.append(new_id)
  49. self._updated_watchpoints[new_id] = value
  50. self._latest_id = new_id
  51. log.debug("Put watchpoint %d into cache.", new_id)
  52. def sync_set_cmd(self, set_cmds):
  53. """Clean temp watchpoints."""
  54. self._outdated = False
  55. self._created_watchpoints = []
  56. self._deleted_watchpoints = []
  57. self._updated_watchpoints = {}
  58. for set_cmd in set_cmds:
  59. self._cache_set_cmd[set_cmd.id] = set_cmd
  60. def clean_cache_set_cmd(self, set_cmd):
  61. """Clean cache set command."""
  62. self._cache_set_cmd.pop(set_cmd.id, None)
  63. def get_watchpoint_by_id(self, watchpoint_id):
  64. """Get watchpoint by watchpoint id."""
  65. res = self.get(watchpoint_id)
  66. watchpoint = res.get('watch_points')[0]
  67. return watchpoint
  68. def get(self, filter_condition=None):
  69. """
  70. Get the watchpoints.
  71. Args:
  72. filter_condition (Union[None, int]): The filter conditions. Get watchpoint by
  73. id. If None, return all watchpoint. Default: None.
  74. Returns:
  75. dict, the watchpoint list.
  76. """
  77. reply = []
  78. if not filter_condition:
  79. # get watch condition list
  80. for _, watchpoint in self._watchpoints.items():
  81. watchpoint_info = watchpoint.get_watch_condition_info()
  82. reply.append(watchpoint_info)
  83. else:
  84. self.validate_watchpoint_id(filter_condition)
  85. reply = [self._watchpoints.get(filter_condition)]
  86. log.debug("get the watch points with filter_condition:%s", filter_condition)
  87. return {'watch_points': reply}
  88. def get_pending_commands(self, graph_stream):
  89. """
  90. Get all watchpoint in SetCMD proto format.
  91. Args:
  92. graph_stream (GraphHandler): Graph handler.
  93. Returns:
  94. list[SetCMD], updated watchpoint to be sent to MindSpore.
  95. """
  96. res = []
  97. for _, watchpoint in self._updated_watchpoints.items():
  98. # construct set command with leaf nodes
  99. watch_nodes = watchpoint.get_watch_nodes()
  100. leaf_watch_nodes = self._expand_to_leaf_nodes(graph_stream, watch_nodes)
  101. res.append(watchpoint.get_pending_cmd(leaf_watch_nodes))
  102. res.extend(self._deleted_watchpoints)
  103. for _, set_cmd in self._cache_set_cmd.items():
  104. res.append(set_cmd)
  105. return res
  106. @staticmethod
  107. def _expand_to_leaf_nodes(graph_stream, watch_nodes):
  108. """
  109. Get all leaf node basic info according to watch nodes.
  110. Args:
  111. graph_stream (GraphHandler): Graph handler.
  112. watch_nodes (list[NodeBasicInfo]): The list of watch node basic infos.
  113. Returns:
  114. list[NodeBasicInfo], expanded leaf basic node infos.
  115. """
  116. leaf_watch_nodes = []
  117. for node in watch_nodes:
  118. if is_scope_type(node.type):
  119. pure_node_name = None
  120. if len(node.name.split('/')) > 1:
  121. graph_name, pure_node_name = node.name.split('/', 1)
  122. else:
  123. graph_name = node.name
  124. search_node_infos = graph_stream.get_node_basic_info_by_scope(pure_node_name, graph_name=graph_name)
  125. leaf_watch_nodes.extend(search_node_infos)
  126. else:
  127. leaf_watch_nodes.append(node)
  128. return leaf_watch_nodes
  129. def is_recheckable(self):
  130. """
  131. Check if current status is able to recheck.
  132. Returns:
  133. bool, if enable to recheck.
  134. """
  135. return self._outdated
  136. def set_watch_nodes(self, graph, graph_stream, watch_point_id, graph_name=None):
  137. """
  138. set watch nodes for graph.
  139. Args:
  140. graph (dict): The graph with list of nodes.
  141. graph_stream (GraphHandler): The graph handler.
  142. watch_point_id (int): The id of watchpoint.
  143. graph_name (str): The graph name.
  144. """
  145. if not (watch_point_id and graph):
  146. return
  147. log.debug("add watch flags")
  148. watchpoint = self._watchpoints.get(watch_point_id)
  149. self._set_watch_status_recursively(graph, graph_stream, watchpoint, graph_name)
  150. def _set_watch_status_recursively(self, graph, graph_stream, watchpoint, graph_name=None):
  151. """Set watch status to graph."""
  152. if graph.get('children'):
  153. self._set_watch_status_recursively(
  154. graph.get('children'), graph_stream, watchpoint, graph_name)
  155. if graph.get('nodes'):
  156. _ = self._set_watch_state_for_nodes(graph['nodes'], graph_stream, watchpoint, graph_name)
  157. def _set_watch_state_for_nodes(self, nodes, graph_stream, watchpoint, graph_name):
  158. """
  159. Set watch state for nodes.
  160. Args:
  161. nodes (list[Node]): List of node info.
  162. Returns:
  163. int, the number of all watched nodes.
  164. """
  165. all_watched_num = 0
  166. valid_node_num = len(nodes)
  167. # initialize the state of current node.
  168. state = WatchNodeTree.NOT_WATCH
  169. for node in nodes:
  170. node_name = node.get('name')
  171. # search result could have `nodes` in nodes object
  172. if node.get('nodes'):
  173. flag = self._set_watch_state_for_nodes(node.get('nodes'), graph_stream, watchpoint, graph_name)
  174. else:
  175. full_name = graph_stream.get_full_name(node_name, graph_name)
  176. new_node_name = node_name if graph_name is None else '/'.join([graph_name, node_name])
  177. flag = watchpoint.get_node_status(new_node_name, node.get('type'), full_name)
  178. node['watched'] = flag
  179. if flag == WatchNodeTree.NOT_WATCH:
  180. continue
  181. state = WatchNodeTree.PARTIAL_WATCH
  182. if flag == WatchNodeTree.INVALID:
  183. valid_node_num -= 1
  184. elif flag == WatchNodeTree.TOTAL_WATCH:
  185. all_watched_num += 1
  186. # update the watch status of current node
  187. if not valid_node_num:
  188. state = WatchNodeTree.INVALID
  189. elif all_watched_num == valid_node_num:
  190. state = WatchNodeTree.TOTAL_WATCH
  191. return state
  192. def create_watchpoint(self, condition_mgr, watch_condition, watch_nodes=None, watch_point_id=None, name=None):
  193. """
  194. Create watchpoint.
  195. Args:
  196. condition_mgr (ConditionMgr): Instance of ConditionMgr.
  197. watch_condition (dict): The watch condition.
  198. "condition": {
  199. id: "tensor_too_large",
  200. "params": [
  201. {
  202. "name": "abs_mean_gt",
  203. "value": 1.1
  204. }
  205. ]
  206. }
  207. - id (str): Id of condition.
  208. - param (list[dict]): The list of param for this condition.
  209. watch_nodes (list[NodeBasicInfo]): The list of node basic info.
  210. watch_point_id (int): The id of watchpoint.
  211. name (str): The name of watchpoint.
  212. Returns:
  213. int, the new id of watchpoint.
  214. """
  215. validate_watch_condition(condition_mgr, watch_condition)
  216. watch_condition = set_default_param(condition_mgr, watch_condition)
  217. new_id = self._latest_id + 1
  218. watchpoint = Watchpoint(new_id, watch_condition, name)
  219. if watch_nodes:
  220. watchpoint.add_nodes(watch_nodes)
  221. elif watch_point_id:
  222. self.validate_watchpoint_id(watch_point_id)
  223. watchpoint.copy_nodes_from(self._watchpoints.get(watch_point_id))
  224. self.put(watchpoint)
  225. self._outdated = True
  226. return new_id
  227. def update_watchpoint(self, watch_point_id, watch_nodes, watched=False):
  228. """
  229. Update watchpoint.
  230. Args:
  231. watch_point_id (int): The id of watchpoint.
  232. watch_nodes (list[NodeBasicInfo]): The list of node basic info.
  233. watched (bool): The update operator on nodes. If False, remove nodes from watch nodes.
  234. If True, add nodes to watch nodes. Default: False.
  235. """
  236. self.validate_watchpoint_id(watch_point_id)
  237. watchpoint = self._watchpoints.get(watch_point_id)
  238. if watched:
  239. watchpoint.add_nodes(watch_nodes)
  240. else:
  241. watchpoint.remove_nodes(watch_nodes)
  242. self._updated_watchpoints[watch_point_id] = watchpoint
  243. self._outdated = True
  244. log.debug("Update watchpoint %d in cache.", watch_point_id)
  245. def delete_watchpoint(self, watch_point_id=None):
  246. """
  247. Delete watchpoint.
  248. Args:
  249. watch_point_id (Union[None, int]): The id of watchpoint.
  250. If None, delete all watchpoints. Default: None.
  251. """
  252. if watch_point_id is None:
  253. watch_point_ids = [sub_id for sub_id, _ in self._watchpoints.items()]
  254. else:
  255. self.validate_watchpoint_id(watch_point_id)
  256. watch_point_ids = [watch_point_id]
  257. for single_id in watch_point_ids:
  258. self._delete_single_watchpoint(single_id)
  259. self._outdated = True
  260. def _delete_single_watchpoint(self, watch_point_id):
  261. """
  262. Delete single watchpoint.
  263. Args:
  264. watch_point_id (int): The id of watchpoint.
  265. """
  266. self._watchpoints.pop(watch_point_id)
  267. # if the watchpoint has not been created by MindSpore, clean the relative cache directly
  268. if watch_point_id in self._created_watchpoints:
  269. self._created_watchpoints.remove(watch_point_id)
  270. self._updated_watchpoints.pop(watch_point_id)
  271. log.debug("Cancel create watchpoint %d in cache.", watch_point_id)
  272. return
  273. set_cmd = SetCMD()
  274. set_cmd.id = watch_point_id
  275. set_cmd.delete = True
  276. self._deleted_watchpoints.append(set_cmd)
  277. log.debug("Delete watchpoint %d in cache.", watch_point_id)
  278. def validate_watchpoint_id(self, watch_point_id):
  279. """Validate watchpoint id."""
  280. if not isinstance(watch_point_id, int):
  281. log.error("Invalid watchpoint id %s. The watch point id should be int.", watch_point_id)
  282. raise DebuggerParamTypeError("Watchpoint id should be int type.")
  283. if watch_point_id and watch_point_id not in self._watchpoints:
  284. log.error("Invalid watchpoint id: %d.", watch_point_id)
  285. raise DebuggerParamValueError("Invalid watchpoint id: {}".format(watch_point_id))
  286. class WatchpointHitHandler(StreamHandlerBase):
  287. """Watchpoint hit handler."""
  288. def __init__(self):
  289. # dict of <ui node_name, dict of <slot, WatchpointHit>>,
  290. self._hits = {}
  291. @property
  292. def empty(self):
  293. """Whether the watchpoint hit is empty."""
  294. return not self._hits
  295. def put(self, value):
  296. """
  297. Put value into watchpoint hit cache. Called by grpc server.
  298. Args:
  299. value (dict): The watchpoint hit info.
  300. - tensor_proto (TensorProto): The message about hit tensor.
  301. - watchpoint (Watchpoint): The Watchpoint that a node hit.
  302. - node_name (str): The UI node name.
  303. - graph_name (str): The graph name.
  304. """
  305. watchpoint_hit = WatchpointHit(
  306. tensor_proto=value.get('tensor_proto'),
  307. watchpoint=value.get('watchpoint'),
  308. node_name=value.get('node_name'),
  309. graph_name=value.get('graph_name')
  310. )
  311. if 'error_code' in value.keys():
  312. watchpoint_hit.error_code = value.get('error_code')
  313. # get all hit watchpoints according to node name ans tensor slot
  314. watchpoint_hits = self._get_watchpoints_by_tensor_name(watchpoint_hit.node_name,
  315. watchpoint_hit.slot)
  316. if watchpoint_hit not in watchpoint_hits:
  317. watchpoint_hits.append(watchpoint_hit)
  318. def _get_watchpoints_by_tensor_name(self, node_name, slot):
  319. """
  320. Get hit tensors according to ui node name and slot.
  321. Args:
  322. node_name (str): The node name.
  323. slot (str): The tensor slot.
  324. Returns:
  325. list, list of watchpoints.
  326. """
  327. hit_node = self._hits.get(node_name)
  328. if hit_node is None:
  329. hit_node = {}
  330. self._hits[node_name] = hit_node
  331. hit_tensors = hit_node.get(slot)
  332. if hit_tensors is None:
  333. hit_tensors = []
  334. hit_node[slot] = hit_tensors
  335. return hit_tensors
  336. def get(self, filter_condition=None):
  337. """
  338. Get watchpoint hit list.
  339. Args:
  340. filter_condition (str): Get the watchpoint hit according to specified node name.
  341. If not given, get all watchpoint hits. Default: None.
  342. Returns:
  343. dict, the watchpoint hit list.
  344. """
  345. if filter_condition is None:
  346. log.debug("Get all watchpoint hit list.")
  347. reply = self.get_watchpoint_hits()
  348. else:
  349. log.debug("Get the watchpoint for node: <%s>.", filter_condition)
  350. reply = self._hits.get(filter_condition)
  351. return reply
  352. def get_watchpoint_hits(self):
  353. """Return the list of watchpoint hits."""
  354. watch_point_hits = []
  355. for node_name, watchpoint_hits in self._hits.items():
  356. tensors = []
  357. graph_name = None
  358. for slot, tensor_hits in watchpoint_hits.items():
  359. if graph_name is None:
  360. graph_name = tensor_hits[0].graph_name
  361. tensor_info = self._get_tensor_hit_info(slot, tensor_hits)
  362. tensors.append(tensor_info)
  363. watch_point_hits.append({
  364. 'node_name': node_name,
  365. 'tensors': tensors,
  366. 'graph_name': graph_name
  367. })
  368. return {'watch_point_hits': watch_point_hits}
  369. @staticmethod
  370. def _get_tensor_hit_info(slot, tensor_hits):
  371. """
  372. Get watchpoint hit info of specified tensor.
  373. Args:
  374. slot (str): Slot id.
  375. tensor_hits (list): A list of watchpoint hit objects that the tensor hit.
  376. Returns:
  377. dict, tensor hit info.
  378. """
  379. res = {}
  380. watch_points = []
  381. error_codes = set()
  382. for tensor_hit in tensor_hits:
  383. error_code = tensor_hit.error_code
  384. watchpoint = tensor_hit.watchpoint
  385. watchpoint['error_code'] = error_code
  386. watch_points.append(watchpoint)
  387. error_codes.add(error_code)
  388. summarized_error_code = error_codes.pop()
  389. while error_codes:
  390. temp = error_codes.pop()
  391. summarized_error_code = summarized_error_code | temp
  392. if watch_points:
  393. res = {
  394. 'slot': slot,
  395. 'summarized_error_code': summarized_error_code,
  396. 'watch_points': watch_points
  397. }
  398. return res
  399. def _is_tensor_hit(self, tensor_name):
  400. """
  401. Check if the tensor is record in hit cache.
  402. Args:
  403. tensor_name (str): The name of ui tensor name.
  404. Returns:
  405. bool, if the tensor is hit.
  406. """
  407. node_name, slot = tensor_name.rsplit(':', 1)
  408. watchpoint_hits = self._hits.get(node_name, {}).get(slot)
  409. return bool(watchpoint_hits)
  410. def update_tensor_history(self, tensor_history):
  411. """
  412. Add hit flag to tensor history.
  413. Args:
  414. tensor_history (dict): The tensor history.
  415. """
  416. if not self._hits:
  417. return
  418. # add hit tensor names to `tensor_names`
  419. for tensor_info in tensor_history.get('tensor_history'):
  420. tensor_name = tensor_info['name']
  421. hit_flag = self._is_tensor_hit(tensor_name)
  422. tensor_info['is_hit'] = hit_flag
  423. def get_tensor_hit_infos(self, tensor_name):
  424. """
  425. Get all hit information of a tensor.
  426. Args:
  427. tensor_name (str): Tensor name showed on UI.
  428. Returns:
  429. dict, tensor hit info.
  430. """
  431. tensor_hit_info = {}
  432. if self._is_tensor_hit(tensor_name):
  433. node_name, slot = tensor_name.rsplit(':', 1)
  434. tensor_hits = self._get_watchpoints_by_tensor_name(node_name, slot)
  435. tensor_hit_info = self._get_tensor_hit_info(slot, tensor_hits)
  436. return tensor_hit_info
  437. def validate_watch_condition(condition_mgr, watch_condition):
  438. """Validate watch condition."""
  439. if not isinstance(watch_condition, dict):
  440. log.error("<watch_condition> should be dict. %s received.", watch_condition)
  441. raise DebuggerParamTypeError("<watch_condition> should be dict.")
  442. # validate condition_id
  443. condition_id = watch_condition.get('id')
  444. if condition_id not in condition_mgr.conditions.keys():
  445. log.error("Invalid watch condition. Acceptable values are <%s>. %s received.",
  446. str(condition_mgr.conditions.keys()), condition_id)
  447. raise DebuggerParamValueError("Invalid watch condition value.")
  448. # validate param
  449. validate_watch_condition_params(condition_mgr, watch_condition)
  450. def validate_watch_condition_params(condition_mgr, watch_condition):
  451. """
  452. Validate watch condition parameters.
  453. Args:
  454. condition_mgr (ConditionMgr): Instance of ConditionMgr.
  455. watch_condition (dict): Watch condition.
  456. - id (str): Condition id. Should be in WATCHPOINT_CONDITION_MAPPING.
  457. - param (list): Condition value. Should be given for comparison condition. The value
  458. will be translated to np.float32.
  459. """
  460. condition_id = watch_condition.get('id')
  461. params = watch_condition.get('params')
  462. condition = condition_mgr.get_condition(condition_id)
  463. if condition_id in condition_mgr.get_no_param_condition():
  464. if params:
  465. log.error("No param is expected for %s condition", condition_id)
  466. raise DebuggerParamValueError("No param is expected.")
  467. return
  468. for param in params:
  469. condition_param_name = param.get("name")
  470. if condition_param_name not in condition.names:
  471. log.error("Invalid name of parameter for condition: %s, available values: %s",
  472. condition_id, condition.names)
  473. raise DebuggerParamValueError("Invalid name of parameter.")
  474. condition_param = condition.get_parameter_definition(condition_param_name)
  475. if condition_param.type.name in (ValueTypeEnum.FLOAT64.name, ValueTypeEnum.INT64.name) \
  476. and not isinstance(param.get("value"), (float, int)):
  477. log.error("Number param should be given for condition: %s", condition_id)
  478. raise DebuggerParamValueError("Number param should be given.")
  479. if condition_param.type.name == ValueTypeEnum.BOOL.name \
  480. and not isinstance(param.get("value"), bool):
  481. log.error("Bool param should be given for condition: %s", condition_id)
  482. raise DebuggerParamValueError("Bool param should be given.")
  483. if not condition_param.is_valid(param.get("value")):
  484. log.error("Param %s out of range for condition: %s", condition_param_name, condition_id)
  485. raise DebuggerParamValueError("Parameter out of range.")
  486. def set_default_param(condition_mgr, watch_condition):
  487. """
  488. Set default param.
  489. Args:
  490. condition_mgr (ConditionMgr): Instance of ConditionMgr.
  491. watch_condition (dict): The watch condition.
  492. "condition": {
  493. id: "tensor_too_large",
  494. "params": [
  495. {
  496. "name": "abs_mean_gt",
  497. "value": 1.1
  498. }
  499. ]
  500. }
  501. - id (str): Id of condition.
  502. - param (list[dict]): The list of param for this condition.
  503. Returns:
  504. dict, the new watch_condition.
  505. """
  506. condition_id = watch_condition.get('id')
  507. condition = condition_mgr.get_condition(condition_id)
  508. for param in condition.parameters:
  509. if not param.visible_on_ui and not param.support_disable:
  510. watch_condition["params"].append({
  511. "name": param.name,
  512. "value": param.default_value
  513. })
  514. watch_condition["abbr"] = condition.abbr
  515. return watch_condition