You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_profiling.py 21 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. # Copyright 2020-2021 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ==============================================================================
  15. """
  16. Testing profiling support in DE
  17. """
  18. import json
  19. import os
  20. import numpy as np
  21. import mindspore.common.dtype as mstype
  22. import mindspore.dataset as ds
  23. import mindspore.dataset.transforms.c_transforms as C
  24. import mindspore.dataset.vision.c_transforms as vision
  25. import mindspore._c_dataengine as cde
  26. FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
  27. DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
  28. SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
  29. # add file name to rank id mapping to avoid file writing crash
  30. file_name_map_rank_id = {"test_profiling_simple_pipeline": "0",
  31. "test_profiling_complex_pipeline": "1",
  32. "test_profiling_inline_ops_pipeline1": "2",
  33. "test_profiling_inline_ops_pipeline2": "3",
  34. "test_profiling_sampling_interval": "4",
  35. "test_profiling_basic_pipeline": "5",
  36. "test_profiling_cifar10_pipeline": "6",
  37. "test_profiling_seq_pipelines_epochctrl3": "7",
  38. "test_profiling_seq_pipelines_epochctrl2": "8",
  39. "test_profiling_seq_pipelines_repeat": "9"}
  40. class TestMinddataProfilingManager:
  41. """
  42. Test MinddataProfilingManager
  43. """
  44. def setup_class(self):
  45. """
  46. Run once for the class
  47. """
  48. # Get instance pointer for MindData profiling manager
  49. self.md_profiler = cde.GlobalContext.profiling_manager()
  50. self._PIPELINE_FILE = "./pipeline_profiling"
  51. self._CPU_UTIL_FILE = "./minddata_cpu_utilization"
  52. self._DATASET_ITERATOR_FILE = "./dataset_iterator_profiling"
  53. def setup_method(self):
  54. """
  55. Run before each test function.
  56. """
  57. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  58. file_id = file_name_map_rank_id[file_name]
  59. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  60. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  61. dataset_iterator_file = self._DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
  62. # Confirm MindData Profiling files do not yet exist
  63. assert os.path.exists(pipeline_file) is False
  64. assert os.path.exists(cpu_util_file) is False
  65. assert os.path.exists(dataset_iterator_file) is False
  66. # Set the MindData Profiling related environment variables
  67. os.environ['RANK_ID'] = file_id
  68. os.environ['DEVICE_ID'] = file_id
  69. # Initialize MindData profiling manager with current working directory
  70. self.md_profiler.init("./")
  71. # Start MindData Profiling
  72. self.md_profiler.start()
  73. def teardown_method(self):
  74. """
  75. Run after each test function.
  76. """
  77. # Stop MindData Profiling
  78. self.md_profiler.stop()
  79. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  80. file_id = file_name_map_rank_id[file_name]
  81. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  82. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  83. dataset_iterator_file = self._DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
  84. # Delete MindData profiling files generated from the test.
  85. os.remove(pipeline_file)
  86. os.remove(cpu_util_file)
  87. os.remove(dataset_iterator_file)
  88. # Disable MindData Profiling related environment variables
  89. del os.environ['RANK_ID']
  90. del os.environ['DEVICE_ID']
  91. def confirm_cpuutil(self, num_pipeline_ops, cpu_uti_file):
  92. """
  93. Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline
  94. """
  95. with open(cpu_uti_file) as file1:
  96. data = json.load(file1)
  97. op_info = data["op_info"]
  98. assert len(op_info) == num_pipeline_ops
  99. def confirm_ops_in_pipeline(self, num_ops, op_list, pipeline_file):
  100. """
  101. Confirm pipeline JSON file with <num_ops> are in the pipeline and the given list of ops
  102. """
  103. with open(pipeline_file) as file1:
  104. data = json.load(file1)
  105. op_info = data["op_info"]
  106. # Confirm ops in pipeline file
  107. assert len(op_info) == num_ops
  108. for i in range(num_ops):
  109. assert op_info[i]["op_type"] in op_list
  110. def test_profiling_simple_pipeline(self):
  111. """
  112. Generator -> Shuffle -> Batch
  113. """
  114. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  115. file_id = file_name_map_rank_id[file_name]
  116. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  117. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  118. dataset_iterator_file = self._DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
  119. source = [(np.array([x]),) for x in range(1024)]
  120. data1 = ds.GeneratorDataset(source, ["data"])
  121. data1 = data1.shuffle(64)
  122. data1 = data1.batch(32)
  123. # try output shape type and dataset size and make sure no profiling file is generated
  124. assert data1.output_shapes() == [[32, 1]]
  125. assert [str(tp) for tp in data1.output_types()] == ["int64"]
  126. assert data1.get_dataset_size() == 32
  127. # Confirm profiling files do not (yet) exist
  128. assert os.path.exists(pipeline_file) is False
  129. assert os.path.exists(cpu_util_file) is False
  130. assert os.path.exists(dataset_iterator_file) is False
  131. for _ in data1:
  132. pass
  133. # Confirm profiling files now exist
  134. assert os.path.exists(pipeline_file) is True
  135. assert os.path.exists(cpu_util_file) is True
  136. assert os.path.exists(dataset_iterator_file) is True
  137. def test_profiling_complex_pipeline(self):
  138. """
  139. Generator -> Map ->
  140. -> Zip
  141. TFReader -> Shuffle ->
  142. """
  143. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  144. file_id = file_name_map_rank_id[file_name]
  145. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  146. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  147. source = [(np.array([x]),) for x in range(1024)]
  148. data1 = ds.GeneratorDataset(source, ["gen"])
  149. data1 = data1.map(operations=[(lambda x: x + 1)], input_columns=["gen"])
  150. pattern = DATASET_ROOT + "/test.data"
  151. data2 = ds.TFRecordDataset(pattern, SCHEMA_FILE, shuffle=ds.Shuffle.FILES)
  152. data2 = data2.shuffle(4)
  153. data3 = ds.zip((data1, data2))
  154. for _ in data3:
  155. pass
  156. with open(pipeline_file) as f:
  157. data = json.load(f)
  158. op_info = data["op_info"]
  159. assert len(op_info) == 5
  160. for i in range(5):
  161. if op_info[i]["op_type"] != "ZipOp":
  162. assert "size" in op_info[i]["metrics"]["output_queue"]
  163. assert "length" in op_info[i]["metrics"]["output_queue"]
  164. else:
  165. # Note: Zip is an inline op and hence does not have metrics information
  166. assert op_info[i]["metrics"] is None
  167. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  168. self.confirm_cpuutil(5, cpu_util_file)
  169. def test_profiling_inline_ops_pipeline1(self):
  170. """
  171. Test pipeline with inline ops: Concat and EpochCtrl
  172. Generator ->
  173. Concat -> EpochCtrl
  174. Generator ->
  175. """
  176. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  177. file_id = file_name_map_rank_id[file_name]
  178. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  179. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  180. # In source1 dataset: Number of rows is 3; its values are 0, 1, 2
  181. def source1():
  182. for i in range(3):
  183. yield (np.array([i]),)
  184. # In source2 dataset: Number of rows is 7; its values are 3, 4, 5 ... 9
  185. def source2():
  186. for i in range(3, 10):
  187. yield (np.array([i]),)
  188. data1 = ds.GeneratorDataset(source1, ["col1"])
  189. data2 = ds.GeneratorDataset(source2, ["col1"])
  190. data3 = data1.concat(data2)
  191. num_iter = 0
  192. # Note: set num_epochs=2 in create_tuple_iterator(), so that EpochCtrl op is added to the pipeline
  193. # Here i refers to index, d refers to data element
  194. for i, d in enumerate(data3.create_tuple_iterator(num_epochs=2, output_numpy=True)):
  195. num_iter += 1
  196. t = d
  197. assert i == t[0][0]
  198. assert num_iter == 10
  199. # Confirm pipeline is created with EpochCtrl op
  200. with open(pipeline_file) as f:
  201. data = json.load(f)
  202. op_info = data["op_info"]
  203. assert len(op_info) == 4
  204. for i in range(4):
  205. # Note: The following ops are inline ops: Concat, EpochCtrl
  206. if op_info[i]["op_type"] in ("ConcatOp", "EpochCtrlOp"):
  207. # Confirm these inline ops do not have metrics information
  208. assert op_info[i]["metrics"] is None
  209. else:
  210. assert "size" in op_info[i]["metrics"]["output_queue"]
  211. assert "length" in op_info[i]["metrics"]["output_queue"]
  212. # Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file
  213. self.confirm_cpuutil(4, cpu_util_file)
  214. def test_profiling_inline_ops_pipeline2(self):
  215. """
  216. Test pipeline with many inline ops
  217. Generator -> Rename -> Skip -> Repeat -> Take
  218. """
  219. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  220. file_id = file_name_map_rank_id[file_name]
  221. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  222. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  223. # In source1 dataset: Number of rows is 10; its values are 0, 1, 2, 3, 4, 5 ... 9
  224. def source1():
  225. for i in range(10):
  226. yield (np.array([i]),)
  227. data1 = ds.GeneratorDataset(source1, ["col1"])
  228. data1 = data1.rename(input_columns=["col1"], output_columns=["newcol1"])
  229. data1 = data1.skip(2)
  230. data1 = data1.repeat(2)
  231. data1 = data1.take(12)
  232. for _ in data1:
  233. pass
  234. with open(pipeline_file) as f:
  235. data = json.load(f)
  236. op_info = data["op_info"]
  237. assert len(op_info) == 5
  238. for i in range(5):
  239. # Check for these inline ops
  240. if op_info[i]["op_type"] in ("RenameOp", "RepeatOp", "SkipOp", "TakeOp"):
  241. # Confirm these inline ops do not have metrics information
  242. assert op_info[i]["metrics"] is None
  243. else:
  244. assert "size" in op_info[i]["metrics"]["output_queue"]
  245. assert "length" in op_info[i]["metrics"]["output_queue"]
  246. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  247. self.confirm_cpuutil(5, cpu_util_file)
  248. def test_profiling_sampling_interval(self):
  249. """
  250. Test non-default monitor sampling interval
  251. """
  252. interval_origin = ds.config.get_monitor_sampling_interval()
  253. ds.config.set_monitor_sampling_interval(30)
  254. interval = ds.config.get_monitor_sampling_interval()
  255. assert interval == 30
  256. source = [(np.array([x]),) for x in range(1024)]
  257. data1 = ds.GeneratorDataset(source, ["data"])
  258. data1 = data1.shuffle(64)
  259. data1 = data1.batch(32)
  260. for _ in data1:
  261. pass
  262. ds.config.set_monitor_sampling_interval(interval_origin)
  263. def test_profiling_basic_pipeline(self):
  264. """
  265. Test with this basic pipeline
  266. Generator -> Map -> Batch -> Repeat -> EpochCtrl
  267. """
  268. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  269. file_id = file_name_map_rank_id[file_name]
  270. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  271. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  272. def source1():
  273. for i in range(8000):
  274. yield (np.array([i]),)
  275. # Create this basic and common pipeline
  276. # Leaf/Source-Op -> Map -> Batch -> Repeat
  277. data1 = ds.GeneratorDataset(source1, ["col1"])
  278. type_cast_op = C.TypeCast(mstype.int32)
  279. data1 = data1.map(operations=type_cast_op, input_columns="col1")
  280. data1 = data1.batch(16)
  281. data1 = data1.repeat(2)
  282. num_iter = 0
  283. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  284. for _ in data1.create_dict_iterator(num_epochs=2):
  285. num_iter += 1
  286. assert num_iter == 1000
  287. with open(pipeline_file) as f:
  288. data = json.load(f)
  289. op_info = data["op_info"]
  290. assert len(op_info) == 5
  291. for i in range(5):
  292. # Check for inline ops
  293. if op_info[i]["op_type"] in ("EpochCtrlOp", "RepeatOp"):
  294. # Confirm these inline ops do not have metrics information
  295. assert op_info[i]["metrics"] is None
  296. else:
  297. assert "size" in op_info[i]["metrics"]["output_queue"]
  298. assert "length" in op_info[i]["metrics"]["output_queue"]
  299. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  300. self.confirm_cpuutil(5, cpu_util_file)
  301. def test_profiling_cifar10_pipeline(self):
  302. """
  303. Test with this common pipeline with Cifar10
  304. Cifar10 -> Map -> Map -> Batch -> Repeat
  305. """
  306. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  307. file_id = file_name_map_rank_id[file_name]
  308. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  309. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  310. # Create this common pipeline
  311. # Cifar10 -> Map -> Map -> Batch -> Repeat
  312. DATA_DIR_10 = "../data/dataset/testCifar10Data"
  313. data1 = ds.Cifar10Dataset(DATA_DIR_10, num_samples=8000)
  314. type_cast_op = C.TypeCast(mstype.int32)
  315. data1 = data1.map(operations=type_cast_op, input_columns="label")
  316. random_horizontal_op = vision.RandomHorizontalFlip()
  317. data1 = data1.map(operations=random_horizontal_op, input_columns="image")
  318. data1 = data1.batch(32)
  319. data1 = data1.repeat(3)
  320. num_iter = 0
  321. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
  322. for _ in data1.create_dict_iterator(num_epochs=1):
  323. num_iter += 1
  324. assert num_iter == 750
  325. with open(pipeline_file) as f:
  326. data = json.load(f)
  327. op_info = data["op_info"]
  328. assert len(op_info) == 5
  329. for i in range(5):
  330. # Check for inline ops
  331. if op_info[i]["op_type"] == "RepeatOp":
  332. # Confirm these inline ops do not have metrics information
  333. assert op_info[i]["metrics"] is None
  334. else:
  335. assert "size" in op_info[i]["metrics"]["output_queue"]
  336. assert "length" in op_info[i]["metrics"]["output_queue"]
  337. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  338. self.confirm_cpuutil(5, cpu_util_file)
  339. def test_profiling_seq_pipelines_epochctrl3(self):
  340. """
  341. Test with these 2 sequential pipelines:
  342. 1) Generator -> Batch -> EpochCtrl
  343. 2) Generator -> Batch
  344. Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation.
  345. """
  346. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  347. file_id = file_name_map_rank_id[file_name]
  348. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  349. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  350. source = [(np.array([x]),) for x in range(64)]
  351. data1 = ds.GeneratorDataset(source, ["data"])
  352. data1 = data1.batch(32)
  353. # Test A - Call create_dict_iterator with num_epochs>1
  354. num_iter = 0
  355. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  356. for _ in data1.create_dict_iterator(num_epochs=2):
  357. num_iter += 1
  358. assert num_iter == 2
  359. # Confirm pipeline file and CPU util file each have 3 ops
  360. self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
  361. self.confirm_cpuutil(3, cpu_util_file)
  362. # Test B - Call create_dict_iterator with num_epochs=1
  363. num_iter = 0
  364. # Note: If create_dict_iterator() is called with num_epochs=1,
  365. # then EpochCtrlOp should not be NOT added to the pipeline
  366. for _ in data1.create_dict_iterator(num_epochs=1):
  367. num_iter += 1
  368. assert num_iter == 2
  369. # Confirm pipeline file and CPU util file each have 2 ops
  370. self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
  371. self.confirm_cpuutil(2, cpu_util_file)
  372. def test_profiling_seq_pipelines_epochctrl2(self):
  373. """
  374. Test with these 2 sequential pipelines:
  375. 1) Generator -> Batch
  376. 2) Generator -> Batch -> EpochCtrl
  377. """
  378. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  379. file_id = file_name_map_rank_id[file_name]
  380. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  381. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  382. source = [(np.array([x]),) for x in range(64)]
  383. data2 = ds.GeneratorDataset(source, ["data"])
  384. data2 = data2.batch(16)
  385. # Test A - Call create_dict_iterator with num_epochs=1
  386. num_iter = 0
  387. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
  388. for _ in data2.create_dict_iterator(num_epochs=1):
  389. num_iter += 1
  390. assert num_iter == 4
  391. # Confirm pipeline file and CPU util file each have 2 ops
  392. self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
  393. self.confirm_cpuutil(2, cpu_util_file)
  394. # Test B - Call create_dict_iterator with num_epochs>1
  395. num_iter = 0
  396. # Note: If create_dict_iterator() is called with num_epochs>1,
  397. # then EpochCtrlOp should be added to the pipeline
  398. for _ in data2.create_dict_iterator(num_epochs=2):
  399. num_iter += 1
  400. assert num_iter == 4
  401. # Confirm pipeline file and CPU util file each have 3 ops
  402. self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
  403. self.confirm_cpuutil(3, cpu_util_file)
  404. def test_profiling_seq_pipelines_repeat(self):
  405. """
  406. Test with these 2 sequential pipelines:
  407. 1) Generator -> Batch
  408. 2) Generator -> Batch -> Repeat
  409. """
  410. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  411. file_id = file_name_map_rank_id[file_name]
  412. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  413. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  414. source = [(np.array([x]),) for x in range(64)]
  415. data2 = ds.GeneratorDataset(source, ["data"])
  416. data2 = data2.batch(16)
  417. # Test A - Call create_dict_iterator with 2 ops in pipeline
  418. num_iter = 0
  419. for _ in data2.create_dict_iterator(num_epochs=1):
  420. num_iter += 1
  421. assert num_iter == 4
  422. # Confirm pipeline file and CPU util file each have 2 ops
  423. self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
  424. self.confirm_cpuutil(2, cpu_util_file)
  425. # Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline
  426. data2 = data2.repeat(5)
  427. num_iter = 0
  428. for _ in data2.create_dict_iterator(num_epochs=1):
  429. num_iter += 1
  430. assert num_iter == 20
  431. # Confirm pipeline file and CPU util file each have 3 ops
  432. self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"], pipeline_file)
  433. self.confirm_cpuutil(3, cpu_util_file)