Browse Source

actor runtime message running performance optimizer

tags/v1.3.0
limingqi107 4 years ago
parent
commit
ce68aff167
8 changed files with 49 additions and 12 deletions
  1. +12
    -4
      mindspore/ccsrc/runtime/framework/actor/actor_common.cc
  2. +7
    -0
      mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc
  3. +5
    -1
      mindspore/ccsrc/runtime/framework/actor/kernel_actor.h
  4. +7
    -0
      mindspore/ccsrc/runtime/framework/actor/output_actor.cc
  5. +10
    -2
      mindspore/ccsrc/runtime/framework/actor/output_actor.h
  6. +1
    -1
      mindspore/ccsrc/runtime/framework/graph_scheduler.cc
  7. +3
    -0
      mindspore/core/mindrt/include/actor/actor.h
  8. +4
    -4
      mindspore/core/mindrt/src/actor/actorpolicy.cc

+ 12
- 4
mindspore/ccsrc/runtime/framework/actor/actor_common.cc View File

@@ -17,6 +17,7 @@
#include "runtime/framework/actor/actor_common.h"
#include "backend/session/anf_runtime_algorithm.h"
#include "runtime/framework/device_tensor_store.h"
#include "utils/ms_context.h"

namespace mindspore {
namespace runtime {
@@ -25,17 +26,24 @@ void ComputeThreadNums(size_t *actor_thread_num, size_t *OMP_thread_num) {
MS_EXCEPTION_IF_NULL(OMP_thread_num);
size_t cpu_core_num = std::thread::hardware_concurrency();

const size_t kActorThreadMaxNum = 8;
const size_t kActorThreadMaxNum = 5;
// The MemoryManagerActor binds single thread, and the other actors share one thread at least, so the min num is 2.
const size_t kActorThreadMinNum = 2;
*actor_thread_num = cpu_core_num < kActorThreadMinNum ? kActorThreadMinNum : cpu_core_num;
*actor_thread_num = *actor_thread_num > kActorThreadMaxNum ? kActorThreadMaxNum : *actor_thread_num;
auto context_ptr = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(context_ptr);
// The pyNative mode is the step execution strategy, so only need the kActorThreadMinNum.
if (context_ptr->get_param<bool>(MS_CTX_SAVE_GRAPHS_FLAG) == kPynativeMode) {
*actor_thread_num = kActorThreadMinNum;
} else {
*actor_thread_num = cpu_core_num < kActorThreadMinNum ? kActorThreadMinNum : cpu_core_num;
*actor_thread_num = *actor_thread_num > kActorThreadMaxNum ? kActorThreadMaxNum : *actor_thread_num;
}

const size_t kOMPThreadNumThreshold = 16;
if (cpu_core_num <= kOMPThreadNumThreshold) {
*OMP_thread_num = cpu_core_num;
} else {
*OMP_thread_num = cpu_core_num / (*actor_thread_num);
*OMP_thread_num = cpu_core_num / (*actor_thread_num - 1);
}
}



+ 7
- 0
mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc View File

@@ -25,6 +25,9 @@
namespace mindspore {
namespace runtime {
void KernelActor::Init() {
// Set the number of actor running dependent messages.
running_dependent_msg_num_ = SizeToInt(input_datas_num_ + input_controls_num_);

MS_EXCEPTION_IF_NULL(kernel_);
real_input_num_ = AnfAlgo::GetInputTensorNum(kernel_);
kernel_info_ = static_cast<KernelInfo *>(kernel_->kernel_info());
@@ -132,6 +135,7 @@ void KernelActor::RunOpControlWithInputTensor(AID *input_control, OpContext<Devi
}

void KernelActor::SendMemoryAllocReq(OpContext<DeviceTensor> *context) {
running_dependent_msg_num_ = 1;
Async(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &memory_alloc_list_, device_context_, context,
GetAID());
}
@@ -170,6 +174,7 @@ void KernelActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *context) {
}

void KernelActor::SendDebugReq(OpContext<DeviceTensor> *context) {
running_dependent_msg_num_ = 1;
Async(*debug_aid_, &DebugActor::Debug, kernel_, &launch_info_, device_context_, context, &GetAID());
}

@@ -300,6 +305,8 @@ void KernelActor::PreLaunchKernel(OpContext<DeviceTensor> *) {
}

void KernelActor::PostLaunchKernel(OpContext<DeviceTensor> *context) {
running_dependent_msg_num_ = SizeToInt(input_datas_num_ + input_controls_num_);

// The input is invalid and needs to be erased when finish kernel launch.
EraseInput(context);



+ 5
- 1
mindspore/ccsrc/runtime/framework/actor/kernel_actor.h View File

@@ -55,10 +55,12 @@ class KernelActor : public DebugAwareActor {
recorder_aid_(recorder_aid),
input_datas_num_(0),
input_controls_num_(0),
real_input_num_(0) {}
real_input_num_(0),
running_dependent_msg_num_(1) {}
~KernelActor() override = default;

void Init() override;
bool IsActive(int msg_num) override { return msg_num >= running_dependent_msg_num_ ? true : false; }

// The kernel actor run when receive the input data.
void RunOpData(OpData<DeviceTensor> *input_data, OpContext<DeviceTensor> *context) override;
@@ -121,6 +123,8 @@ class KernelActor : public DebugAwareActor {
size_t input_controls_num_;
// The real input number of kernel launch.
size_t real_input_num_;
// The dependent messages number of actor running.
int running_dependent_msg_num_;

// The dependent input actors.
std::vector<AID> input_data_arrow_aids_;


+ 7
- 0
mindspore/ccsrc/runtime/framework/actor/output_actor.cc View File

@@ -42,6 +42,13 @@ TensorPtr CreateOutputTensor(const AnfNodePtr &output_node, size_t output_index,
}
} // namespace

void OutputActor::Init() {
// Set the number of actor running dependent messages.
if ((!need_loop_count_) && (device_tensor_store_keys_.size() == 1)) {
running_dependent_msg_num_ = SizeToInt(outputs_num_ - device_tensor_store_keys_[kMainBranchID].size());
}
}

void OutputActor::CollectLoopCount(size_t loop_count, OpContext<DeviceTensor> *context) {
MS_EXCEPTION_IF_NULL(context);
if (branch_id_ == kInvalidBranchID) {


+ 10
- 2
mindspore/ccsrc/runtime/framework/actor/output_actor.h View File

@@ -45,7 +45,9 @@ class OutputActor : public OpActor<DeviceTensor> {
current_count_(0),
outputs_num_(outputs_num),
current_outputs_num_(0),
need_loop_count_(need_loop_count) {
need_loop_count_(need_loop_count),
branch_id_(kMainBranchID),
running_dependent_msg_num_(1) {
outputs_.resize(outputs_num);
output_nodes_.resize(outputs_num);
device_contexts_.resize(outputs_num);
@@ -53,6 +55,9 @@ class OutputActor : public OpActor<DeviceTensor> {
}
~OutputActor() override = default;

void Init() override;
bool IsActive(int msg_num) override { return msg_num >= running_dependent_msg_num_ ? true : false; }

// The output actor collects loop count when receive the input control of loop count actor.
void CollectLoopCount(size_t loop_count, OpContext<DeviceTensor> *context);

@@ -79,7 +84,10 @@ class OutputActor : public OpActor<DeviceTensor> {
size_t outputs_num_;
size_t current_outputs_num_;
bool need_loop_count_;
int branch_id_{kMainBranchID};
int branch_id_;

// The dependent messages number of actor running.
int running_dependent_msg_num_;

// Pair<branch_id, <index, node>> points to the dependent device tensor store, branch_id is the output branch id.
// In general, the branch id is 0, which means there is only one output branch in the actor set. When there are


+ 1
- 1
mindspore/ccsrc/runtime/framework/graph_scheduler.cc View File

@@ -372,7 +372,7 @@ void GraphScheduler::Initialize() {
size_t actor_thread_num = 0;
size_t OMP_thread_num = 0;
ComputeThreadNums(&actor_thread_num, &OMP_thread_num);
thread_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num, kThreadWait);
thread_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num, KThreadSpin);
MS_EXCEPTION_IF_NULL(thread_pool_);
std::string OMP_env = std::to_string(OMP_thread_num);
common::SetEnv("OMP_NUM_THREADS", OMP_env.c_str(), 0);


+ 3
- 0
mindspore/core/mindrt/include/actor/actor.h View File

@@ -83,6 +83,9 @@ class ActorBase {

void set_thread_pool(ActorThreadPool *pool) { pool_ = pool; }

// Judge if actor running by the received message number, the default is true.
virtual bool IsActive(int msg_num) { return true; }

protected:
using ActorFunction = std::function<void(const std::unique_ptr<MessageBase> &msg)>;



+ 4
- 4
mindspore/core/mindrt/src/actor/actorpolicy.cc View File

@@ -86,17 +86,17 @@ void ShardedThread::Terminate(const ActorBase *aActor) {
}

int ShardedThread::EnqueMessage(std::unique_ptr<MessageBase> &&msg) {
int result;
mailboxLock.lock();
enqueMailbox->push_back(std::move(msg));
++msgCount;

// true : The actor is running. else the actor will be ready to run.
if (start && ready == false && terminated == false) {
if (start && (ready == false) && (terminated == false) && actor->IsActive(msgCount)) {
ActorMgr::GetActorMgrRef()->SetActorReady(actor);
ready = true;
}
result = ++msgCount;
mailboxLock.unlock();
return result;
return msgCount;
}

void ShardedThread::Notify() {


Loading…
Cancel
Save