From: @ms_yan Reviewed-by: @heleiwang,@liucunwei Signed-off-by: @liucunweipull/14268/MERGE
| @@ -50,6 +50,8 @@ PYBIND_REGISTER(ConfigManager, 0, ([](const py::module *m) { | |||||
| .def("set_auto_worker_config", &ConfigManager::set_auto_worker_config_) | .def("set_auto_worker_config", &ConfigManager::set_auto_worker_config_) | ||||
| .def("set_callback_timeout", &ConfigManager::set_callback_timeout) | .def("set_callback_timeout", &ConfigManager::set_callback_timeout) | ||||
| .def("set_monitor_sampling_interval", &ConfigManager::set_monitor_sampling_interval) | .def("set_monitor_sampling_interval", &ConfigManager::set_monitor_sampling_interval) | ||||
| .def("stop_dataset_profiler", &ConfigManager::stop_dataset_profiler) | |||||
| .def("get_profiler_file_status", &ConfigManager::get_profiler_file_status) | |||||
| .def("set_num_parallel_workers", &ConfigManager::set_num_parallel_workers) | .def("set_num_parallel_workers", &ConfigManager::set_num_parallel_workers) | ||||
| .def("set_op_connector_size", &ConfigManager::set_op_connector_size) | .def("set_op_connector_size", &ConfigManager::set_op_connector_size) | ||||
| .def("set_rows_per_buffer", &ConfigManager::set_rows_per_buffer) | .def("set_rows_per_buffer", &ConfigManager::set_rows_per_buffer) | ||||
| @@ -39,6 +39,8 @@ ConfigManager::ConfigManager() | |||||
| seed_(kCfgDefaultSeed), | seed_(kCfgDefaultSeed), | ||||
| numa_enable_(false), | numa_enable_(false), | ||||
| monitor_sampling_interval_(kCfgMonitorSamplingInterval), | monitor_sampling_interval_(kCfgMonitorSamplingInterval), | ||||
| stop_profiler_(false), | |||||
| file_ready_(true), | |||||
| callback_timout_(kCfgCallbackTimeout), | callback_timout_(kCfgCallbackTimeout), | ||||
| cache_host_(kCfgDefaultCacheHost), | cache_host_(kCfgDefaultCacheHost), | ||||
| cache_port_(kCfgDefaultCachePort), | cache_port_(kCfgDefaultCachePort), | ||||
| @@ -139,6 +141,10 @@ void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; } | |||||
| void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; } | void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; } | ||||
| void ConfigManager::stop_dataset_profiler(bool stop_profiler) { stop_profiler_ = stop_profiler; } | |||||
| void ConfigManager::set_profiler_file_status(bool file_ready) { file_ready_ = file_ready; } | |||||
| void ConfigManager::set_callback_timeout(uint32_t timeout) { callback_timout_ = timeout; } | void ConfigManager::set_callback_timeout(uint32_t timeout) { callback_timout_ = timeout; } | ||||
| void ConfigManager::set_cache_host(std::string cache_host) { cache_host_ = std::move(cache_host); } | void ConfigManager::set_cache_host(std::string cache_host) { cache_host_ = std::move(cache_host); } | ||||
| @@ -16,6 +16,7 @@ | |||||
| #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_ | #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_ | ||||
| #define MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_ | #define MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_ | ||||
| #include <atomic> | |||||
| #include <ostream> | #include <ostream> | ||||
| #include <sstream> | #include <sstream> | ||||
| #include <string> | #include <string> | ||||
| @@ -178,6 +179,22 @@ class ConfigManager { | |||||
| // @return The interval of monitor sampling | // @return The interval of monitor sampling | ||||
| int32_t monitor_sampling_interval() const { return monitor_sampling_interval_; } | int32_t monitor_sampling_interval() const { return monitor_sampling_interval_; } | ||||
| // setter function | |||||
| // @param stop_profiler - The setting to apply to the config | |||||
| void stop_dataset_profiler(bool stop_profiler); | |||||
| // getter function | |||||
| // @return The status of stop profiler | |||||
| bool stop_profiler_status() const { return stop_profiler_; } | |||||
| // setter function | |||||
| // @param file_ready - The setting to apply to the config | |||||
| void set_profiler_file_status(bool file_ready); | |||||
| // getter function | |||||
| // @return The status of profiler file, whether generated | |||||
| bool get_profiler_file_status() const { return file_ready_; } | |||||
| // setter function | // setter function | ||||
| // @param auto_num_workers - whether assign threads to each op automatically | // @param auto_num_workers - whether assign threads to each op automatically | ||||
| void set_auto_num_workers(bool auto_num_workers) { auto_num_workers_ = auto_num_workers; } | void set_auto_num_workers(bool auto_num_workers) { auto_num_workers_ = auto_num_workers; } | ||||
| @@ -223,6 +240,8 @@ class ConfigManager { | |||||
| int32_t rank_id_; | int32_t rank_id_; | ||||
| uint32_t seed_; | uint32_t seed_; | ||||
| uint32_t monitor_sampling_interval_; | uint32_t monitor_sampling_interval_; | ||||
| std::atomic_bool stop_profiler_; | |||||
| std::atomic_bool file_ready_; | |||||
| uint32_t callback_timout_; | uint32_t callback_timout_; | ||||
| std::string cache_host_; | std::string cache_host_; | ||||
| int32_t cache_port_; | int32_t cache_port_; | ||||
| @@ -113,7 +113,7 @@ Status DeviceCpu::ParseRunningProcess(const std::string &str) { | |||||
| Status DeviceCpu::Collect(ExecutionTree *tree) { | Status DeviceCpu::Collect(ExecutionTree *tree) { | ||||
| std::ifstream file("/proc/stat"); | std::ifstream file("/proc/stat"); | ||||
| if (!file.is_open()) { | if (!file.is_open()) { | ||||
| MS_LOG(WARNING) << "Open CPU file failed when collect CPU information"; | |||||
| MS_LOG(INFO) << "Open CPU file failed when collect CPU information"; | |||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| bool first_line = true; | bool first_line = true; | ||||
| @@ -214,7 +214,7 @@ Status OperatorCpu::ParseCpuInfo(int32_t op_id, int64_t thread_id, | |||||
| std::ifstream file(stat_path); | std::ifstream file(stat_path); | ||||
| if (!file.is_open()) { | if (!file.is_open()) { | ||||
| MS_LOG(WARNING) << "Open CPU file failed when collect CPU information"; | |||||
| MS_LOG(INFO) << "Open CPU file failed when collect CPU information"; | |||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| std::string str; | std::string str; | ||||
| @@ -236,7 +236,7 @@ Status OperatorCpu::ParseCpuInfo(int32_t op_id, int64_t thread_id, | |||||
| Status OperatorCpu::GetTotalCpuTime(uint64_t *total_stat) { | Status OperatorCpu::GetTotalCpuTime(uint64_t *total_stat) { | ||||
| std::ifstream file("/proc/stat"); | std::ifstream file("/proc/stat"); | ||||
| if (!file.is_open()) { | if (!file.is_open()) { | ||||
| MS_LOG(WARNING) << "Open CPU file failed when collect CPU information"; | |||||
| MS_LOG(INFO) << "Open CPU file failed when collect CPU information"; | |||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| std::string str; | std::string str; | ||||
| @@ -443,7 +443,7 @@ Status ProcessCpu::ParseCpuInfo() { | |||||
| std::ifstream file(stat_path); | std::ifstream file(stat_path); | ||||
| if (!file.is_open()) { | if (!file.is_open()) { | ||||
| MS_LOG(WARNING) << "Open CPU file failed when collect CPU information"; | |||||
| MS_LOG(INFO) << "Open CPU file failed when collect CPU information"; | |||||
| continue; | continue; | ||||
| } | } | ||||
| std::string str; | std::string str; | ||||
| @@ -479,7 +479,7 @@ Status ProcessCpu::ParseCpuInfo() { | |||||
| Status ProcessCpu::GetTotalCpuTime(uint64_t *total_stat) { | Status ProcessCpu::GetTotalCpuTime(uint64_t *total_stat) { | ||||
| std::ifstream file("/proc/stat"); | std::ifstream file("/proc/stat"); | ||||
| if (!file.is_open()) { | if (!file.is_open()) { | ||||
| MS_LOG(WARNING) << "Open CPU file failed when collect CPU information"; | |||||
| MS_LOG(INFO) << "Open CPU file failed when collect CPU information"; | |||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| std::string str; | std::string str; | ||||
| @@ -29,11 +29,13 @@ Monitor::Monitor(ExecutionTree *tree) : tree_(tree) { | |||||
| Status Monitor::operator()() { | Status Monitor::operator()() { | ||||
| // Register this thread with TaskManager to receive proper interrupt signal. | // Register this thread with TaskManager to receive proper interrupt signal. | ||||
| TaskManager::FindMe()->Post(); | TaskManager::FindMe()->Post(); | ||||
| std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager(); | |||||
| cfg->set_profiler_file_status(false); | |||||
| // Keep sampling if | // Keep sampling if | ||||
| // 1) Monitor Task is not interrupted by TaskManager AND | // 1) Monitor Task is not interrupted by TaskManager AND | ||||
| // 2) Iterator has not received EOF | // 2) Iterator has not received EOF | ||||
| while (!this_thread::is_interrupted() && !(tree_->isFinished())) { | |||||
| while (!this_thread::is_interrupted() && !(tree_->isFinished()) && !(cfg->stop_profiler_status())) { | |||||
| if (tree_->IsEpochEnd()) { | if (tree_->IsEpochEnd()) { | ||||
| RETURN_IF_NOT_OK(tree_->GetProfilingManager()->SaveProfilingData()); | RETURN_IF_NOT_OK(tree_->GetProfilingManager()->SaveProfilingData()); | ||||
| tree_->SetExecuting(); | tree_->SetExecuting(); | ||||
| @@ -48,6 +50,8 @@ Status Monitor::operator()() { | |||||
| RETURN_IF_NOT_OK(tree_->GetProfilingManager()->Analyze()); | RETURN_IF_NOT_OK(tree_->GetProfilingManager()->Analyze()); | ||||
| RETURN_IF_NOT_OK(tree_->GetProfilingManager()->SaveProfilingData()); | RETURN_IF_NOT_OK(tree_->GetProfilingManager()->SaveProfilingData()); | ||||
| RETURN_IF_NOT_OK(tree_->GetProfilingManager()->ChangeFileMode()); | RETURN_IF_NOT_OK(tree_->GetProfilingManager()->ChangeFileMode()); | ||||
| cfg->set_profiler_file_status(true); | |||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| @@ -18,8 +18,10 @@ configuration parameters, and read a configuration file. | |||||
| """ | """ | ||||
| import os | import os | ||||
| import random | import random | ||||
| import time | |||||
| import numpy | import numpy | ||||
| import mindspore._c_dataengine as cde | import mindspore._c_dataengine as cde | ||||
| from mindspore import log as logger | |||||
| __all__ = ['set_seed', 'get_seed', 'set_prefetch_size', 'get_prefetch_size', 'set_num_parallel_workers', | __all__ = ['set_seed', 'get_seed', 'set_prefetch_size', 'get_prefetch_size', 'set_num_parallel_workers', | ||||
| 'get_num_parallel_workers', 'set_monitor_sampling_interval', 'get_monitor_sampling_interval', 'load', | 'get_num_parallel_workers', 'set_monitor_sampling_interval', 'get_monitor_sampling_interval', 'load', | ||||
| @@ -357,3 +359,17 @@ def load(file): | |||||
| >>> ds.config.load(config_file) | >>> ds.config.load(config_file) | ||||
| """ | """ | ||||
| _config.load(file) | _config.load(file) | ||||
| def _stop_dataset_profiler(): | |||||
| """ | |||||
| Mainly for stop dataset profiler. | |||||
| Returns: | |||||
| bool, whether the profiler file has generated. | |||||
| """ | |||||
| while not _config.get_profiler_file_status(): | |||||
| _config.stop_dataset_profiler(True) | |||||
| logger.warning("Profiling: waiting for dataset part profiling stop.") | |||||
| time.sleep(1) | |||||
| @@ -23,6 +23,7 @@ from enum import Enum | |||||
| from mindspore import log as logger, context | from mindspore import log as logger, context | ||||
| from mindspore.communication.management import GlobalComm, release, get_rank | from mindspore.communication.management import GlobalComm, release, get_rank | ||||
| import mindspore._c_expression as c_expression | import mindspore._c_expression as c_expression | ||||
| from mindspore.dataset.core.config import _stop_dataset_profiler | |||||
| from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \ | from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \ | ||||
| ProfilerIOException, ProfilerException, ProfilerRawFileException | ProfilerIOException, ProfilerException, ProfilerRawFileException | ||||
| from mindspore.profiler.common.util import get_file_names, fwrite_format | from mindspore.profiler.common.util import get_file_names, fwrite_format | ||||
| @@ -189,6 +190,7 @@ class Profiler: | |||||
| Collect and analyse performance data, called after training or during training. The example shows above. | Collect and analyse performance data, called after training or during training. The example shows above. | ||||
| """ | """ | ||||
| self._cpu_profiler.stop() | self._cpu_profiler.stop() | ||||
| _stop_dataset_profiler() | |||||
| if self._device_target and self._device_target == "GPU": | if self._device_target and self._device_target == "GPU": | ||||
| self._gpu_analyse() | self._gpu_analyse() | ||||