From: @lixiachen Reviewed-by: @robingrosman,@pandoublefeng Signed-off-by: @pandoublefengpull/14136/MERGE
| @@ -30,7 +30,8 @@ Status CachePool::DoServiceStart() { | |||
| if (!root_.toString().empty()) { | |||
| Path spill = GetSpillPath(); | |||
| RETURN_IF_NOT_OK(spill.CreateDirectories()); | |||
| sm_ = std::make_shared<StorageManager>(spill); | |||
| auto &cs = CacheServer::GetInstance(); | |||
| sm_ = std::make_shared<StorageManager>(spill, cs.GetNumWorkers()); | |||
| RETURN_IF_NOT_OK(sm_->ServiceStart()); | |||
| MS_LOG(INFO) << "CachePool will use disk folder: " << spill.toString(); | |||
| } | |||
| @@ -127,11 +127,23 @@ Status StorageContainer::Insert(const std::vector<ReadableSlice> &buf, off64_t * | |||
| addr_t addr = 0; | |||
| RETURN_IF_NOT_OK(bs_->Alloc(sz, &bspd, &addr)); | |||
| *offset = static_cast<off64_t>(addr); | |||
| // We will do piecewise copy of the data to disk. | |||
| // We will do piecewise copy of the data to a large buffer | |||
| std::string mem; | |||
| try { | |||
| mem.resize(sz); | |||
| CHECK_FAIL_RETURN_UNEXPECTED(mem.capacity() >= sz, "Programming error"); | |||
| } catch (const std::bad_alloc &e) { | |||
| return Status(StatusCode::kMDOutOfMemory); | |||
| } | |||
| WritableSlice all(mem.data(), sz); | |||
| size_t pos = 0; | |||
| for (auto &v : buf) { | |||
| RETURN_IF_NOT_OK(Write(v, addr)); | |||
| addr += v.GetSize(); | |||
| WritableSlice row_data(all, pos); | |||
| RETURN_IF_NOT_OK(WritableSlice::Copy(&row_data, v)); | |||
| pos += v.GetSize(); | |||
| } | |||
| // Write all data to disk at once | |||
| RETURN_IF_NOT_OK(Write(all, addr)); | |||
| return Status::OK(); | |||
| } | |||
| @@ -22,6 +22,7 @@ | |||
| #include "utils/ms_utils.h" | |||
| #include "minddata/dataset/util/log_adapter.h" | |||
| #include "minddata/dataset/util/path.h" | |||
| #include "minddata/dataset/util/random.h" | |||
| #include "minddata/dataset/util/services.h" | |||
| namespace mindspore { | |||
| @@ -37,7 +38,7 @@ std::string StorageManager::ConstructFileName(const std::string &prefix, int32_t | |||
| return (base_name + "." + suffix); | |||
| } | |||
| Status StorageManager::AddOneContainer() { | |||
| Status StorageManager::AddOneContainer(int replaced_container_pos) { | |||
| const std::string kPrefix = "IMG"; | |||
| const std::string kSuffix = "LB"; | |||
| Path container_name = root_ / ConstructFileName(kPrefix, file_id_, kSuffix); | |||
| @@ -45,13 +46,22 @@ Status StorageManager::AddOneContainer() { | |||
| RETURN_IF_NOT_OK(StorageContainer::CreateStorageContainer(&sc, container_name.toString())); | |||
| containers_.push_back(sc); | |||
| file_id_++; | |||
| if (replaced_container_pos >= 0) { | |||
| writable_containers_pool_[replaced_container_pos] = containers_.size() - 1; | |||
| } else { | |||
| writable_containers_pool_.push_back(containers_.size() - 1); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| Status StorageManager::DoServiceStart() { | |||
| containers_.reserve(1000); | |||
| writable_containers_pool_.reserve(pool_size_); | |||
| if (root_.IsDirectory()) { | |||
| RETURN_IF_NOT_OK(AddOneContainer()); | |||
| // create multiple containers and store their index in a pool | |||
| for (int i = 0; i < pool_size_; i++) { | |||
| RETURN_IF_NOT_OK(AddOneContainer()); | |||
| } | |||
| } else { | |||
| RETURN_STATUS_UNEXPECTED("Not a directory"); | |||
| } | |||
| @@ -67,22 +77,25 @@ Status StorageManager::Write(key_type *key, const std::vector<ReadableSlice> &bu | |||
| if (sz == 0) { | |||
| RETURN_STATUS_UNEXPECTED("Unexpected 0 length"); | |||
| } | |||
| auto mt = GetRandomDevice(); | |||
| std::shared_ptr<StorageContainer> cont; | |||
| key_type out_key; | |||
| value_type out_value; | |||
| bool create_new_container = false; | |||
| int old_container_pos = -1; | |||
| size_t last_num_container = -1; | |||
| do { | |||
| SharedLock lock_s(&rw_lock_); | |||
| size_t num_containers = containers_.size(); | |||
| if (create_new_container && (num_containers == last_num_container)) { | |||
| // Upgrade to exclusvie lock. | |||
| if (create_new_container && (num_containers == last_num_container) && (old_container_pos >= 0)) { | |||
| // Upgrade to exclusive lock. | |||
| lock_s.Upgrade(); | |||
| create_new_container = false; | |||
| // Check again if someone has already added a | |||
| // new container after we got the x lock | |||
| if (containers_.size() == num_containers) { | |||
| RETURN_IF_NOT_OK(AddOneContainer()); | |||
| // Create a new container and replace the full container in the pool with the newly created one | |||
| RETURN_IF_NOT_OK(AddOneContainer(old_container_pos)); | |||
| } | |||
| // Refresh how many containers there are. | |||
| num_containers = containers_.size(); | |||
| @@ -92,17 +105,21 @@ Status StorageManager::Write(key_type *key, const std::vector<ReadableSlice> &bu | |||
| if (num_containers == 0) { | |||
| RETURN_STATUS_UNEXPECTED("num_containers is zero"); | |||
| } | |||
| // Go to the last container to insert. | |||
| cont = containers_.at(num_containers - 1); | |||
| // Pick a random container from the writable container pool to insert. | |||
| std::uniform_int_distribution<int> distribution(0, pool_size_ - 1); | |||
| int pos_in_pool = distribution(mt); | |||
| int cont_index = writable_containers_pool_.at(pos_in_pool); | |||
| cont = containers_.at(cont_index); | |||
| off64_t offset; | |||
| Status rc = cont->Insert(buf, &offset); | |||
| if (rc.StatusCode() == StatusCode::kMDBuddySpaceFull) { | |||
| create_new_container = true; | |||
| old_container_pos = pos_in_pool; | |||
| // Remember how many containers we saw. In the next iteration we will do a comparison to see | |||
| // if someone has already created it. | |||
| last_num_container = num_containers; | |||
| } else if (rc.IsOk()) { | |||
| out_value = std::make_pair(num_containers - 1, std::make_pair(offset, sz)); | |||
| out_value = std::make_pair(cont_index, std::make_pair(offset, sz)); | |||
| RETURN_IF_NOT_OK(index_.insert(out_value, &out_key)); | |||
| *key = out_key; | |||
| break; | |||
| @@ -150,11 +167,15 @@ Status StorageManager::DoServiceStop() noexcept { | |||
| } | |||
| } | |||
| containers_.clear(); | |||
| writable_containers_pool_.clear(); | |||
| file_id_ = 0; | |||
| return rc1; | |||
| } | |||
| StorageManager::StorageManager(const Path &root) : root_(root), file_id_(0), index_() {} | |||
| StorageManager::StorageManager(const Path &root) : root_(root), pool_size_(1), file_id_(0), index_() {} | |||
| StorageManager::StorageManager(const Path &root, int pool_size) | |||
| : root_(root), pool_size_(pool_size), file_id_(0), index_() {} | |||
| StorageManager::~StorageManager() { (void)StorageManager::DoServiceStop(); } | |||
| @@ -35,12 +35,23 @@ namespace mindspore { | |||
| namespace dataset { | |||
| class StorageManager : public Service { | |||
| public: | |||
| using storage_index = AutoIndexObj<std::pair<int, std::pair<off_t, size_t>>>; | |||
| // Use these traits for the B+ tree inside the StorageManager | |||
| struct StorageBPlusTreeTraits { | |||
| // This determines the limit of number of keys in a node. | |||
| using slot_type = uint16_t; | |||
| // Number of slots in each leaf of the tree. | |||
| static constexpr slot_type kLeafSlots = 512; | |||
| // Number of slots in each inner node of the tree | |||
| static constexpr slot_type kInnerSlots = 256; | |||
| }; | |||
| using value_type = std::pair<int, std::pair<off_t, size_t>>; | |||
| using storage_index = AutoIndexObj<value_type, std::allocator<value_type>, StorageBPlusTreeTraits>; | |||
| using key_type = storage_index::key_type; | |||
| using value_type = storage_index::value_type; | |||
| explicit StorageManager(const Path &); | |||
| StorageManager(const Path &root, int pool_size); | |||
| ~StorageManager() override; | |||
| StorageManager(const StorageManager &) = delete; | |||
| @@ -63,12 +74,19 @@ class StorageManager : public Service { | |||
| int file_id_; | |||
| RWLock rw_lock_; | |||
| storage_index index_; | |||
| std::vector<int> writable_containers_pool_; | |||
| int pool_size_; | |||
| std::string GetBaseName(const std::string &prefix, int32_t file_id); | |||
| std::string ConstructFileName(const std::string &prefix, int32_t file_id, const std::string &suffix); | |||
| Status AddOneContainer(); | |||
| /// \brief Add a new storage container | |||
| /// The newly-created container is going to be added into a pool of writable containers. | |||
| /// \param replaced_container_pos If provided, will use the newly created container to replace the corresponding old | |||
| /// container in the pool. If not provided, will just append the newly created container to the end of the pool. | |||
| /// \return Status object | |||
| Status AddOneContainer(int replaced_container_pos = -1); | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -17,6 +17,7 @@ | |||
| #define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_AUTO_INDEX_H_ | |||
| #include <atomic> | |||
| #include <functional> | |||
| #include <memory> | |||
| #include <utility> | |||
| #include <vector> | |||
| @@ -30,16 +31,16 @@ namespace dataset { | |||
| /// Use minKey() function to query the min key. | |||
| /// Use maxKey() function to query the max key. | |||
| /// @tparam T | |||
| template <typename T, typename A = std::allocator<T>> | |||
| class AutoIndexObj : public BPlusTree<int64_t, T, A> { | |||
| template <typename V, typename A = std::allocator<V>, typename T = BPlusTreeTraits> | |||
| class AutoIndexObj : public BPlusTree<int64_t, V, A, std::less<int64_t>, T> { | |||
| public: | |||
| using my_tree = BPlusTree<int64_t, T, A>; | |||
| using my_tree = BPlusTree<int64_t, V, A, std::less<int64_t>, T>; | |||
| using key_type = typename my_tree::key_type; | |||
| using value_type = typename my_tree::value_type; | |||
| AutoIndexObj() : my_tree::BPlusTree(), inx_(kMinKey) {} | |||
| explicit AutoIndexObj(const Allocator<T> &alloc) : my_tree::BPlusTree(alloc), inx_(kMinKey) {} | |||
| explicit AutoIndexObj(const Allocator<V> &alloc) : my_tree::BPlusTree(alloc), inx_(kMinKey) {} | |||
| ~AutoIndexObj() = default; | |||