Merge pull request !22851 from cathwong/ckw_mon_seq_pipelines_fixtags/v1.5.0-rc1
| @@ -92,11 +92,15 @@ Status ConnectorSize::SaveToFile() { | |||
| // Discard the content of the file when opening. | |||
| std::ofstream os(file_path_, std::ios::trunc); | |||
| os << output; | |||
| os.close(); | |||
| return Status::OK(); | |||
| } | |||
| Status ConnectorSize::Init(const std::string &dir_path, const std::string &device_id) { | |||
| file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString(); | |||
| Path path = Path(file_path_); | |||
| // Remove the file if it exists (from prior profiling usage) | |||
| RETURN_IF_NOT_OK(path.Remove()); | |||
| return Status::OK(); | |||
| } | |||
| @@ -125,11 +125,15 @@ Status ConnectorThroughput::SaveToFile() { | |||
| // Discard the content of the file when opening. | |||
| std::ofstream os(file_path_, std::ios::trunc); | |||
| os << output; | |||
| os.close(); | |||
| return Status::OK(); | |||
| } | |||
| Status ConnectorThroughput::Init(const std::string &dir_path, const std::string &device_id) { | |||
| file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString(); | |||
| Path path = Path(file_path_); | |||
| // Remove the file if it exists (from prior profiling usage) | |||
| RETURN_IF_NOT_OK(path.Remove()); | |||
| return Status::OK(); | |||
| } | |||
| @@ -287,10 +287,8 @@ class MinddataProfilingAnalyzer: | |||
| if metrics and metrics['output_queue']: | |||
| queue_size = metrics['output_queue']['size'] | |||
| queue_length = metrics['output_queue']['length'] | |||
| if queue_length == 0: | |||
| raise ValueError("The input queue can not be None.") | |||
| queue_average_size = round(sum(queue_size) / len(queue_size), 2) if queue_size else -1 | |||
| queue_utilization_pct = round(100 * queue_average_size / queue_length, 2) | |||
| queue_utilization_pct = round(100 * queue_average_size / queue_length, 2) if queue_length else -1 | |||
| # Compute percentage of time queue is empty | |||
| empty_count = 0 | |||
| for q_size in queue_size: | |||
| @@ -61,7 +61,7 @@ def delete_profiling_files(): | |||
| def confirm_cpuutil(num_pipeline_ops): | |||
| """ | |||
| Confirm CPU utilization JSON file when <num_pipeline_ops> are in the pipeline | |||
| Confirm CPU utilization JSON file with <num_pipeline_ops> in the pipeline | |||
| """ | |||
| with open(CPU_UTIL_FILE) as file1: | |||
| data = json.load(file1) | |||
| @@ -70,6 +70,19 @@ def confirm_cpuutil(num_pipeline_ops): | |||
| assert len(op_info) == num_pipeline_ops + 1 | |||
| def confirm_ops_in_pipeline(num_ops, op_list): | |||
| """ | |||
| Confirm pipeline JSON file with <num_ops> are in the pipeline and the given list of ops | |||
| """ | |||
| with open(PIPELINE_FILE) as file1: | |||
| data = json.load(file1) | |||
| op_info = data["op_info"] | |||
| # Confirm ops in pipeline file | |||
| assert len(op_info) == num_ops | |||
| for i in range(num_ops): | |||
| assert op_info[i]["op_type"] in op_list | |||
| def test_profiling_simple_pipeline(): | |||
| """ | |||
| Generator -> Shuffle -> Batch | |||
| @@ -397,26 +410,6 @@ def test_profiling_cifar10_pipeline(): | |||
| delete_profiling_files() | |||
| def confirm_3ops_in_pipeline(): | |||
| with open(PIPELINE_FILE) as file1: | |||
| data = json.load(file1) | |||
| op_info = data["op_info"] | |||
| # Confirm 3 ops in pipeline file | |||
| assert len(op_info) == 3 | |||
| for i in range(3): | |||
| assert op_info[i]["op_type"] in ("GeneratorOp", "BatchOp", "EpochCtrlOp") | |||
| def confirm_2ops_in_pipeline(): | |||
| with open(PIPELINE_FILE) as file1: | |||
| data = json.load(file1) | |||
| op_info = data["op_info"] | |||
| # Confirm 2 ops in pipeline file | |||
| assert len(op_info) == 2 | |||
| for i in range(2): | |||
| assert op_info[i]["op_type"] in ("GeneratorOp", "BatchOp") | |||
| def test_profiling_seq_pipelines_epochctrl3(): | |||
| """ | |||
| Test with these 2 sequential pipelines: | |||
| @@ -438,7 +431,8 @@ def test_profiling_seq_pipelines_epochctrl3(): | |||
| num_iter += 1 | |||
| assert num_iter == 2 | |||
| confirm_3ops_in_pipeline() | |||
| # Confirm pipeline file and CPU util file each have 3 ops | |||
| confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"]) | |||
| confirm_cpuutil(3) | |||
| # Test B - Call create_dict_iterator with num_epochs=1 | |||
| @@ -449,10 +443,8 @@ def test_profiling_seq_pipelines_epochctrl3(): | |||
| num_iter += 1 | |||
| assert num_iter == 2 | |||
| # confirm_2ops_in_pipeline() | |||
| # MD BUG: Confirm pipeline file is not changed and wrongly still has 3 ops | |||
| confirm_3ops_in_pipeline() | |||
| # Confirm CPU util file has correct number of ops | |||
| # Confirm pipeline file and CPU util file each have 2 ops | |||
| confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"]) | |||
| confirm_cpuutil(2) | |||
| except Exception as error: | |||
| @@ -483,7 +475,8 @@ def test_profiling_seq_pipelines_epochctrl2(): | |||
| num_iter += 1 | |||
| assert num_iter == 4 | |||
| confirm_2ops_in_pipeline() | |||
| # Confirm pipeline file and CPU util file each have 2 ops | |||
| confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"]) | |||
| confirm_cpuutil(2) | |||
| # Test B - Call create_dict_iterator with num_epochs>1 | |||
| @@ -494,10 +487,8 @@ def test_profiling_seq_pipelines_epochctrl2(): | |||
| num_iter += 1 | |||
| assert num_iter == 4 | |||
| # confirm_3ops_in_pipeline() | |||
| # MD BUG: Confirm pipeline file is not changed and wrongly still has 2 ops | |||
| confirm_2ops_in_pipeline() | |||
| # Confirm CPU util file has correct number of ops | |||
| # Confirm pipeline file and CPU util file each have 3 ops | |||
| confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "EpochCtrlOp"]) | |||
| confirm_cpuutil(3) | |||
| except Exception as error: | |||
| @@ -527,7 +518,8 @@ def test_profiling_seq_pipelines_repeat(): | |||
| num_iter += 1 | |||
| assert num_iter == 4 | |||
| confirm_2ops_in_pipeline() | |||
| # Confirm pipeline file and CPU util file each have 2 ops | |||
| confirm_ops_in_pipeline(2, ["GeneratorOp", "BatchOp"]) | |||
| confirm_cpuutil(2) | |||
| # Test B - Add repeat op to pipeline. Call create_dict_iterator with 3 ops in pipeline | |||
| @@ -537,10 +529,8 @@ def test_profiling_seq_pipelines_repeat(): | |||
| num_iter += 1 | |||
| assert num_iter == 20 | |||
| # confirm_3ops_in_pipeline() | |||
| # MD BUG: Confirm pipeline file is not changed and wrongly still has 2 ops | |||
| confirm_2ops_in_pipeline() | |||
| # Confirm CPU util file has correct number of ops | |||
| # Confirm pipeline file and CPU util file each have 3 ops | |||
| confirm_ops_in_pipeline(3, ["GeneratorOp", "BatchOp", "RepeatOp"]) | |||
| confirm_cpuutil(3) | |||
| except Exception as error: | |||
| @@ -42,22 +42,12 @@ class TestMinddataProfilingAnalyzer(): | |||
| self._SUMMARY_CSV_FILE = "./minddata_pipeline_summary_7.csv" | |||
| self._ANALYZE_FILE_PATH = "./" | |||
| # These are the minimum subset of expected keys (in alphabetical order) in the MindData Analyzer summary output | |||
| # This is the set of keys for success case | |||
| self._EXPECTED_SUMMARY_KEYS_SUCCESS = \ | |||
| ['avg_cpu_pct', 'avg_cpu_pct_per_worker', 'children_ids', 'num_workers', 'op_ids', 'op_names', | |||
| 'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops', | |||
| 'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct'] | |||
| # This is the set of keys for the case which omits the keys for composite computation of more than one raw file. | |||
| # This is used for the invalid user case in which the number of ops in the pipeline file does not match | |||
| # the number of ops in the CPU utilization file. | |||
| self._EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE = \ | |||
| ['avg_cpu_pct', 'children_ids', 'num_workers', 'op_ids', 'op_names', | |||
| 'parent_id', 'per_batch_time', 'per_pipeline_time', 'per_push_queue_time', 'pipeline_ops', | |||
| 'queue_average_size', 'queue_empty_freq_pct', 'queue_utilization_pct'] | |||
| def setup_method(self): | |||
| """ | |||
| @@ -269,13 +259,10 @@ class TestMinddataProfilingAnalyzer(): | |||
| md_summary_dict = md_analyzer.analyze() | |||
| # Verify MindData Profiling Analyze Summary output | |||
| # Use self._EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE, since composite keys are not produced, since there is a mismatch | |||
| # between the 4 ops in the stale pipeline file versus the 3 ops in the recreated cpu util file | |||
| self.verify_md_summary(md_summary_dict, self._EXPECTED_SUMMARY_KEYS_OMIT_COMPOSITE) | |||
| self.verify_md_summary(md_summary_dict, self._EXPECTED_SUMMARY_KEYS_SUCCESS) | |||
| # Confirm pipeline data wrongly contains info for 4 ops | |||
| assert md_summary_dict["pipeline_ops"] == ["EpochCtrl(id=0)", "Batch(id=1)", "Map(id=2)", | |||
| "Generator(id=3)"] | |||
| # Confirm pipeline data contains info for 3 ops | |||
| assert md_summary_dict["pipeline_ops"] == ["Batch(id=0)", "Map(id=1)", "Generator(id=2)"] | |||
| # Verify CPU util data contains info for only 3 ops | |||
| # Verify CPU util data contains info for 3 ops | |||
| assert len(md_summary_dict["avg_cpu_pct"]) == 3 | |||