Merge pull request !4998 from JesseKLee/mem_pooltags/v1.0.0
| @@ -18,8 +18,7 @@ | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| CachedSharedMemoryArena::CachedSharedMemoryArena(int32_t port, size_t val_in_GB) | |||
| : Arena::Arena(val_in_GB * 1024), port_(port), shmid_(-1) {} | |||
| : ptr_(nullptr), val_in_GB_(val_in_GB), port_(port), shmid_(-1) {} | |||
| CachedSharedMemoryArena::~CachedSharedMemoryArena() { | |||
| #if CACHE_LOCAL_CLIENT | |||
| if (this->ptr_ != nullptr && this->ptr_ != reinterpret_cast<void *>(-1)) { | |||
| @@ -54,18 +53,18 @@ Status CachedSharedMemoryArena::CreateArena(std::unique_ptr<CachedSharedMemoryAr | |||
| RETURN_STATUS_UNEXPECTED(errMsg); | |||
| } | |||
| auto access_mode = S_IRUSR | S_IWUSR | S_IROTH | S_IWOTH | S_IRGRP | S_IWGRP; | |||
| ba->shmid_ = shmget(shm_key, ba->size_in_bytes_, IPC_CREAT | IPC_EXCL | access_mode); | |||
| // Value is in GB. Convert into bytes. | |||
| int64_t sz = val_in_GB * 1073741824L; | |||
| ba->shmid_ = shmget(shm_key, sz, IPC_CREAT | IPC_EXCL | access_mode); | |||
| if (ba->shmid_) { | |||
| ba->ptr_ = shmat(ba->shmid_, nullptr, 0); | |||
| if (ba->ptr_ == reinterpret_cast<void *>(-1)) { | |||
| RETURN_STATUS_UNEXPECTED("Shared memory attach failed. Errno " + std::to_string(errno)); | |||
| } | |||
| ba->impl_ = std::make_unique<ArenaImpl>(ba->ptr_, sz); | |||
| } else { | |||
| RETURN_STATUS_UNEXPECTED("Shared memory creation failed. Errno " + std::to_string(errno)); | |||
| } | |||
| uint64_t num_blks = ba->size_in_bytes_ / ARENA_BLK_SZ; | |||
| MS_LOG(DEBUG) << "Size of memory pool is " << num_blks << ", number of blocks of size is " << ARENA_BLK_SZ << "."; | |||
| ba->tr_.Insert(0, num_blks); | |||
| #endif | |||
| return Status::OK(); | |||
| } | |||
| @@ -17,14 +17,18 @@ | |||
| #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_ARENA_H_ | |||
| #include <memory> | |||
| #include <mutex> | |||
| #include <string> | |||
| #include "minddata/dataset/util/arena.h" | |||
| #include "minddata/dataset/engine/cache/cache_common.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| /// This is a derived class of Arena but resides in shared memory | |||
| class CachedSharedMemoryArena : public Arena { | |||
| class CachedSharedMemoryArena : public MemoryPool { | |||
| public: | |||
| // Disable copy and assignment constructor | |||
| CachedSharedMemoryArena(const CachedSharedMemoryArena &) = delete; | |||
| CachedSharedMemoryArena &operator=(const CachedSharedMemoryArena &) = delete; | |||
| ~CachedSharedMemoryArena() override; | |||
| /// \brief Create an Arena in shared memory | |||
| /// \param[out] p_ba Pointer to a unique_ptr | |||
| @@ -39,11 +43,41 @@ class CachedSharedMemoryArena : public Arena { | |||
| /// in the client. So instead we will return an address relative | |||
| /// to the base address of the shared memory where we attach to. | |||
| /// \return Base address of the shared memory. | |||
| const void *SharedMemoryBaseAddr() const { return this->ptr_; } | |||
| const void *SharedMemoryBaseAddr() const { return impl_->get_base_addr(); } | |||
| /// As a derived class of MemoryPool, we have to implement the following | |||
| /// But we simply transfer the call to the implementation class | |||
| Status Allocate(size_t size, void **pVoid) override { | |||
| std::unique_lock<std::mutex> lock(mux_); | |||
| return impl_->Allocate(size, pVoid); | |||
| } | |||
| Status Reallocate(void **pVoid, size_t old_sz, size_t new_sz) override { | |||
| std::unique_lock<std::mutex> lock(mux_); | |||
| return impl_->Reallocate(pVoid, old_sz, new_sz); | |||
| } | |||
| void Deallocate(void *pVoid) override { | |||
| std::unique_lock<std::mutex> lock(mux_); | |||
| impl_->Deallocate(pVoid); | |||
| } | |||
| uint64_t get_max_size() const override { return impl_->get_max_size(); } | |||
| int PercentFree() const override { | |||
| std::unique_lock<std::mutex> lock(mux_); | |||
| return impl_->PercentFree(); | |||
| } | |||
| /// \brief Dump the memory allocation block. | |||
| friend std::ostream &operator<<(std::ostream &os, const CachedSharedMemoryArena &s) { | |||
| os << *(s.impl_); | |||
| return os; | |||
| } | |||
| private: | |||
| mutable std::mutex mux_; | |||
| void *ptr_; | |||
| int32_t val_in_GB_; | |||
| int32_t port_; | |||
| int shmid_; | |||
| std::unique_ptr<ArenaImpl> impl_; | |||
| /// Private constructor. Not to be called directly. | |||
| CachedSharedMemoryArena(int32_t port, size_t val_in_GB); | |||
| }; | |||
| @@ -40,6 +40,7 @@ class Allocator { | |||
| using reference = T &; | |||
| using const_reference = const T &; | |||
| using size_type = uint64_t; | |||
| using difference_type = std::ptrdiff_t; | |||
| template <typename U> | |||
| struct rebind { | |||
| @@ -86,8 +87,30 @@ class Allocator { | |||
| private: | |||
| std::shared_ptr<MemoryPool> pool_; | |||
| }; | |||
| /// \brief It is a wrapper of unique_ptr with a custom allocator and acts like std::lock_guard such that the memory will | |||
| /// be released when the object goes out of scope | |||
| /// \brief It is a wrapper of unique_ptr with a custom Allocator class defined above | |||
| template <typename T, typename... Args> | |||
| Status MakeUnique(std::unique_ptr<T[], std::function<void(T *)>> *out, Allocator<T> alloc, size_t n, Args &&... args) { | |||
| RETURN_UNEXPECTED_IF_NULL(out); | |||
| CHECK_FAIL_RETURN_UNEXPECTED(n > 0, "size must be positive"); | |||
| T *data = alloc.allocate(n); | |||
| if (!std::is_arithmetic<T>::value) { | |||
| for (auto i = 0; i < n; i++) { | |||
| std::allocator_traits<Allocator<T>>::construct(alloc, &(data[i]), std::forward<Args>(args)...); | |||
| } | |||
| } | |||
| auto deleter = [](T *p, Allocator<T> f_alloc, size_t f_n) { | |||
| if (!std::is_arithmetic<T>::value && std::is_destructible<T>::value) { | |||
| for (auto i = 0; i < f_n; ++i) { | |||
| std::allocator_traits<Allocator<T>>::destroy(f_alloc, &p[i]); | |||
| } | |||
| } | |||
| f_alloc.deallocate(p, f_n); | |||
| }; | |||
| *out = std::unique_ptr<T[], std::function<void(T *)>>(data, std::bind(deleter, std::placeholders::_1, alloc, n)); | |||
| return Status::OK(); | |||
| } | |||
| /// \brief It is a wrapper of the above custom unique_ptr with some additional methods | |||
| /// \tparam T The type of object to be allocated | |||
| /// \tparam C Allocator. Default to std::allocator | |||
| template <typename T, typename C = std::allocator<T>> | |||
| @@ -113,14 +136,7 @@ class MemGuard { | |||
| /// \brief Explicitly deallocate the memory if allocated | |||
| void deallocate() { | |||
| if (ptr_) { | |||
| auto *p = ptr_.release(); | |||
| if (!std::is_arithmetic<T>::value && std::is_destructible<T>::value) { | |||
| for (auto i = 0; i < n_; ++i) { | |||
| p[i].~T(); | |||
| } | |||
| } | |||
| alloc_.deallocate(p, n_); | |||
| n_ = 0; | |||
| ptr_.reset(); | |||
| } | |||
| } | |||
| /// \brief Allocate memory (with emplace feature). Previous one will be released. If size is 0, no new memory is | |||
| @@ -129,24 +145,9 @@ class MemGuard { | |||
| /// \tparam Args Extra arguments pass to the constructor of T | |||
| template <typename... Args> | |||
| Status allocate(size_t n, Args &&... args) noexcept { | |||
| try { | |||
| deallocate(); | |||
| if (n > 0) { | |||
| T *data = alloc_.allocate(n); | |||
| if (!std::is_arithmetic<T>::value) { | |||
| for (auto i = 0; i < n; i++) { | |||
| std::allocator_traits<C>::construct(alloc_, &(data[i]), std::forward<Args>(args)...); | |||
| } | |||
| } | |||
| ptr_ = std::unique_ptr<T[]>(data); | |||
| n_ = n; | |||
| } | |||
| } catch (const std::bad_alloc &e) { | |||
| return Status(StatusCode::kOutOfMemory); | |||
| } catch (std::exception &e) { | |||
| RETURN_STATUS_UNEXPECTED(e.what()); | |||
| } | |||
| return Status::OK(); | |||
| deallocate(); | |||
| n_ = n; | |||
| return MakeUnique(&ptr_, alloc_, n, std::forward<Args>(args)...); | |||
| } | |||
| ~MemGuard() noexcept { deallocate(); } | |||
| /// \brief Getter function | |||
| @@ -170,7 +171,7 @@ class MemGuard { | |||
| private: | |||
| size_t n_; | |||
| allocator alloc_; | |||
| std::unique_ptr<T[]> ptr_; | |||
| std::unique_ptr<T[], std::function<void(T *)>> ptr_; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -33,21 +33,19 @@ struct MemHdr { | |||
| *hdr = *tmp; | |||
| } | |||
| }; | |||
| Status Arena::Init() { | |||
| RETURN_IF_NOT_OK(DeMalloc(size_in_MB_ * 1048576L, &ptr_, false)); | |||
| ArenaImpl::ArenaImpl(void *ptr, size_t sz) : size_in_bytes_(sz), ptr_(ptr) { | |||
| // Divide the memory into blocks. Ignore the last partial block. | |||
| uint64_t num_blks = size_in_bytes_ / ARENA_BLK_SZ; | |||
| MS_LOG(DEBUG) << "Size of memory pool is " << num_blks << ", number of blocks of size is " << ARENA_BLK_SZ << "."; | |||
| tr_.Insert(0, num_blks); | |||
| return Status::OK(); | |||
| } | |||
| Status Arena::Allocate(size_t n, void **p) { | |||
| Status ArenaImpl::Allocate(size_t n, void **p) { | |||
| if (n == 0) { | |||
| *p = nullptr; | |||
| return Status::OK(); | |||
| } | |||
| std::unique_lock<std::mutex> lck(mux_); | |||
| // Round up n to 1K block | |||
| uint64_t req_size = static_cast<uint64_t>(n) + ARENA_WALL_OVERHEAD_SZ; | |||
| if (req_size > this->get_max_size()) { | |||
| @@ -64,7 +62,6 @@ Status Arena::Allocate(size_t n, void **p) { | |||
| if (size > reqBlk) { | |||
| tr_.Insert(addr + reqBlk, size - reqBlk); | |||
| } | |||
| lck.unlock(); | |||
| char *q = static_cast<char *>(ptr_) + addr * ARENA_BLK_SZ; | |||
| MemHdr::setHdr(q, addr, reqBlk); | |||
| *p = get_user_addr(q); | |||
| @@ -74,14 +71,24 @@ Status Arena::Allocate(size_t n, void **p) { | |||
| return Status::OK(); | |||
| } | |||
| void Arena::Deallocate(void *p) { | |||
| std::pair<std::pair<uint64_t, uint64_t>, bool> ArenaImpl::FindPrevBlk(uint64_t addr) { | |||
| for (auto &it : tr_) { | |||
| if (it.key + it.priority == addr) { | |||
| return std::make_pair(std::make_pair(it.key, it.priority), true); | |||
| } else if (it.key > addr) { | |||
| break; | |||
| } | |||
| } | |||
| return std::make_pair(std::make_pair(0, 0), false); | |||
| } | |||
| void ArenaImpl::Deallocate(void *p) { | |||
| auto *q = get_base_addr(p); | |||
| MemHdr hdr(0, 0); | |||
| MemHdr::getHdr(q, &hdr); | |||
| MS_ASSERT(hdr.sig == 0xDEADBEEF); | |||
| // We are going to insert a free block back to the treap. But first, check if we can combine | |||
| // with the free blocks before and after to form a bigger block. | |||
| std::unique_lock<std::mutex> lck(mux_); | |||
| // Query if we have a free block after us. | |||
| auto nextBlk = tr_.Search(hdr.addr + hdr.blk_size); | |||
| if (nextBlk.second) { | |||
| @@ -101,7 +108,61 @@ void Arena::Deallocate(void *p) { | |||
| tr_.Insert(hdr.addr, hdr.blk_size); | |||
| } | |||
| Status Arena::Reallocate(void **pp, size_t old_sz, size_t new_sz) { | |||
| bool ArenaImpl::BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz) { | |||
| uint64_t size = old_sz; | |||
| // The logic is very much identical to Deallocate. We will see if we can combine with the blocks before and after. | |||
| auto next_blk = tr_.Search(*addr + old_sz); | |||
| if (next_blk.second) { | |||
| size += next_blk.first.priority; | |||
| if (size >= new_sz) { | |||
| // In this case, we can just enlarge the block without doing any moving. | |||
| tr_.DeleteKey(next_blk.first.key); | |||
| // Return unused back to the tree. | |||
| if (size > new_sz) { | |||
| tr_.Insert(*addr + new_sz, size - new_sz); | |||
| } | |||
| } | |||
| return true; | |||
| } | |||
| // If we still get here, we have to look at the block before us. | |||
| auto result = FindPrevBlk(*addr); | |||
| if (result.second) { | |||
| // We can combine with this block together with the next block (if any) | |||
| size += result.first.second; | |||
| *addr = result.first.first; | |||
| if (size >= new_sz) { | |||
| // We can combine with this block together with the next block (if any) | |||
| tr_.DeleteKey(*addr); | |||
| if (next_blk.second) { | |||
| tr_.DeleteKey(next_blk.first.key); | |||
| } | |||
| // Return unused back to the tree. | |||
| if (size > new_sz) { | |||
| tr_.Insert(*addr + new_sz, size - new_sz); | |||
| } | |||
| return true; | |||
| } | |||
| } | |||
| return false; | |||
| } | |||
| Status ArenaImpl::FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz) { | |||
| MS_ASSERT(pp); | |||
| MS_ASSERT(*pp); | |||
| void *p = nullptr; | |||
| void *q = *pp; | |||
| RETURN_IF_NOT_OK(Allocate(new_sz, &p)); | |||
| errno_t err = memmove_s(p, new_sz, q, old_sz); | |||
| if (err) { | |||
| RETURN_STATUS_UNEXPECTED("Error from memmove: " + std::to_string(err)); | |||
| } | |||
| *pp = p; | |||
| // Free the old one. | |||
| Deallocate(q); | |||
| return Status::OK(); | |||
| } | |||
| Status ArenaImpl::Reallocate(void **pp, size_t old_sz, size_t new_sz) { | |||
| MS_ASSERT(pp); | |||
| MS_ASSERT(*pp); | |||
| uint64_t actual_size = static_cast<uint64_t>(new_sz) + ARENA_WALL_OVERHEAD_SZ; | |||
| @@ -114,7 +175,6 @@ Status Arena::Reallocate(void **pp, size_t old_sz, size_t new_sz) { | |||
| MemHdr hdr(0, 0); | |||
| MemHdr::getHdr(oldHdr, &hdr); | |||
| MS_ASSERT(hdr.sig == 0xDEADBEEF); | |||
| std::unique_lock<std::mutex> lck(mux_); | |||
| if (hdr.blk_size > req_blk) { | |||
| // Refresh the header with the new smaller size. | |||
| MemHdr::setHdr(oldHdr, hdr.addr, req_blk); | |||
| @@ -142,42 +202,12 @@ Status Arena::Reallocate(void **pp, size_t old_sz, size_t new_sz) { | |||
| *pp = get_user_addr(newHdr); | |||
| return Status::OK(); | |||
| } | |||
| // If we reach here, allocate a new block and simply move the content from the old to the new place. | |||
| // Unlock since allocate will grab the lock again. | |||
| lck.unlock(); | |||
| return FreeAndAlloc(pp, old_sz, new_sz); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| std::ostream &operator<<(std::ostream &os, const Arena &s) { | |||
| for (auto &it : s.tr_) { | |||
| os << "Address : " << it.key << ". Size : " << it.priority << "\n"; | |||
| } | |||
| return os; | |||
| } | |||
| Arena::Arena(size_t val_in_MB) : ptr_(nullptr), size_in_MB_(val_in_MB), size_in_bytes_(val_in_MB * 1048576L) {} | |||
| Status Arena::CreateArena(std::shared_ptr<Arena> *p_ba, size_t val_in_MB) { | |||
| if (p_ba == nullptr) { | |||
| RETURN_STATUS_UNEXPECTED("p_ba is null"); | |||
| } | |||
| Status rc; | |||
| auto ba = new (std::nothrow) Arena(val_in_MB); | |||
| if (ba == nullptr) { | |||
| return Status(StatusCode::kOutOfMemory); | |||
| } | |||
| rc = ba->Init(); | |||
| if (rc.IsOk()) { | |||
| (*p_ba).reset(ba); | |||
| } else { | |||
| delete ba; | |||
| } | |||
| return rc; | |||
| } | |||
| int Arena::PercentFree() const { | |||
| int ArenaImpl::PercentFree() const { | |||
| uint64_t sz = 0; | |||
| for (auto &it : tr_) { | |||
| sz += it.priority; | |||
| @@ -186,70 +216,42 @@ int Arena::PercentFree() const { | |||
| return static_cast<int>(ratio * 100.0); | |||
| } | |||
| uint64_t Arena::get_max_size() const { return (size_in_bytes_ - ARENA_WALL_OVERHEAD_SZ); } | |||
| std::pair<std::pair<uint64_t, uint64_t>, bool> Arena::FindPrevBlk(uint64_t addr) { | |||
| for (auto &it : tr_) { | |||
| if (it.key + it.priority == addr) { | |||
| return std::make_pair(std::make_pair(it.key, it.priority), true); | |||
| } else if (it.key > addr) { | |||
| break; | |||
| } | |||
| uint64_t ArenaImpl::SizeToBlk(uint64_t sz) { | |||
| uint64_t req_blk = sz / ARENA_BLK_SZ; | |||
| if (sz % ARENA_BLK_SZ) { | |||
| ++req_blk; | |||
| } | |||
| return std::make_pair(std::make_pair(0, 0), false); | |||
| return req_blk; | |||
| } | |||
| bool Arena::BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz) { | |||
| uint64_t size = old_sz; | |||
| // The logic is very much identical to Deallocate. We will see if we can combine with the blocks before and after. | |||
| auto next_blk = tr_.Search(*addr + old_sz); | |||
| if (next_blk.second) { | |||
| size += next_blk.first.priority; | |||
| if (size >= new_sz) { | |||
| // In this case, we can just enlarge the block without doing any moving. | |||
| tr_.DeleteKey(next_blk.first.key); | |||
| // Return unused back to the tree. | |||
| if (size > new_sz) { | |||
| tr_.Insert(*addr + new_sz, size - new_sz); | |||
| } | |||
| } | |||
| return true; | |||
| std::ostream &operator<<(std::ostream &os, const ArenaImpl &s) { | |||
| for (auto &it : s.tr_) { | |||
| os << "Address : " << it.key << ". Size : " << it.priority << "\n"; | |||
| } | |||
| // If we still get here, we have to look at the block before us. | |||
| auto result = FindPrevBlk(*addr); | |||
| if (result.second) { | |||
| // We can combine with this block together with the next block (if any) | |||
| size += result.first.second; | |||
| *addr = result.first.first; | |||
| if (size >= new_sz) { | |||
| // We can combine with this block together with the next block (if any) | |||
| tr_.DeleteKey(*addr); | |||
| if (next_blk.second) { | |||
| tr_.DeleteKey(next_blk.first.key); | |||
| } | |||
| // Return unused back to the tree. | |||
| if (size > new_sz) { | |||
| tr_.Insert(*addr + new_sz, size - new_sz); | |||
| } | |||
| return true; | |||
| } | |||
| return os; | |||
| } | |||
| Status Arena::Init() { | |||
| try { | |||
| auto sz = size_in_MB_ * 1048576L; | |||
| mem_ = std::make_unique<uint8_t[]>(sz); | |||
| impl_ = std::make_unique<ArenaImpl>(mem_.get(), sz); | |||
| } catch (std::bad_alloc &e) { | |||
| return Status(StatusCode::kOutOfMemory); | |||
| } | |||
| return false; | |||
| return Status::OK(); | |||
| } | |||
| Status Arena::FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz) { | |||
| MS_ASSERT(pp); | |||
| MS_ASSERT(*pp); | |||
| void *p = nullptr; | |||
| void *q = *pp; | |||
| RETURN_IF_NOT_OK(Allocate(new_sz, &p)); | |||
| errno_t err = memmove_s(p, new_sz, q, old_sz); | |||
| if (err) { | |||
| RETURN_STATUS_UNEXPECTED("Error from memmove: " + std::to_string(err)); | |||
| Arena::Arena(size_t val_in_MB) : size_in_MB_(val_in_MB) {} | |||
| Status Arena::CreateArena(std::shared_ptr<Arena> *p_ba, size_t val_in_MB) { | |||
| RETURN_UNEXPECTED_IF_NULL(p_ba); | |||
| auto ba = new (std::nothrow) Arena(val_in_MB); | |||
| if (ba == nullptr) { | |||
| return Status(StatusCode::kOutOfMemory); | |||
| } | |||
| *pp = p; | |||
| // Free the old one. | |||
| Deallocate(q); | |||
| (*p_ba).reset(ba); | |||
| RETURN_IF_NOT_OK(ba->Init()); | |||
| return Status::OK(); | |||
| } | |||
| } // namespace dataset | |||
| @@ -41,63 +41,111 @@ namespace dataset { | |||
| /// | |||
| /// When a block of memory is freed. It is joined with the blocks before and after (if they are available) to | |||
| /// form a bigger block. | |||
| class Arena : public MemoryPool { | |||
| public: | |||
| Arena(const Arena &) = delete; | |||
| Arena &operator=(const Arena &) = delete; | |||
| ~Arena() override { | |||
| if (ptr_ != nullptr) { | |||
| free(ptr_); | |||
| ptr_ = nullptr; | |||
| } | |||
| } | |||
| /// At the lowest level, we don't really care where the memory is coming from. | |||
| /// This allows other class to make use of Arena method and override the origin of the | |||
| /// memory, say from some unix shared memory instead. | |||
| /// \note Implementation class is not thread safe. Caller needs to ensure proper serialization | |||
| class ArenaImpl { | |||
| public: | |||
| /// Constructor | |||
| /// \param ptr The start of the memory address | |||
| /// \param sz Size of the memory block we manage | |||
| ArenaImpl(void *ptr, size_t sz); | |||
| ~ArenaImpl() { ptr_ = nullptr; } | |||
| /// \brief Allocate a sub block | |||
| /// \param n Size requested | |||
| /// \param p pointer to where the result is stored | |||
| /// \return Status object. | |||
| Status Allocate(size_t n, void **p); | |||
| /// \brief Enlarge or shrink a sub block | |||
| /// \param old_sz Original size | |||
| /// \param new_sz New size | |||
| /// \return Status object | |||
| Status Reallocate(void **, size_t old_sz, size_t new_sz); | |||
| /// \brief Free a sub block | |||
| /// \param Address of the block to be freed. | |||
| void Deallocate(void *); | |||
| /// \brief Calculate % free of the memory | |||
| /// \return Percent free | |||
| int PercentFree() const; | |||
| /// \brief What is the maximum we can support in allocate. | |||
| /// \return Max value | |||
| uint64_t get_max_size() const { return (size_in_bytes_ - ARENA_WALL_OVERHEAD_SZ); } | |||
| /// \brief Get the start of the address. Read only | |||
| /// \return Start of the address block | |||
| const void *get_base_addr() const { return ptr_; } | |||
| Status Allocate(size_t n, void **p) override; | |||
| static uint64_t SizeToBlk(uint64_t sz); | |||
| friend std::ostream &operator<<(std::ostream &os, const ArenaImpl &s); | |||
| Status Reallocate(void **, size_t old_sz, size_t new_sz) override; | |||
| private: | |||
| size_t size_in_bytes_; | |||
| Treap<uint64_t, uint64_t> tr_; | |||
| void *ptr_; | |||
| void Deallocate(void *) override; | |||
| void *get_user_addr(void *base_addr) const { return reinterpret_cast<char *>(base_addr) + ARENA_WALL_OVERHEAD_SZ; } | |||
| void *get_base_addr(void *user_addr) const { return reinterpret_cast<char *>(user_addr) - ARENA_WALL_OVERHEAD_SZ; } | |||
| std::pair<std::pair<uint64_t, uint64_t>, bool> FindPrevBlk(uint64_t addr); | |||
| bool BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz); | |||
| Status FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz); | |||
| }; | |||
| uint64_t get_max_size() const override; | |||
| /// \brief This version of Arena allocates from private memory | |||
| class Arena : public MemoryPool { | |||
| public: | |||
| // Disable copy and assignment constructor | |||
| Arena(const Arena &) = delete; | |||
| Arena &operator=(const Arena &) = delete; | |||
| ~Arena() override = default; | |||
| static uint64_t SizeToBlk(uint64_t sz) { | |||
| uint64_t req_blk = sz / ARENA_BLK_SZ; | |||
| if (sz % ARENA_BLK_SZ) { | |||
| ++req_blk; | |||
| } | |||
| return req_blk; | |||
| /// As a derived class of MemoryPool, we have to implement the following. | |||
| /// But we simply transfer the call to the implementation class | |||
| Status Allocate(size_t size, void **pVoid) override { | |||
| std::unique_lock<std::mutex> lock(mux_); | |||
| return impl_->Allocate(size, pVoid); | |||
| } | |||
| Status Reallocate(void **pVoid, size_t old_sz, size_t new_sz) override { | |||
| std::unique_lock<std::mutex> lock(mux_); | |||
| return impl_->Reallocate(pVoid, old_sz, new_sz); | |||
| } | |||
| void Deallocate(void *pVoid) override { | |||
| std::unique_lock<std::mutex> lock(mux_); | |||
| impl_->Deallocate(pVoid); | |||
| } | |||
| uint64_t get_max_size() const override { return impl_->get_max_size(); } | |||
| int PercentFree() const override { | |||
| std::unique_lock<std::mutex> lock(mux_); | |||
| return impl_->PercentFree(); | |||
| } | |||
| int PercentFree() const override; | |||
| const void *get_base_addr() const { return ptr_; } | |||
| /// \return Return the start of the memory block | |||
| const void *get_base_addr() const { return impl_->get_base_addr(); } | |||
| friend std::ostream &operator<<(std::ostream &os, const Arena &s); | |||
| /// \brief Dump the memory allocation block. | |||
| friend std::ostream &operator<<(std::ostream &os, const Arena &s) { | |||
| os << *(s.impl_); | |||
| return os; | |||
| } | |||
| /// The only method to create an arena. | |||
| static Status CreateArena(std::shared_ptr<Arena> *p_ba, size_t val_in_MB = 4096); | |||
| protected: | |||
| std::mutex mux_; | |||
| Treap<uint64_t, uint64_t> tr_; | |||
| void *ptr_; | |||
| mutable std::mutex mux_; | |||
| std::unique_ptr<ArenaImpl> impl_; | |||
| std::unique_ptr<uint8_t[]> mem_; | |||
| size_t size_in_MB_; | |||
| size_t size_in_bytes_; | |||
| explicit Arena(size_t val_in_MB = 4096); | |||
| std::pair<std::pair<uint64_t, uint64_t>, bool> FindPrevBlk(uint64_t addr); | |||
| Status Init(); | |||
| bool BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz); | |||
| Status FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz); | |||
| void *get_user_addr(void *base_addr) const { return reinterpret_cast<char *>(base_addr) + ARENA_WALL_OVERHEAD_SZ; } | |||
| void *get_base_addr(void *user_addr) const { return reinterpret_cast<char *>(user_addr) - ARENA_WALL_OVERHEAD_SZ; } | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -47,10 +47,16 @@ Status BuddySpace::Init() { | |||
| size_t offset_1 = sizeof(rel_addr_t) * num_lvl_; | |||
| size_t offset_2 = sizeof(int) * num_lvl_ + offset_1; | |||
| size_t offset_3 = sizeof(char) * BitLeftShift(1, num_lvl_ - 3) + offset_2; | |||
| RETURN_IF_NOT_OK(DeMalloc(offset_3, &ptr_, true)); | |||
| hint_ = reinterpret_cast<rel_addr_t *>(ptr_); | |||
| count_ = reinterpret_cast<int *>((reinterpret_cast<char *>(ptr_) + offset_1)); | |||
| map_ = reinterpret_cast<char *>(ptr_) + offset_2; | |||
| try { | |||
| mem_ = std::make_unique<uint8_t[]>(offset_3); | |||
| } catch (const std::bad_alloc &e) { | |||
| return Status(StatusCode::kOutOfMemory); | |||
| } | |||
| (void)memset_s(mem_.get(), offset_3, 0, offset_3); | |||
| auto ptr = mem_.get(); | |||
| hint_ = reinterpret_cast<rel_addr_t *>(ptr); | |||
| count_ = reinterpret_cast<int *>((reinterpret_cast<char *>(ptr) + offset_1)); | |||
| map_ = reinterpret_cast<char *>(ptr) + offset_2; | |||
| count_[num_lvl_ - 1] = 1; | |||
| map_[0] = BitOr(MORE_BIT, num_lvl_ - 3); | |||
| return Status::OK(); | |||
| @@ -352,19 +358,9 @@ int BuddySpace::PercentFree() const { | |||
| } | |||
| BuddySpace::BuddySpace(int log_min, int num_lvl) | |||
| : hint_(nullptr), | |||
| count_(nullptr), | |||
| map_(nullptr), | |||
| log_min_(log_min), | |||
| num_lvl_(num_lvl), | |||
| min_(0), | |||
| max_(0), | |||
| ptr_(nullptr) {} | |||
| : hint_(nullptr), count_(nullptr), map_(nullptr), log_min_(log_min), num_lvl_(num_lvl), min_(0), max_(0) {} | |||
| BuddySpace::~BuddySpace() { | |||
| if (ptr_ != nullptr) { | |||
| free(ptr_); | |||
| } | |||
| hint_ = nullptr; | |||
| count_ = nullptr; | |||
| map_ = nullptr; | |||
| @@ -94,7 +94,7 @@ class BuddySpace { | |||
| int num_lvl_; | |||
| uint64_t min_; | |||
| uint64_t max_; | |||
| void *ptr_; | |||
| std::unique_ptr<uint8_t[]> mem_; | |||
| std::mutex mutex_; | |||
| explicit BuddySpace(int log_min = 15, int num_lvl = 18); | |||
| @@ -33,18 +33,6 @@ | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| template <typename T> | |||
| struct is_shared_ptr : public std::false_type {}; | |||
| template <typename T> | |||
| struct is_shared_ptr<std::shared_ptr<T>> : public std::true_type {}; | |||
| template <typename T> | |||
| struct is_unique_ptr : public std::false_type {}; | |||
| template <typename T> | |||
| struct is_unique_ptr<std::unique_ptr<T>> : public std::true_type {}; | |||
| // A simple thread safe queue using a fixed size array | |||
| template <typename T> | |||
| class Queue { | |||
| @@ -55,44 +43,25 @@ class Queue { | |||
| using reference = T &; | |||
| using const_reference = const T &; | |||
| void Init() { | |||
| if (sz_ > 0) { | |||
| // We allocate a block of memory and then call the default constructor for each slot. Maybe simpler to call | |||
| // new[] but we want to control where the memory is allocated from. | |||
| arr_ = alloc_.allocate(sz_); | |||
| for (uint64_t i = 0; i < sz_; i++) { | |||
| std::allocator_traits<Allocator<T>>::construct(alloc_, &(arr_[i])); | |||
| } | |||
| } | |||
| } | |||
| explicit Queue(int sz) | |||
| : sz_(sz), | |||
| arr_(nullptr), | |||
| head_(0), | |||
| tail_(0), | |||
| my_name_(Services::GetUniqueID()), | |||
| alloc_(Services::GetInstance().GetServiceMemPool()) { | |||
| Init(); | |||
| MS_LOG(DEBUG) << "Create Q with uuid " << my_name_ << " of size " << sz_ << "."; | |||
| } | |||
| virtual ~Queue() { | |||
| ResetQue(); | |||
| if (arr_) { | |||
| // Simply free the pointer. Since there is nothing in the queue. We don't want to invoke the destructor | |||
| // of T in each slot. | |||
| alloc_.deallocate(arr_); | |||
| arr_ = nullptr; | |||
| : sz_(sz), arr_(Services::GetAllocator<T>()), head_(0), tail_(0), my_name_(Services::GetUniqueID()) { | |||
| Status rc = arr_.allocate(sz); | |||
| if (rc.IsError()) { | |||
| MS_LOG(ERROR) << "Fail to create a queue."; | |||
| std::terminate(); | |||
| } else { | |||
| MS_LOG(DEBUG) << "Create Q with uuid " << my_name_ << " of size " << sz_ << "."; | |||
| } | |||
| } | |||
| int size() const { | |||
| int v = tail_ - head_; | |||
| virtual ~Queue() { ResetQue(); } | |||
| size_t size() const { | |||
| size_t v = tail_ - head_; | |||
| return (v >= 0) ? v : 0; | |||
| } | |||
| int capacity() const { return sz_; } | |||
| size_t capacity() const { return sz_; } | |||
| bool empty() const { return head_ == tail_; } | |||
| @@ -104,8 +73,8 @@ class Queue { | |||
| // Block when full | |||
| Status rc = full_cv_.Wait(&_lock, [this]() -> bool { return (size() != capacity()); }); | |||
| if (rc.IsOk()) { | |||
| uint32_t k = tail_++ % sz_; | |||
| arr_[k] = ele; | |||
| auto k = tail_++ % sz_; | |||
| *(arr_[k]) = ele; | |||
| empty_cv_.NotifyAll(); | |||
| _lock.unlock(); | |||
| } else { | |||
| @@ -119,8 +88,8 @@ class Queue { | |||
| // Block when full | |||
| Status rc = full_cv_.Wait(&_lock, [this]() -> bool { return (size() != capacity()); }); | |||
| if (rc.IsOk()) { | |||
| uint32_t k = tail_++ % sz_; | |||
| arr_[k] = std::forward<T>(ele); | |||
| auto k = tail_++ % sz_; | |||
| *(arr_[k]) = std::forward<T>(ele); | |||
| empty_cv_.NotifyAll(); | |||
| _lock.unlock(); | |||
| } else { | |||
| @@ -135,8 +104,8 @@ class Queue { | |||
| // Block when full | |||
| Status rc = full_cv_.Wait(&_lock, [this]() -> bool { return (size() != capacity()); }); | |||
| if (rc.IsOk()) { | |||
| uint32_t k = tail_++ % sz_; | |||
| new (&(arr_[k])) T(std::forward<Ts>(args)...); | |||
| auto k = tail_++ % sz_; | |||
| new (arr_[k]) T(std::forward<Ts>(args)...); | |||
| empty_cv_.NotifyAll(); | |||
| _lock.unlock(); | |||
| } else { | |||
| @@ -151,20 +120,8 @@ class Queue { | |||
| // Block when empty | |||
| Status rc = empty_cv_.Wait(&_lock, [this]() -> bool { return !empty(); }); | |||
| if (rc.IsOk()) { | |||
| uint32_t k = head_++ % sz_; | |||
| *p = std::move(arr_[k]); | |||
| if (std::is_destructible<T>::value) { | |||
| // std::move above only changes arr_[k] from rvalue to lvalue. | |||
| // The real implementation of move constructor depends on T. | |||
| // It may be compiler generated or user defined. But either case | |||
| // the result of arr_[k] is still a valid object of type T, and | |||
| // we will not keep any extra copy in the queue. | |||
| arr_[k].~T(); | |||
| // For gcc 9, an extra fix is needed here to clear the memory content | |||
| // of arr_[k] because this slot can be reused by another Add which can | |||
| // do another std::move. We have seen SEGV here in this case. | |||
| std::allocator_traits<Allocator<T>>::construct(alloc_, &(arr_[k])); | |||
| } | |||
| auto k = head_++ % sz_; | |||
| *p = std::move(*(arr_[k])); | |||
| full_cv_.NotifyAll(); | |||
| _lock.unlock(); | |||
| } else { | |||
| @@ -175,15 +132,15 @@ class Queue { | |||
| void ResetQue() noexcept { | |||
| std::unique_lock<std::mutex> _lock(mux_); | |||
| // If there are elements in the queue, invoke its destructor one by one. | |||
| if (!empty() && std::is_destructible<T>::value) { | |||
| for (uint64_t i = head_; i < tail_; i++) { | |||
| uint32_t k = i % sz_; | |||
| arr_[k].~T(); | |||
| } | |||
| } | |||
| for (uint64_t i = 0; i < sz_; i++) { | |||
| std::allocator_traits<Allocator<T>>::construct(alloc_, &(arr_[i])); | |||
| // If there are elements in the queue, drain them. We won't call PopFront directly | |||
| // because we have got the lock already. We will deadlock if we call PopFront | |||
| for (auto i = head_; i < tail_; ++i) { | |||
| auto k = i % sz_; | |||
| auto val = std::move(*(arr_[k])); | |||
| // Let val go out of scope and its destructor will be invoked automatically. | |||
| // But our compiler may complain val is not in use. So let's do some useless | |||
| // stuff. | |||
| MS_LOG(DEBUG) << "Address of val: " << &val; | |||
| } | |||
| empty_cv_.ResetIntrpState(); | |||
| full_cv_.ResetIntrpState(); | |||
| @@ -202,15 +159,14 @@ class Queue { | |||
| } | |||
| private: | |||
| uint64_t sz_; | |||
| pointer arr_; | |||
| uint64_t head_; | |||
| uint64_t tail_; | |||
| size_t sz_; | |||
| MemGuard<T, Allocator<T>> arr_; | |||
| size_t head_; | |||
| size_t tail_; | |||
| std::string my_name_; | |||
| std::mutex mux_; | |||
| CondVar empty_cv_; | |||
| CondVar full_cv_; | |||
| Allocator<T> alloc_; | |||
| }; | |||
| // A container of queues with [] operator accessors. Basically this is a wrapper over of a vector of queues | |||
| @@ -237,7 +193,7 @@ class QueueList { | |||
| return Status::OK(); | |||
| } | |||
| int size() const { return queue_list_.size(); } | |||
| auto size() const { return queue_list_.size(); } | |||
| std::unique_ptr<Queue<T>> &operator[](const int index) { return queue_list_[index]; } | |||
| @@ -15,7 +15,9 @@ | |||
| */ | |||
| #include <string> | |||
| #include "minddata/dataset/util/allocator.h" | |||
| #include "minddata/dataset/util/arena.h" | |||
| #include "minddata/dataset/util/system_pool.h" | |||
| #include "common/common.h" | |||
| #include "utils/log_adapter.h" | |||
| @@ -27,11 +29,10 @@ class MindDataTestArena : public UT::Common { | |||
| }; | |||
| TEST_F(MindDataTestArena, TestALLFunction) { | |||
| TEST_F(MindDataTestArena, Test1) { | |||
| std::shared_ptr<Arena> mp; | |||
| Status rc = Arena::CreateArena(&mp); | |||
| ASSERT_TRUE(rc.IsOk()); | |||
| std::shared_ptr<Arena> arena = std::dynamic_pointer_cast<Arena>(mp); | |||
| std::vector<void *> v; | |||
| srand(time(NULL)); | |||
| @@ -46,3 +47,25 @@ TEST_F(MindDataTestArena, TestALLFunction) { | |||
| } | |||
| MS_LOG(DEBUG) << *mp; | |||
| } | |||
| TEST_F(MindDataTestArena, Test2) { | |||
| std::shared_ptr<Arena> arena; | |||
| Status rc = Arena::CreateArena(&arena); | |||
| std::shared_ptr<MemoryPool> mp = std::static_pointer_cast<MemoryPool>(arena); | |||
| auto alloc = Allocator<int>(mp); | |||
| ASSERT_TRUE(rc.IsOk()); | |||
| std::vector<int, Allocator<int>> v(alloc); | |||
| v.reserve(1000); | |||
| for (auto i = 0; i < 1000; ++i) { | |||
| v.push_back(i); | |||
| } | |||
| // Test copy | |||
| std::vector<int, Allocator<int>> w(v, SystemPool::GetAllocator<int>()); | |||
| auto val = w.at(10); | |||
| EXPECT_EQ(val, 10); | |||
| // Test move | |||
| std::vector<int, Allocator<int>> s(std::move(v), SystemPool::GetAllocator<int>()); | |||
| val = s.at(100); | |||
| EXPECT_EQ(val, 100); | |||
| EXPECT_EQ(v.size(), 0); | |||
| } | |||
| @@ -50,6 +50,13 @@ class RefCount { | |||
| v_ = o.v_; | |||
| return *this; | |||
| } | |||
| RefCount(RefCount &&o) : v_(std::move(o.v_)) {} | |||
| RefCount &operator=(RefCount &&o) { | |||
| if (&o != this) { | |||
| v_ = std::move(o.v_); | |||
| } | |||
| return *this; | |||
| } | |||
| std::shared_ptr<int> v_; | |||
| }; | |||
| @@ -148,8 +155,9 @@ TEST_F(MindDataTestQueue, Test4) { test4(); } | |||
| TEST_F(MindDataTestQueue, Test5) { | |||
| test4(); | |||
| // Assume we have run Test4. The destructor of the RefCount should be called 4 times. | |||
| // One for a. One for b. One for line 125 when we pop. One for the stale element in the queue. | |||
| ASSERT_EQ(gRefCountDestructorCalled, 4); | |||
| // One for a. One for b. One for the stale element in the queue. 3 more for | |||
| // the one in the queue (but they are empty). | |||
| ASSERT_EQ(gRefCountDestructorCalled, 6); | |||
| } | |||
| TEST_F(MindDataTestQueue, Test6) { | |||
| @@ -169,70 +177,3 @@ TEST_F(MindDataTestQueue, Test6) { | |||
| MS_LOG(INFO) << "Popped value " << *pepped_value << " from queue index " << chosen_queue_index; | |||
| ASSERT_EQ(*pepped_value, 99); | |||
| } | |||
| using namespace std::chrono; | |||
| template <typename QueueType, typename PayloadType> | |||
| void Perf(int n, int p, std::string name) { | |||
| auto payload = std::vector<PayloadType>(n, PayloadType(p)); | |||
| auto queue = QueueType(n); | |||
| auto t0 = high_resolution_clock::now(); | |||
| auto check = 0; | |||
| for (int i = 0; i < queue.capacity(); i++) { | |||
| queue.Add(PayloadType(p)); | |||
| } | |||
| check = queue.size(); | |||
| for (int i = 0; i < queue.capacity(); i++) { | |||
| queue.PopFront(&payload[i]); | |||
| } | |||
| auto t1 = high_resolution_clock::now(); | |||
| std::cout << name << " queue filled size: " << queue.size() << " " << check << std::endl; | |||
| auto t2 = high_resolution_clock::now(); | |||
| for (int i = 0; i < queue.capacity(); i++) { | |||
| queue.Add(PayloadType(p)); | |||
| } | |||
| check = queue.size(); | |||
| for (int i = 0; i < queue.capacity(); i++) { | |||
| queue.PopFront(&payload[i]); | |||
| } | |||
| auto t3 = high_resolution_clock::now(); | |||
| auto d = duration_cast<milliseconds>(t3 - t2 + t1 - t0).count(); | |||
| std::cout << name << " queue emptied size: " << queue.size() << " " << check << std::endl; | |||
| std::cout << name << " " | |||
| << " ran in " << d << "ms" << std::endl; | |||
| } | |||
| template <typename QueueType, typename PayloadType> | |||
| void Fuzz(int n, int p, std::string name) { | |||
| std::mt19937 gen(1); | |||
| auto payload = std::vector<PayloadType>(n, PayloadType(p)); | |||
| auto queue = QueueType(n); | |||
| auto dist = std::uniform_int_distribution<int>(0, 2); | |||
| std::cout << "###" << std::endl; | |||
| for (auto i = 0; i < n; i++) { | |||
| auto v = dist(gen); | |||
| if (v == 0 && queue.size() < n - 1) { | |||
| queue.Add(std::move(payload[i])); | |||
| } | |||
| if (v == 1 && queue.size() > 0) { | |||
| queue.PopFront(&payload[i]); | |||
| } else { | |||
| queue.Reset(); | |||
| } | |||
| } | |||
| std::cout << name << " fuzz ran " << queue.size() << std::endl; | |||
| } | |||
| TEST_F(MindDataTestQueue, TestPerf) { | |||
| try { | |||
| int kSz = 1000000; | |||
| // std::cout << "enter size" << std::endl; | |||
| // std::cin >> kSz; | |||
| Perf<Queue<std::vector<int>>, std::vector<int>>(kSz, 1, "old queue, vector of size 1"); | |||
| } catch (const std::exception &e) { | |||
| std::cout << e.what() << std::endl; | |||
| } | |||
| std::cout << "Test Reset" << std::endl; | |||
| std::cout << "Enter fuzz size" << std::endl; | |||
| int fs = 1000; | |||
| // std::cin >> fs; | |||
| Fuzz<Queue<std::vector<int>>, std::vector<int>>(fs, 1, "New queue"); | |||
| } | |||