You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_profiling_startstop.py 9.4 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. # Copyright 2021 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ==============================================================================
  15. """
  16. Test MindData Profiling Start and Stop Support
  17. """
  18. import json
  19. import os
  20. import numpy as np
  21. import pytest
  22. import mindspore.common.dtype as mstype
  23. import mindspore.dataset as ds
  24. import mindspore._c_dataengine as cde
  25. import mindspore.dataset.transforms.c_transforms as C
  26. FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
  27. DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
  28. SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
  29. # Add file name to rank id mapping so that each profiling file name is unique,
  30. # to support parallel test execution
  31. file_name_map_rank_id = {"test_profiling_early_stop": "0",
  32. "test_profiling_delay_start": "1",
  33. "test_profiling_start_start": "2",
  34. "test_profiling_stop_stop": "3",
  35. "test_profiling_stop_nostart": "4"}
  36. class TestMindDataProfilingStartStop:
  37. """
  38. Test MindData Profiling Manager Start-Stop Support
  39. """
  40. def setup_class(self):
  41. """
  42. Run once for the class
  43. """
  44. self._PIPELINE_FILE = "./pipeline_profiling"
  45. self._CPU_UTIL_FILE = "./minddata_cpu_utilization"
  46. self._DATASET_ITERATOR_FILE = "./dataset_iterator_profiling"
  47. def setup_method(self):
  48. """
  49. Run before each test function.
  50. """
  51. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  52. file_id = file_name_map_rank_id[file_name]
  53. self.pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  54. self.cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  55. self.dataset_iterator_file = self._DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
  56. # Confirm MindData Profiling files do not yet exist
  57. assert os.path.exists(self.pipeline_file) is False
  58. assert os.path.exists(self.cpu_util_file) is False
  59. assert os.path.exists(self.dataset_iterator_file) is False
  60. # Set the MindData Profiling related environment variables
  61. os.environ['RANK_ID'] = file_id
  62. os.environ['DEVICE_ID'] = file_id
  63. def teardown_method(self):
  64. """
  65. Run after each test function.
  66. """
  67. # Delete MindData profiling files generated from the test.
  68. if os.path.exists(self.pipeline_file):
  69. os.remove(self.pipeline_file)
  70. if os.path.exists(self.cpu_util_file):
  71. os.remove(self.cpu_util_file)
  72. if os.path.exists(self.dataset_iterator_file):
  73. os.remove(self.dataset_iterator_file)
  74. # Disable MindData Profiling related environment variables
  75. del os.environ['RANK_ID']
  76. del os.environ['DEVICE_ID']
  77. def confirm_pipeline_file(self, num_ops, op_list=None):
  78. """
  79. Confirm pipeline JSON file with <num_ops> in the pipeline and the given optional list of ops
  80. """
  81. with open(self.pipeline_file) as file1:
  82. data = json.load(file1)
  83. op_info = data["op_info"]
  84. # Confirm ops in pipeline file
  85. assert len(op_info) == num_ops
  86. if op_list:
  87. for i in range(num_ops):
  88. assert op_info[i]["op_type"] in op_list
  89. def confirm_cpuutil_file(self, num_pipeline_ops):
  90. """
  91. Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline
  92. """
  93. with open(self.cpu_util_file) as file1:
  94. data = json.load(file1)
  95. op_info = data["op_info"]
  96. assert len(op_info) == num_pipeline_ops
  97. def confirm_dataset_iterator_file(self):
  98. """
  99. Confirm dataset iterator file exists
  100. """
  101. assert os.path.exists(self.dataset_iterator_file)
  102. def test_profiling_early_stop(self):
  103. """
  104. Test MindData Profiling with Early Stop; profile for some iterations and then stop profiling
  105. """
  106. def source1():
  107. for i in range(8000):
  108. yield (np.array([i]),)
  109. # Get instance pointer for MindData profiling manager
  110. md_profiler = cde.GlobalContext.profiling_manager()
  111. # Initialize MindData profiling manager
  112. md_profiler.init()
  113. # Start MindData Profiling
  114. md_profiler.start()
  115. # Create this basic and common pipeline
  116. # Leaf/Source-Op -> Map -> Batch
  117. data1 = ds.GeneratorDataset(source1, ["col1"])
  118. type_cast_op = C.TypeCast(mstype.int32)
  119. data1 = data1.map(operations=type_cast_op, input_columns="col1")
  120. data1 = data1.batch(16)
  121. num_iter = 0
  122. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  123. for _ in data1.create_dict_iterator(num_epochs=2):
  124. if num_iter == 400:
  125. # Stop MindData Profiling and Save MindData Profiling Output
  126. md_profiler.stop()
  127. md_profiler.save(os.getcwd())
  128. num_iter += 1
  129. assert num_iter == 500
  130. # Confirm the content of the profiling files, including 4 ops in the pipeline JSON file
  131. self.confirm_pipeline_file(4, ["GeneratorOp", "BatchOp", "MapOp", "EpochCtrlOp"])
  132. self.confirm_cpuutil_file(4)
  133. self.confirm_dataset_iterator_file()
  134. def test_profiling_delay_start(self):
  135. """
  136. Test MindData Profiling with Delayed Start; profile for subset of iterations
  137. """
  138. def source1():
  139. for i in range(8000):
  140. yield (np.array([i]),)
  141. # Get instance pointer for MindData profiling manager
  142. md_profiler = cde.GlobalContext.profiling_manager()
  143. # Initialize MindData profiling manager
  144. md_profiler.init()
  145. # Create this basic and common pipeline
  146. # Leaf/Source-Op -> Map -> Batch
  147. data1 = ds.GeneratorDataset(source1, ["col1"])
  148. type_cast_op = C.TypeCast(mstype.int32)
  149. data1 = data1.map(operations=type_cast_op, input_columns="col1")
  150. data1 = data1.batch(16)
  151. num_iter = 0
  152. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is not added to the pipeline
  153. for _ in data1.create_dict_iterator(num_epochs=1):
  154. if num_iter == 5:
  155. # Start MindData Profiling
  156. md_profiler.start()
  157. elif num_iter == 400:
  158. # Stop MindData Profiling and Save MindData Profiling Output
  159. md_profiler.stop()
  160. md_profiler.save(os.getcwd())
  161. num_iter += 1
  162. assert num_iter == 500
  163. # Confirm the content of the profiling files, including 3 ops in the pipeline JSON file
  164. self.confirm_pipeline_file(3, ["GeneratorOp", "BatchOp", "MapOp"])
  165. self.confirm_cpuutil_file(3)
  166. self.confirm_dataset_iterator_file()
  167. def test_profiling_start_start(self):
  168. """
  169. Test MindData Profiling with Start followed by Start - user error scenario
  170. """
  171. # Get instance pointer for MindData profiling manager
  172. md_profiler = cde.GlobalContext.profiling_manager()
  173. # Initialize MindData profiling manager
  174. md_profiler.init()
  175. # Start MindData Profiling
  176. md_profiler.start()
  177. with pytest.raises(RuntimeError) as info:
  178. # Reissue Start MindData Profiling
  179. md_profiler.start()
  180. assert "MD ProfilingManager is already running." in str(info)
  181. # Stop MindData Profiling
  182. md_profiler.stop()
  183. def test_profiling_stop_stop(self):
  184. """
  185. Test MindData Profiling with Stop followed by Stop - user warning scenario
  186. """
  187. # Get instance pointer for MindData profiling manager
  188. md_profiler = cde.GlobalContext.profiling_manager()
  189. # Initialize MindData profiling manager
  190. md_profiler.init()
  191. # Start MindData Profiling
  192. md_profiler.start()
  193. # Stop MindData Profiling and Save MindData Profiling Output
  194. md_profiler.stop()
  195. md_profiler.save(os.getcwd())
  196. # Reissue Stop MindData Profiling
  197. # A warning "MD ProfilingManager had already stopped" is produced.
  198. md_profiler.stop()
  199. def test_profiling_stop_nostart(self):
  200. """
  201. Test MindData Profiling with Stop not without prior Start - user error scenario
  202. """
  203. # Get instance pointer for MindData profiling manager
  204. md_profiler = cde.GlobalContext.profiling_manager()
  205. # Initialize MindData profiling manager
  206. md_profiler.init()
  207. with pytest.raises(RuntimeError) as info:
  208. # Stop MindData Profiling - without prior Start()
  209. md_profiler.stop()
  210. assert "MD ProfilingManager has not started yet." in str(info)