From: @probiotics_53 Reviewed-by: @zhang_xue_tong,@zhanghaibo5 Signed-off-by: @zhang_xue_tongpull/16049/MERGE
| @@ -33,6 +33,9 @@ struct MindrtAddress { | |||||
| int Initialize(const std::string &tcpUrl, const std::string &tcpUrlAdv = "", const std::string &udpUrl = "", | int Initialize(const std::string &tcpUrl, const std::string &tcpUrlAdv = "", const std::string &udpUrl = "", | ||||
| const std::string &udpUrlAdv = "", int threadCount = 0); | const std::string &udpUrlAdv = "", int threadCount = 0); | ||||
| // brief terminate the threads for current session | |||||
| void TerminateCurThreads(int threadCount = 0); | |||||
| // brief spawn a process to run an actor | // brief spawn a process to run an actor | ||||
| AID Spawn(ActorReference actor, bool sharedThread = true, bool start = true); | AID Spawn(ActorReference actor, bool sharedThread = true, bool start = true); | ||||
| @@ -100,6 +100,8 @@ void ActorMgr::TerminateAll() { | |||||
| void ActorMgr::Initialize(int threadCount) { threadPool.AddThread(threadCount); } | void ActorMgr::Initialize(int threadCount) { threadPool.AddThread(threadCount); } | ||||
| void ActorMgr::TerminateCurThreads(int threadCount) { threadPool.TerminateThread(threadCount); } | |||||
| void ActorMgr::Finalize() { | void ActorMgr::Finalize() { | ||||
| this->TerminateAll(); | this->TerminateAll(); | ||||
| MS_LOG(INFO) << "mindrt Actors finish exiting."; | MS_LOG(INFO) << "mindrt Actors finish exiting."; | ||||
| @@ -48,6 +48,7 @@ class ActorMgr { | |||||
| void Finalize(); | void Finalize(); | ||||
| void Initialize(int threadCount); | void Initialize(int threadCount); | ||||
| void TerminateCurThreads(int threadCount); | |||||
| void RemoveActor(const std::string &name); | void RemoveActor(const std::string &name); | ||||
| ActorReference GetActor(const AID &id); | ActorReference GetActor(const AID &id); | ||||
| const std::string GetUrl(const std::string &protocol = "tcp"); | const std::string GetUrl(const std::string &protocol = "tcp"); | ||||
| @@ -59,7 +59,8 @@ ActorThread::ActorThread() : readyActors(), workers() { | |||||
| ActorThread::~ActorThread() {} | ActorThread::~ActorThread() {} | ||||
| void ActorThread::AddThread(int threadCount) { | void ActorThread::AddThread(int threadCount) { | ||||
| std::unique_lock<std::mutex> lock(initLock_); | std::unique_lock<std::mutex> lock(initLock_); | ||||
| for (int i = 0; i < threadCount; ++i) { | |||||
| int threadsNeed = threadCount - (workers.size() - threadsInUse_); | |||||
| for (int i = 0; i < threadsNeed; ++i) { | |||||
| if (workers.size() >= maxThreads_) { | if (workers.size() >= maxThreads_) { | ||||
| MS_LOG(DEBUG) << "threads number in mindrt reach upper limit. maxThreads:" << maxThreads_; | MS_LOG(DEBUG) << "threads number in mindrt reach upper limit. maxThreads:" << maxThreads_; | ||||
| break; | break; | ||||
| @@ -68,8 +69,15 @@ void ActorThread::AddThread(int threadCount) { | |||||
| MINDRT_OOM_EXIT(worker) | MINDRT_OOM_EXIT(worker) | ||||
| workers.push_back(std::move(worker)); | workers.push_back(std::move(worker)); | ||||
| threadsInUse_ += 1; | |||||
| } | } | ||||
| } | } | ||||
| void ActorThread::TerminateThread(int threadCount) { | |||||
| // temp scheme, not actually terminate the threads when current session destructs | |||||
| threadsInUse_ -= threadCount; | |||||
| } | |||||
| void ActorThread::Finalize() { | void ActorThread::Finalize() { | ||||
| MS_LOG(INFO) << "Actor's threads are exiting."; | MS_LOG(INFO) << "Actor's threads are exiting."; | ||||
| // terminate all thread; enqueue nullptr actor to terminate; | // terminate all thread; enqueue nullptr actor to terminate; | ||||
| @@ -33,6 +33,7 @@ class ActorThread { | |||||
| ~ActorThread(); | ~ActorThread(); | ||||
| void Finalize(); | void Finalize(); | ||||
| void AddThread(int threadCount); | void AddThread(int threadCount); | ||||
| void TerminateThread(int threadCount); | |||||
| void EnqueReadyActor(const std::shared_ptr<ActorBase> &actor); | void EnqueReadyActor(const std::shared_ptr<ActorBase> &actor); | ||||
| private: | private: | ||||
| @@ -46,6 +47,7 @@ class ActorThread { | |||||
| std::list<std::unique_ptr<std::thread>> workers; | std::list<std::unique_ptr<std::thread>> workers; | ||||
| std::string threadName; | std::string threadName; | ||||
| size_t threadsInUse_ = 0; | |||||
| size_t maxThreads_; | size_t maxThreads_; | ||||
| std::mutex initLock_; | std::mutex initLock_; | ||||
| }; | }; | ||||
| @@ -59,6 +59,8 @@ const MindrtAddress &GetMindrtAddress() { | |||||
| void SetThreadCount(int threadCount) { ActorMgr::GetActorMgrRef()->Initialize(threadCount); } | void SetThreadCount(int threadCount) { ActorMgr::GetActorMgrRef()->Initialize(threadCount); } | ||||
| void TerminateCurThreads(int threadCount) { ActorMgr::GetActorMgrRef()->TerminateCurThreads(threadCount); } | |||||
| class MindrtExit { | class MindrtExit { | ||||
| public: | public: | ||||
| MindrtExit() { MS_LOG(DEBUG) << "trace: enter MindrtExit()---------"; } | MindrtExit() { MS_LOG(DEBUG) << "trace: enter MindrtExit()---------"; } | ||||
| @@ -71,6 +71,7 @@ void MindrtTerminate(std::vector<std::shared_ptr<LiteOpActor>> actor_list) { | |||||
| for (auto actor : actor_list) { | for (auto actor : actor_list) { | ||||
| mindspore::Terminate(actor->GetAID()); | mindspore::Terminate(actor->GetAID()); | ||||
| } | } | ||||
| mindspore::TerminateCurThreads(1); | |||||
| return; | return; | ||||
| } | } | ||||