From: @robingrosman Reviewed-by: Signed-off-by:tags/v1.2.0-rc1
| @@ -156,7 +156,8 @@ Status MapOp::operator()() { | |||
| } | |||
| // The operator class just starts off threads by calling the tree_ function | |||
| rc = tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1), NameWithID()); | |||
| rc = | |||
| tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1), NameWithID(), id()); | |||
| // Synchronize with TaskManager | |||
| TaskManager::FindMe()->Post(); | |||
| RETURN_IF_NOT_OK(rc); | |||
| @@ -109,5 +109,7 @@ Status ConnectorSize::Init(const std::string &dir_path, const std::string &devic | |||
| file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString(); | |||
| return Status::OK(); | |||
| } | |||
| Status ConnectorSize::Analyze() { return Status::OK(); } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -31,7 +31,7 @@ class ExecutionTree; | |||
| // Connector size sampling samples the output connector size of each op in the pipeline. | |||
| // It support JSON serialization for external usage. | |||
| class ConnectorSize : public Sampling { | |||
| // Connecto size sampling data is stored as a 2D vector | |||
| // Connector size sampling data is stored as a 2D vector | |||
| // op_0 ... op_m | |||
| // sample_0 size_0_0 ... size_m_0 | |||
| // ... ... ... ... | |||
| @@ -58,12 +58,14 @@ class ConnectorSize : public Sampling { | |||
| Status Init(const std::string &dir_path, const std::string &device_id) override; | |||
| // Parse op infomation and transform to json format | |||
| // Parse op information and transform to json format | |||
| json ParseOpInfo(const DatasetOp &node, const std::vector<int32_t> &size); | |||
| // Change file mode after save throughput data | |||
| Status ChangeFileMode() { return Status::OK(); } | |||
| Status Analyze() override; | |||
| private: | |||
| ExecutionTree *tree_ = nullptr; // ExecutionTree pointer | |||
| ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling | |||
| @@ -150,5 +150,7 @@ Status ConnectorThroughput::ChangeFileMode() { | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| Status ConnectorThroughput::Analyze() { return Status::OK(); } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -74,6 +74,8 @@ class ConnectorThroughput : public Sampling { | |||
| Status ChangeFileMode() override; | |||
| Status Analyze() override; | |||
| private: | |||
| ExecutionTree *tree_ = nullptr; // ExecutionTree pointer | |||
| int64_t max_rows_; | |||
| @@ -135,6 +135,27 @@ Status DeviceCpu::Collect(ExecutionTree *tree) { | |||
| first_collect_ = false; | |||
| return Status::OK(); | |||
| } | |||
| Status DeviceCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) { | |||
| *name = std::string("device_info"); | |||
| int total_samples = cpu_util_.size(); | |||
| int sum = 0; | |||
| // Only analyze the middle half of the samples | |||
| // Starting and ending may be impacted by startup or ending pipeline activities | |||
| int start_analyze = total_samples / 4; | |||
| int end_analyze = total_samples - start_analyze; | |||
| for (int i = start_analyze; i < end_analyze; i++) { | |||
| sum += cpu_util_[i].user_utilization_; | |||
| sum += cpu_util_[i].sys_utilization_; | |||
| } | |||
| // Note device utilization is already in range of 0-1, so don't | |||
| // need to divide by number of CPUS | |||
| if ((end_analyze - start_analyze) > 0) { | |||
| *utilization = sum / (end_analyze - start_analyze); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| Status DeviceCpu::SaveToFile(const std::string &file_path) { | |||
| Path path = Path(file_path); | |||
| @@ -236,6 +257,8 @@ Status OperatorCpu::Collect(ExecutionTree *tree) { | |||
| if (first_collect_) { | |||
| for (auto iter = tree->begin(); iter != tree->end(); ++iter) { | |||
| id_count++; | |||
| op_name[iter->id()] = iter->NameWithID(); | |||
| op_parallel_workers[iter->id()] = iter->num_workers(); | |||
| } | |||
| #if defined(USING_LINUX) | |||
| cpu_processor_num = get_nprocs_conf(); | |||
| @@ -327,6 +350,37 @@ Status OperatorCpu::Collect(ExecutionTree *tree) { | |||
| return Status::OK(); | |||
| } | |||
| Status OperatorCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) { | |||
| int total_samples = cpu_op_util_.size(); | |||
| // Only analyze the middle half of the samples | |||
| // Starting and ending may be impacted by startup or ending pipeline activities | |||
| int start_analyze = total_samples / 4; | |||
| int end_analyze = total_samples - start_analyze; | |||
| double op_util; | |||
| *utilization = 0; | |||
| // start loop from 0 was as don't want to analyze op -1 | |||
| for (auto op_id = 0; op_id < id_count; op_id++) { | |||
| int sum = 0; | |||
| int index = op_id + 1; | |||
| for (int i = start_analyze; i < end_analyze; i++) { | |||
| sum += cpu_op_util_[i][index].user_utilization_; | |||
| sum += cpu_op_util_[i][index].sys_utilization_; | |||
| } | |||
| if ((end_analyze - start_analyze) > 0) { | |||
| op_util = 1.0 * sum * cpu_processor_num / (op_parallel_workers[op_id] * (end_analyze - start_analyze)); | |||
| } | |||
| if (op_util > *utilization) { | |||
| *utilization = op_util; | |||
| *name = op_name[op_id]; | |||
| } | |||
| extra_message->append(op_name[op_id] + " utiliization per thread: " + std::to_string(op_util) + "% (" + | |||
| std::to_string(op_parallel_workers[op_id]) + " parallel_workers); "); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| Status OperatorCpu::SaveToFile(const std::string &file_path) { | |||
| Path path = Path(file_path); | |||
| json output; | |||
| @@ -453,6 +507,26 @@ Status ProcessCpu::Collect(ExecutionTree *tree) { | |||
| return Status::OK(); | |||
| } | |||
| Status ProcessCpu::Analyze(std::string *name, double *utilization, std::string *extra_message) { | |||
| *name = std::string("process_info"); | |||
| int total_samples = process_util_.size(); | |||
| int sum = 0; | |||
| // Only analyze the middle half of the samples | |||
| // Starting and ending may be impacted by startup or ending pipeline activities | |||
| int start_analyze = total_samples / 4; | |||
| int end_analyze = total_samples - start_analyze; | |||
| for (int i = start_analyze; i < end_analyze; i++) { | |||
| sum += process_util_[i].user_utilization_; | |||
| sum += process_util_[i].sys_utilization_; | |||
| } | |||
| if ((end_analyze - start_analyze) > 0) { | |||
| *utilization = sum / (end_analyze - start_analyze); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| Status ProcessCpu::SaveToFile(const std::string &file_path) { | |||
| Path path = Path(file_path); | |||
| json output; | |||
| @@ -529,6 +603,37 @@ Status CpuSampling::SaveSamplingItervalToFile() { | |||
| return Status::OK(); | |||
| } | |||
| // Analyze profiling data and output warning messages | |||
| Status CpuSampling::Analyze() { | |||
| std::string name; | |||
| double utilization = 0; | |||
| // Keep track of specific information returned by differentn CPU sampling types | |||
| double total_utilization = 0; | |||
| double max_op_utilization = 0; | |||
| std::string max_op_name; | |||
| std::string detailed_op_cpu_message; | |||
| // Save cpu information to json file | |||
| for (auto cpu : cpu_) { | |||
| std::string extra_message; | |||
| RETURN_IF_NOT_OK(cpu->Analyze(&name, &utilization, &extra_message)); | |||
| if (name == "device_info") { | |||
| total_utilization = utilization; | |||
| } else if (name != "process_info") { | |||
| max_op_utilization = utilization; | |||
| max_op_name = name; | |||
| detailed_op_cpu_message = extra_message; | |||
| } | |||
| } | |||
| if ((total_utilization < 90) && (max_op_utilization > 80)) { | |||
| MS_LOG(WARNING) << "Operator " << max_op_name << " is using " << max_op_utilization << "% CPU per thread. " | |||
| << "This operator may benefit from increasing num_parallel_workers." | |||
| << "Full Operator CPU utiliization for all operators: " << detailed_op_cpu_message << std::endl; | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| // Save profiling data to file | |||
| Status CpuSampling::SaveToFile() { | |||
| // Save time stamp to json file | |||
| @@ -71,6 +71,7 @@ class BaseCpu { | |||
| // Collect CPU information | |||
| virtual Status Collect(ExecutionTree *tree) = 0; | |||
| virtual Status SaveToFile(const std::string &file_path) = 0; | |||
| virtual Status Analyze(std::string *name, double *utilization, std::string *extra_message) = 0; | |||
| protected: | |||
| std::vector<CpuUtil> cpu_util_; | |||
| @@ -90,6 +91,7 @@ class DeviceCpu : public BaseCpu { | |||
| ~DeviceCpu() = default; | |||
| Status Collect(ExecutionTree *tree) override; | |||
| Status SaveToFile(const std::string &file_path) override; | |||
| Status Analyze(std::string *name, double *utilization, std::string *extra_message) override; | |||
| private: | |||
| // Get CPU information, include use/sys/idle/io utilization | |||
| @@ -115,6 +117,11 @@ class OperatorCpu : public BaseCpu { | |||
| ~OperatorCpu() = default; | |||
| Status Collect(ExecutionTree *tree) override; | |||
| Status SaveToFile(const std::string &file_path) override; | |||
| // Analyze will output the name of the metric, the avg utiliization of highest | |||
| // object within the class and any extra message that would be useful for the user. | |||
| // The Higher level CPUSampling class will combine information from different classes | |||
| // to decide if warning should be output. | |||
| Status Analyze(std::string *name, double *utilization, std::string *extra_message) override; | |||
| private: | |||
| // Get cpu information, include use/sys/idle/io utilization | |||
| @@ -131,6 +138,8 @@ class OperatorCpu : public BaseCpu { | |||
| // Store the id and its corresponding threads. | |||
| std::unordered_map<int32_t, std::vector<pid_t>> op_thread; | |||
| std::unordered_map<int32_t, std::string> op_name; | |||
| std::unordered_map<int32_t, int32_t> op_parallel_workers; | |||
| std::unordered_map<int32_t, std::unordered_map<int64_t, CpuOpStat>> pre_op_stat_; | |||
| uint64_t pre_total_stat_; | |||
| int32_t id_count = 0; | |||
| @@ -143,6 +152,7 @@ class ProcessCpu : public BaseCpu { | |||
| ~ProcessCpu() = default; | |||
| Status Collect(ExecutionTree *tree) override; | |||
| Status SaveToFile(const std::string &file_path) override; | |||
| Status Analyze(std::string *name, double *utilization, std::string *extra_message) override; | |||
| private: | |||
| // Get CPU information, include use/sys/idle/io utilization | |||
| @@ -183,6 +193,9 @@ class CpuSampling : public Sampling { | |||
| // Change file mode after save CPU data | |||
| Status ChangeFileMode() override; | |||
| // Analyze sampling data and print message to log | |||
| Status Analyze() override; | |||
| private: | |||
| Status CollectTimeStamp(); | |||
| @@ -45,6 +45,7 @@ Status Monitor::operator()() { | |||
| } | |||
| // Output all profiling data upon request. | |||
| RETURN_IF_NOT_OK(tree_->GetProfilingManager()->Analyze()); | |||
| RETURN_IF_NOT_OK(tree_->GetProfilingManager()->SaveProfilingData()); | |||
| RETURN_IF_NOT_OK(tree_->GetProfilingManager()->ChangeFileMode()); | |||
| return Status::OK(); | |||
| @@ -157,6 +157,16 @@ Status ProfilingManager::SaveProfilingData() { | |||
| MS_LOG(INFO) << "Save profiling data end."; | |||
| return Status::OK(); | |||
| } | |||
| Status ProfilingManager::Analyze() { | |||
| if (!IsProfilingEnable()) { | |||
| return Status::OK(); | |||
| } | |||
| MS_LOG(INFO) << "Start to analyze profiling data."; | |||
| for (auto node : sampling_nodes_) { | |||
| RETURN_IF_NOT_OK(node.second->Analyze()); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| Status ProfilingManager::ChangeFileMode() { | |||
| if (!IsProfilingEnable()) { | |||
| @@ -65,6 +65,7 @@ class Sampling : public Profiling { | |||
| // Sampling action function. This function will be invoked by performance monitor thread. | |||
| virtual Status Sample() = 0; | |||
| // virtual Status TestPrint() = 0; | |||
| virtual Status Analyze() = 0; | |||
| virtual ~Sampling() = default; | |||
| }; | |||
| @@ -118,6 +119,9 @@ class ProfilingManager { | |||
| Status ChangeFileMode(); | |||
| // Analyze profile data and print warning messages | |||
| Status Analyze(); | |||
| private: | |||
| std::unique_ptr<Monitor> perf_monitor_; | |||
| bool enabled_; | |||