|
|
|
@@ -1,4 +1,4 @@ |
|
|
|
# Copyright 2020-2021 Huawei Technologies Co., Ltd |
|
|
|
# Copyright 2020-2022 Huawei Technologies Co., Ltd |
|
|
|
# |
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
# you may not use this file except in compliance with the License. |
|
|
|
@@ -69,16 +69,16 @@ class TestMinddataProfilingManager: |
|
|
|
del os.environ['RANK_ID'] |
|
|
|
del os.environ['DEVICE_ID'] |
|
|
|
|
|
|
|
def confirm_cpuutil(self, num_pipeline_ops, cpu_uti_file): |
|
|
|
def confirm_cpuutil(self, cpu_util_file, num_pipeline_ops): |
|
|
|
""" |
|
|
|
Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline |
|
|
|
""" |
|
|
|
with open(cpu_uti_file) as file1: |
|
|
|
with open(cpu_util_file) as file1: |
|
|
|
data = json.load(file1) |
|
|
|
op_info = data["op_info"] |
|
|
|
assert len(op_info) == num_pipeline_ops |
|
|
|
|
|
|
|
def confirm_ops_in_pipeline(self, num_ops, op_list, pipeline_file): |
|
|
|
def confirm_ops_in_pipeline(self, pipeline_file, num_ops, op_list): |
|
|
|
""" |
|
|
|
Confirm pipeline JSON file with <num_ops> are in the pipeline and the given list of ops |
|
|
|
""" |
|
|
|
@@ -90,6 +90,15 @@ class TestMinddataProfilingManager: |
|
|
|
for i in range(num_ops): |
|
|
|
assert op_info[i]["op_type"] in op_list |
|
|
|
|
|
|
|
def confirm_dataset_iterator_file(self, dataset_iterator_file, num_batches): |
|
|
|
""" |
|
|
|
Confirm dataset iterator file exists with the correct number of rows in the file |
|
|
|
""" |
|
|
|
assert os.path.exists(dataset_iterator_file) |
|
|
|
actual_num_lines = sum(1 for _ in open(dataset_iterator_file)) |
|
|
|
# Confirm there are 4 lines for each batch in the dataset iterator file |
|
|
|
assert actual_num_lines == 4 * num_batches |
|
|
|
|
|
|
|
def test_profiling_simple_pipeline(self, tmp_path): |
|
|
|
""" |
|
|
|
Generator -> Shuffle -> Batch |
|
|
|
@@ -99,15 +108,12 @@ class TestMinddataProfilingManager: |
|
|
|
data1 = ds.GeneratorDataset(source, ["data"]) |
|
|
|
data1 = data1.shuffle(64) |
|
|
|
data1 = data1.batch(32) |
|
|
|
# try output shape type and dataset size and make sure no profiling file is generated |
|
|
|
# Check output shape type and dataset size |
|
|
|
assert data1.output_shapes() == [[32, 1]] |
|
|
|
assert [str(tp) for tp in data1.output_types()] == ["int64"] |
|
|
|
assert data1.get_dataset_size() == 32 |
|
|
|
|
|
|
|
for _ in data1: |
|
|
|
pass |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
@@ -115,6 +121,22 @@ class TestMinddataProfilingManager: |
|
|
|
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" |
|
|
|
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt" |
|
|
|
|
|
|
|
# Confirm no profiling files are produced (since no MindData pipeline has been executed) |
|
|
|
assert os.path.exists(pipeline_file) is False |
|
|
|
assert os.path.exists(cpu_util_file) is False |
|
|
|
assert os.path.exists(dataset_iterator_file) is False |
|
|
|
|
|
|
|
# Start MindData Profiling |
|
|
|
self.md_profiler.start() |
|
|
|
|
|
|
|
# Execute MindData Pipeline |
|
|
|
for _ in data1: |
|
|
|
pass |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
# Confirm profiling files now exist |
|
|
|
assert os.path.exists(pipeline_file) is True |
|
|
|
assert os.path.exists(cpu_util_file) is True |
|
|
|
@@ -140,12 +162,13 @@ class TestMinddataProfilingManager: |
|
|
|
for _ in data3: |
|
|
|
pass |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" |
|
|
|
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" |
|
|
|
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt" |
|
|
|
|
|
|
|
with open(pipeline_file) as f: |
|
|
|
data = json.load(f) |
|
|
|
@@ -160,7 +183,10 @@ class TestMinddataProfilingManager: |
|
|
|
assert op_info[i]["metrics"] is None |
|
|
|
|
|
|
|
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file |
|
|
|
self.confirm_cpuutil(5, cpu_util_file) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 5) |
|
|
|
|
|
|
|
# Confirm dataset iterator file content |
|
|
|
self.confirm_dataset_iterator_file(dataset_iterator_file, 12) |
|
|
|
|
|
|
|
def test_profiling_inline_ops_pipeline1(self, tmp_path): |
|
|
|
""" |
|
|
|
@@ -194,12 +220,13 @@ class TestMinddataProfilingManager: |
|
|
|
|
|
|
|
assert num_iter == 10 |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" |
|
|
|
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" |
|
|
|
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt" |
|
|
|
|
|
|
|
# Confirm pipeline is created with EpochCtrl op |
|
|
|
with open(pipeline_file) as f: |
|
|
|
@@ -216,7 +243,10 @@ class TestMinddataProfilingManager: |
|
|
|
assert "length" in op_info[i]["metrics"]["output_queue"] |
|
|
|
|
|
|
|
# Confirm CPU util JSON file content, when 4 ops are in the pipeline JSON file |
|
|
|
self.confirm_cpuutil(4, cpu_util_file) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 4) |
|
|
|
|
|
|
|
# Confirm dataset iterator file content |
|
|
|
self.confirm_dataset_iterator_file(dataset_iterator_file, 10) |
|
|
|
|
|
|
|
def test_profiling_inline_ops_pipeline2(self, tmp_path): |
|
|
|
""" |
|
|
|
@@ -238,12 +268,13 @@ class TestMinddataProfilingManager: |
|
|
|
for _ in data1: |
|
|
|
pass |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" |
|
|
|
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" |
|
|
|
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt" |
|
|
|
|
|
|
|
with open(pipeline_file) as f: |
|
|
|
data = json.load(f) |
|
|
|
@@ -259,7 +290,10 @@ class TestMinddataProfilingManager: |
|
|
|
assert "length" in op_info[i]["metrics"]["output_queue"] |
|
|
|
|
|
|
|
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file |
|
|
|
self.confirm_cpuutil(5, cpu_util_file) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 5) |
|
|
|
|
|
|
|
# Confirm dataset iterator file content |
|
|
|
self.confirm_dataset_iterator_file(dataset_iterator_file, 12) |
|
|
|
|
|
|
|
def test_profiling_sampling_interval(self, tmp_path): |
|
|
|
""" |
|
|
|
@@ -281,10 +315,21 @@ class TestMinddataProfilingManager: |
|
|
|
|
|
|
|
ds.config.set_monitor_sampling_interval(interval_origin) |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" |
|
|
|
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" |
|
|
|
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt" |
|
|
|
|
|
|
|
# Confirm pipeline file and CPU util file each have 3 ops |
|
|
|
self.confirm_ops_in_pipeline(pipeline_file, 3, ["GeneratorOp", "BatchOp", "ShuffleOp"]) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 3) |
|
|
|
|
|
|
|
# Confirm dataset iterator file content |
|
|
|
self.confirm_dataset_iterator_file(dataset_iterator_file, 32) |
|
|
|
|
|
|
|
def test_profiling_basic_pipeline(self, tmp_path): |
|
|
|
""" |
|
|
|
Test with this basic pipeline |
|
|
|
@@ -311,12 +356,13 @@ class TestMinddataProfilingManager: |
|
|
|
|
|
|
|
assert num_iter == 1000 |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" |
|
|
|
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" |
|
|
|
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt" |
|
|
|
|
|
|
|
with open(pipeline_file) as f: |
|
|
|
data = json.load(f) |
|
|
|
@@ -332,7 +378,10 @@ class TestMinddataProfilingManager: |
|
|
|
assert "length" in op_info[i]["metrics"]["output_queue"] |
|
|
|
|
|
|
|
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file |
|
|
|
self.confirm_cpuutil(5, cpu_util_file) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 5) |
|
|
|
|
|
|
|
# Confirm dataset iterator file content |
|
|
|
self.confirm_dataset_iterator_file(dataset_iterator_file, 1000) |
|
|
|
|
|
|
|
def test_profiling_cifar10_pipeline(self, tmp_path): |
|
|
|
""" |
|
|
|
@@ -360,12 +409,13 @@ class TestMinddataProfilingManager: |
|
|
|
|
|
|
|
assert num_iter == 750 |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" |
|
|
|
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" |
|
|
|
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt" |
|
|
|
|
|
|
|
with open(pipeline_file) as f: |
|
|
|
data = json.load(f) |
|
|
|
@@ -381,7 +431,10 @@ class TestMinddataProfilingManager: |
|
|
|
assert "length" in op_info[i]["metrics"]["output_queue"] |
|
|
|
|
|
|
|
# Confirm CPU util JSON file content, when 5 ops are in the pipeline JSON file |
|
|
|
self.confirm_cpuutil(5, cpu_util_file) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 5) |
|
|
|
|
|
|
|
# Confirm dataset iterator file content |
|
|
|
self.confirm_dataset_iterator_file(dataset_iterator_file, 750) |
|
|
|
|
|
|
|
def test_profiling_seq_pipelines_epochctrl3(self, tmp_path): |
|
|
|
""" |
|
|
|
@@ -402,16 +455,17 @@ class TestMinddataProfilingManager: |
|
|
|
num_iter += 1 |
|
|
|
assert num_iter == 2 |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" |
|
|
|
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" |
|
|
|
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt" |
|
|
|
|
|
|
|
# Confirm pipeline file and CPU util file each have 3 ops |
|
|
|
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file) |
|
|
|
self.confirm_cpuutil(3, cpu_util_file) |
|
|
|
self.confirm_ops_in_pipeline(pipeline_file, 3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"]) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 3) |
|
|
|
|
|
|
|
# Test B - Call create_dict_iterator with num_epochs=1 |
|
|
|
|
|
|
|
@@ -426,13 +480,16 @@ class TestMinddataProfilingManager: |
|
|
|
num_iter += 1 |
|
|
|
assert num_iter == 2 |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
# Confirm pipeline file and CPU util file each have 2 ops |
|
|
|
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file) |
|
|
|
self.confirm_cpuutil(2, cpu_util_file) |
|
|
|
self.confirm_ops_in_pipeline(pipeline_file, 2, ["GeneratorOp", "BatchOp"]) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 2) |
|
|
|
|
|
|
|
# Confirm dataset iterator file content |
|
|
|
self.confirm_dataset_iterator_file(dataset_iterator_file, 2) |
|
|
|
|
|
|
|
def test_profiling_seq_pipelines_epochctrl2(self, tmp_path): |
|
|
|
""" |
|
|
|
@@ -452,16 +509,17 @@ class TestMinddataProfilingManager: |
|
|
|
num_iter += 1 |
|
|
|
assert num_iter == 4 |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" |
|
|
|
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" |
|
|
|
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt" |
|
|
|
|
|
|
|
# Confirm pipeline file and CPU util file each have 2 ops |
|
|
|
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file) |
|
|
|
self.confirm_cpuutil(2, cpu_util_file) |
|
|
|
self.confirm_ops_in_pipeline(pipeline_file, 2, ["GeneratorOp", "BatchOp"]) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 2) |
|
|
|
|
|
|
|
# Test B - Call create_dict_iterator with num_epochs>1 |
|
|
|
|
|
|
|
@@ -476,13 +534,16 @@ class TestMinddataProfilingManager: |
|
|
|
num_iter += 1 |
|
|
|
assert num_iter == 4 |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
# Confirm pipeline file and CPU util file each have 3 ops |
|
|
|
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"], pipeline_file) |
|
|
|
self.confirm_cpuutil(3, cpu_util_file) |
|
|
|
self.confirm_ops_in_pipeline(pipeline_file, 3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"]) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 3) |
|
|
|
|
|
|
|
# Confirm dataset iterator file content |
|
|
|
self.confirm_dataset_iterator_file(dataset_iterator_file, 4) |
|
|
|
|
|
|
|
def test_profiling_seq_pipelines_repeat(self, tmp_path): |
|
|
|
""" |
|
|
|
@@ -501,16 +562,17 @@ class TestMinddataProfilingManager: |
|
|
|
num_iter += 1 |
|
|
|
assert num_iter == 4 |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
pipeline_file = str(tmp_path) + "/pipeline_profiling_1.json" |
|
|
|
cpu_util_file = str(tmp_path) + "/minddata_cpu_utilization_1.json" |
|
|
|
dataset_iterator_file = str(tmp_path) + "/dataset_iterator_profiling_1.txt" |
|
|
|
|
|
|
|
# Confirm pipeline file and CPU util file each have 2 ops |
|
|
|
self.confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"], pipeline_file) |
|
|
|
self.confirm_cpuutil(2, cpu_util_file) |
|
|
|
self.confirm_ops_in_pipeline(pipeline_file, 2, ["GeneratorOp", "BatchOp"]) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 2) |
|
|
|
|
|
|
|
# Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline |
|
|
|
|
|
|
|
@@ -524,10 +586,13 @@ class TestMinddataProfilingManager: |
|
|
|
num_iter += 1 |
|
|
|
assert num_iter == 20 |
|
|
|
|
|
|
|
# Stop MindData Profiling and save output files to current working directory |
|
|
|
# Stop MindData Profiling and save output files to tmp_path |
|
|
|
self.md_profiler.stop() |
|
|
|
self.md_profiler.save(str(tmp_path)) |
|
|
|
|
|
|
|
# Confirm pipeline file and CPU util file each have 3 ops |
|
|
|
self.confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"], pipeline_file) |
|
|
|
self.confirm_cpuutil(3, cpu_util_file) |
|
|
|
self.confirm_ops_in_pipeline(pipeline_file, 3, ["GeneratorOp", "BatchOp", "RepeatOp"]) |
|
|
|
self.confirm_cpuutil(cpu_util_file, 3) |
|
|
|
|
|
|
|
# Confirm dataset iterator file content |
|
|
|
self.confirm_dataset_iterator_file(dataset_iterator_file, 20) |