You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_profiling.py 20 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. # Copyright 2020-2021 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ==============================================================================
  15. """
  16. Testing profiling support in DE
  17. """
  18. import json
  19. import os
  20. import numpy as np
  21. import pytest
  22. import mindspore.common.dtype as mstype
  23. import mindspore.dataset as ds
  24. import mindspore.dataset.transforms.c_transforms as C
  25. import mindspore.dataset.vision.c_transforms as vision
  26. import mindspore._c_dataengine as cde
  27. FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
  28. DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
  29. SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
  30. @pytest.mark.forked
  31. class TestMinddataProfilingManager:
  32. """
  33. Test MinddataProfilingManager
  34. Note: Use pytest fixture tmp_path to create files within this temporary directory,
  35. which is automatically created for each test and deleted at the end of the test.
  36. """
  37. def setup_class(self):
  38. """
  39. Run once for the class
  40. """
  41. # Get instance pointer for MindData profiling manager
  42. self.md_profiler = cde.GlobalContext.profiling_manager()
  43. def setup_method(self):
  44. """
  45. Run before each test function.
  46. """
  47. # Set the MindData Profiling related environment variables
  48. os.environ['RANK_ID'] = "1"
  49. os.environ['DEVICE_ID'] = "1"
  50. # Initialize MindData profiling manager
  51. self.md_profiler.init()
  52. # Start MindData Profiling
  53. self.md_profiler.start()
  54. def teardown_method(self):
  55. """
  56. Run after each test function.
  57. """
  58. # Disable MindData Profiling related environment variables
  59. del os.environ['RANK_ID']
  60. del os.environ['DEVICE_ID']
  61. def confirm_cpuutil(self, num_pipeline_ops, cpu_uti_file):
  62. """
  63. Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline
  64. """
  65. with open(cpu_uti_file) as file1:
  66. data = json.load(file1)
  67. op_info = data["op_info"]
  68. assert len(op_info) == num_pipeline_ops
  69. def confirm_ops_in_pipeline(self, num_ops, op_list, pipeline_file):
  70. """
  71. Confirm pipeline JSON file with <num_ops> are in the pipeline and the given list of ops
  72. """
  73. with open(pipeline_file) as file1:
  74. data = json.load(file1)
  75. op_info = data["op_info"]
  76. # Confirm ops in pipeline file
  77. assert len(op_info) == num_ops
  78. for i in range(num_ops):
  79. assert op_info[i]["op_type"] in op_list
  80. def test_profiling_simple_pipeline(self, tmp_path):
  81. """
  82. Generator -> Shuffle -> Batch
  83. """
  84. source = [(np.array([x]),) for x in range(1024)]
  85. data1 = ds.GeneratorDataset(source, ["data"])
  86. data1 = data1.shuffle(64)
  87. data1 = data1.batch(32)
  88. # try output shape type and dataset size and make sure no profiling file is generated
  89. assert data1.output_shapes() == [[32, 1]]
  90. assert [str(tp) for tp in data1.output_types()] == ["int64"]
  91. assert data1.get_dataset_size() == 32
  92. for _ in data1:
  93. pass
  94. # Stop MindData Profiling and save output files to current working directory
  95. self.md_profiler.stop()
  96. self.md_profiler.save(str(tmp_path))
  97. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  98. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  99. dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt"
  100. # Confirm profiling files now exist
  101. assert os.path.exists(pipeline_file) is True
  102. assert os.path.exists(cpu_util_file) is True
  103. assert os.path.exists(dataset_iterator_file) is True
  104. def test_profiling_complex_pipeline(self, tmp_path):
  105. """
  106. Generator -> Map ->
  107. -> Zip
  108. TFReader -> Shuffle ->
  109. """
  110. source = [(np.array([x]),) for x in range(1024)]
  111. data1 = ds.GeneratorDataset(source, ["gen"])
  112. data1 = data1.map(operations=[(lambda x: x + 1)], input_columns=["gen"])
  113. pattern = DATASET_ROOT + "/test.data"
  114. data2 = ds.TFRecordDataset(pattern, SCHEMA_FILE, shuffle=ds.Shuffle.FILES)
  115. data2 = data2.shuffle(4)
  116. data3 = ds.zip((data1, data2))
  117. for _ in data3:
  118. pass
  119. # Stop MindData Profiling and save output files to current working directory
  120. self.md_profiler.stop()
  121. self.md_profiler.save(str(tmp_path))
  122. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  123. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  124. with open(pipeline_file) as f:
  125. data = json.load(f)
  126. op_info = data["op_info"]
  127. assert len(op_info) == 5
  128. for i in range(5):
  129. if op_info[i]["op_type"] != "ZipOp":
  130. assert "size" in op_info[i]["metrics"]["output_queue"]
  131. assert "length" in op_info[i]["metrics"]["output_queue"]
  132. else:
  133. # Note: Zip is an inline op and hence does not have metrics information
  134. assert op_info[i]["metrics"] is None
  135. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  136. self.confirm_cpuutil(5, cpu_util_file)
  137. def test_profiling_inline_ops_pipeline1(self, tmp_path):
  138. """
  139. Test pipeline with inline ops: Concat and EpochCtrl
  140. Generator ->
  141. Concat -> EpochCtrl
  142. Generator ->
  143. """
  144. # In source1 dataset: Number of rows is 3; its values are 0, 1, 2
  145. def source1():
  146. for i in range(3):
  147. yield (np.array([i]),)
  148. # In source2 dataset: Number of rows is 7; its values are 3, 4, 5 ... 9
  149. def source2():
  150. for i in range(3, 10):
  151. yield (np.array([i]),)
  152. data1 = ds.GeneratorDataset(source1, ["col1"])
  153. data2 = ds.GeneratorDataset(source2, ["col1"])
  154. data3 = data1.concat(data2)
  155. num_iter = 0
  156. # Note: set num_epochs=2 in create_tuple_iterator(), so that EpochCtrl op is added to the pipeline
  157. # Here i refers to index, d refers to data element
  158. for i, d in enumerate(data3.create_tuple_iterator(num_epochs=2, output_numpy=True)):
  159. num_iter += 1
  160. t = d
  161. assert i == t[0][0]
  162. assert num_iter == 10
  163. # Stop MindData Profiling and save output files to current working directory
  164. self.md_profiler.stop()
  165. self.md_profiler.save(str(tmp_path))
  166. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  167. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  168. # Confirm pipeline is created with EpochCtrl op
  169. with open(pipeline_file) as f:
  170. data = json.load(f)
  171. op_info = data["op_info"]
  172. assert len(op_info) == 4
  173. for i in range(4):
  174. # Note: The following ops are inline ops: Concat, EpochCtrl
  175. if op_info[i]["op_type"] in ("ConcatOp", "EpochCtrlOp"):
  176. # Confirm these inline ops do not have metrics information
  177. assert op_info[i]["metrics"] is None
  178. else:
  179. assert "size" in op_info[i]["metrics"]["output_queue"]
  180. assert "length" in op_info[i]["metrics"]["output_queue"]
  181. # Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file
  182. self.confirm_cpuutil(4, cpu_util_file)
  183. def test_profiling_inline_ops_pipeline2(self, tmp_path):
  184. """
  185. Test pipeline with many inline ops
  186. Generator -> Rename -> Skip -> Repeat -> Take
  187. """
  188. # In source1 dataset: Number of rows is 10; its values are 0, 1, 2, 3, 4, 5 ... 9
  189. def source1():
  190. for i in range(10):
  191. yield (np.array([i]),)
  192. data1 = ds.GeneratorDataset(source1, ["col1"])
  193. data1 = data1.rename(input_columns=["col1"], output_columns=["newcol1"])
  194. data1 = data1.skip(2)
  195. data1 = data1.repeat(2)
  196. data1 = data1.take(12)
  197. for _ in data1:
  198. pass
  199. # Stop MindData Profiling and save output files to current working directory
  200. self.md_profiler.stop()
  201. self.md_profiler.save(str(tmp_path))
  202. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  203. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  204. with open(pipeline_file) as f:
  205. data = json.load(f)
  206. op_info = data["op_info"]
  207. assert len(op_info) == 5
  208. for i in range(5):
  209. # Check for these inline ops
  210. if op_info[i]["op_type"] in ("RenameOp", "RepeatOp", "SkipOp", "TakeOp"):
  211. # Confirm these inline ops do not have metrics information
  212. assert op_info[i]["metrics"] is None
  213. else:
  214. assert "size" in op_info[i]["metrics"]["output_queue"]
  215. assert "length" in op_info[i]["metrics"]["output_queue"]
  216. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  217. self.confirm_cpuutil(5, cpu_util_file)
  218. def test_profiling_sampling_interval(self, tmp_path):
  219. """
  220. Test non-default monitor sampling interval
  221. """
  222. interval_origin = ds.config.get_monitor_sampling_interval()
  223. ds.config.set_monitor_sampling_interval(30)
  224. interval = ds.config.get_monitor_sampling_interval()
  225. assert interval == 30
  226. source = [(np.array([x]),) for x in range(1024)]
  227. data1 = ds.GeneratorDataset(source, ["data"])
  228. data1 = data1.shuffle(64)
  229. data1 = data1.batch(32)
  230. for _ in data1:
  231. pass
  232. ds.config.set_monitor_sampling_interval(interval_origin)
  233. # Stop MindData Profiling and save output files to current working directory
  234. self.md_profiler.stop()
  235. self.md_profiler.save(str(tmp_path))
  236. def test_profiling_basic_pipeline(self, tmp_path):
  237. """
  238. Test with this basic pipeline
  239. Generator -> Map -> Batch -> Repeat -> EpochCtrl
  240. """
  241. def source1():
  242. for i in range(8000):
  243. yield (np.array([i]),)
  244. # Create this basic and common pipeline
  245. # Leaf/Source-Op -> Map -> Batch -> Repeat
  246. data1 = ds.GeneratorDataset(source1, ["col1"])
  247. type_cast_op = C.TypeCast(mstype.int32)
  248. data1 = data1.map(operations=type_cast_op, input_columns="col1")
  249. data1 = data1.batch(16)
  250. data1 = data1.repeat(2)
  251. num_iter = 0
  252. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  253. for _ in data1.create_dict_iterator(num_epochs=2):
  254. num_iter += 1
  255. assert num_iter == 1000
  256. # Stop MindData Profiling and save output files to current working directory
  257. self.md_profiler.stop()
  258. self.md_profiler.save(str(tmp_path))
  259. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  260. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  261. with open(pipeline_file) as f:
  262. data = json.load(f)
  263. op_info = data["op_info"]
  264. assert len(op_info) == 5
  265. for i in range(5):
  266. # Check for inline ops
  267. if op_info[i]["op_type"] in ("EpochCtrlOp", "RepeatOp"):
  268. # Confirm these inline ops do not have metrics information
  269. assert op_info[i]["metrics"] is None
  270. else:
  271. assert "size" in op_info[i]["metrics"]["output_queue"]
  272. assert "length" in op_info[i]["metrics"]["output_queue"]
  273. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  274. self.confirm_cpuutil(5, cpu_util_file)
  275. def test_profiling_cifar10_pipeline(self, tmp_path):
  276. """
  277. Test with this common pipeline with Cifar10
  278. Cifar10 -> Map -> Map -> Batch -> Repeat
  279. """
  280. # Create this common pipeline
  281. # Cifar10 -> Map -> Map -> Batch -> Repeat
  282. DATA_DIR_10 = "../data/dataset/testCifar10Data"
  283. data1 = ds.Cifar10Dataset(DATA_DIR_10, num_samples=8000)
  284. type_cast_op = C.TypeCast(mstype.int32)
  285. data1 = data1.map(operations=type_cast_op, input_columns="label")
  286. random_horizontal_op = vision.RandomHorizontalFlip()
  287. data1 = data1.map(operations=random_horizontal_op, input_columns="image")
  288. data1 = data1.batch(32)
  289. data1 = data1.repeat(3)
  290. num_iter = 0
  291. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
  292. for _ in data1.create_dict_iterator(num_epochs=1):
  293. num_iter += 1
  294. assert num_iter == 750
  295. # Stop MindData Profiling and save output files to current working directory
  296. self.md_profiler.stop()
  297. self.md_profiler.save(str(tmp_path))
  298. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  299. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  300. with open(pipeline_file) as f:
  301. data = json.load(f)
  302. op_info = data["op_info"]
  303. assert len(op_info) == 5
  304. for i in range(5):
  305. # Check for inline ops
  306. if op_info[i]["op_type"] == "RepeatOp":
  307. # Confirm these inline ops do not have metrics information
  308. assert op_info[i]["metrics"] is None
  309. else:
  310. assert "size" in op_info[i]["metrics"]["output_queue"]
  311. assert "length" in op_info[i]["metrics"]["output_queue"]
  312. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  313. self.confirm_cpuutil(5, cpu_util_file)
  314. def test_profiling_seq_pipelines_epochctrl3(self, tmp_path):
  315. """
  316. Test with these 2 sequential pipelines:
  317. 1) Generator -> Batch -> EpochCtrl
  318. 2) Generator -> Batch
  319. Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation.
  320. """
  321. source = [(np.array([x]),) for x in range(64)]
  322. data1 = ds.GeneratorDataset(source, ["data"])
  323. data1 = data1.batch(32)
  324. # Test A - Call create_dict_iterator with num_epochs>1
  325. num_iter = 0
  326. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  327. for _ in data1.create_dict_iterator(num_epochs=2):
  328. num_iter += 1
  329. assert num_iter == 2
  330. # Stop MindData Profiling and save output files to current working directory
  331. self.md_profiler.stop()
  332. self.md_profiler.save(str(tmp_path))
  333. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  334. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  335. # Confirm pipeline file and CPU util file each have 3 ops
  336. self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
  337. self.confirm_cpuutil(3, cpu_util_file)
  338. # Test B - Call create_dict_iterator with num_epochs=1
  339. # Initialize and Start MindData profiling manager
  340. self.md_profiler.init()
  341. self.md_profiler.start()
  342. num_iter = 0
  343. # Note: If create_dict_iterator() is called with num_epochs=1,
  344. # then EpochCtrlOp should not be NOT added to the pipeline
  345. for _ in data1.create_dict_iterator(num_epochs=1):
  346. num_iter += 1
  347. assert num_iter == 2
  348. # Stop MindData Profiling and save output files to current working directory
  349. self.md_profiler.stop()
  350. self.md_profiler.save(str(tmp_path))
  351. # Confirm pipeline file and CPU util file each have 2 ops
  352. self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
  353. self.confirm_cpuutil(2, cpu_util_file)
  354. def test_profiling_seq_pipelines_epochctrl2(self, tmp_path):
  355. """
  356. Test with these 2 sequential pipelines:
  357. 1) Generator -> Batch
  358. 2) Generator -> Batch -> EpochCtrl
  359. """
  360. source = [(np.array([x]),) for x in range(64)]
  361. data2 = ds.GeneratorDataset(source, ["data"])
  362. data2 = data2.batch(16)
  363. # Test A - Call create_dict_iterator with num_epochs=1
  364. num_iter = 0
  365. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
  366. for _ in data2.create_dict_iterator(num_epochs=1):
  367. num_iter += 1
  368. assert num_iter == 4
  369. # Stop MindData Profiling and save output files to current working directory
  370. self.md_profiler.stop()
  371. self.md_profiler.save(str(tmp_path))
  372. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  373. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  374. # Confirm pipeline file and CPU util file each have 2 ops
  375. self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
  376. self.confirm_cpuutil(2, cpu_util_file)
  377. # Test B - Call create_dict_iterator with num_epochs>1
  378. # Initialize and Start MindData profiling manager
  379. self.md_profiler.init()
  380. self.md_profiler.start()
  381. num_iter = 0
  382. # Note: If create_dict_iterator() is called with num_epochs>1,
  383. # then EpochCtrlOp should be added to the pipeline
  384. for _ in data2.create_dict_iterator(num_epochs=2):
  385. num_iter += 1
  386. assert num_iter == 4
  387. # Stop MindData Profiling and save output files to current working directory
  388. self.md_profiler.stop()
  389. self.md_profiler.save(str(tmp_path))
  390. # Confirm pipeline file and CPU util file each have 3 ops
  391. self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file)
  392. self.confirm_cpuutil(3, cpu_util_file)
  393. def test_profiling_seq_pipelines_repeat(self, tmp_path):
  394. """
  395. Test with these 2 sequential pipelines:
  396. 1) Generator -> Batch
  397. 2) Generator -> Batch -> Repeat
  398. """
  399. source = [(np.array([x]),) for x in range(64)]
  400. data2 = ds.GeneratorDataset(source, ["data"])
  401. data2 = data2.batch(16)
  402. # Test A - Call create_dict_iterator with 2 ops in pipeline
  403. num_iter = 0
  404. for _ in data2.create_dict_iterator(num_epochs=1):
  405. num_iter += 1
  406. assert num_iter == 4
  407. # Stop MindData Profiling and save output files to current working directory
  408. self.md_profiler.stop()
  409. self.md_profiler.save(str(tmp_path))
  410. pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json"
  411. cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json"
  412. # Confirm pipeline file and CPU util file each have 2 ops
  413. self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file)
  414. self.confirm_cpuutil(2, cpu_util_file)
  415. # Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline
  416. # Initialize and Start MindData profiling manager
  417. self.md_profiler.init()
  418. self.md_profiler.start()
  419. data2 = data2.repeat(5)
  420. num_iter = 0
  421. for _ in data2.create_dict_iterator(num_epochs=1):
  422. num_iter += 1
  423. assert num_iter == 20
  424. # Stop MindData Profiling and save output files to current working directory
  425. self.md_profiler.stop()
  426. self.md_profiler.save(str(tmp_path))
  427. # Confirm pipeline file and CPU util file each have 3 ops
  428. self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"], pipeline_file)
  429. self.confirm_cpuutil(3, cpu_util_file)