From: @jonyguo Reviewed-by: Signed-off-by:tags/v1.1.0
| @@ -26,6 +26,7 @@ | |||||
| #include "minddata/dataset/engine/db_connector.h" | #include "minddata/dataset/engine/db_connector.h" | ||||
| #include "minddata/dataset/engine/opt/pass.h" | #include "minddata/dataset/engine/opt/pass.h" | ||||
| #include "minddata/dataset/kernels/data/data_utils.h" | #include "minddata/dataset/kernels/data/data_utils.h" | ||||
| #include "minddata/dataset/util/status.h" | |||||
| namespace mindspore { | namespace mindspore { | ||||
| namespace dataset { | namespace dataset { | ||||
| @@ -131,6 +132,15 @@ Status BatchOp::operator()() { | |||||
| worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kEOE)))); | worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kEOE)))); | ||||
| RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(epoch_num, batch_num, cnt - epoch_num))); | RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(epoch_num, batch_num, cnt - epoch_num))); | ||||
| RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row)); | RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row)); | ||||
| #if !defined(_WIN32) && !defined(_WIN64) | |||||
| if ((num_workers_ > 1 || batch_map_func_) && GetMemoryUsage() > MAX_MEMORY_USAGE_THRESHOLD) { | |||||
| MS_LOG(WARNING) << "Memory consumption is more than " << MAX_MEMORY_USAGE_THRESHOLD * 100 << "%, " | |||||
| << "which may cause oom error. Please reduce num_parallel_workers size / " | |||||
| << "optimize per_batch_map function / other python data preprocess function to " | |||||
| << "reduce memory usage."; | |||||
| } | |||||
| #endif | |||||
| } // end of eof_handled() == false | } // end of eof_handled() == false | ||||
| RETURN_IF_NOT_OK( | RETURN_IF_NOT_OK( | ||||
| worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kEOF)))); | worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kEOF)))); | ||||
| @@ -15,7 +15,12 @@ | |||||
| */ | */ | ||||
| #include "minddata/dataset/util/status.h" | #include "minddata/dataset/util/status.h" | ||||
| #include <sstream> | #include <sstream> | ||||
| #include <string> | |||||
| #include <memory.h> | |||||
| #include <stdio.h> | |||||
| #include <stdlib.h> | |||||
| #include "utils/ms_utils.h" | #include "utils/ms_utils.h" | ||||
| #include "./securec.h" | |||||
| #ifndef ENABLE_ANDROID | #ifndef ENABLE_ANDROID | ||||
| #include "minddata/dataset/util/task_manager.h" | #include "minddata/dataset/util/task_manager.h" | ||||
| @@ -139,5 +144,55 @@ std::ostream &operator<<(std::ostream &os, const Status &s) { | |||||
| std::string Status::ToString() const { return err_msg_; } | std::string Status::ToString() const { return err_msg_; } | ||||
| StatusCode Status::get_code() const { return code_; } | StatusCode Status::get_code() const { return code_; } | ||||
| #if !defined(_WIN32) && !defined(_WIN64) | |||||
| float GetMemoryUsage() { | |||||
| char buf[128] = {0}; | |||||
| FILE *fd; | |||||
| fd = fopen("/proc/meminfo", "r"); | |||||
| if (fd == nullptr) { | |||||
| MS_LOG(WARNING) << "The meminfo file: /proc/meminfo is opened failed."; | |||||
| return 0.0; | |||||
| } | |||||
| uint32_t status_count = 0; | |||||
| uint64_t mem_total = 0L; | |||||
| uint64_t mem_available = 0L; | |||||
| while (fgets(buf, sizeof(buf), fd)) { | |||||
| if (status_count == 2) { // get MemTotal and MemAvailable yet | |||||
| break; | |||||
| } | |||||
| // get title | |||||
| std::string line(buf); | |||||
| std::string::size_type position = line.find(":"); | |||||
| std::string title = line.substr(0, position); | |||||
| // get the value when MemTotal or MemAvailable | |||||
| if (title == "MemTotal") { | |||||
| std::string::size_type pos1 = line.find_last_of(" "); | |||||
| std::string::size_type pos2 = line.find_last_of(" ", pos1 - 1); | |||||
| mem_total = atol(line.substr(pos2, pos1 - pos2).c_str()); | |||||
| status_count++; | |||||
| } else if (title == "MemAvailable") { | |||||
| std::string::size_type pos1 = line.find_last_of(" "); | |||||
| std::string::size_type pos2 = line.find_last_of(" ", pos1 - 1); | |||||
| mem_available = atol(line.substr(pos2, pos1 - pos2).c_str()); | |||||
| status_count++; | |||||
| } | |||||
| (void)memset_s(buf, sizeof(buf), 0, sizeof(buf)); | |||||
| } | |||||
| fclose(fd); | |||||
| if (status_count != 2 || mem_total == 0 || mem_available > mem_total) { | |||||
| MS_LOG(WARNING) << "Get memory usage failed."; | |||||
| return 0.0; | |||||
| } | |||||
| return (1.0 - static_cast<float>(static_cast<double>(mem_available) / static_cast<double>(mem_total))); | |||||
| } | |||||
| #endif | |||||
| } // namespace dataset | } // namespace dataset | ||||
| } // namespace mindspore | } // namespace mindspore | ||||
| @@ -167,6 +167,12 @@ class Status { | |||||
| StatusCode code_; | StatusCode code_; | ||||
| std::string err_msg_; | std::string err_msg_; | ||||
| }; | }; | ||||
| #if !defined(_WIN32) && !defined(_WIN64) | |||||
| const float MAX_MEMORY_USAGE_THRESHOLD = 0.95; | |||||
| float GetMemoryUsage(); | |||||
| #endif | |||||
| } // namespace dataset | } // namespace dataset | ||||
| } // namespace mindspore | } // namespace mindspore | ||||
| #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_STATUS_H_ | #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_STATUS_H_ | ||||