Browse Source

!21428 mindrt actor manager support inner threadpool

Merge pull request !21428 from yangjie159/mindrt_thread
tags/v1.5.0-rc1
i-robot Gitee 4 years ago
parent
commit
f6b5eabb18
4 changed files with 48 additions and 27 deletions
  1. +5
    -14
      mindspore/ccsrc/runtime/framework/graph_scheduler.cc
  2. +0
    -2
      mindspore/ccsrc/runtime/framework/graph_scheduler.h
  3. +29
    -1
      mindspore/core/mindrt/src/actor/actormgr.cc
  4. +14
    -10
      mindspore/core/mindrt/src/actor/actormgr.h

+ 5
- 14
mindspore/ccsrc/runtime/framework/graph_scheduler.cc View File

@@ -417,10 +417,6 @@ void GraphScheduler::Clear() {
graph_output_to_actor_.clear();
front_node_to_actor_.clear();
copy_actors_.clear();

// Delete the thread pool.
delete thread_pool_;
thread_pool_ = nullptr;
}

void GraphScheduler::Initialize() {
@@ -434,16 +430,15 @@ void GraphScheduler::Initialize() {
}
init_ = true;

auto actorMgr = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actorMgr);
actorMgr->Initialize();

// Create the thread pool of actor runtime and Set the OMP_NUM_THREADS env.
size_t actor_thread_num = 0;
size_t OMP_thread_num = 0;
ComputeThreadNums(&actor_thread_num, &OMP_thread_num);
thread_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num);
MS_EXCEPTION_IF_NULL(thread_pool_);

auto actor_manager = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actor_manager);
actor_manager->Initialize(true, actor_thread_num);

std::string OMP_env = std::to_string(OMP_thread_num);
(void)common::SetEnv("OMP_NUM_THREADS", OMP_env.c_str(), 0);
auto OMP_thread_num_used = common::GetEnv("OMP_NUM_THREADS");
@@ -463,7 +458,6 @@ void GraphScheduler::BuildAndScheduleGlobalActor() {
MS_EXCEPTION_IF_NULL(memory_manager_actor);
memory_manager_aid_ = memory_manager_actor->GetAID();
auto base_actor = static_cast<ActorReference>(memory_manager_actor);
base_actor->set_thread_pool(thread_pool_);
// Bind single thread to response to memory alloc and free quickly.
(void)actorMgr->Spawn(base_actor, false);

@@ -472,7 +466,6 @@ void GraphScheduler::BuildAndScheduleGlobalActor() {
MS_EXCEPTION_IF_NULL(recorder_actor);
recorder_aid_ = &(recorder_actor->GetAID());
auto base_recorder_actor = static_cast<ActorReference>(recorder_actor);
base_recorder_actor->set_thread_pool(thread_pool_);
(void)actorMgr->Spawn(base_recorder_actor, true);

// Create and schedule debug actor.
@@ -487,7 +480,6 @@ void GraphScheduler::BuildAndScheduleGlobalActor() {
MS_EXCEPTION_IF_NULL(debug_actor);
debug_aid_ = &(debug_actor->GetAID());
auto base_debug_actor = static_cast<ActorReference>(debug_actor);
base_debug_actor->set_thread_pool(thread_pool_);
(void)actorMgr->Spawn(base_debug_actor, true);
}
}
@@ -561,7 +553,6 @@ void GraphScheduler::Schedule(const ActorSet *actor_set) {
auto actorMgr = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actorMgr);
for (auto actor : actors) {
actor->set_thread_pool(thread_pool_);
(void)actorMgr->Spawn(actor);
}
}


+ 0
- 2
mindspore/ccsrc/runtime/framework/graph_scheduler.h View File

@@ -332,8 +332,6 @@ class GraphScheduler {
const AID *recorder_aid_{nullptr};
const AID *debug_aid_{nullptr};

ActorThreadPool *thread_pool_{nullptr};

bool init_{false};
};
} // namespace runtime


+ 29
- 1
mindspore/core/mindrt/src/actor/actormgr.cc View File

