diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt index f225daa330..90aefa8870 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/CMakeLists.txt @@ -33,6 +33,13 @@ if (ENABLE_CACHE) storage_manager.cc storage_container.cc) + if (ENABLE_ASAN) + target_compile_options(engine-cache-server PRIVATE -fsanitize=address) + target_compile_options(engine-cache-server PRIVATE -fno-omit-frame-pointer) + target_compile_options(engine-cache-server PRIVATE -ggdb) + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_LINKER_FLAGS_DEBUG} -fno-omit-frame-pointer -fsanitize=address") + endif() + add_executable(cache_server cache_main.cc) if (ENABLE_GPU) target_link_libraries(cache_server diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_server.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_server.cc index f20cfbe586..dc4b68cb56 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_server.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_server.cc @@ -37,12 +37,10 @@ void CacheServerGreeterImpl::Shutdown() { // Always shutdown the completion queue after the server. if (cq_) { cq_->Shutdown(); - // We need to drain the queue. All the tag is coming from - // the Services pool which will be shutdown as well. So we - // ignore the tag. void *tag; bool success; while (cq_->Next(&tag, &success)) { + delete reinterpret_cast(tag); } } } @@ -93,30 +91,41 @@ Status CacheServerGreeterImpl::HandleRequest(int32_t worker_id) { // and inject them into the grpc queue. CacheServerRequest *p; // Get a free tag from my free list. - RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(worker_id, &p)); + RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(&p)); RETURN_IF_NOT_OK((*p)(&svc_, cq_.get())); do { auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(1); // Set a timeout for one second. Check for interrupt if we need to do early exit. auto r = cq_->AsyncNext(&tag, &success, deadline); if (r == grpc_impl::CompletionQueue::NextStatus::GOT_EVENT) { + auto rq = static_cast(tag); if (success) { - auto rq = static_cast(tag); - RETURN_IF_NOT_OK((*rq)(&svc_, cq_.get())); + if (rq->st_ == CacheServerRequest::STATE::PROCESS) { + RETURN_IF_NOT_OK((*rq)(&svc_, cq_.get())); + } else if (rq->st_ == CacheServerRequest::STATE::FINISH) { + MS_LOG(DEBUG) << *rq << " Finished."; + if (rq->type_ == BaseRequest::RequestType::kStopService) { + // For cache_admin --stop, ProcessRequest is just acknowledging we receive the request. Now + // we call the real function. + auto &cs = CacheServer::GetInstance(); + cs.GlobalShutdown(); + } + RETURN_IF_NOT_OK(CacheServer::ReturnRequestTag(rq)); + } + } else { + RETURN_IF_NOT_OK(CacheServer::ReturnRequestTag(rq)); } } else if (r == grpc_impl::CompletionQueue::NextStatus::TIMEOUT) { // If we are interrupted, exit. Otherwise wait again. - RETURN_IF_INTERRUPTED(); } else { // Queue is drained. break; } - } while (true); + } while (!this_thread::is_interrupted()); return Status::OK(); } Status CacheServerRequest::operator()(CacheServerGreeter::AsyncService *svc, grpc::ServerCompletionQueue *cq) { - auto myQID = getQid(); if (st_ == STATE::CREATE) { st_ = STATE::PROCESS; svc->RequestCacheServerRequest(&ctx_, &rq_, &responder_, cq, cq, this); @@ -129,7 +138,7 @@ Status CacheServerRequest::operator()(CacheServerGreeter::AsyncService *svc, grp // We can round robin, use the qid or even use the worker id. We will use the free list queue // where the current request comes from. CacheServerRequest *next_rq; - RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(myQID, &next_rq)); + RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(&next_rq)); RETURN_IF_NOT_OK((*next_rq)(svc, cq)); // Now we continue with the current request. // First thing we need to extract the type from the incoming request. @@ -144,25 +153,13 @@ Status CacheServerRequest::operator()(CacheServerGreeter::AsyncService *svc, grp type_ == BaseRequest::RequestType::kStopService || type_ == BaseRequest::RequestType::kAllocateSharedBlock || type_ == BaseRequest::RequestType::kFreeSharedBlock) { cs.ProcessRequest(this); - // For cache_admin --stop, ProcessRequest is just acknowledging we receive the request. Now - // we call the real function. - if (type_ == BaseRequest::RequestType::kStopService) { - cs.GlobalShutdown(); - return Status(StatusCode::kInterrupted); - } else if (rc_.IsInterrupted()) { - return rc_; - } + // WARNING. After we call ProcessRequest, the memory of 'this' is being recycled by ReturnRequestTag + // asynchronously. Further access of 'this' is unpredictable. } else { - // When the number of grpc workers is the same as the server workers, we will use this queue id - // and push to the corresponding queue. - bool random = cs.GetNumWorkers() != cs.GetNumGrpcWorkers(); - worker_id_t worker_id = random ? cs.GetRandomWorker() : myQID; - RETURN_IF_NOT_OK(cs.PushRequest(worker_id, this)); + RETURN_IF_NOT_OK(cs.PushRequest(cs.GetRandomWorker(), this)); } } else if (st_ == STATE::FINISH) { - MS_LOG(DEBUG) << *this << " Finished."; - // Return back to the free list. - RETURN_IF_NOT_OK(CacheServer::ReturnRequestTag(this)); + // We don't have logic here but moved to the caller. } return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_server.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_server.h index 4f8daad1bc..2e79b4fe82 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_server.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_grpc_server.h @@ -38,12 +38,10 @@ class CacheServerRequest : public BaseRequest { public: friend class CacheServer; friend class CacheService; + friend class CacheServerGreeterImpl; enum class STATE : int8_t { CREATE = 1, PROCESS = 2, FINISH = 3 }; - explicit CacheServerRequest(int32_t queue_id) - : BaseRequest::BaseRequest(BaseRequest::RequestType::kRequestUnknown), - qid_(queue_id), - st_(STATE::CREATE), - responder_(&ctx_) {} + CacheServerRequest() + : BaseRequest::BaseRequest(BaseRequest::RequestType::kRequestUnknown), st_(STATE::CREATE), responder_(&ctx_) {} ~CacheServerRequest() override = default; @@ -58,12 +56,7 @@ class CacheServerRequest : public BaseRequest { /// \param out void Print(std::ostream &out) const override; - /// \brief Getter of the queue id - /// \return The queue where the request should go to - int32_t getQid() const { return qid_; } - private: - int32_t qid_; Status rc_; STATE st_; grpc::ServerContext ctx_; diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_hw.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_hw.cc index 0568d6ad81..cb53541962 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_hw.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_hw.cc @@ -115,8 +115,8 @@ Status CacheServerHW::GetNumaNodeInfo() { const char kCpuList[] = "cpulist"; auto r = std::regex("[0-9]*-[0-9]*"); for (Path p : numa_nodes_) { - auto node_dir = p.Basename().data(); - numa_id_t numa_node = strtol(node_dir + strlen(kNodeName), nullptr, 10); + auto node_dir = p.Basename(); + numa_id_t numa_node = strtol(node_dir.data() + strlen(kNodeName), nullptr, 10); Path f = p / kCpuList; std::ifstream fs(f.toString()); CHECK_FAIL_RETURN_UNEXPECTED(!fs.fail(), "Fail to open file: " + f.toString()); diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_numa.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_numa.h index a9352c4826..836e0c1bcb 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_numa.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_numa.h @@ -28,116 +28,6 @@ namespace mindspore { namespace dataset { -/// \brief An allocator but for a particular numa node. -template -class NumaAllocator { - public: - explicit NumaAllocator(numa_id_t node_id, CachePoolPolicy policy) - : policy_(policy), numa_enabled_(false), node_id_(node_id) { -#ifdef NUMA_ENABLED - numa_enabled_ = numa_available() != -1; -#endif - } - ~NumaAllocator() = default; - - template - explicit NumaAllocator(NumaAllocator const &rhs) - : policy_(rhs.policy_), numa_enabled_(rhs.numa_enabled_), node_id_(rhs.node_id_) {} - - template - bool operator==(Allocator const &rhs) const { - return node_id_ == rhs.node_id_; - } - - template - bool operator!=(Allocator const &rhs) const { - return node_id_ != rhs.node_id_; - } - - template - friend class NumaAllocator; - - using value_type = T; - using pointer = T *; - using const_pointer = const T *; - using reference = T &; - using const_reference = const T &; - using size_type = uint64_t; - using difference_type = std::ptrdiff_t; - - template - struct rebind { - using other = Allocator; - }; - - using propagate_on_container_copy_assignment = std::true_type; - using propagate_on_container_move_assignment = std::true_type; - using propagate_on_container_swap = std::true_type; - - /// Allocate memory on this node only. Return nullptr if no memory on this numa node. - /// \note. This version will not throw if we can't allocate memory from this node. - /// User must check if the pointer returned is null or not. - pointer allocate(std::size_t n) noexcept { - auto sz = n * sizeof(T); - void *p = nullptr; -#ifdef NUMA_ENABLED - if (numa_enabled_) { - switch (policy_) { - case kPreferred: - numa_set_preferred(node_id_); - p = numa_alloc(sz); - break; - case kLocal: - p = numa_alloc_local(sz); - break; - case kInterleave: - p = numa_alloc_interleaved(sz); - break; - case kOnNode: - p = numa_alloc_onnode(sz, node_id_); - break; - case kNone: - default: - p = numa_alloc(sz); - break; - } - } else { - p = malloc(sz); - } -#else - p = malloc(sz); -#endif - return reinterpret_cast(p); - } - - /// Free a memory allocated on this node. - void deallocate(pointer p, std::size_t n) noexcept { -#ifdef NUMA_ENABLED - if (numa_enabled_) { - numa_free(p, n * sizeof(T)); - } else { - free(p); - } -#else - free(p); -#endif - } - - /// \brief Allow one to change to another numa node - void SetNodeId(numa_id_t node_id) { node_id_ = node_id; } - - /// \brif Getter for node_id; - numa_id_t GetNodeId() const { return node_id_; } - - /// \brief Getter for policy - CachePoolPolicy GetPolicy() const { return policy_; } - - private: - CachePoolPolicy policy_; - bool numa_enabled_; - numa_id_t node_id_; -}; - /// \brief A NumaMemoryPool is like a CircularPool but all the arenas have already been allocated /// and each one comes from a numa socket. Memory is allocated using OnNode policy. That is, /// it is solely comes from one particular numa node, and is not interleaved. diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_request.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_request.h index fb922d012a..7edbe415f8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_request.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_request.h @@ -91,6 +91,7 @@ class BaseRequest { friend class CacheClientRequestTag; friend class CacheClient; friend class CacheService; + friend class CacheServerGreeterImpl; /// \brief Base class of a cache server request /// \param type Type of the request diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc index c5b0f06a9b..652d4033e4 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.cc @@ -76,34 +76,7 @@ Status CacheServer::DoServiceStart() { // But technically they don't have to be the same. num_grpc_workers_ = num_workers_; MS_LOG(DEBUG) << "Number of gprc workers is set to " << num_grpc_workers_; - // For the grpc completion queue to work, we need to allocate some - // tags which in our case are instances of CacheServerQuest. - // They got recycled and we will allocate them in advance and push - // them into some free list. We need more (two or three times) the - // size of the cache_q. While each worker is working on a CacheSerRequest, - // we need some extra running injecting in the the qrpc completion queue. - const int32_t kMultiplier = 2; - int ratio = num_workers_ / num_grpc_workers_; - if (num_workers_ % num_grpc_workers_) ++ratio; - const int32_t free_list_capacity = kMultiplier * (kQueCapacity + 1) * ratio; - free_list_ = std::make_shared>(); - free_list_->Init(num_grpc_workers_, free_list_capacity); - tag_.reserve(num_grpc_workers_); - // Now we populate all free list. Round robin the free list among the numa nodes. - for (auto m = 0; m < num_grpc_workers_; ++m) { - NumaAllocator alloc(m % num_numa_nodes, CachePoolPolicy::kPreferred); - // Ideally we allocate all the free list in one malloc. But we will allocate one segment - // at a time so that we can change the numa policy easily per grpc worker. - auto my_tag = std::make_unique>>(alloc); - // Allocate the tag and assign it the current queue - RETURN_IF_NOT_OK(my_tag->allocate(free_list_capacity, m)); - for (int i = 0; i < free_list_capacity; ++i) { - RETURN_IF_NOT_OK(free_list_->operator[](m)->Add((*my_tag)[i])); - } - tag_.push_back(std::move(my_tag)); - } RETURN_IF_NOT_OK(cache_q_->Register(&vg_)); - RETURN_IF_NOT_OK(free_list_->Register(&vg_)); // Start the comm layer try { comm_layer_ = std::make_shared(port_); @@ -396,14 +369,10 @@ Status CacheServer::FastCacheRow(CacheRequest *rq, CacheReply *reply) { Status CacheServer::BatchFetch(const std::shared_ptr &fbb, WritableSlice *out) { RETURN_UNEXPECTED_IF_NULL(out); - int32_t numQ = GetNumGrpcWorkers(); - auto rng = GetRandomDevice(); - std::uniform_int_distribution distribution(0, numQ - 1); - int32_t qID = distribution(rng); auto p = flatbuffers::GetRoot(fbb->GetBufferPointer()); const auto num_elements = p->rows()->size(); auto connection_id = p->connection_id(); - auto batch_wait = std::make_unique(num_elements); + auto batch_wait = std::make_shared(num_elements); int64_t data_offset = (num_elements + 1) * sizeof(int64_t); auto *offset_array = reinterpret_cast(out->GetMutablePointer()); offset_array[0] = data_offset; @@ -423,7 +392,7 @@ Status CacheServer::BatchFetch(const std::shared_ptrtype_ = BaseRequest::RequestType::kInternalFetchRow; cache_rq->st_ = CacheServerRequest::STATE::PROCESS; @@ -719,10 +688,6 @@ Status CacheServer::ConnectReset(CacheRequest *rq) { Status CacheServer::BatchCacheRows(CacheRequest *rq) { CHECK_FAIL_RETURN_UNEXPECTED(rq->buf_data().size() == 3, "Expect three pieces of data"); - int32_t numQ = GetNumGrpcWorkers(); - auto rng = GetRandomDevice(); - std::uniform_int_distribution distribution(0, numQ - 1); - int32_t qID = distribution(rng); try { auto &cookie = rq->buf_data(0); auto connection_id = rq->connection_id(); @@ -733,7 +698,7 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) { offset_addr = strtoll(rq->buf_data(1).data(), nullptr, 10); auto p = reinterpret_cast(reinterpret_cast(base) + offset_addr); num_elem = strtol(rq->buf_data(2).data(), nullptr, 10); - auto batch_wait = std::make_unique(num_elem); + auto batch_wait = std::make_shared(num_elem); // Get a set of free request and push into the queues. for (auto i = 0; i < num_elem; ++i) { auto start = reinterpret_cast(p); @@ -743,7 +708,7 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) { p += msg->data_sz()->Get(k); } CacheServerRequest *cache_rq; - RETURN_IF_NOT_OK(GetFreeRequestTag(qID++ % numQ, &cache_rq)); + RETURN_IF_NOT_OK(GetFreeRequestTag(&cache_rq)); // Fill in details. cache_rq->type_ = BaseRequest::RequestType::kInternalCacheRow; cache_rq->st_ = CacheServerRequest::STATE::PROCESS; @@ -787,7 +752,11 @@ Status CacheServer::ProcessRequest(CacheServerRequest *cache_req) { try { int64_t addr = strtol(rq.buf_data(3).data(), nullptr, 10); auto *bw = reinterpret_cast(addr); - RETURN_IF_NOT_OK(bw->Set(std::move(cache_req->rc_))); + // Check if the object is still around. + auto bwObj = bw->GetBatchWait(); + if (bwObj.lock()) { + RETURN_IF_NOT_OK(bw->Set(std::move(cache_req->rc_))); + } } catch (const std::exception &e) { RETURN_STATUS_UNEXPECTED(e.what()); } @@ -820,7 +789,11 @@ Status CacheServer::ProcessRequest(CacheServerRequest *cache_req) { try { int64_t addr = strtol(rq.buf_data(1).data(), nullptr, 10); auto *bw = reinterpret_cast(addr); - RETURN_IF_NOT_OK(bw->Set(std::move(cache_req->rc_))); + // Check if the object is still around. + auto bwObj = bw->GetBatchWait(); + if (bwObj.lock()) { + RETURN_IF_NOT_OK(bw->Set(std::move(cache_req->rc_))); + } } catch (const std::exception &e) { RETURN_STATUS_UNEXPECTED(e.what()); } @@ -918,7 +891,7 @@ Status CacheServer::ProcessRequest(CacheServerRequest *cache_req) { cache_req->st_ = CacheServerRequest::STATE::FINISH; // We will re-tag the request back to the grpc queue. Once it comes back from the client, // the CacheServerRequest, i.e. the pointer cache_req, will be free - if (!internal_request) { + if (!internal_request && !global_shutdown_) { cache_req->responder_.Finish(reply, grpc::Status::OK, cache_req); } else { // We can free up the request now. @@ -994,28 +967,26 @@ Status CacheServer::Run(int msg_qid) { // note that after we have sent the initial status using the msg_qid, parent process will exit and // remove it. So we can't use it again. RETURN_IF_NOT_OK(vg_.join_all(Task::WaitFlag::kBlocking)); + // Shutdown the grpc queue. No longer accept any new comer. + comm_layer_->Shutdown(); + // The next thing to do drop all the caches. + RETURN_IF_NOT_OK(ServiceStop()); return Status::OK(); } -Status CacheServer::GetFreeRequestTag(int32_t queue_id, CacheServerRequest **q) { +Status CacheServer::GetFreeRequestTag(CacheServerRequest **q) { RETURN_UNEXPECTED_IF_NULL(q); - CacheServer &cs = CacheServer::GetInstance(); - CacheServerRequest *p; - RETURN_IF_NOT_OK(cs.free_list_->operator[](queue_id)->PopFront(&p)); + auto *p = new (std::nothrow) CacheServerRequest(); + if (p == nullptr) { + return Status(StatusCode::kOutOfMemory, __LINE__, __FILE__); + } *q = p; return Status::OK(); } Status CacheServer::ReturnRequestTag(CacheServerRequest *p) { RETURN_UNEXPECTED_IF_NULL(p); - int32_t myQID = p->getQid(); - // Free any memory from the protobufs - p->~CacheServerRequest(); - // Re-initialize the memory - new (p) CacheServerRequest(myQID); - // Now we return it back to free list. - CacheServer &cs = CacheServer::GetInstance(); - RETURN_IF_NOT_OK(cs.free_list_->operator[](myQID)->Add(p)); + delete p; return Status::OK(); } @@ -1134,22 +1105,10 @@ void CacheServer::GlobalShutdown() { bool expected = false; if (global_shutdown_.compare_exchange_strong(expected, true)) { MS_LOG(WARNING) << "Shutting down server."; - // Shutdown the grpc queue. No longer accept any new comer. - // The threads we spawn to work on the grpc queue will exit themselves once - // they notice the queue has been shutdown. - comm_layer_->Shutdown(); - // Now we interrupt any threads that are waiting on cache_q_ + // Interrupt all the threads and queues. We will leave the shutdown + // of the comm layer after we have joined all the threads and will + // be done by the master thread. vg_.interrupt_all(); - // The next thing to do drop all the caches. - UniqueLock lck(&rwLock_); - for (auto it = all_caches_.begin(); it != all_caches_.end();) { - auto id = it->first; - MS_LOG(WARNING) << "Dropping cache with connection id " << std::to_string(id); - // Wait for all outstanding work to be finished. - auto &cs = it->second; - UniqueLock cs_lock(&cs->rw_lock_); - it = all_caches_.erase(it); - } } } diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.h index 0c25c51260..c99e29ea54 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_server.h @@ -165,7 +165,7 @@ class CacheServer : public Service { /// \brief Get a free tag /// \param q[in] pointer to a pointer to a CacheServerRequest /// \return Status object - static Status GetFreeRequestTag(int32_t queue_id, CacheServerRequest **q); + static Status GetFreeRequestTag(CacheServerRequest **q); /// \brief Return a tag to the free list /// \param p[in] pointer to already finished CacheServerRequest tag @@ -232,8 +232,6 @@ class CacheServer : public Service { cache_index all_caches_; std::set active_sessions_; std::shared_ptr> cache_q_; - std::shared_ptr> free_list_; - std::vector>>> tag_; std::shared_ptr comm_layer_; TaskGroup vg_; int32_t num_workers_; @@ -359,13 +357,15 @@ class CacheServer : public Service { /// So we will let the server thread return the free tag immediately but the put /// the return code in this following structure. GRPC thread must wait until all /// the rc come back. - class BatchWait { + class BatchWait : public std::enable_shared_from_this { public: explicit BatchWait(int n) : expected_(n), num_back_(0) { expected_ = n; rc_lists_.reserve(expected_); } + std::weak_ptr GetBatchWait() { return weak_from_this(); } + Status Set(Status rc) { CHECK_FAIL_RETURN_UNEXPECTED(expected_ > num_back_, "Programming error"); std::unique_lock lck(mux_);