From 6cfad360ec5378d4cd8f1fb6c6b0cb11d33e0e3e Mon Sep 17 00:00:00 2001 From: zengxianglong Date: Fri, 7 May 2021 17:22:42 +0800 Subject: [PATCH] fix memory leak caused by unterminated threads --- mindspore/core/mindrt/include/mindrt.hpp | 3 +++ mindspore/core/mindrt/src/actor/actormgr.cc | 2 ++ mindspore/core/mindrt/src/actor/actormgr.h | 1 + mindspore/core/mindrt/src/actor/actorthread.cc | 10 +++++++++- mindspore/core/mindrt/src/actor/actorthread.h | 2 ++ mindspore/core/mindrt/src/mindrt.cc | 2 ++ mindspore/lite/src/lite_mindrt.cc | 1 + 7 files changed, 20 insertions(+), 1 deletion(-) diff --git a/mindspore/core/mindrt/include/mindrt.hpp b/mindspore/core/mindrt/include/mindrt.hpp index 45513075e7..beadd092d8 100644 --- a/mindspore/core/mindrt/include/mindrt.hpp +++ b/mindspore/core/mindrt/include/mindrt.hpp @@ -33,6 +33,9 @@ struct MindrtAddress { int Initialize(const std::string &tcpUrl, const std::string &tcpUrlAdv = "", const std::string &udpUrl = "", const std::string &udpUrlAdv = "", int threadCount = 0); +// brief terminate the threads for current session +void TerminateCurThreads(int threadCount = 0); + // brief spawn a process to run an actor AID Spawn(ActorReference actor, bool sharedThread = true, bool start = true); diff --git a/mindspore/core/mindrt/src/actor/actormgr.cc b/mindspore/core/mindrt/src/actor/actormgr.cc index 3875f83e10..576a232c31 100644 --- a/mindspore/core/mindrt/src/actor/actormgr.cc +++ b/mindspore/core/mindrt/src/actor/actormgr.cc @@ -100,6 +100,8 @@ void ActorMgr::TerminateAll() { void ActorMgr::Initialize(int threadCount) { threadPool.AddThread(threadCount); } +void ActorMgr::TerminateCurThreads(int threadCount) { threadPool.TerminateThread(threadCount); } + void ActorMgr::Finalize() { this->TerminateAll(); MS_LOG(INFO) << "mindrt Actors finish exiting."; diff --git a/mindspore/core/mindrt/src/actor/actormgr.h b/mindspore/core/mindrt/src/actor/actormgr.h index 1997a724f0..255ecf851b 100644 --- a/mindspore/core/mindrt/src/actor/actormgr.h +++ b/mindspore/core/mindrt/src/actor/actormgr.h @@ -48,6 +48,7 @@ class ActorMgr { void Finalize(); void Initialize(int threadCount); + void TerminateCurThreads(int threadCount); void RemoveActor(const std::string &name); ActorReference GetActor(const AID &id); const std::string GetUrl(const std::string &protocol = "tcp"); diff --git a/mindspore/core/mindrt/src/actor/actorthread.cc b/mindspore/core/mindrt/src/actor/actorthread.cc index 675e9b9752..c65de7a0ed 100644 --- a/mindspore/core/mindrt/src/actor/actorthread.cc +++ b/mindspore/core/mindrt/src/actor/actorthread.cc @@ -59,7 +59,8 @@ ActorThread::ActorThread() : readyActors(), workers() { ActorThread::~ActorThread() {} void ActorThread::AddThread(int threadCount) { std::unique_lock lock(initLock_); - for (int i = 0; i < threadCount; ++i) { + int threadsNeed = threadCount - (workers.size() - threadsInUse_); + for (int i = 0; i < threadsNeed; ++i) { if (workers.size() >= maxThreads_) { MS_LOG(DEBUG) << "threads number in mindrt reach upper limit. maxThreads:" << maxThreads_; break; @@ -68,8 +69,15 @@ void ActorThread::AddThread(int threadCount) { MINDRT_OOM_EXIT(worker) workers.push_back(std::move(worker)); + threadsInUse_ += 1; } } + +void ActorThread::TerminateThread(int threadCount) { + // temp scheme, not actually terminate the threads when current session destructs + threadsInUse_ -= threadCount; +} + void ActorThread::Finalize() { MS_LOG(INFO) << "Actor's threads are exiting."; // terminate all thread; enqueue nullptr actor to terminate; diff --git a/mindspore/core/mindrt/src/actor/actorthread.h b/mindspore/core/mindrt/src/actor/actorthread.h index 7ceadd30db..9c3cd4d406 100644 --- a/mindspore/core/mindrt/src/actor/actorthread.h +++ b/mindspore/core/mindrt/src/actor/actorthread.h @@ -33,6 +33,7 @@ class ActorThread { ~ActorThread(); void Finalize(); void AddThread(int threadCount); + void TerminateThread(int threadCount); void EnqueReadyActor(const std::shared_ptr &actor); private: @@ -46,6 +47,7 @@ class ActorThread { std::list> workers; std::string threadName; + size_t threadsInUse_ = 0; size_t maxThreads_; std::mutex initLock_; }; diff --git a/mindspore/core/mindrt/src/mindrt.cc b/mindspore/core/mindrt/src/mindrt.cc index d9fa2028f8..1077bdcae7 100644 --- a/mindspore/core/mindrt/src/mindrt.cc +++ b/mindspore/core/mindrt/src/mindrt.cc @@ -59,6 +59,8 @@ const MindrtAddress &GetMindrtAddress() { void SetThreadCount(int threadCount) { ActorMgr::GetActorMgrRef()->Initialize(threadCount); } +void TerminateCurThreads(int threadCount) { ActorMgr::GetActorMgrRef()->TerminateCurThreads(threadCount); } + class MindrtExit { public: MindrtExit() { MS_LOG(DEBUG) << "trace: enter MindrtExit()---------"; } diff --git a/mindspore/lite/src/lite_mindrt.cc b/mindspore/lite/src/lite_mindrt.cc index 7750df45ae..33cb7e29e8 100644 --- a/mindspore/lite/src/lite_mindrt.cc +++ b/mindspore/lite/src/lite_mindrt.cc @@ -71,6 +71,7 @@ void MindrtTerminate(std::vector> actor_list) { for (auto actor : actor_list) { mindspore::Terminate(actor->GetAID()); } + mindspore::TerminateCurThreads(1); return; }