@@ -46,6 +46,30 @@ ActorMgr::ActorMgr() : actors(), procotols(), urls() {

ActorMgr::~ActorMgr() {}

void ActorMgr::Initialize(bool use_inner_pool, size_t thread_num) {
bool expected = false;
if (!initialized_.compare_exchange_strong(expected, true)) {
MS_LOG(DEBUG) << "Actor Manager has been initialized before";
return;
}
// create inner thread pool only when specified use_inner_pool
if (use_inner_pool) {
inner_pool_ = ActorThreadPool::CreateThreadPool(thread_num);
}
}

void ActorMgr::SetActorReady(const ActorReference &actor) const {
// use inner thread pool or actor thread pool created externally
// priority to use actor thread pool
ActorThreadPool *pool = actor->pool_ ? actor->pool_ : inner_pool_;
if (pool == nullptr) {
MS_LOG(ERROR) << "ThreadPool is nullptr, " << actor->pool_ << ", " << inner_pool_
<< ", actor: " << actor->GetAID().Name();
return;
}
pool->PushActorToQueue(actor.get());
}

const std::string ActorMgr::GetUrl(const std::string &protocol) {
auto it = procotols.find(protocol);
if (it != procotols.end()) {
@@ -109,6 +133,10 @@ void ActorMgr::Finalize() {
MS_LOG(INFO) << "finalize IOMgr=" << mgrIt->first.c_str();
mgrIt->second->Finish();
}

// delete actor thread pool if use_inner_pool
delete inner_pool_;
inner_pool_ = nullptr;
MS_LOG(INFO) << "mindrt IOMGRS finish exiting.";
}

@@ -171,7 +199,7 @@ int ActorMgr::Send(const AID &to, std::unique_ptr<MessageBase> &&msg, bool remot
}
}

AID ActorMgr::Spawn(ActorReference &actor, bool shareThread, bool start) {
AID ActorMgr::Spawn(const ActorReference &actor, bool shareThread, bool start) {
actorsMutex.lock();
if (actors.find(actor->GetAID().Name()) != actors.end()) {
actorsMutex.unlock();


+ 14
- 10
mindspore/core/mindrt/src/actor/actormgr.h View File

@@ -17,6 +17,7 @@
#ifndef MINDSPORE_CORE_MINDRT_SRC_ACTOR_ACTORMGR_H
#define MINDSPORE_CORE_MINDRT_SRC_ACTOR_ACTORMGR_H

#include <atomic>
#include <set>
#include <utility>
#include <map>
@@ -51,28 +52,24 @@ class ActorMgr {
~ActorMgr();

void Finalize();
void Initialize() {}
// initialize actor manager resource, do not create inner thread pool by default
void Initialize(bool use_inner_pool = false, size_t thread_num = 1);

void RemoveActor(const std::string &name);
ActorBase *GetActor(const AID &id);
const std::string GetUrl(const std::string &protocol = "tcp");
void AddUrl(const std::string &protocol, const std::string &url);
void AddIOMgr(const std::string &protocol, const std::shared_ptr<IOMgr> &ioMgr);
int Send(const AID &to, std::unique_ptr<MessageBase> &&msg, bool remoteLink = false, bool isExactNotRemote = false);
AID Spawn(ActorReference &actor, bool shareThread = true, bool start = true);
AID Spawn(const ActorReference &actor, bool shareThread = true, bool start = true);
void Terminate(const AID &id);
void TerminateAll();
void Wait(const AID &pid);
inline const std::string &GetDelegate() const { return delegate; }

inline void SetDelegate(const std::string &d) { delegate = d; }
inline void SetActorReady(std::shared_ptr<ActorBase> &actor) const {
auto pool = actor->pool_;
if (pool == nullptr) {
MS_LOG(ERROR) << "ThreadPool is nullptr, actor: " << actor->GetAID().Name();
return;
}
pool->PushActorToQueue(actor.get());
}

void SetActorReady(const ActorReference &actor) const;
void SetActorStatus(const AID &pid, bool start);

private:
@@ -83,6 +80,13 @@ class ActorMgr {
return false;
}
}
// in order to avoid being initialized many times
std::atomic_bool initialized_{false};

// actor manager support running on inner thread pool,
// or running on other thread pool created independently externally
ActorThreadPool *inner_pool_{nullptr};

// Map of all local spawned and running processes.
std::map<std::string, ActorReference> actors;
#ifndef MS_COMPILE_IOS


Loading…
Cancel
Save