You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_profiling.py 18 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ==============================================================================
  15. """
  16. Testing profiling support in DE
  17. """
  18. import json
  19. import os
  20. import numpy as np
  21. import mindspore.common.dtype as mstype
  22. import mindspore.dataset as ds
  23. import mindspore.dataset.transforms.c_transforms as C
  24. import mindspore.dataset.vision.c_transforms as vision
  25. FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
  26. DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
  27. SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"
  28. PIPELINE_FILE = "./pipeline_profiling_1.json"
  29. CPU_UTIL_FILE = "./minddata_cpu_utilization_1.json"
  30. DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt"
  31. def set_profiling_env_var():
  32. """
  33. Set the MindData Profiling environment variables
  34. """
  35. os.environ['PROFILING_MODE'] = 'true'
  36. os.environ['MINDDATA_PROFILING_DIR'] = '.'
  37. os.environ['DEVICE_ID'] = '1'
  38. os.environ['RANK_ID'] = '1'
  39. def delete_profiling_files():
  40. """
  41. Delete the MindData profiling files generated from the test.
  42. Also disable the MindData Profiling environment variables.
  43. """
  44. # Delete MindData profiling files
  45. os.remove(PIPELINE_FILE)
  46. os.remove(CPU_UTIL_FILE)
  47. os.remove(DATASET_ITERATOR_FILE)
  48. # Disable MindData Profiling environment variables
  49. del os.environ['PROFILING_MODE']
  50. del os.environ['MINDDATA_PROFILING_DIR']
  51. del os.environ['DEVICE_ID']
  52. del os.environ['RANK_ID']
  53. def confirm_cpuutil(num_pipeline_ops):
  54. """
  55. Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline
  56. """
  57. with open(CPU_UTIL_FILE) as file1:
  58. data = json.load(file1)
  59. op_info = data["op_info"]
  60. # Confirm <num_pipeline_ops>+1 ops in CPU util file (including op_id=-1 for monitor thread)
  61. assert len(op_info) == num_pipeline_ops + 1
  62. def confirm_ops_in_pipeline(num_ops, op_list):
  63. """
  64. Confirm pipeline JSON file with <num_ops> are in the pipeline and the given list of ops
  65. """
  66. with open(PIPELINE_FILE) as file1:
  67. data = json.load(file1)
  68. op_info = data["op_info"]
  69. # Confirm ops in pipeline file
  70. assert len(op_info) == num_ops
  71. for i in range(num_ops):
  72. assert op_info[i]["op_type"] in op_list
  73. def test_profiling_simple_pipeline():
  74. """
  75. Generator -> Shuffle -> Batch
  76. """
  77. set_profiling_env_var()
  78. source = [(np.array([x]),) for x in range(1024)]
  79. data1 = ds.GeneratorDataset(source, ["data"])
  80. data1 = data1.shuffle(64)
  81. data1 = data1.batch(32)
  82. # try output shape type and dataset size and make sure no profiling file is generated
  83. assert data1.output_shapes() == [[32, 1]]
  84. assert [str(tp) for tp in data1.output_types()] == ["int64"]
  85. assert data1.get_dataset_size() == 32
  86. # Confirm profiling files do not (yet) exist
  87. assert os.path.exists(PIPELINE_FILE) is False
  88. assert os.path.exists(CPU_UTIL_FILE) is False
  89. assert os.path.exists(DATASET_ITERATOR_FILE) is False
  90. try:
  91. for _ in data1:
  92. pass
  93. # Confirm profiling files now exist
  94. assert os.path.exists(PIPELINE_FILE) is True
  95. assert os.path.exists(CPU_UTIL_FILE) is True
  96. assert os.path.exists(DATASET_ITERATOR_FILE) is True
  97. except Exception as error:
  98. delete_profiling_files()
  99. raise error
  100. else:
  101. delete_profiling_files()
  102. def test_profiling_complex_pipeline():
  103. """
  104. Generator -> Map ->
  105. -> Zip
  106. TFReader -> Shuffle ->
  107. """
  108. set_profiling_env_var()
  109. source = [(np.array([x]),) for x in range(1024)]
  110. data1 = ds.GeneratorDataset(source, ["gen"])
  111. data1 = data1.map(operations=[(lambda x: x + 1)], input_columns=["gen"])
  112. pattern = DATASET_ROOT + "/test.data"
  113. data2 = ds.TFRecordDataset(pattern, SCHEMA_FILE, shuffle=ds.Shuffle.FILES)
  114. data2 = data2.shuffle(4)
  115. data3 = ds.zip((data1, data2))
  116. try:
  117. for _ in data3:
  118. pass
  119. with open(PIPELINE_FILE) as f:
  120. data = json.load(f)
  121. op_info = data["op_info"]
  122. assert len(op_info) == 5
  123. for i in range(5):
  124. if op_info[i]["op_type"] != "ZipOp":
  125. assert "size" in op_info[i]["metrics"]["output_queue"]
  126. assert "length" in op_info[i]["metrics"]["output_queue"]
  127. assert "throughput" in op_info[i]["metrics"]["output_queue"]
  128. else:
  129. # Note: Zip is an inline op and hence does not have metrics information
  130. assert op_info[i]["metrics"] is None
  131. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  132. confirm_cpuutil(5)
  133. except Exception as error:
  134. delete_profiling_files()
  135. raise error
  136. else:
  137. delete_profiling_files()
  138. def test_profiling_inline_ops_pipeline1():
  139. """
  140. Test pipeline with inline ops: Concat and EpochCtrl
  141. Generator ->
  142. Concat -> EpochCtrl
  143. Generator ->
  144. """
  145. set_profiling_env_var()
  146. # In source1 dataset: Number of rows is 3; its values are 0, 1, 2
  147. def source1():
  148. for i in range(3):
  149. yield (np.array([i]),)
  150. # In source2 dataset: Number of rows is 7; its values are 3, 4, 5 ... 9
  151. def source2():
  152. for i in range(3, 10):
  153. yield (np.array([i]),)
  154. data1 = ds.GeneratorDataset(source1, ["col1"])
  155. data2 = ds.GeneratorDataset(source2, ["col1"])
  156. data3 = data1.concat(data2)
  157. try:
  158. num_iter = 0
  159. # Note: set num_epochs=2 in create_tuple_iterator(), so that EpochCtrl op is added to the pipeline
  160. # Here i refers to index, d refers to data element
  161. for i, d in enumerate(data3.create_tuple_iterator(num_epochs=2, output_numpy=True)):
  162. num_iter += 1
  163. t = d
  164. assert i == t[0][0]
  165. assert num_iter == 10
  166. # Confirm pipeline is created with EpochCtrl op
  167. with open(PIPELINE_FILE) as f:
  168. data = json.load(f)
  169. op_info = data["op_info"]
  170. assert len(op_info) == 4
  171. for i in range(4):
  172. # Note: The following ops are inline ops: Concat, EpochCtrl
  173. if op_info[i]["op_type"] in ("ConcatOp", "EpochCtrlOp"):
  174. # Confirm these inline ops do not have metrics information
  175. assert op_info[i]["metrics"] is None
  176. else:
  177. assert "size" in op_info[i]["metrics"]["output_queue"]
  178. assert "length" in op_info[i]["metrics"]["output_queue"]
  179. assert "throughput" in op_info[i]["metrics"]["output_queue"]
  180. # Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file
  181. confirm_cpuutil(4)
  182. except Exception as error:
  183. delete_profiling_files()
  184. raise error
  185. else:
  186. delete_profiling_files()
  187. def test_profiling_inline_ops_pipeline2():
  188. """
  189. Test pipeline with many inline ops
  190. Generator -> Rename -> Skip -> Repeat -> Take
  191. """
  192. set_profiling_env_var()
  193. # In source1 dataset: Number of rows is 10; its values are 0, 1, 2, 3, 4, 5 ... 9
  194. def source1():
  195. for i in range(10):
  196. yield (np.array([i]),)
  197. data1 = ds.GeneratorDataset(source1, ["col1"])
  198. data1 = data1.rename(input_columns=["col1"], output_columns=["newcol1"])
  199. data1 = data1.skip(2)
  200. data1 = data1.repeat(2)
  201. data1 = data1.take(12)
  202. try:
  203. for _ in data1:
  204. pass
  205. with open(PIPELINE_FILE) as f:
  206. data = json.load(f)
  207. op_info = data["op_info"]
  208. assert len(op_info) == 5
  209. for i in range(5):
  210. # Check for these inline ops
  211. if op_info[i]["op_type"] in ("RenameOp", "RepeatOp", "SkipOp", "TakeOp"):
  212. # Confirm these inline ops do not have metrics information
  213. assert op_info[i]["metrics"] is None
  214. else:
  215. assert "size" in op_info[i]["metrics"]["output_queue"]
  216. assert "length" in op_info[i]["metrics"]["output_queue"]
  217. assert "throughput" in op_info[i]["metrics"]["output_queue"]
  218. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  219. confirm_cpuutil(5)
  220. except Exception as error:
  221. delete_profiling_files()
  222. raise error
  223. else:
  224. delete_profiling_files()
  225. def test_profiling_sampling_interval():
  226. """
  227. Test non-default monitor sampling interval
  228. """
  229. set_profiling_env_var()
  230. interval_origin = ds.config.get_monitor_sampling_interval()
  231. ds.config.set_monitor_sampling_interval(30)
  232. interval = ds.config.get_monitor_sampling_interval()
  233. assert interval == 30
  234. source = [(np.array([x]),) for x in range(1024)]
  235. data1 = ds.GeneratorDataset(source, ["data"])
  236. data1 = data1.shuffle(64)
  237. data1 = data1.batch(32)
  238. try:
  239. for _ in data1:
  240. pass
  241. except Exception as error:
  242. ds.config.set_monitor_sampling_interval(interval_origin)
  243. delete_profiling_files()
  244. raise error
  245. else:
  246. ds.config.set_monitor_sampling_interval(interval_origin)
  247. delete_profiling_files()
  248. def test_profiling_basic_pipeline():
  249. """
  250. Test with this basic pipeline
  251. Generator -> Map -> Batch -> Repeat -> EpochCtrl
  252. """
  253. set_profiling_env_var()
  254. def source1():
  255. for i in range(8000):
  256. yield (np.array([i]),)
  257. # Create this basic and common pipeline
  258. # Leaf/Source-Op -> Map -> Batch -> Repeat
  259. data1 = ds.GeneratorDataset(source1, ["col1"])
  260. type_cast_op = C.TypeCast(mstype.int32)
  261. data1 = data1.map(operations=type_cast_op, input_columns="col1")
  262. data1 = data1.batch(16)
  263. data1 = data1.repeat(2)
  264. try:
  265. num_iter = 0
  266. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  267. for _ in data1.create_dict_iterator(num_epochs=2):
  268. num_iter += 1
  269. assert num_iter == 1000
  270. with open(PIPELINE_FILE) as f:
  271. data = json.load(f)
  272. op_info = data["op_info"]
  273. assert len(op_info) == 5
  274. for i in range(5):
  275. # Check for inline ops
  276. if op_info[i]["op_type"] in ("EpochCtrlOp", "RepeatOp"):
  277. # Confirm these inline ops do not have metrics information
  278. assert op_info[i]["metrics"] is None
  279. else:
  280. assert "size" in op_info[i]["metrics"]["output_queue"]
  281. assert "length" in op_info[i]["metrics"]["output_queue"]
  282. assert "throughput" in op_info[i]["metrics"]["output_queue"]
  283. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  284. confirm_cpuutil(5)
  285. except Exception as error:
  286. delete_profiling_files()
  287. raise error
  288. else:
  289. delete_profiling_files()
  290. def test_profiling_cifar10_pipeline():
  291. """
  292. Test with this common pipeline with Cifar10
  293. Cifar10 -> Map -> Map -> Batch -> Repeat
  294. """
  295. set_profiling_env_var()
  296. # Create this common pipeline
  297. # Cifar10 -> Map -> Map -> Batch -> Repeat
  298. DATA_DIR_10 = "../data/dataset/testCifar10Data"
  299. data1 = ds.Cifar10Dataset(DATA_DIR_10, num_samples=8000)
  300. type_cast_op = C.TypeCast(mstype.int32)
  301. data1 = data1.map(operations=type_cast_op, input_columns="label")
  302. random_horizontal_op = vision.RandomHorizontalFlip()
  303. data1 = data1.map(operations=random_horizontal_op, input_columns="image")
  304. data1 = data1.batch(32)
  305. data1 = data1.repeat(3)
  306. try:
  307. num_iter = 0
  308. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
  309. for _ in data1.create_dict_iterator(num_epochs=1):
  310. num_iter += 1
  311. assert num_iter == 750
  312. with open(PIPELINE_FILE) as f:
  313. data = json.load(f)
  314. op_info = data["op_info"]
  315. assert len(op_info) == 5
  316. for i in range(5):
  317. # Check for inline ops
  318. if op_info[i]["op_type"] == "RepeatOp":
  319. # Confirm these inline ops do not have metrics information
  320. assert op_info[i]["metrics"] is None
  321. else:
  322. assert "size" in op_info[i]["metrics"]["output_queue"]
  323. assert "length" in op_info[i]["metrics"]["output_queue"]
  324. assert "throughput" in op_info[i]["metrics"]["output_queue"]
  325. # Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file
  326. confirm_cpuutil(5)
  327. except Exception as error:
  328. delete_profiling_files()
  329. raise error
  330. else:
  331. delete_profiling_files()
  332. def test_profiling_seq_pipelines_epochctrl3():
  333. """
  334. Test with these 2 sequential pipelines:
  335. 1) Generator -> Batch -> EpochCtrl
  336. 2) Generator -> Batch
  337. Note: This is a simplification of the user scenario to use the same pipeline for training and then evaluation.
  338. """
  339. set_profiling_env_var()
  340. source = [(np.array([x]),) for x in range(64)]
  341. data1 = ds.GeneratorDataset(source, ["data"])
  342. data1 = data1.batch(32)
  343. try:
  344. # Test A - Call create_dict_iterator with num_epochs>1
  345. num_iter = 0
  346. # Note: If create_dict_iterator() is called with num_epochs>1, then EpochCtrlOp is added to the pipeline
  347. for _ in data1.create_dict_iterator(num_epochs=2):
  348. num_iter += 1
  349. assert num_iter == 2
  350. # Confirm pipeline file and CPU util file each have 3 ops
  351. confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"])
  352. confirm_cpuutil(3)
  353. # Test B - Call create_dict_iterator with num_epochs=1
  354. num_iter = 0
  355. # Note: If create_dict_iterator() is called with num_epochs=1,
  356. # then EpochCtrlOp should not be NOT added to the pipeline
  357. for _ in data1.create_dict_iterator(num_epochs=1):
  358. num_iter += 1
  359. assert num_iter == 2
  360. # Confirm pipeline file and CPU util file each have 2 ops
  361. confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
  362. confirm_cpuutil(2)
  363. except Exception as error:
  364. delete_profiling_files()
  365. raise error
  366. else:
  367. delete_profiling_files()
  368. def test_profiling_seq_pipelines_epochctrl2():
  369. """
  370. Test with these 2 sequential pipelines:
  371. 1) Generator -> Batch
  372. 2) Generator -> Batch -> EpochCtrl
  373. """
  374. set_profiling_env_var()
  375. source = [(np.array([x]),) for x in range(64)]
  376. data2 = ds.GeneratorDataset(source, ["data"])
  377. data2 = data2.batch(16)
  378. try:
  379. # Test A - Call create_dict_iterator with num_epochs=1
  380. num_iter = 0
  381. # Note: If create_dict_iterator() is called with num_epochs=1, then EpochCtrlOp is NOT added to the pipeline
  382. for _ in data2.create_dict_iterator(num_epochs=1):
  383. num_iter += 1
  384. assert num_iter == 4
  385. # Confirm pipeline file and CPU util file each have 2 ops
  386. confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
  387. confirm_cpuutil(2)
  388. # Test B - Call create_dict_iterator with num_epochs>1
  389. num_iter = 0
  390. # Note: If create_dict_iterator() is called with num_epochs>1,
  391. # then EpochCtrlOp should be added to the pipeline
  392. for _ in data2.create_dict_iterator(num_epochs=2):
  393. num_iter += 1
  394. assert num_iter == 4
  395. # Confirm pipeline file and CPU util file each have 3 ops
  396. confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"])
  397. confirm_cpuutil(3)
  398. except Exception as error:
  399. delete_profiling_files()
  400. raise error
  401. else:
  402. delete_profiling_files()
  403. def test_profiling_seq_pipelines_repeat():
  404. """
  405. Test with these 2 sequential pipelines:
  406. 1) Generator -> Batch
  407. 2) Generator -> Batch -> Repeat
  408. """
  409. set_profiling_env_var()
  410. source = [(np.array([x]),) for x in range(64)]
  411. data2 = ds.GeneratorDataset(source, ["data"])
  412. data2 = data2.batch(16)
  413. try:
  414. # Test A - Call create_dict_iterator with 2 ops in pipeline
  415. num_iter = 0
  416. for _ in data2.create_dict_iterator(num_epochs=1):
  417. num_iter += 1
  418. assert num_iter == 4
  419. # Confirm pipeline file and CPU util file each have 2 ops
  420. confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"])
  421. confirm_cpuutil(2)
  422. # Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline
  423. data2 = data2.repeat(5)
  424. num_iter = 0
  425. for _ in data2.create_dict_iterator(num_epochs=1):
  426. num_iter += 1
  427. assert num_iter == 20
  428. # Confirm pipeline file and CPU util file each have 3 ops
  429. confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"])
  430. confirm_cpuutil(3)
  431. except Exception as error:
  432. delete_profiling_files()
  433. raise error
  434. else:
  435. delete_profiling_files()
  436. if __name__ == "__main__":
  437. test_profiling_simple_pipeline()
  438. test_profiling_complex_pipeline()
  439. test_profiling_inline_ops_pipeline1()
  440. test_profiling_inline_ops_pipeline2()
  441. test_profiling_sampling_interval()
  442. test_profiling_basic_pipeline()
  443. test_profiling_cifar10_pipeline()
  444. test_profiling_seq_pipelines_epochctrl3()
  445. test_profiling_seq_pipelines_epochctrl2()
  446. test_profiling_seq_pipelines_repeat()