diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_pool.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_pool.cc index 47611836d3..d59b88e649 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_pool.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_pool.cc @@ -30,7 +30,8 @@ Status CachePool::DoServiceStart() { if (!root_.toString().empty()) { Path spill = GetSpillPath(); RETURN_IF_NOT_OK(spill.CreateDirectories()); - sm_ = std::make_shared(spill); + auto &cs = CacheServer::GetInstance(); + sm_ = std::make_shared(spill, cs.GetNumWorkers()); RETURN_IF_NOT_OK(sm_->ServiceStart()); MS_LOG(INFO) << "CachePool will use disk folder: " << spill.toString(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/storage_container.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/storage_container.cc index bc1ebce07c..335f360a8a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/storage_container.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/storage_container.cc @@ -127,11 +127,23 @@ Status StorageContainer::Insert(const std::vector &buf, off64_t * addr_t addr = 0; RETURN_IF_NOT_OK(bs_->Alloc(sz, &bspd, &addr)); *offset = static_cast(addr); - // We will do piecewise copy of the data to disk. + // We will do piecewise copy of the data to a large buffer + std::string mem; + try { + mem.resize(sz); + CHECK_FAIL_RETURN_UNEXPECTED(mem.capacity() >= sz, "Programming error"); + } catch (const std::bad_alloc &e) { + return Status(StatusCode::kMDOutOfMemory); + } + WritableSlice all(mem.data(), sz); + size_t pos = 0; for (auto &v : buf) { - RETURN_IF_NOT_OK(Write(v, addr)); - addr += v.GetSize(); + WritableSlice row_data(all, pos); + RETURN_IF_NOT_OK(WritableSlice::Copy(&row_data, v)); + pos += v.GetSize(); } + // Write all data to disk at once + RETURN_IF_NOT_OK(Write(all, addr)); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/storage_manager.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/storage_manager.cc index 26b12ea5f2..941dfb225d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/storage_manager.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/storage_manager.cc @@ -22,6 +22,7 @@ #include "utils/ms_utils.h" #include "minddata/dataset/util/log_adapter.h" #include "minddata/dataset/util/path.h" +#include "minddata/dataset/util/random.h" #include "minddata/dataset/util/services.h" namespace mindspore { @@ -37,7 +38,7 @@ std::string StorageManager::ConstructFileName(const std::string &prefix, int32_t return (base_name + "." + suffix); } -Status StorageManager::AddOneContainer() { +Status StorageManager::AddOneContainer(int replaced_container_pos) { const std::string kPrefix = "IMG"; const std::string kSuffix = "LB"; Path container_name = root_ / ConstructFileName(kPrefix, file_id_, kSuffix); @@ -45,13 +46,22 @@ Status StorageManager::AddOneContainer() { RETURN_IF_NOT_OK(StorageContainer::CreateStorageContainer(&sc, container_name.toString())); containers_.push_back(sc); file_id_++; + if (replaced_container_pos >= 0) { + writable_containers_pool_[replaced_container_pos] = containers_.size() - 1; + } else { + writable_containers_pool_.push_back(containers_.size() - 1); + } return Status::OK(); } Status StorageManager::DoServiceStart() { containers_.reserve(1000); + writable_containers_pool_.reserve(pool_size_); if (root_.IsDirectory()) { - RETURN_IF_NOT_OK(AddOneContainer()); + // create multiple containers and store their index in a pool + for (int i = 0; i < pool_size_; i++) { + RETURN_IF_NOT_OK(AddOneContainer()); + } } else { RETURN_STATUS_UNEXPECTED("Not a directory"); } @@ -67,22 +77,25 @@ Status StorageManager::Write(key_type *key, const std::vector &bu if (sz == 0) { RETURN_STATUS_UNEXPECTED("Unexpected 0 length"); } + auto mt = GetRandomDevice(); std::shared_ptr cont; key_type out_key; value_type out_value; bool create_new_container = false; + int old_container_pos = -1; size_t last_num_container = -1; do { SharedLock lock_s(&rw_lock_); size_t num_containers = containers_.size(); - if (create_new_container && (num_containers == last_num_container)) { - // Upgrade to exclusvie lock. + if (create_new_container && (num_containers == last_num_container) && (old_container_pos >= 0)) { + // Upgrade to exclusive lock. lock_s.Upgrade(); create_new_container = false; // Check again if someone has already added a // new container after we got the x lock if (containers_.size() == num_containers) { - RETURN_IF_NOT_OK(AddOneContainer()); + // Create a new container and replace the full container in the pool with the newly created one + RETURN_IF_NOT_OK(AddOneContainer(old_container_pos)); } // Refresh how many containers there are. num_containers = containers_.size(); @@ -92,17 +105,21 @@ Status StorageManager::Write(key_type *key, const std::vector &bu if (num_containers == 0) { RETURN_STATUS_UNEXPECTED("num_containers is zero"); } - // Go to the last container to insert. - cont = containers_.at(num_containers - 1); + // Pick a random container from the writable container pool to insert. + std::uniform_int_distribution distribution(0, pool_size_ - 1); + int pos_in_pool = distribution(mt); + int cont_index = writable_containers_pool_.at(pos_in_pool); + cont = containers_.at(cont_index); off64_t offset; Status rc = cont->Insert(buf, &offset); if (rc.StatusCode() == StatusCode::kMDBuddySpaceFull) { create_new_container = true; + old_container_pos = pos_in_pool; // Remember how many containers we saw. In the next iteration we will do a comparison to see // if someone has already created it. last_num_container = num_containers; } else if (rc.IsOk()) { - out_value = std::make_pair(num_containers - 1, std::make_pair(offset, sz)); + out_value = std::make_pair(cont_index, std::make_pair(offset, sz)); RETURN_IF_NOT_OK(index_.insert(out_value, &out_key)); *key = out_key; break; @@ -150,11 +167,15 @@ Status StorageManager::DoServiceStop() noexcept { } } containers_.clear(); + writable_containers_pool_.clear(); file_id_ = 0; return rc1; } -StorageManager::StorageManager(const Path &root) : root_(root), file_id_(0), index_() {} +StorageManager::StorageManager(const Path &root) : root_(root), pool_size_(1), file_id_(0), index_() {} + +StorageManager::StorageManager(const Path &root, int pool_size) + : root_(root), pool_size_(pool_size), file_id_(0), index_() {} StorageManager::~StorageManager() { (void)StorageManager::DoServiceStop(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/storage_manager.h b/mindspore/ccsrc/minddata/dataset/engine/cache/storage_manager.h index bd316f7fec..a66eef9273 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/storage_manager.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/storage_manager.h @@ -35,12 +35,23 @@ namespace mindspore { namespace dataset { class StorageManager : public Service { public: - using storage_index = AutoIndexObj>>; + // Use these traits for the B+ tree inside the StorageManager + struct StorageBPlusTreeTraits { + // This determines the limit of number of keys in a node. + using slot_type = uint16_t; + // Number of slots in each leaf of the tree. + static constexpr slot_type kLeafSlots = 512; + // Number of slots in each inner node of the tree + static constexpr slot_type kInnerSlots = 256; + }; + using value_type = std::pair>; + using storage_index = AutoIndexObj, StorageBPlusTreeTraits>; using key_type = storage_index::key_type; - using value_type = storage_index::value_type; explicit StorageManager(const Path &); + StorageManager(const Path &root, int pool_size); + ~StorageManager() override; StorageManager(const StorageManager &) = delete; @@ -63,12 +74,19 @@ class StorageManager : public Service { int file_id_; RWLock rw_lock_; storage_index index_; + std::vector writable_containers_pool_; + int pool_size_; std::string GetBaseName(const std::string &prefix, int32_t file_id); std::string ConstructFileName(const std::string &prefix, int32_t file_id, const std::string &suffix); - Status AddOneContainer(); + /// \brief Add a new storage container + /// The newly-created container is going to be added into a pool of writable containers. + /// \param replaced_container_pos If provided, will use the newly created container to replace the corresponding old + /// container in the pool. If not provided, will just append the newly created container to the end of the pool. + /// \return Status object + Status AddOneContainer(int replaced_container_pos = -1); }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/util/auto_index.h b/mindspore/ccsrc/minddata/dataset/util/auto_index.h index a1c3a613e2..886462248b 100644 --- a/mindspore/ccsrc/minddata/dataset/util/auto_index.h +++ b/mindspore/ccsrc/minddata/dataset/util/auto_index.h @@ -17,6 +17,7 @@ #define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_AUTO_INDEX_H_ #include +#include #include #include #include @@ -30,16 +31,16 @@ namespace dataset { /// Use minKey() function to query the min key. /// Use maxKey() function to query the max key. /// @tparam T -template > -class AutoIndexObj : public BPlusTree { +template , typename T = BPlusTreeTraits> +class AutoIndexObj : public BPlusTree, T> { public: - using my_tree = BPlusTree; + using my_tree = BPlusTree, T>; using key_type = typename my_tree::key_type; using value_type = typename my_tree::value_type; AutoIndexObj() : my_tree::BPlusTree(), inx_(kMinKey) {} - explicit AutoIndexObj(const Allocator &alloc) : my_tree::BPlusTree(alloc), inx_(kMinKey) {} + explicit AutoIndexObj(const Allocator &alloc) : my_tree::BPlusTree(alloc), inx_(kMinKey) {} ~AutoIndexObj() = default;