| @@ -49,7 +49,7 @@ class DatasetInitKernel : public GpuKernel { | |||
| std::vector<size_t> workspace_size_list_; | |||
| // The capacity of buffer Q. | |||
| size_t buffer_q_capacity_{1}; | |||
| size_t buffer_q_capacity_{2}; | |||
| }; | |||
| MS_REG_GPU_KERNEL(InitDataSetQueue, DatasetInitKernel) | |||
| @@ -42,16 +42,30 @@ if (ENABLE_CACHE) | |||
| storage_container.cc) | |||
| add_executable(cache_server cache_main.cc) | |||
| target_link_libraries(cache_server | |||
| engine-cache-server | |||
| _c_dataengine | |||
| _c_mindrecord | |||
| mindspore::protobuf | |||
| mindspore::grpc++ | |||
| mindspore_gvar | |||
| ${PYTHON_LIBRARIES} | |||
| ${SECUREC_LIBRARY} | |||
| pthread) | |||
| if (ENABLE_GPU) | |||
| target_link_libraries(cache_server | |||
| engine-cache-server | |||
| _c_dataengine | |||
| _c_mindrecord | |||
| mindspore::protobuf | |||
| mindspore::grpc++ | |||
| mindspore_gvar | |||
| ${CUDNN_LIBRARY_PATH} | |||
| ${PYTHON_LIBRARIES} | |||
| ${SECUREC_LIBRARY} | |||
| pthread) | |||
| else() | |||
| target_link_libraries(cache_server | |||
| engine-cache-server | |||
| _c_dataengine | |||
| _c_mindrecord | |||
| mindspore::protobuf | |||
| mindspore::grpc++ | |||
| mindspore_gvar | |||
| ${PYTHON_LIBRARIES} | |||
| ${SECUREC_LIBRARY} | |||
| pthread) | |||
| endif() | |||
| if (USE_GLOG) | |||
| target_link_libraries(cache_server mindspore::glog) | |||
| @@ -91,7 +91,7 @@ Status DeviceQueueOp::operator()() { | |||
| #endif | |||
| } else if (device_type_ == DeviceType::GPU) { | |||
| #ifdef ENABLE_GPUQUE | |||
| RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool_)); | |||
| RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool_, -1, 1024, false, true)); | |||
| RETURN_IF_NOT_OK(SendDataToGPU()); | |||
| #endif | |||
| } else if (device_type_ == DeviceType::CPU) { | |||
| @@ -235,14 +235,43 @@ std::ostream &operator<<(std::ostream &os, const ArenaImpl &s) { | |||
| Status Arena::Init() { | |||
| try { | |||
| int64_t sz = size_in_MB_ * 1048576L; | |||
| #ifdef ENABLE_GPUQUE | |||
| if (is_cuda_malloc_) { | |||
| auto ret = cudaHostAlloc(&ptr_, sz, cudaHostAllocDefault); | |||
| if (ret != cudaSuccess) { | |||
| MS_LOG(ERROR) << "cudaHostAlloc failed, ret[" << static_cast<int>(ret) << "], " << cudaGetErrorString(ret); | |||
| return Status(StatusCode::kOutOfMemory); | |||
| } | |||
| impl_ = std::make_unique<ArenaImpl>(ptr_, sz); | |||
| } else { | |||
| RETURN_IF_NOT_OK(DeMalloc(sz, &ptr_, false)); | |||
| impl_ = std::make_unique<ArenaImpl>(ptr_, sz); | |||
| } | |||
| #else | |||
| RETURN_IF_NOT_OK(DeMalloc(sz, &ptr_, false)); | |||
| impl_ = std::make_unique<ArenaImpl>(ptr_, sz); | |||
| #endif | |||
| } catch (std::bad_alloc &e) { | |||
| return Status(StatusCode::kOutOfMemory); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| #ifdef ENABLE_GPUQUE | |||
| Arena::Arena(size_t val_in_MB, bool is_cuda_malloc) | |||
| : ptr_(nullptr), size_in_MB_(val_in_MB), is_cuda_malloc_(is_cuda_malloc) {} | |||
| Status Arena::CreateArena(std::shared_ptr<Arena> *p_ba, size_t val_in_MB, bool is_cuda_malloc) { | |||
| RETURN_UNEXPECTED_IF_NULL(p_ba); | |||
| auto ba = new (std::nothrow) Arena(val_in_MB, is_cuda_malloc); | |||
| if (ba == nullptr) { | |||
| return Status(StatusCode::kOutOfMemory); | |||
| } | |||
| (*p_ba).reset(ba); | |||
| RETURN_IF_NOT_OK(ba->Init()); | |||
| return Status::OK(); | |||
| } | |||
| #else | |||
| Arena::Arena(size_t val_in_MB) : ptr_(nullptr), size_in_MB_(val_in_MB) {} | |||
| Status Arena::CreateArena(std::shared_ptr<Arena> *p_ba, size_t val_in_MB) { | |||
| @@ -255,5 +284,6 @@ Status Arena::CreateArena(std::shared_ptr<Arena> *p_ba, size_t val_in_MB) { | |||
| RETURN_IF_NOT_OK(ba->Init()); | |||
| return Status::OK(); | |||
| } | |||
| #endif | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -22,6 +22,9 @@ | |||
| #include "minddata/dataset/util/allocator.h" | |||
| #include "minddata/dataset/util/memory_pool.h" | |||
| #include "minddata/dataset/util/treap.h" | |||
| #ifdef ENABLE_GPUQUE | |||
| #include <cuda_runtime_api.h> | |||
| #endif | |||
| #define ARENA_LOG_BLK_SZ (6u) | |||
| #define ARENA_BLK_SZ (static_cast<uint16_t>(1u << ARENA_LOG_BLK_SZ)) | |||
| @@ -105,10 +108,18 @@ class Arena : public MemoryPool { | |||
| Arena(const Arena &) = delete; | |||
| Arena &operator=(const Arena &) = delete; | |||
| ~Arena() override { | |||
| #ifdef ENABLE_GPUQUE | |||
| if (is_cuda_malloc_) { | |||
| if (ptr_) { | |||
| (void)cudaFreeHost(ptr_); | |||
| } | |||
| } | |||
| #else | |||
| if (ptr_ != nullptr) { | |||
| free(ptr_); | |||
| } | |||
| ptr_ = nullptr; | |||
| #endif | |||
| } | |||
| /// As a derived class of MemoryPool, we have to implement the following. | |||
| @@ -140,16 +151,27 @@ class Arena : public MemoryPool { | |||
| return os; | |||
| } | |||
| #ifdef ENABLE_GPUQUE | |||
| /// The only method to create an arena. | |||
| static Status CreateArena(std::shared_ptr<Arena> *p_ba, size_t val_in_MB = 4096, bool is_cuda_malloc = false); | |||
| #else | |||
| /// The only method to create an arena. | |||
| static Status CreateArena(std::shared_ptr<Arena> *p_ba, size_t val_in_MB = 4096); | |||
| #endif | |||
| protected: | |||
| mutable std::mutex mux_; | |||
| std::unique_ptr<ArenaImpl> impl_; | |||
| void *ptr_; | |||
| size_t size_in_MB_; | |||
| #ifdef ENABLE_GPUQUE | |||
| bool is_cuda_malloc_; | |||
| explicit Arena(size_t val_in_MB = 4096, bool is_cuda_malloc = false); | |||
| #else | |||
| explicit Arena(size_t val_in_MB = 4096); | |||
| #endif | |||
| Status Init(); | |||
| }; | |||
| @@ -27,7 +27,11 @@ namespace dataset { | |||
| Status CircularPool::AddOneArena() { | |||
| Status rc; | |||
| std::shared_ptr<Arena> b; | |||
| #ifdef ENABLE_GPUQUE | |||
| RETURN_IF_NOT_OK(Arena::CreateArena(&b, arena_size_, is_cuda_malloc_)); | |||
| #else | |||
| RETURN_IF_NOT_OK(Arena::CreateArena(&b, arena_size_)); | |||
| #endif | |||
| tail_ = b.get(); | |||
| cur_size_in_mb_ += arena_size_; | |||
| mem_segments_.push_back(std::move(b)); | |||
| @@ -194,12 +198,43 @@ int CircularPool::PercentFree() const { | |||
| } | |||
| } | |||
| #ifdef ENABLE_GPUQUE | |||
| CircularPool::CircularPool(int max_size_in_gb, int arena_size, bool is_cuda_malloc) | |||
| : unlimited_(max_size_in_gb <= 0), | |||
| max_size_in_mb_(unlimited_ ? std::numeric_limits<int32_t>::max() : max_size_in_gb * 1024), | |||
| arena_size_(arena_size), | |||
| is_cuda_malloc_(is_cuda_malloc), | |||
| cur_size_in_mb_(0) {} | |||
| #else | |||
| CircularPool::CircularPool(int max_size_in_gb, int arena_size) | |||
| : unlimited_(max_size_in_gb <= 0), | |||
| max_size_in_mb_(unlimited_ ? std::numeric_limits<int32_t>::max() : max_size_in_gb * 1024), | |||
| arena_size_(arena_size), | |||
| cur_size_in_mb_(0) {} | |||
| #endif | |||
| #ifdef ENABLE_GPUQUE | |||
| Status CircularPool::CreateCircularPool(std::shared_ptr<MemoryPool> *out_pool, int max_size_in_gb, int arena_size, | |||
| bool createOneArena, bool is_cuda_malloc) { | |||
| Status rc; | |||
| if (out_pool == nullptr) { | |||
| RETURN_STATUS_UNEXPECTED("pPool is null"); | |||
| } | |||
| auto pool = new (std::nothrow) CircularPool(max_size_in_gb, arena_size, is_cuda_malloc); | |||
| if (pool == nullptr) { | |||
| return Status(StatusCode::kOutOfMemory); | |||
| } | |||
| if (createOneArena) { | |||
| rc = pool->AddOneArena(); | |||
| } | |||
| if (rc.IsOk()) { | |||
| (*out_pool).reset(pool); | |||
| } else { | |||
| delete pool; | |||
| } | |||
| return rc; | |||
| } | |||
| #else | |||
| Status CircularPool::CreateCircularPool(std::shared_ptr<MemoryPool> *out_pool, int max_size_in_gb, int arena_size, | |||
| bool createOneArena) { | |||
| Status rc; | |||
| @@ -220,6 +255,7 @@ Status CircularPool::CreateCircularPool(std::shared_ptr<MemoryPool> *out_pool, i | |||
| } | |||
| return rc; | |||
| } | |||
| #endif | |||
| CircularPool::~CircularPool() = default; | |||
| } // namespace dataset | |||
| @@ -85,8 +85,13 @@ class CircularPool : public MemoryPool { | |||
| return os; | |||
| } | |||
| #ifdef ENABLE_GPUQUE | |||
| static Status CreateCircularPool(std::shared_ptr<MemoryPool> *out_pool, int max_size_in_gb = -1, | |||
| int arena_size = 4096, bool create_one_arena = false, bool is_cuda_malloc = false); | |||
| #else | |||
| static Status CreateCircularPool(std::shared_ptr<MemoryPool> *out_pool, int max_size_in_gb = -1, | |||
| int arena_size = 4096, bool create_one_arena = false); | |||
| #endif | |||
| private: | |||
| ListOfArenas mem_segments_; | |||
| @@ -96,9 +101,16 @@ class CircularPool : public MemoryPool { | |||
| int arena_size_; | |||
| int cur_size_in_mb_; | |||
| RWLock rw_lock_; | |||
| #ifdef ENABLE_GPU | |||
| bool is_cuda_malloc_; | |||
| // We can take negative or 0 as input which means unlimited. | |||
| CircularPool(int max_size_in_gb, int arena_size, bool is_cuda_malloc); | |||
| #else | |||
| // We can take negative or 0 as input which means unlimited. | |||
| CircularPool(int max_size_in_gb, int arena_size); | |||
| #endif | |||
| Status AddOneArena(); | |||
| }; | |||