From a71faeb01a38ea4f70efb02ab5a076275f928301 Mon Sep 17 00:00:00 2001 From: ling Date: Wed, 21 Apr 2021 09:25:32 +0800 Subject: [PATCH] mindrt thread init : singleton -> append --- .../core/mindrt/src/actor/actorthread.cc | 30 +++++++++++++++++-- mindspore/core/mindrt/src/actor/actorthread.h | 3 ++ mindspore/core/mindrt/src/litebus.cc | 11 ++----- mindspore/lite/src/lite_mindrt.cc | 2 +- 4 files changed, 34 insertions(+), 12 deletions(-) diff --git a/mindspore/core/mindrt/src/actor/actorthread.cc b/mindspore/core/mindrt/src/actor/actorthread.cc index fcb1479b4e..5ba34a7929 100644 --- a/mindspore/core/mindrt/src/actor/actorthread.cc +++ b/mindspore/core/mindrt/src/actor/actorthread.cc @@ -15,32 +15,58 @@ */ #include "actor/actorthread.h" +#ifdef __WIN32__ +#include +#else +#include +#endif #include #include #include namespace mindspore { constexpr int MAXTHREADNAMELEN = 12; + +size_t GetMaxThreadCount() { + size_t max_num; +#ifdef __WIN32__ + SYSTEM_INFO sys_info; + GetSystemInfo(&sys_info); + max_num = sys_info.dwNumberOfProcessors; +#else + max_num = sysconf(_SC_NPROCESSORS_ONLN); +#endif + return max_num; +} + ActorThread::ActorThread() : readyActors(), workers() { readyActors.clear(); workers.clear(); - char *envThreadName = getenv("LITEBUS_THREAD_NAME"); + char *envThreadName = getenv("MINDRT_THREAD_NAME"); if (envThreadName != nullptr) { threadName = envThreadName; if (threadName.size() > MAXTHREADNAMELEN) { threadName.resize(MAXTHREADNAMELEN); } } else { - threadName = "HARES_LB_ACT"; + threadName = "HARES_MINDRT_ACT"; } + + maxThreads_ = GetMaxThreadCount(); } ActorThread::~ActorThread() {} void ActorThread::AddThread(int threadCount) { + std::unique_lock lock(initLock_); for (int i = 0; i < threadCount; ++i) { + if (workers.size() >= maxThreads_) { + MS_LOG(DEBUG) << "threads number in mindrt reach upper limit. maxThreads:" << maxThreads_; + break; + } std::unique_ptr worker(new (std::nothrow) std::thread(&ActorThread::Run, this)); BUS_OOM_EXIT(worker); + workers.push_back(std::move(worker)); } } diff --git a/mindspore/core/mindrt/src/actor/actorthread.h b/mindspore/core/mindrt/src/actor/actorthread.h index 556a109c49..7ceadd30db 100644 --- a/mindspore/core/mindrt/src/actor/actorthread.h +++ b/mindspore/core/mindrt/src/actor/actorthread.h @@ -45,6 +45,9 @@ class ActorThread { std::list> workers; std::string threadName; + + size_t maxThreads_; + std::mutex initLock_; }; }; // end of namespace mindspore diff --git a/mindspore/core/mindrt/src/litebus.cc b/mindspore/core/mindrt/src/litebus.cc index a7761bbe33..dc7e7fbbd4 100644 --- a/mindspore/core/mindrt/src/litebus.cc +++ b/mindspore/core/mindrt/src/litebus.cc @@ -83,15 +83,8 @@ int InitializeImp(const std::string &tcpUrl, const std::string &tcpUrlAdv, const int Initialize(const std::string &tcpUrl, const std::string &tcpUrlAdv, const std::string &udpUrl, const std::string &udpUrlAdv, int threadCount) { - static std::atomic_bool initLitebusStatus(false); - bool inite = false; - if (initLitebusStatus.compare_exchange_strong(inite, true) == false) { - ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus has been initialized"); - return BUS_OK; - } - - int result = BUS_OK; - result = InitializeImp(tcpUrl, tcpUrlAdv, udpUrl, udpUrlAdv, threadCount); + /* support repeat initialize */ + int result = InitializeImp(tcpUrl, tcpUrlAdv, udpUrl, udpUrlAdv, threadCount); static LiteBusExit busExit; return result; diff --git a/mindspore/lite/src/lite_mindrt.cc b/mindspore/lite/src/lite_mindrt.cc index 2f5f057411..16b74c18ca 100644 --- a/mindspore/lite/src/lite_mindrt.cc +++ b/mindspore/lite/src/lite_mindrt.cc @@ -65,7 +65,7 @@ void LiteOpActor::SetOutputData(OpContext *context) { } } -int MindrtInit() { return mindspore::Initialize("tcp://127.0.0.1:8080", "", "", "", 2); } +int MindrtInit() { return mindspore::Initialize("tcp://127.0.0.1:8080", "", "", "", 1); } void MindrtTerminate(std::vector> actor_list) { for (auto actor : actor_list) {