| @@ -233,5 +233,32 @@ bool CacheServerHW::numa_enabled() { | |||||
| return false; | return false; | ||||
| #endif | #endif | ||||
| } | } | ||||
| uint64_t CacheServerHW::GetAvailableMemory() { | |||||
| std::ifstream mem_file(kMemInfoFileName); | |||||
| if (mem_file.fail()) { | |||||
| MS_LOG(WARNING) << "Fail to open file: " << kMemInfoFileName; | |||||
| return 0; | |||||
| } | |||||
| std::string line; | |||||
| uint64_t mem_available_in_kb = 0L; | |||||
| while (std::getline(mem_file, line)) { | |||||
| // get title | |||||
| std::string::size_type position = line.find(":"); | |||||
| std::string title = line.substr(0, position); | |||||
| // get the value of MemAvailable | |||||
| if (title == "MemAvailable") { | |||||
| std::string::size_type pos1 = line.find_last_of(" "); | |||||
| std::string::size_type pos2 = line.find_last_of(" ", pos1 - 1); | |||||
| mem_available_in_kb = std::stol(line.substr(pos2, pos1 - pos2)); | |||||
| break; | |||||
| } | |||||
| } | |||||
| mem_file.close(); | |||||
| return mem_available_in_kb * 1024; | |||||
| } | |||||
| } // namespace dataset | } // namespace dataset | ||||
| } // namespace mindspore | } // namespace mindspore | ||||
| @@ -90,8 +90,12 @@ class CacheServerHW { | |||||
| /// \return the size (in bytes) of the physical RAM on the machine. | /// \return the size (in bytes) of the physical RAM on the machine. | ||||
| static int64_t GetTotalSystemMemory(); | static int64_t GetTotalSystemMemory(); | ||||
| /// \brief Get the size (in bytes) of available memory on the machine by reading from file /proc/meminfo. | |||||
| static uint64_t GetAvailableMemory(); | |||||
| private: | private: | ||||
| constexpr static char kSysNodePath[] = "/sys/devices/system/node"; | constexpr static char kSysNodePath[] = "/sys/devices/system/node"; | ||||
| constexpr static char kMemInfoFileName[] = "/proc/meminfo"; | |||||
| int32_t num_cpus_; | int32_t num_cpus_; | ||||
| std::map<numa_id_t, cpu_set_t> numa_cpuset_; | std::map<numa_id_t, cpu_set_t> numa_cpuset_; | ||||
| std::map<numa_id_t, int32_t> numa_cpu_cnt_; | std::map<numa_id_t, int32_t> numa_cpu_cnt_; | ||||
| @@ -63,6 +63,9 @@ class NumaMemoryPool : public MemoryPool { | |||||
| /// \brief Return maximum available memory | /// \brief Return maximum available memory | ||||
| int64_t GetAvailableMemory() const { return memory_cap_; } | int64_t GetAvailableMemory() const { return memory_cap_; } | ||||
| /// \brief Return the configured or computed memory cap ratio | |||||
| float GetMemoryCapRatio() const { return memory_cap_ratio_; } | |||||
| private: | private: | ||||
| std::shared_ptr<CacheServerHW> hw_; | std::shared_ptr<CacheServerHW> hw_; | ||||
| float memory_cap_ratio_; | float memory_cap_ratio_; | ||||
| @@ -22,7 +22,12 @@ | |||||
| namespace mindspore { | namespace mindspore { | ||||
| namespace dataset { | namespace dataset { | ||||
| CachePool::CachePool(std::shared_ptr<NumaMemoryPool> mp, const std::string &root) | CachePool::CachePool(std::shared_ptr<NumaMemoryPool> mp, const std::string &root) | ||||
| : mp_(std::move(mp)), root_(root), subfolder_(Services::GetUniqueID()), sm_(nullptr), tree_(nullptr) {} | |||||
| : mp_(std::move(mp)), root_(root), subfolder_(Services::GetUniqueID()), sm_(nullptr), tree_(nullptr) { | |||||
| // Initialize soft memory cap to the current available memory on the machine. | |||||
| soft_mem_limit_ = CacheServerHW::GetAvailableMemory(); | |||||
| temp_mem_usage_ = 0; | |||||
| min_avail_mem_ = CacheServerHW::GetTotalSystemMemory() * (1.0 - mp_->GetMemoryCapRatio()); | |||||
| } | |||||
| Status CachePool::DoServiceStart() { | Status CachePool::DoServiceStart() { | ||||
| tree_ = std::make_shared<data_index>(); | tree_ = std::make_shared<data_index>(); | ||||
| @@ -83,8 +88,23 @@ Status CachePool::Insert(CachePool::key_type key, const std::vector<ReadableSlic | |||||
| sz += v.GetSize(); | sz += v.GetSize(); | ||||
| } | } | ||||
| bl.sz = sz; | bl.sz = sz; | ||||
| rc = mp_->Allocate(sz, reinterpret_cast<void **>(&bl.ptr)); | |||||
| // If required memory size exceeds the available size, it gives OOM status. To avoid cache server process got killed | |||||
| // or crashing the machine, set lower bound memory, which means stopping cache once the rest available memory is less | |||||
| // than the lower bound. (The default is 20% of physical RAM) | |||||
| if (soft_mem_limit_ - temp_mem_usage_ - static_cast<uint64_t>(sz) < min_avail_mem_) { | |||||
| MS_LOG(WARNING) << "Memory usage will exceed the upper bound limit of: " << min_avail_mem_ | |||||
| << ". The cache server will not cache any more data."; | |||||
| rc = Status(StatusCode::kMDOutOfMemory, __LINE__, __FILE__); | |||||
| } else { | |||||
| rc = mp_->Allocate(sz, reinterpret_cast<void **>(&bl.ptr)); | |||||
| // Adjust the soft limit and usage counting when every 100M memory are used. | |||||
| if (temp_mem_usage_ + sz >= kMemoryCapAdjustInterval) { | |||||
| soft_mem_limit_ = CacheServerHW::GetAvailableMemory(); | |||||
| temp_mem_usage_ = 0; | |||||
| } | |||||
| } | |||||
| if (rc.IsOk()) { | if (rc.IsOk()) { | ||||
| temp_mem_usage_ += sz; | |||||
| // Write down which numa node where we allocate from. It only make sense if the policy is kOnNode. | // Write down which numa node where we allocate from. It only make sense if the policy is kOnNode. | ||||
| if (CacheServerHW::numa_enabled()) { | if (CacheServerHW::numa_enabled()) { | ||||
| auto &cs = CacheServer::GetInstance(); | auto &cs = CacheServer::GetInstance(); | ||||
| @@ -149,6 +149,11 @@ class CachePool : public Service { | |||||
| const std::string subfolder_; | const std::string subfolder_; | ||||
| std::shared_ptr<StorageManager> sm_; | std::shared_ptr<StorageManager> sm_; | ||||
| std::shared_ptr<data_index> tree_; | std::shared_ptr<data_index> tree_; | ||||
| std::atomic<uint64_t> soft_mem_limit_; // the available memory in the machine | |||||
| std::atomic<uint64_t> temp_mem_usage_; // temporary count on the amount of memory usage by cache every 100Mb (because | |||||
| // we will adjust soft_mem_limit_ every 100Mb based on this parameter) | |||||
| uint64_t min_avail_mem_; // lower bound of the available memory | |||||
| const int kMemoryCapAdjustInterval = 104857600; | |||||
| }; | }; | ||||
| } // namespace dataset | } // namespace dataset | ||||
| } // namespace mindspore | } // namespace mindspore | ||||