You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_profiling.py 24 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. # Copyright 2020-2021 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ==============================================================================
  15. """
  16. Testing profiling support in DE
  17. """
  18. import json
  19. import os
  20. import numpy as np
  21. import mindspore.common.dtype as mstype
  22. import mindspore.dataset as ds
  23. import mindspore.dataset.transforms.c_transforms as C
  24. import mindspore.dataset.vision.c_transforms as vision
  25. import mindspore._c_dataengine as cde
  26. FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
  27. DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
  28. SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
  29. # add file name to rank id mapping to avoid file writing crash
  30. file_name_map_rank_id = {"test_profiling_simple_pipeline": "0",
  31. "test_profiling_complex_pipeline": "1",
  32. "test_profiling_inline_ops_pipeline1": "2",
  33. "test_profiling_inline_ops_pipeline2": "3",
  34. "test_profiling_sampling_interval": "4",
  35. "test_profiling_basic_pipeline": "5",
  36. "test_profiling_cifar10_pipeline": "6",
  37. "test_profiling_seq_pipelines_epochctrl3": "7",
  38. "test_profiling_seq_pipelines_epochctrl2": "8",
  39. "test_profiling_seq_pipelines_repeat": "9"}
  40. class TestMinddataProfilingManager:
  41. """
  42. Test MinddataProfilingManager
  43. """
  44. def setup_class(self):
  45. """
  46. Run once for the class
  47. """
  48. # Get instance pointer for MindData profiling manager
  49. self.md_profiler = cde.GlobalContext.profiling_manager()
  50. self._PIPELINE_FILE = "./pipeline_profiling"
  51. self._CPU_UTIL_FILE = "./minddata_cpu_utilization"
  52. self._DATASET_ITERATOR_FILE = "./dataset_iterator_profiling"
  53. def setup_method(self):
  54. """
  55. Run before each test function.
  56. """
  57. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  58. file_id = file_name_map_rank_id[file_name]
  59. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  60. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  61. dataset_iterator_file = self._DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
  62. # Confirm MindData Profiling files do not yet exist
  63. assert os.path.exists(pipeline_file) is False
  64. assert os.path.exists(cpu_util_file) is False
  65. assert os.path.exists(dataset_iterator_file) is False
  66. # Set the MindData Profiling related environment variables
  67. os.environ['RANK_ID'] = file_id
  68. os.environ['DEVICE_ID'] = file_id
  69. # Initialize MindData profiling manager
  70. self.md_profiler.init()
  71. # Start MindData Profiling
  72. self.md_profiler.start()
  73. def teardown_method(self):
  74. """
  75. Run after each test function.
  76. """
  77. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  78. file_id = file_name_map_rank_id[file_name]
  79. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  80. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  81. dataset_iterator_file = self._DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
  82. # Delete MindData profiling files generated from the test.
  83. os.remove(pipeline_file)
  84. os.remove(cpu_util_file)
  85. os.remove(dataset_iterator_file)
  86. # Disable MindData Profiling related environment variables
  87. del os.environ['RANK_ID']
  88. del os.environ['DEVICE_ID']
  89. def confirm_cpuutil(self, num_pipeline_ops, cpu_uti_file):
  90. """
  91. Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline
  92. """
  93. with open(cpu_uti_file) as file1:
  94. data = json.load(file1)
  95. op_info = data["op_info"]
  96. assert len(op_info) == num_pipeline_ops
  97. def confirm_ops_in_pipeline(self, num_ops, op_list, pipeline_file):
  98. """
  99. Confirm pipeline JSON file with <num_ops> are in the pipeline and the given list of ops
  100. """
  101. with open(pipeline_file) as file1:
  102. data = json.load(file1)
  103. op_info = data["op_info"]
  104. # Confirm ops in pipeline file
  105. assert len(op_info) == num_ops
  106. for i in range(num_ops):
  107. assert op_info[i]["op_type"] in op_list
  108. def test_profiling_simple_pipeline(self):
  109. """
  110. Generator -> Shuffle -> Batch
  111. """
  112. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  113. file_id = file_name_map_rank_id[file_name]
  114. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  115. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  116. dataset_iterator_file = self._DATASET_ITERATOR_FILE + "_" + file_id + ".txt"
  117. source = [(np.array([x]),) for x in range(1024)]
  118. data1 = ds.GeneratorDataset(source, ["data"])
  119. data1 = data1.shuffle(64)
  120. data1 = data1.batch(32)
  121. # try output shape type and dataset size and make sure no profiling file is generated
  122. assert data1.output_shapes() == [[32, 1]]
  123. assert [str(tp) for tp in data1.output_types()] == ["int64"]
  124. assert data1.get_dataset_size() == 32
  125. # Confirm profiling files do not (yet) exist
  126. assert os.path.exists(pipeline_file) is False
  127. assert os.path.exists(cpu_util_file) is False
  128. assert os.path.exists(dataset_iterator_file) is False
  129. for _ in data1:
  130. pass
  131. # Stop MindData Profiling and save output files to current working directory
  132. self.md_profiler.stop()
  133. self.md_profiler.save('./')
  134. # Confirm profiling files now exist
  135. assert os.path.exists(pipeline_file) is True
  136. assert os.path.exists(cpu_util_file) is True
  137. assert os.path.exists(dataset_iterator_file) is True
  138. def test_profiling_complex_pipeline(self):
  139. """
  140. Generator -> Map ->
  141. -> Zip
  142. TFReader -> Shuffle ->
  143. """
  144. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  145. file_id = file_name_map_rank_id[file_name]
  146. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  147. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  148. source = [(np.array([x]),) for x in range(1024)]
  149. data1 = ds.GeneratorDataset(source, ["gen"])
  150. data1 = data1.map(operations=[(lambda x: x + 1)], input_columns=["gen"])
  151. pattern = DATASET_ROOT + "/test.data"
  152. data2 = ds.TFRecordDataset(pattern, SCHEMA_FILE, shuffle=ds.Shuffle.FILES)
  153. data2 = data2.shuffle(4)
  154. data3 = ds.zip((data1, data2))
  155. for _ in data3:
  156. pass
  157. # Stop MindData Profiling and save output files to current working directory
  158. self.md_profiler.stop()
  159. self.md_profiler.save('./')
  160. with open(pipeline_file) as f:
  161. data = json.load(f)
  162. op_info = data["op_info"]
  163. assert len(op_info) == 5
  164. for i in range(5):
  165. if op_info[i]["op_type"] != "ZipOp":
  166. assert "size" in op_info[i]["metrics"]["output_queue"]
  167. assert "length" in op_info[i]["metrics"]["output_queue"]
  168. else:
  169. # Note: Zip is an inline op and hence does not have metrics information
  170. assert op_info[i]["metrics"] is None
  171. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  172. self.confirm_cpuutil(5, cpu_util_file)
  173. def test_profiling_inline_ops_pipeline1(self):
  174. """
  175. Test pipeline with inline ops: Concat and EpochCtrl
  176. Generator ->
  177. Concat -> EpochCtrl
  178. Generator ->
  179. """
  180. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  181. file_id = file_name_map_rank_id[file_name]
  182. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  183. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  184. # In source1 dataset: Number of rows is 3; its values are 0, 1, 2
  185. def source1():
  186. for i in range(3):
  187. yield (np.array([i]),)
  188. # In source2 dataset: Number of rows is 7; its values are 3, 4, 5 ... 9
  189. def source2():
  190. for i in range(3, 10):
  191. yield (np.array([i]),)
  192. data1 = ds.GeneratorDataset(source1, ["col1"])
  193. data2 = ds.GeneratorDataset(source2, ["col1"])
  194. data3 = data1.concat(data2)
  195. num_iter = 0
  196. # Note: set num_epochs=2 in create_tuple_iterator(), so that EpochCtrl op is added to the pipeline
  197. # Here i refers to index, d refers to data element
  198. for i, d in enumerate(data3.create_tuple_iterator(num_epochs=2, output_numpy=True)):
  199. num_iter += 1
  200. t = d
  201. assert i == t[0][0]
  202. assert num_iter == 10
  203. # Stop MindData Profiling and save output files to current working directory
  204. self.md_profiler.stop()
  205. self.md_profiler.save('./')
  206. # Confirm pipeline is created with EpochCtrl op
  207. with open(pipeline_file) as f:
  208. data = json.load(f)
  209. op_info = data["op_info"]
  210. assert len(op_info) == 4
  211. for i in range(4):
  212. # Note: The following ops are inline ops: Concat, EpochCtrl
  213. if op_info[i]["op_type"] in ("ConcatOp", "EpochCtrlOp"):
  214. # Confirm these inline ops do not have metrics information
  215. assert op_info[i]["metrics"] is None
  216. else:
  217. assert "size" in op_info[i]["metrics"]["output_queue"]
  218. assert "length" in op_info[i]["metrics"]["output_queue"]
  219. # Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file
  220. self.confirm_cpuutil(4, cpu_util_file)
  221. def test_profiling_inline_ops_pipeline2(self):
  222. """
  223. Test pipeline with many inline ops
  224. Generator -> Rename -> Skip -> Repeat -> Take
  225. """
  226. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  227. file_id = file_name_map_rank_id[file_name]
  228. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  229. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  230. # In source1 dataset: Number of rows is 10; its values are 0, 1, 2, 3, 4, 5 ... 9
  231. def source1():
  232. for i in range(10):
  233. yield (np.array([i]),)
  234. data1 = ds.GeneratorDataset(source1, ["col1"])
  235. data1 = data1.rename(input_columns=["col1"], output_columns=["newcol1"])
  236. data1 = data1.skip(2)
  237. data1 = data1.repeat(2)
  238. data1 = data1.take(12)
  239. for _ in data1:
  240. pass
  241. # Stop MindData Profiling and save output files to current working directory
  242. self.md_profiler.stop()
  243. self.md_profiler.save('./')
  244. with open(pipeline_file) as f:
  245. data = json.load(f)
  246. op_info = data["op_info"]
  247. assert len(op_info) == 5
  248. for i in range(5):
  249. # Check for these inline ops
  250. if op_info[i]["op_type"] in ("RenameOp", "RepeatOp", "SkipOp", "TakeOp"):
  251. # Confirm these inline ops do not have metrics information
  252. assert op_info[i]["metrics"] is None
  253. else:
  254. assert "size" in op_info[i]["metrics"]["output_queue"]
  255. assert "length" in op_info[i]["metrics"]["output_queue"]
  256. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  257. self.confirm_cpuutil(5, cpu_util_file)
  258. def test_profiling_sampling_interval(self):
  259. """
  260. Test non-default monitor sampling interval
  261. """
  262. interval_origin = ds.config.get_monitor_sampling_interval()
  263. ds.config.set_monitor_sampling_interval(30)
  264. interval = ds.config.get_monitor_sampling_interval()
  265. assert interval == 30
  266. source = [(np.array([x]),) for x in range(1024)]
  267. data1 = ds.GeneratorDataset(source, ["data"])
  268. data1 = data1.shuffle(64)
  269. data1 = data1.batch(32)
  270. for _ in data1:
  271. pass
  272. ds.config.set_monitor_sampling_interval(interval_origin)
  273. # Stop MindData Profiling and save output files to current working directory
  274. self.md_profiler.stop()
  275. self.md_profiler.save('./')
  276. def test_profiling_basic_pipeline(self):
  277. """
  278. Test with this basic pipeline
  279. Generator -> Map -> Batch -> Repeat -> EpochCtrl
  280. """
  281. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  282. file_id = file_name_map_rank_id[file_name]
  283. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  284. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  285. def source1():
  286. for i in range(8000):
  287. yield (np.array([i]),)
  288. # Create this basic and common pipeline
  289. # Leaf/Source-Op -> Map -> Batch -> Repeat
  290. data1 = ds.GeneratorDataset(source1, ["col1"])
  291. type_cast_op = C.TypeCast(mstype.int32)
  292. data1 = data1.map(operations=type_cast_op, input_columns="col1")
  293. data1 = data1.batch(16)
  294. data1 = data1.repeat(2)
  295. num_iter = 0
  296. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  297. for _ in data1.create_dict_iterator(num_epochs=2):
  298. num_iter += 1
  299. assert num_iter == 1000
  300. # Stop MindData Profiling and save output files to current working directory
  301. self.md_profiler.stop()
  302. self.md_profiler.save('./')
  303. with open(pipeline_file) as f:
  304. data = json.load(f)
  305. op_info = data["op_info"]
  306. assert len(op_info) == 5
  307. for i in range(5):
  308. # Check for inline ops
  309. if op_info[i]["op_type"] in ("EpochCtrlOp", "RepeatOp"):
  310. # Confirm these inline ops do not have metrics information
  311. assert op_info[i]["metrics"] is None
  312. else:
  313. assert "size" in op_info[i]["metrics"]["output_queue"]
  314. assert "length" in op_info[i]["metrics"]["output_queue"]
  315. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  316. self.confirm_cpuutil(5, cpu_util_file)
  317. def test_profiling_cifar10_pipeline(self):
  318. """
  319. Test with this common pipeline with Cifar10
  320. Cifar10 -> Map -> Map -> Batch -> Repeat
  321. """
  322. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  323. file_id = file_name_map_rank_id[file_name]
  324. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  325. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  326. # Create this common pipeline
  327. # Cifar10 -> Map -> Map -> Batch -> Repeat
  328. DATA_DIR_10 = "../data/dataset/testCifar10Data"
  329. data1 = ds.Cifar10Dataset(DATA_DIR_10, num_samples=8000)
  330. type_cast_op = C.TypeCast(mstype.int32)
  331. data1 = data1.map(operations=type_cast_op, input_columns="label")
  332. random_horizontal_op = vision.RandomHorizontalFlip()
  333. data1 = data1.map(operations=random_horizontal_op, input_columns="image")
  334. data1 = data1.batch(32)
  335. data1 = data1.repeat(3)
  336. num_iter = 0
  337. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
  338. for _ in data1.create_dict_iterator(num_epochs=1):
  339. num_iter += 1
  340. assert num_iter == 750
  341. # Stop MindData Profiling and save output files to current working directory
  342. self.md_profiler.stop()
  343. self.md_profiler.save('./')
  344. with open(pipeline_file) as f:
  345. data = json.load(f)
  346. op_info = data["op_info"]
  347. assert len(op_info) == 5
  348. for i in range(5):
  349. # Check for inline ops
  350. if op_info[i]["op_type"] == "RepeatOp":
  351. # Confirm these inline ops do not have metrics information
  352. assert op_info[i]["metrics"] is None
  353. else:
  354. assert "size" in op_info[i]["metrics"]["output_queue"]
  355. assert "length" in op_info[i]["metrics"]["output_queue"]
  356. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  357. self.confirm_cpuutil(5, cpu_util_file)
  358. def test_profiling_seq_pipelines_epochctrl3(self):
  359. """
  360. Test with these 2 sequential pipelines:
  361. 1) Generator -> Batch -> EpochCtrl
  362. 2) Generator -> Batch
  363. Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation.
  364. """
  365. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  366. file_id = file_name_map_rank_id[file_name]
  367. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  368. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  369. source = [(np.array([x]),) for x in range(64)]
  370. data1 = ds.GeneratorDataset(source, ["data"])
  371. data1 = data1.batch(32)
  372. # Test A - Call create_dict_iterator with num_epochs>1
  373. num_iter = 0
  374. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  375. for _ in data1.create_dict_iterator(num_epochs=2):
  376. num_iter += 1
  377. assert num_iter == 2
  378. # Stop MindData Profiling and save output files to current working directory
  379. self.md_profiler.stop()
  380. self.md_profiler.save('./')
  381. # Confirm pipeline file and CPU util file each have 3 ops
  382. self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
  383. self.confirm_cpuutil(3, cpu_util_file)
  384. # Test B - Call create_dict_iterator with num_epochs=1
  385. # Initialize and Start MindData profiling manager
  386. self.md_profiler.init()
  387. self.md_profiler.start()
  388. num_iter = 0
  389. # Note: If create_dict_iterator() is called with num_epochs=1,
  390. # then EpochCtrlOp should not be NOT added to the pipeline
  391. for _ in data1.create_dict_iterator(num_epochs=1):
  392. num_iter += 1
  393. assert num_iter == 2
  394. # Stop MindData Profiling and save output files to current working directory
  395. self.md_profiler.stop()
  396. self.md_profiler.save('./')
  397. # Confirm pipeline file and CPU util file each have 2 ops
  398. self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
  399. self.confirm_cpuutil(2, cpu_util_file)
  400. def test_profiling_seq_pipelines_epochctrl2(self):
  401. """
  402. Test with these 2 sequential pipelines:
  403. 1) Generator -> Batch
  404. 2) Generator -> Batch -> EpochCtrl
  405. """
  406. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  407. file_id = file_name_map_rank_id[file_name]
  408. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  409. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  410. source = [(np.array([x]),) for x in range(64)]
  411. data2 = ds.GeneratorDataset(source, ["data"])
  412. data2 = data2.batch(16)
  413. # Test A - Call create_dict_iterator with num_epochs=1
  414. num_iter = 0
  415. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
  416. for _ in data2.create_dict_iterator(num_epochs=1):
  417. num_iter += 1
  418. assert num_iter == 4
  419. # Stop MindData Profiling and save output files to current working directory
  420. self.md_profiler.stop()
  421. self.md_profiler.save('./')
  422. # Confirm pipeline file and CPU util file each have 2 ops
  423. self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
  424. self.confirm_cpuutil(2, cpu_util_file)
  425. # Test B - Call create_dict_iterator with num_epochs>1
  426. # Initialize and Start MindData profiling manager
  427. self.md_profiler.init()
  428. self.md_profiler.start()
  429. num_iter = 0
  430. # Note: If create_dict_iterator() is called with num_epochs>1,
  431. # then EpochCtrlOp should be added to the pipeline
  432. for _ in data2.create_dict_iterator(num_epochs=2):
  433. num_iter += 1
  434. assert num_iter == 4
  435. # Stop MindData Profiling and save output files to current working directory
  436. self.md_profiler.stop()
  437. self.md_profiler.save('./')
  438. # Confirm pipeline file and CPU util file each have 3 ops
  439. self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
  440. self.confirm_cpuutil(3, cpu_util_file)
  441. def test_profiling_seq_pipelines_repeat(self):
  442. """
  443. Test with these 2 sequential pipelines:
  444. 1) Generator -> Batch
  445. 2) Generator -> Batch -> Repeat
  446. """
  447. file_name = os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
  448. file_id = file_name_map_rank_id[file_name]
  449. pipeline_file = self._PIPELINE_FILE + "_" + file_id + ".json"
  450. cpu_util_file = self._CPU_UTIL_FILE + "_" + file_id + ".json"
  451. source = [(np.array([x]),) for x in range(64)]
  452. data2 = ds.GeneratorDataset(source, ["data"])
  453. data2 = data2.batch(16)
  454. # Test A - Call create_dict_iterator with 2 ops in pipeline
  455. num_iter = 0
  456. for _ in data2.create_dict_iterator(num_epochs=1):
  457. num_iter += 1
  458. assert num_iter == 4
  459. # Stop MindData Profiling and save output files to current working directory
  460. self.md_profiler.stop()
  461. self.md_profiler.save('./')
  462. # Confirm pipeline file and CPU util file each have 2 ops
  463. self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
  464. self.confirm_cpuutil(2, cpu_util_file)
  465. # Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline
  466. # Initialize and Start MindData profiling manager
  467. self.md_profiler.init()
  468. self.md_profiler.start()
  469. data2 = data2.repeat(5)
  470. num_iter = 0
  471. for _ in data2.create_dict_iterator(num_epochs=1):
  472. num_iter += 1
  473. assert num_iter == 20
  474. # Stop MindData Profiling and save output files to current working directory
  475. self.md_profiler.stop()
  476. self.md_profiler.save('./')
  477. # Confirm pipeline file and CPU util file each have 3 ops
  478. self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"], pipeline_file)
  479. self.confirm_cpuutil(3, cpu_util_file)