You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_profiling.py 23 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. # Copyright 2020-2022 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ==============================================================================
  15. """
  16. Testing profiling support in DE
  17. """
  18. import json
  19. import os
  20. import numpy as np
  21. import pytest
  22. import mindspore.common.dtype as mstype
  23. import mindspore.dataset as ds
  24. import mindspore.dataset.transforms.c_transforms as C
  25. import mindspore.dataset.vision.c_transforms as vision
  26. import mindspore._c_dataengine as cde
  27. FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
  28. DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
  29. SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
  30. @pytest.mark.forked
  31. class TestMinddataProfilingManager:
  32. """
  33. Test MinddataProfilingManager
  34. Note: Use pytest fixture tmp_path to create files within this temporary directory,
  35. which is automatically created for each test and deleted at the end of the test.
  36. """
  37. def setup_class(self):
  38. """
  39. Run once for the class
  40. """
  41. # Get instance pointer for MindData profiling manager
  42. self.md_profiler = cde.GlobalContext.profiling_manager()
  43. def setup_method(self):
  44. """
  45. Run before each test function.
  46. """
  47. # Set the MindData Profiling related environment variables
  48. os.environ['RANK_ID'] = "1"
  49. os.environ['DEVICE_ID'] = "1"
  50. # Initialize MindData profiling manager
  51. self.md_profiler.init()
  52. # Start MindData Profiling
  53. self.md_profiler.start()
  54. def teardown_method(self):
  55. """
  56. Run after each test function.
  57. """
  58. # Disable MindData Profiling related environment variables
  59. del os.environ['RANK_ID']
  60. del os.environ['DEVICE_ID']
  61. def confirm_cpuutil(self, cpu_util_file, num_pipeline_ops):
  62. """
  63. Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline
  64. """
  65. with open(cpu_util_file) as file1:
  66. data = json.load(file1)
  67. op_info = data["op_info"]
  68. assert len(op_info) == num_pipeline_ops
  69. def confirm_ops_in_pipeline(self, pipeline_file, num_ops, op_list):
  70. """
  71. Confirm pipeline JSON file with <num_ops> are in the pipeline and the given list of ops
  72. """
  73. with open(pipeline_file) as file1:
  74. data = json.load(file1)
  75. op_info = data["op_info"]
  76. # Confirm ops in pipeline file
  77. assert len(op_info) == num_ops
  78. for i in range(num_ops):
  79. assert op_info[i]["op_type"] in op_list
  80. def confirm_dataset_iterator_file(self, dataset_iterator_file, num_batches):
  81. """
  82. Confirm dataset iterator file exists with the correct number of rows in the file
  83. """
  84. assert os.path.exists(dataset_iterator_file)
  85. actual_num_lines = sum(1 for _ in open(dataset_iterator_file))
  86. # Confirm there are 4 lines for each batch in the dataset iterator file
  87. assert actual_num_lines == 4 * num_batches
  88. def test_profiling_simple_pipeline(self, tmp_path):
  89. """
  90. Generator -> Shuffle -> Batch
  91. """
  92. source = [(np.array([x]),) for x in range(1024)]
  93. data1 = ds.GeneratorDataset(source, ["data"])
  94. data1 = data1.shuffle(64)
  95. data1 = data1.batch(32)
  96. # Check output shape type and dataset size
  97. assert data1.output_shapes() == [[32, 1]]
  98. assert [str(tp) for tp in data1.output_types()] == ["int64"]
  99. assert data1.get_dataset_size() == 32
  100. # Stop MindData Profiling and save output files to tmp_path
  101. self.md_profiler.stop()
  102. self.md_profiler.save(str(tmp_path))
  103. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  104. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  105. dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
  106. # Confirm no profiling files are produced (since no MindData pipeline has been executed)
  107. assert os.path.exists(pipeline_file) is False
  108. assert os.path.exists(cpu_util_file) is False
  109. assert os.path.exists(dataset_iterator_file) is False
  110. # Start MindData Profiling
  111. self.md_profiler.start()
  112. # Execute MindData Pipeline
  113. for _ in data1:
  114. pass
  115. # Stop MindData Profiling and save output files to tmp_path
  116. self.md_profiler.stop()
  117. self.md_profiler.save(str(tmp_path))
  118. # Confirm profiling files now exist
  119. assert os.path.exists(pipeline_file) is True
  120. assert os.path.exists(cpu_util_file) is True
  121. assert os.path.exists(dataset_iterator_file) is True
  122. def test_profiling_complex_pipeline(self, tmp_path):
  123. """
  124. Generator -> Map ->
  125. -> Zip
  126. TFReader -> Shuffle ->
  127. """
  128. source = [(np.array([x]),) for x in range(1024)]
  129. data1 = ds.GeneratorDataset(source, ["gen"])
  130. data1 = data1.map(operations=[(lambda x: x + 1)], input_columns=["gen"])
  131. pattern = DATASET_ROOT + "/test.data"
  132. data2 = ds.TFRecordDataset(pattern, SCHEMA_FILE, shuffle=ds.Shuffle.FILES)
  133. data2 = data2.shuffle(4)
  134. data3 = ds.zip((data1, data2))
  135. for _ in data3:
  136. pass
  137. # Stop MindData Profiling and save output files to tmp_path
  138. self.md_profiler.stop()
  139. self.md_profiler.save(str(tmp_path))
  140. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  141. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  142. dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
  143. with open(pipeline_file) as f:
  144. data = json.load(f)
  145. op_info = data["op_info"]
  146. assert len(op_info) == 5
  147. for i in range(5):
  148. if op_info[i]["op_type"] != "ZipOp":
  149. assert "size" in op_info[i]["metrics"]["output_queue"]
  150. assert "length" in op_info[i]["metrics"]["output_queue"]
  151. else:
  152. # Note: Zip is an inline op and hence does not have metrics information
  153. assert op_info[i]["metrics"] is None
  154. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  155. self.confirm_cpuutil(cpu_util_file, 5)
  156. # Confirm dataset iterator file content
  157. self.confirm_dataset_iterator_file(dataset_iterator_file, 12)
  158. def test_profiling_inline_ops_pipeline1(self, tmp_path):
  159. """
  160. Test pipeline with inline ops: Concat and EpochCtrl
  161. Generator ->
  162. Concat -> EpochCtrl
  163. Generator ->
  164. """
  165. # In source1 dataset: Number of rows is 3; its values are 0, 1, 2
  166. def source1():
  167. for i in range(3):
  168. yield (np.array([i]),)
  169. # In source2 dataset: Number of rows is 7; its values are 3, 4, 5 ... 9
  170. def source2():
  171. for i in range(3, 10):
  172. yield (np.array([i]),)
  173. data1 = ds.GeneratorDataset(source1, ["col1"])
  174. data2 = ds.GeneratorDataset(source2, ["col1"])
  175. data3 = data1.concat(data2)
  176. num_iter = 0
  177. # Note: set num_epochs=2 in create_tuple_iterator(), so that EpochCtrl op is added to the pipeline
  178. # Here i refers to index, d refers to data element
  179. for i, d in enumerate(data3.create_tuple_iterator(num_epochs=2, output_numpy=True)):
  180. num_iter += 1
  181. t = d
  182. assert i == t[0][0]
  183. assert num_iter == 10
  184. # Stop MindData Profiling and save output files to tmp_path
  185. self.md_profiler.stop()
  186. self.md_profiler.save(str(tmp_path))
  187. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  188. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  189. dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
  190. # Confirm pipeline is created with EpochCtrl op
  191. with open(pipeline_file) as f:
  192. data = json.load(f)
  193. op_info = data["op_info"]
  194. assert len(op_info) == 4
  195. for i in range(4):
  196. # Note: The following ops are inline ops: Concat, EpochCtrl
  197. if op_info[i]["op_type"] in ("ConcatOp", "EpochCtrlOp"):
  198. # Confirm these inline ops do not have metrics information
  199. assert op_info[i]["metrics"] is None
  200. else:
  201. assert "size" in op_info[i]["metrics"]["output_queue"]
  202. assert "length" in op_info[i]["metrics"]["output_queue"]
  203. # Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file
  204. self.confirm_cpuutil(cpu_util_file, 4)
  205. # Confirm dataset iterator file content
  206. self.confirm_dataset_iterator_file(dataset_iterator_file, 10)
  207. def test_profiling_inline_ops_pipeline2(self, tmp_path):
  208. """
  209. Test pipeline with many inline ops
  210. Generator -> Rename -> Skip -> Repeat -> Take
  211. """
  212. # In source1 dataset: Number of rows is 10; its values are 0, 1, 2, 3, 4, 5 ... 9
  213. def source1():
  214. for i in range(10):
  215. yield (np.array([i]),)
  216. data1 = ds.GeneratorDataset(source1, ["col1"])
  217. data1 = data1.rename(input_columns=["col1"], output_columns=["newcol1"])
  218. data1 = data1.skip(2)
  219. data1 = data1.repeat(2)
  220. data1 = data1.take(12)
  221. for _ in data1:
  222. pass
  223. # Stop MindData Profiling and save output files to tmp_path
  224. self.md_profiler.stop()
  225. self.md_profiler.save(str(tmp_path))
  226. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  227. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  228. dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
  229. with open(pipeline_file) as f:
  230. data = json.load(f)
  231. op_info = data["op_info"]
  232. assert len(op_info) == 5
  233. for i in range(5):
  234. # Check for these inline ops
  235. if op_info[i]["op_type"] in ("RenameOp", "RepeatOp", "SkipOp", "TakeOp"):
  236. # Confirm these inline ops do not have metrics information
  237. assert op_info[i]["metrics"] is None
  238. else:
  239. assert "size" in op_info[i]["metrics"]["output_queue"]
  240. assert "length" in op_info[i]["metrics"]["output_queue"]
  241. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  242. self.confirm_cpuutil(cpu_util_file, 5)
  243. # Confirm dataset iterator file content
  244. self.confirm_dataset_iterator_file(dataset_iterator_file, 12)
  245. def test_profiling_sampling_interval(self, tmp_path):
  246. """
  247. Test non-default monitor sampling interval
  248. """
  249. interval_origin = ds.config.get_monitor_sampling_interval()
  250. ds.config.set_monitor_sampling_interval(30)
  251. interval = ds.config.get_monitor_sampling_interval()
  252. assert interval == 30
  253. source = [(np.array([x]),) for x in range(1024)]
  254. data1 = ds.GeneratorDataset(source, ["data"])
  255. data1 = data1.shuffle(64)
  256. data1 = data1.batch(32)
  257. for _ in data1:
  258. pass
  259. ds.config.set_monitor_sampling_interval(interval_origin)
  260. # Stop MindData Profiling and save output files to tmp_path
  261. self.md_profiler.stop()
  262. self.md_profiler.save(str(tmp_path))
  263. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  264. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  265. dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
  266. # Confirm pipeline file and CPU util file each have 3 ops
  267. self.confirm_ops_in_pipeline(pipeline_file, 3, ["GeneratorOp", "BatchOp", "ShuffleOp"])
  268. self.confirm_cpuutil(cpu_util_file, 3)
  269. # Confirm dataset iterator file content
  270. self.confirm_dataset_iterator_file(dataset_iterator_file, 32)
  271. def test_profiling_basic_pipeline(self, tmp_path):
  272. """
  273. Test with this basic pipeline
  274. Generator -> Map -> Batch -> Repeat -> EpochCtrl
  275. """
  276. def source1():
  277. for i in range(8000):
  278. yield (np.array([i]),)
  279. # Create this basic and common pipeline
  280. # Leaf/Source-Op -> Map -> Batch -> Repeat
  281. data1 = ds.GeneratorDataset(source1, ["col1"])
  282. type_cast_op = C.TypeCast(mstype.int32)
  283. data1 = data1.map(operations=type_cast_op, input_columns="col1")
  284. data1 = data1.batch(16)
  285. data1 = data1.repeat(2)
  286. num_iter = 0
  287. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  288. for _ in data1.create_dict_iterator(num_epochs=2):
  289. num_iter += 1
  290. assert num_iter == 1000
  291. # Stop MindData Profiling and save output files to tmp_path
  292. self.md_profiler.stop()
  293. self.md_profiler.save(str(tmp_path))
  294. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  295. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  296. dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
  297. with open(pipeline_file) as f:
  298. data = json.load(f)
  299. op_info = data["op_info"]
  300. assert len(op_info) == 5
  301. for i in range(5):
  302. # Check for inline ops
  303. if op_info[i]["op_type"] in ("EpochCtrlOp", "RepeatOp"):
  304. # Confirm these inline ops do not have metrics information
  305. assert op_info[i]["metrics"] is None
  306. else:
  307. assert "size" in op_info[i]["metrics"]["output_queue"]
  308. assert "length" in op_info[i]["metrics"]["output_queue"]
  309. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  310. self.confirm_cpuutil(cpu_util_file, 5)
  311. # Confirm dataset iterator file content
  312. self.confirm_dataset_iterator_file(dataset_iterator_file, 1000)
  313. def test_profiling_cifar10_pipeline(self, tmp_path):
  314. """
  315. Test with this common pipeline with Cifar10
  316. Cifar10 -> Map -> Map -> Batch -> Repeat
  317. """
  318. # Create this common pipeline
  319. # Cifar10 -> Map -> Map -> Batch -> Repeat
  320. DATA_DIR_10 = "../data/dataset/testCifar10Data"
  321. data1 = ds.Cifar10Dataset(DATA_DIR_10, num_samples=8000)
  322. type_cast_op = C.TypeCast(mstype.int32)
  323. data1 = data1.map(operations=type_cast_op, input_columns="label")
  324. random_horizontal_op = vision.RandomHorizontalFlip()
  325. data1 = data1.map(operations=random_horizontal_op, input_columns="image")
  326. data1 = data1.batch(32)
  327. data1 = data1.repeat(3)
  328. num_iter = 0
  329. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
  330. for _ in data1.create_dict_iterator(num_epochs=1):
  331. num_iter += 1
  332. assert num_iter == 750
  333. # Stop MindData Profiling and save output files to tmp_path
  334. self.md_profiler.stop()
  335. self.md_profiler.save(str(tmp_path))
  336. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  337. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  338. dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
  339. with open(pipeline_file) as f:
  340. data = json.load(f)
  341. op_info = data["op_info"]
  342. assert len(op_info) == 5
  343. for i in range(5):
  344. # Check for inline ops
  345. if op_info[i]["op_type"] == "RepeatOp":
  346. # Confirm these inline ops do not have metrics information
  347. assert op_info[i]["metrics"] is None
  348. else:
  349. assert "size" in op_info[i]["metrics"]["output_queue"]
  350. assert "length" in op_info[i]["metrics"]["output_queue"]
  351. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  352. self.confirm_cpuutil(cpu_util_file, 5)
  353. # Confirm dataset iterator file content
  354. self.confirm_dataset_iterator_file(dataset_iterator_file, 750)
  355. def test_profiling_seq_pipelines_epochctrl3(self, tmp_path):
  356. """
  357. Test with these 2 sequential pipelines:
  358. 1) Generator -> Batch -> EpochCtrl
  359. 2) Generator -> Batch
  360. Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation.
  361. """
  362. source = [(np.array([x]),) for x in range(64)]
  363. data1 = ds.GeneratorDataset(source, ["data"])
  364. data1 = data1.batch(32)
  365. # Test A - Call create_dict_iterator with num_epochs>1
  366. num_iter = 0
  367. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  368. for _ in data1.create_dict_iterator(num_epochs=2):
  369. num_iter += 1
  370. assert num_iter == 2
  371. # Stop MindData Profiling and save output files to tmp_path
  372. self.md_profiler.stop()
  373. self.md_profiler.save(str(tmp_path))
  374. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  375. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  376. dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
  377. # Confirm pipeline file and CPU util file each have 3 ops
  378. self.confirm_ops_in_pipeline(pipeline_file, 3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"])
  379. self.confirm_cpuutil(cpu_util_file, 3)
  380. # Test B - Call create_dict_iterator with num_epochs=1
  381. # Initialize and Start MindData profiling manager
  382. self.md_profiler.init()
  383. self.md_profiler.start()
  384. num_iter = 0
  385. # Note: If create_dict_iterator() is called with num_epochs=1,
  386. # then EpochCtrlOp should not be NOT added to the pipeline
  387. for _ in data1.create_dict_iterator(num_epochs=1):
  388. num_iter += 1
  389. assert num_iter == 2
  390. # Stop MindData Profiling and save output files to tmp_path
  391. self.md_profiler.stop()
  392. self.md_profiler.save(str(tmp_path))
  393. # Confirm pipeline file and CPU util file each have 2 ops
  394. self.confirm_ops_in_pipeline(pipeline_file, 2, ["GeneratorOp", "BatchOp"])
  395. self.confirm_cpuutil(cpu_util_file, 2)
  396. # Confirm dataset iterator file content
  397. self.confirm_dataset_iterator_file(dataset_iterator_file, 2)
  398. def test_profiling_seq_pipelines_epochctrl2(self, tmp_path):
  399. """
  400. Test with these 2 sequential pipelines:
  401. 1) Generator -> Batch
  402. 2) Generator -> Batch -> EpochCtrl
  403. """
  404. source = [(np.array([x]),) for x in range(64)]
  405. data2 = ds.GeneratorDataset(source, ["data"])
  406. data2 = data2.batch(16)
  407. # Test A - Call create_dict_iterator with num_epochs=1
  408. num_iter = 0
  409. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
  410. for _ in data2.create_dict_iterator(num_epochs=1):
  411. num_iter += 1
  412. assert num_iter == 4
  413. # Stop MindData Profiling and save output files to tmp_path
  414. self.md_profiler.stop()
  415. self.md_profiler.save(str(tmp_path))
  416. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  417. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  418. dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
  419. # Confirm pipeline file and CPU util file each have 2 ops
  420. self.confirm_ops_in_pipeline(pipeline_file, 2, ["GeneratorOp", "BatchOp"])
  421. self.confirm_cpuutil(cpu_util_file, 2)
  422. # Test B - Call create_dict_iterator with num_epochs>1
  423. # Initialize and Start MindData profiling manager
  424. self.md_profiler.init()
  425. self.md_profiler.start()
  426. num_iter = 0
  427. # Note: If create_dict_iterator() is called with num_epochs>1,
  428. # then EpochCtrlOp should be added to the pipeline
  429. for _ in data2.create_dict_iterator(num_epochs=2):
  430. num_iter += 1
  431. assert num_iter == 4
  432. # Stop MindData Profiling and save output files to tmp_path
  433. self.md_profiler.stop()
  434. self.md_profiler.save(str(tmp_path))
  435. # Confirm pipeline file and CPU util file each have 3 ops
  436. self.confirm_ops_in_pipeline(pipeline_file, 3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"])
  437. self.confirm_cpuutil(cpu_util_file, 3)
  438. # Confirm dataset iterator file content
  439. self.confirm_dataset_iterator_file(dataset_iterator_file, 4)
  440. def test_profiling_seq_pipelines_repeat(self, tmp_path):
  441. """
  442. Test with these 2 sequential pipelines:
  443. 1) Generator -> Batch
  444. 2) Generator -> Batch -> Repeat
  445. """
  446. source = [(np.array([x]),) for x in range(64)]
  447. data2 = ds.GeneratorDataset(source, ["data"])
  448. data2 = data2.batch(16)
  449. # Test A - Call create_dict_iterator with 2 ops in pipeline
  450. num_iter = 0
  451. for _ in data2.create_dict_iterator(num_epochs=1):
  452. num_iter += 1
  453. assert num_iter == 4
  454. # Stop MindData Profiling and save output files to tmp_path
  455. self.md_profiler.stop()
  456. self.md_profiler.save(str(tmp_path))
  457. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  458. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  459. dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
  460. # Confirm pipeline file and CPU util file each have 2 ops
  461. self.confirm_ops_in_pipeline(pipeline_file, 2, ["GeneratorOp", "BatchOp"])
  462. self.confirm_cpuutil(cpu_util_file, 2)
  463. # Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline
  464. # Initialize and Start MindData profiling manager
  465. self.md_profiler.init()
  466. self.md_profiler.start()
  467. data2 = data2.repeat(5)
  468. num_iter = 0
  469. for _ in data2.create_dict_iterator(num_epochs=1):
  470. num_iter += 1
  471. assert num_iter == 20
  472. # Stop MindData Profiling and save output files to tmp_path
  473. self.md_profiler.stop()
  474. self.md_profiler.save(str(tmp_path))
  475. # Confirm pipeline file and CPU util file each have 3 ops
  476. self.confirm_ops_in_pipeline(pipeline_file, 3, ["GeneratorOp", "BatchOp", "RepeatOp"])
  477. self.confirm_cpuutil(cpu_util_file, 3)
  478. # Confirm dataset iterator file content
  479. self.confirm_dataset_iterator_file(dataset_iterator_file, 20)