You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_profiling_startstop.py 9.4 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. # Copyright 2021 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ==============================================================================
  15. """
  16. Test MindData Profiling Start and Stop Support
  17. """
  18. import json
  19. import os
  20. import numpy as np
  21. import pytest
  22. import mindspore.common.dtype as mstype
  23. import mindspore.dataset as ds
  24. import mindspore._c_dataengine as cde
  25. import mindspore.dataset.transforms.c_transforms as C
  26. FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
  27. DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
  28. SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
  29. # Add file name to rank id mapping so that each profiling file name is unique,
  30. # to support parallel test execution
  31. file_name_map_rank_id = {"test_profiling_early_stop": "0",
  32. "test_profiling_delay_start": "1",
  33. "test_profiling_start_start": "2",
  34. "test_profiling_stop_stop": "3",
  35. "test_profiling_stop_nostart": "4"}
  36. @pytest.mark.forked
  37. class TestMindDataProfilingStartStop:
  38. """
  39. Test MindData Profiling Manager Start-Stop Support
  40. """
  41. def setup_class(self):
  42. """
  43. Run once for the class
  44. """
  45. self._pipeline_file = "./pipeline_profiling"
  46. self._cpu_util_file = "./minddata_cpu_utilization"
  47. self._dataset_iterator_file = "./dataset_iterator_profiling"
  48. def setup_method(self):
  49. """
  50. Run before each test function.
  51. """
  52. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  53. file_id = file_name_map_rank_id[file_name]
  54. self.pipeline_file = self._pipeline_file + "_" + file_id + ".json"
  55. self.cpu_util_file = self._cpu_util_file + "_" + file_id + ".json"
  56. self.dataset_iterator_file = self._dataset_iterator_file + "_" + file_id + ".txt"
  57. # Confirm MindData Profiling files do not yet exist
  58. assert os.path.exists(self.pipeline_file) is False
  59. assert os.path.exists(self.cpu_util_file) is False
  60. assert os.path.exists(self.dataset_iterator_file) is False
  61. # Set the MindData Profiling related environment variables
  62. os.environ['RANK_ID'] = file_id
  63. os.environ['DEVICE_ID'] = file_id
  64. def teardown_method(self):
  65. """
  66. Run after each test function.
  67. """
  68. # Delete MindData profiling files generated from the test.
  69. if os.path.exists(self.pipeline_file):
  70. os.remove(self.pipeline_file)
  71. if os.path.exists(self.cpu_util_file):
  72. os.remove(self.cpu_util_file)
  73. if os.path.exists(self.dataset_iterator_file):
  74. os.remove(self.dataset_iterator_file)
  75. # Disable MindData Profiling related environment variables
  76. del os.environ['RANK_ID']
  77. del os.environ['DEVICE_ID']
  78. def confirm_pipeline_file(self, num_ops, op_list=None):
  79. """
  80. Confirm pipeline JSON file with <num_ops> in the pipeline and the given optional list of ops
  81. """
  82. with open(self.pipeline_file) as file1:
  83. data = json.load(file1)
  84. op_info = data["op_info"]
  85. # Confirm ops in pipeline file
  86. assert len(op_info) == num_ops
  87. if op_list:
  88. for i in range(num_ops):
  89. assert op_info[i]["op_type"] in op_list
  90. def confirm_cpuutil_file(self, num_pipeline_ops):
  91. """
  92. Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline
  93. """
  94. with open(self.cpu_util_file) as file1:
  95. data = json.load(file1)
  96. op_info = data["op_info"]
  97. assert len(op_info) == num_pipeline_ops
  98. def confirm_dataset_iterator_file(self):
  99. """
  100. Confirm dataset iterator file exists
  101. """
  102. assert os.path.exists(self.dataset_iterator_file)
  103. def test_profiling_early_stop(self):
  104. """
  105. Test MindData Profiling with Early Stop; profile for some iterations and then stop profiling
  106. """
  107. def source1():
  108. for i in range(8000):
  109. yield (np.array([i]),)
  110. # Get instance pointer for MindData profiling manager
  111. md_profiler = cde.GlobalContext.profiling_manager()
  112. # Initialize MindData profiling manager
  113. md_profiler.init()
  114. # Start MindData Profiling
  115. md_profiler.start()
  116. # Create this basic and common pipeline
  117. # Leaf/Source-Op -> Map -> Batch
  118. data1 = ds.GeneratorDataset(source1, ["col1"])
  119. type_cast_op = C.TypeCast(mstype.int32)
  120. data1 = data1.map(operations=type_cast_op, input_columns="col1")
  121. data1 = data1.batch(16)
  122. num_iter = 0
  123. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  124. for _ in data1.create_dict_iterator(num_epochs=2):
  125. if num_iter == 400:
  126. # Stop MindData Profiling and Save MindData Profiling Output
  127. md_profiler.stop()
  128. md_profiler.save(os.getcwd())
  129. num_iter += 1
  130. assert num_iter == 500
  131. # Confirm the content of the profiling files, including 4 ops in the pipeline JSON file
  132. self.confirm_pipeline_file(4, ["GeneratorOp", "BatchOp", "MapOp", "EpochCtrlOp"])
  133. self.confirm_cpuutil_file(4)
  134. self.confirm_dataset_iterator_file()
  135. def test_profiling_delay_start(self):
  136. """
  137. Test MindData Profiling with Delayed Start; profile for subset of iterations
  138. """
  139. def source1():
  140. for i in range(8000):
  141. yield (np.array([i]),)
  142. # Get instance pointer for MindData profiling manager
  143. md_profiler = cde.GlobalContext.profiling_manager()
  144. # Initialize MindData profiling manager
  145. md_profiler.init()
  146. # Create this basic and common pipeline
  147. # Leaf/Source-Op -> Map -> Batch
  148. data1 = ds.GeneratorDataset(source1, ["col1"])
  149. type_cast_op = C.TypeCast(mstype.int32)
  150. data1 = data1.map(operations=type_cast_op, input_columns="col1")
  151. data1 = data1.batch(16)
  152. num_iter = 0
  153. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is not added to the pipeline
  154. for _ in data1.create_dict_iterator(num_epochs=1):
  155. if num_iter == 5:
  156. # Start MindData Profiling
  157. md_profiler.start()
  158. elif num_iter == 400:
  159. # Stop MindData Profiling and Save MindData Profiling Output
  160. md_profiler.stop()
  161. md_profiler.save(os.getcwd())
  162. num_iter += 1
  163. assert num_iter == 500
  164. # Confirm the content of the profiling files, including 3 ops in the pipeline JSON file
  165. self.confirm_pipeline_file(3, ["GeneratorOp", "BatchOp", "MapOp"])
  166. self.confirm_cpuutil_file(3)
  167. self.confirm_dataset_iterator_file()
  168. def test_profiling_start_start(self):
  169. """
  170. Test MindData Profiling with Start followed by Start - user error scenario
  171. """
  172. # Get instance pointer for MindData profiling manager
  173. md_profiler = cde.GlobalContext.profiling_manager()
  174. # Initialize MindData profiling manager
  175. md_profiler.init()
  176. # Start MindData Profiling
  177. md_profiler.start()
  178. with pytest.raises(RuntimeError) as info:
  179. # Reissue Start MindData Profiling
  180. md_profiler.start()
  181. assert "MD ProfilingManager is already running." in str(info)
  182. # Stop MindData Profiling
  183. md_profiler.stop()
  184. def test_profiling_stop_stop(self):
  185. """
  186. Test MindData Profiling with Stop followed by Stop - user warning scenario
  187. """
  188. # Get instance pointer for MindData profiling manager
  189. md_profiler = cde.GlobalContext.profiling_manager()
  190. # Initialize MindData profiling manager
  191. md_profiler.init()
  192. # Start MindData Profiling
  193. md_profiler.start()
  194. # Stop MindData Profiling and Save MindData Profiling Output
  195. md_profiler.stop()
  196. md_profiler.save(os.getcwd())
  197. # Reissue Stop MindData Profiling
  198. # A warning "MD ProfilingManager had already stopped" is produced.
  199. md_profiler.stop()
  200. def test_profiling_stop_nostart(self):
  201. """
  202. Test MindData Profiling with Stop not without prior Start - user error scenario
  203. """
  204. # Get instance pointer for MindData profiling manager
  205. md_profiler = cde.GlobalContext.profiling_manager()
  206. # Initialize MindData profiling manager
  207. md_profiler.init()
  208. with pytest.raises(RuntimeError) as info:
  209. # Stop MindData Profiling - without prior Start()
  210. md_profiler.stop()
  211. assert "MD ProfilingManager has not started yet." in str(info)