Browse Source

!4511 [MS][LITE]support parallel executor

Merge pull request !4511 from zhaizhiqiang/master
tags/v0.7.0-beta
mindspore-ci-bot Gitee 5 years ago
parent
commit
5270b33374
12 changed files with 287 additions and 22 deletions
  1. +3
    -2
      mindspore/lite/src/executor.h
  2. +3
    -3
      mindspore/lite/src/kernel_registry.cc
  3. +1
    -0
      mindspore/lite/src/kernel_registry.h
  4. +7
    -3
      mindspore/lite/src/lite_session.cc
  5. +3
    -1
      mindspore/lite/src/lite_session.h
  6. +111
    -0
      mindspore/lite/src/runtime/parallel_executor.cc
  7. +53
    -0
      mindspore/lite/src/runtime/parallel_executor.h
  8. +4
    -4
      mindspore/lite/src/runtime/runtime_api.cc
  9. +3
    -6
      mindspore/lite/src/runtime/thread_pool.cc
  10. +2
    -1
      mindspore/lite/src/runtime/thread_pool.h
  11. +1
    -0
      mindspore/lite/test/CMakeLists.txt
  12. +96
    -2
      mindspore/lite/test/ut/src/infer_test.cc

+ 3
- 2
mindspore/lite/src/executor.h View File

@@ -26,10 +26,11 @@ namespace mindspore::lite {
class Executor {
public:
Executor() = default;
virtual ~Executor() = default;

int Prepare(std::vector<kernel::LiteKernel *> &kernels) { return 0; }
virtual int Prepare(std::vector<kernel::LiteKernel *> &kernels) { return 0; }

int Run(std::vector<tensor::Tensor *> &in_tensors, std::vector<tensor::Tensor *> &out_tensors,
virtual int Run(std::vector<tensor::Tensor *> &in_tensors, std::vector<tensor::Tensor *> &out_tensors,
std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator = nullptr,
const session::KernelCallBack &before = nullptr, const session::KernelCallBack &after = nullptr);



+ 3
- 3
mindspore/lite/src/kernel_registry.cc View File

@@ -38,12 +38,12 @@ KernelRegistry::KernelRegistry() {
data_type_length_ = kNumberTypeEnd - kNumberTypeBegin + 1;
op_type_length_ = PrimitiveType_MAX - PrimitiveType_MIN + 1;
// malloc an array contain creator functions of kernel
auto total_len = device_type_length_ * data_type_length_ * op_type_length_;
creator_arrays_ = (kernel::KernelCreator *)malloc(total_len * sizeof(kernel::KernelCreator));
array_size_ = device_type_length_ * data_type_length_ * op_type_length_;
creator_arrays_ = (kernel::KernelCreator *)malloc(array_size_ * sizeof(kernel::KernelCreator));
if (creator_arrays_ == nullptr) {
MS_LOG(ERROR) << "malloc creator_arrays_ failed.";
} else {
for (int i = 0; i < total_len; ++i) {
for (int i = 0; i < array_size_; ++i) {
creator_arrays_[i] = nullptr;
}
}


+ 1
- 0
mindspore/lite/src/kernel_registry.h View File

@@ -42,6 +42,7 @@ class KernelRegistry {

protected:
kernel::KernelCreator *creator_arrays_ = nullptr;
size_t array_size_;
int device_type_length_;
int data_type_length_;
int op_type_length_;


+ 7
- 3
mindspore/lite/src/lite_session.cc View File

@@ -208,6 +208,7 @@ int LiteSession::CompileGraph(Model *model) {
return ret;
}

executor->Prepare(this->kernels_);
return RET_OK;
}

@@ -219,11 +220,10 @@ int LiteSession::RunGraph(const session::KernelCallBack &before, const session::
MS_EXCEPTION_IF_NULL(this->context_);
SetMaxWokerNum(context_->thread_num_);
context_->running_ = true;
Executor executor;
if (before == nullptr && after == nullptr) {
return executor.Run(this->inputs_, this->outputs_, this->kernels_, this->context_->allocator.get());
return executor->Run(this->inputs_, this->outputs_, this->kernels_, this->context_->allocator.get());
} else {
return executor.Run(this->inputs_, this->outputs_, this->kernels_, this->context_->allocator.get(), before, after);
return executor->Run(this->inputs_, this->outputs_, this->kernels_, this->context_->allocator.get(), before, after);
}
}

@@ -251,6 +251,8 @@ int LiteSession::Init(Context *context) {
opencl_runtime->Init();
}
#endif
executor = new Executor();
MS_EXCEPTION_IF_NULL(executor);
return RET_OK;
}

@@ -289,6 +291,8 @@ LiteSession::~LiteSession() {
delete kernel;
}
delete this->context_;
delete this->executor;
this->executor = nullptr;
}

std::vector<mindspore::tensor::MSTensor *> LiteSession::GetInputsByName(const std::string &name) const {


+ 3
- 1
mindspore/lite/src/lite_session.h View File

@@ -27,6 +27,7 @@
#include "include/context.h"
#include "src/lite_kernel.h"
#include "schema/model_generated.h"
#include "src/executor.h"

namespace mindspore {
namespace lite {
@@ -36,7 +37,7 @@ class LiteSession : public session::LiteSession {

~LiteSession() override;

int Init(Context *context);
virtual int Init(Context *context);

void BindThread(bool if_bind) override;

@@ -82,6 +83,7 @@ class LiteSession : public session::LiteSession {
std::unordered_map<std::string, std::vector<mindspore::tensor::MSTensor *>> input_map_;
// graph output node name -- output tensors
std::unordered_map<std::string, std::vector<mindspore::tensor::MSTensor *>> output_map_;
Executor *executor = nullptr;
};
} // namespace lite
} // namespace mindspore


+ 111
- 0
mindspore/lite/src/runtime/parallel_executor.cc View File

@@ -0,0 +1,111 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "src/runtime/parallel_executor.h"
using mindspore::predict::ThreadPool;
using mindspore::predict::TvmEnv;
#define MAX_THREAD_NUM 8
namespace mindspore::lite {
ParallelExecutor::~ParallelExecutor() {
delete pool;
pool = nullptr;
}
int ParallelExecutor::Prepare(std::vector<mindspore::kernel::LiteKernel *> &kernels) {
pool = new ThreadPool();
pool->ConfigThreadPool(NO_BIND, MAX_THREAD_NUM);
for (mindspore::kernel::LiteKernel *kernel : kernels) {
refCount[kernel] = kernel->out_kernels().size();
}
}

void ParallelExecutor::PrepareReadyKernels(const std::vector<mindspore::kernel::LiteKernel *> &kernels) {
for (auto iter = refCount.begin(); iter != refCount.end();) {
if (iter->second == 0) {
readyKernels.emplace_back(iter->first);
iter = refCount.erase(iter);
} else {
iter++;
}
}
results.resize(readyKernels.size());
}

static int RunKernel(int index, TvmEnv *env, void *data) {
ParallelExecutor *executor = reinterpret_cast<ParallelExecutor *>(data);
auto kernel = executor->GetReadyKernel(index);
auto ret = kernel->Run();
executor->SetResult(index, ret);
if (0 != ret) {
MS_LOG(ERROR) << "run kernel failed, name: " << kernel->name();
return 0;
}

for (auto input_kernel : kernel->in_kernels()) {
MS_ASSERT(input_kernel != nullptr);
if (input_kernel->is_model_output()) {
continue;
}
ret = input_kernel->DecOutTensorRefCount();
if (0 != ret) {
MS_LOG(WARNING) << "DecOutTensorRefCount for kernel" << kernel->name() << " failed";
}
}
return 0;
}

int ParallelExecutor::Run(std::vector<tensor::Tensor *> &in_tensors, std::vector<tensor::Tensor *> &out_tensors,
std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator,
const session::KernelCallBack &before, const session::KernelCallBack &after) {
MS_ASSERT(nullptr != allocator);
for (auto &inTensor : in_tensors) {
if (inTensor == nullptr) {
MS_LOG(ERROR) << "Graph input tensor is nullptr";
return RET_ERROR;
}
if (inTensor->GetFormat() != schema::Format_NHWC) {
MS_LOG(ERROR) << "Model input tensor should be NHWC";
return RET_ERROR;
}
}
kernel::LiteKernelUtil::InitTensorRefCount(kernels);

PrepareReadyKernels(kernels);
while (readyKernels.size() > 0) {
pool->LaunchWork(RunKernel, this, readyKernels.size());

if (std::find_if(results.begin(), results.end(), [](const int &ret) { return (ret != 0); }) != results.end()) {
return RET_ERROR;
}
for (auto completedKernel : readyKernels) {
for (auto out : completedKernel->out_kernels()) {
auto iter = refCount.find(out);
if (iter == refCount.end()) {
continue;
}
(iter->second)--;
if (iter->second <= 0) {
refCount.erase(iter);
}
}
}
readyKernels.clear();
PrepareReadyKernels(kernels);
}

return RET_OK;
}

} // namespace mindspore::lite

+ 53
- 0
mindspore/lite/src/runtime/parallel_executor.h View File

@@ -0,0 +1,53 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_LITE_PARALLEL_EXECUTOR_H_
#define MINDSPORE_LITE_PARALLEL_EXECUTOR_H_

#include <vector>
#include <unordered_map>
#include "src/runtime/allocator.h"
#include "src/lite_kernel.h"
#include "include/lite_session.h"
#include "src/executor.h"
#include "src/runtime/thread_pool.h"

namespace mindspore::lite {
class ParallelExecutor : public Executor {
public:
ParallelExecutor() = default;
virtual ~ParallelExecutor();

int Prepare(std::vector<kernel::LiteKernel *> &kernels) override;

int Run(std::vector<tensor::Tensor *> &in_tensors, std::vector<tensor::Tensor *> &out_tensors,
std::vector<kernel::LiteKernel *> &kernels, Allocator *allocator = nullptr,
const session::KernelCallBack &before = nullptr, const session::KernelCallBack &after = nullptr) override;
inline kernel::LiteKernel *GetReadyKernel(const int index) { return readyKernels.at(index); }
inline void SetResult(const int index, const int result) { results.at(index) = result; }

private:
void PrepareReadyKernels(const std::vector<kernel::LiteKernel *> &kernels);

private:
predict::ThreadPool *pool;
std::unordered_map<kernel::LiteKernel *, size_t> refCount;
std::vector<kernel::LiteKernel *> readyKernels;
std::vector<int> results;
};

} // namespace mindspore::lite
#endif

+ 4
- 4
mindspore/lite/src/runtime/runtime_api.cc View File

@@ -54,7 +54,7 @@ int LiteBackendFreeWorkspace(int deviceType, int deviceId, void *ptr) {
}

void SetMaxWokerNum(int num) {
auto p = mindspore::predict::ThreadPool::GetInstance();
auto p = mindspore::predict::GlobalThreadPool();
if (p == nullptr) {
MS_LOG(ERROR) << "Get thread pool instance failed";
return;
@@ -67,7 +67,7 @@ void SetMaxWokerNum(int num) {
}

void ConfigThreadPool(int mode, int nthreads) {
auto p = mindspore::predict::ThreadPool::GetInstance();
auto p = mindspore::predict::GlobalThreadPool();
if (p == nullptr) {
MS_LOG(ERROR) << "Get thread pool instance failed";
return;
@@ -76,7 +76,7 @@ void ConfigThreadPool(int mode, int nthreads) {
}

int LiteBackendParallelLaunch(FTVMParallelLambda flambda, void *cdata, int num_task) {
auto p = mindspore::predict::ThreadPool::GetInstance();
auto p = mindspore::predict::GlobalThreadPool();
if (p == nullptr) {
MS_LOG(ERROR) << "Get thread pool instance failed";
return -1;
@@ -89,7 +89,7 @@ int LiteBackendParallelLaunch(FTVMParallelLambda flambda, void *cdata, int num_t
}

void DoAllThreadBind(bool ifBind, int mode) {
auto p = mindspore::predict::ThreadPool::GetInstance();
auto p = mindspore::predict::GlobalThreadPool();
if (p == nullptr) {
MS_LOG(ERROR) << "Get thread pool instance failed";
return;


+ 3
- 6
mindspore/lite/src/runtime/thread_pool.cc View File

@@ -32,6 +32,9 @@ constexpr int kBigMidCpuNum = 4;
constexpr int kDefaultThreadNum = 1;
static unsigned int kDefaultMaxThreadNums = 8;
static unsigned int localMaxThreadNums = 1;
static ThreadPool globalThreadPool;

ThreadPool *GlobalThreadPool() { return &globalThreadPool; }

bool LiteQueue::Enqueue(ThreadPoolTask *task) {
const int tailIndex = tail.load(std::memory_order_relaxed);
@@ -425,11 +428,6 @@ void ThreadPool::ConfigThreadPool(int mode, int numThreads) {

void ThreadPool::ConfigMaxThreadNum(unsigned int num) { localMaxThreadNums = num; }

ThreadPool *ThreadPool::GetInstance() {
static ThreadPool instance;
return &instance;
}

ThreadPool::~ThreadPool() {
curThreadRunNums = static_cast<int>(threadList.size() + 1);
exitRun = true;
@@ -446,4 +444,3 @@ ThreadPool::~ThreadPool() {
}
} // namespace predict
} // namespace mindspore


+ 2
- 1
mindspore/lite/src/runtime/thread_pool.h View File

@@ -87,7 +87,6 @@ class ThreadPool {
public:
ThreadPool() = default;
~ThreadPool();
static ThreadPool *GetInstance();
bool LaunchWork(WorkFun worker, void *cdata, int numTask);
void ConfigThreadPool(int mode, int numThreads);
void ConfigMaxThreadNum(unsigned int num);
@@ -119,6 +118,8 @@ class ThreadPool {
std::unique_ptr<LiteThreadBind> threadBind{nullptr};
std::vector<std::pair<int, std::pair<bool, int>>> errorInfo{};
};

ThreadPool* GlobalThreadPool();
} // namespace predict
} // namespace mindspore



+ 1
- 0
mindspore/lite/test/CMakeLists.txt View File

@@ -169,6 +169,7 @@ set(TEST_LITE_SRC
${LITE_DIR}/src/runtime/runtime_api.cc
${LITE_DIR}/src/runtime/thread_pool.cc
${LITE_DIR}/src/runtime/workspace_pool.cc
${LITE_DIR}/src/runtime/parallel_executor.cc
${LITE_DIR}/src/ir/tensor.cc
${LITE_DIR}/src/ir/primitive_t_value.cc
${LITE_DIR}/src/context.cc


+ 96
- 2
mindspore/lite/test/ut/src/infer_test.cc View File

@@ -23,6 +23,8 @@
#include "include/context.h"
#include "include/errorcode.h"
#include "mindspore/core/utils/log_adapter.h"
#include "src/lite_session.h"
#include "src/runtime/parallel_executor.h"

namespace mindspore {
class InferTest : public mindspore::CommonTest {
@@ -147,10 +149,11 @@ TEST_F(InferTest, TestConvNode) {
//===================================================
ASSERT_EQ(output_size, outTensor->Size());
for (size_t i = 0; i < outTensor->ElementsNum(); i++) {
ASSERT_LE((output_data[i]- outData[i]), 0.001);
ASSERT_LE((output_data[i] - outData[i]), 0.001);
}
MS_LOG(INFO) << "Passed";
}

TEST_F(InferTest, TestAddNode) {
auto meta_graph = std::make_shared<schema::MetaGraphT>();
meta_graph->name = "graph";
@@ -203,7 +206,7 @@ TEST_F(InferTest, TestAddNode) {
content = nullptr;
auto context = new lite::Context;
context->cpu_bind_mode_ = lite::NO_BIND;
context->device_ctx_.type = lite::DT_GPU;
context->device_ctx_.type = lite::DT_CPU;
context->thread_num_ = 4;
auto session = session::LiteSession::CreateSession(context);
ASSERT_NE(nullptr, session);
@@ -243,6 +246,97 @@ TEST_F(InferTest, TestAddNode) {
MS_LOG(INFO) << "Passed";
}

class SessionWithParallelExecutor : public lite::LiteSession {
public:
int Init(lite::Context *context) {
lite::LiteSession::Init(context);
delete this->executor;
this->executor = new mindspore::lite::ParallelExecutor();
return 0;
}
};

TEST_F(InferTest, TestParallelExecutor) {
auto meta_graph = std::make_shared<schema::MetaGraphT>();
meta_graph->name = "graph";

auto node = std::make_unique<schema::CNodeT>();
node->inputIndex = {0, 1};
node->outputIndex = {2};
node->primitive = std::make_unique<schema::PrimitiveT>();
node->primitive->value.type = schema::PrimitiveType_Add;
auto primitive = new schema::AddT;
node->primitive->value.value = primitive;
node->name = "Add";
meta_graph->nodes.emplace_back(std::move(node));
meta_graph->inputIndex = {0, 1};
meta_graph->outputIndex = {2};

auto input0 = std::make_unique<schema::TensorT>();
input0->nodeType = schema::NodeType::NodeType_ValueNode;
input0->format = schema::Format_NHWC;
input0->dataType = TypeId::kNumberTypeFloat32;
input0->dims = {1, 28, 28, 3};
input0->offset = -1;
meta_graph->allTensors.emplace_back(std::move(input0));

auto weight = std::make_unique<schema::TensorT>();
weight->nodeType = schema::NodeType::NodeType_ValueNode;
weight->format = schema::Format_NHWC;
weight->dataType = TypeId::kNumberTypeFloat32;
weight->dims = {1, 28, 28, 3};

weight->offset = -1;
meta_graph->allTensors.emplace_back(std::move(weight));

auto output = std::make_unique<schema::TensorT>();
output->nodeType = schema::NodeType::NodeType_Parameter;
output->format = schema::Format_NHWC;
output->dataType = TypeId::kNumberTypeFloat32;
output->offset = -1;
meta_graph->allTensors.emplace_back(std::move(output));

flatbuffers::FlatBufferBuilder builder(1024);
auto offset = schema::MetaGraph::Pack(builder, meta_graph.get());
builder.Finish(offset);
size_t size = builder.GetSize();
const char *content = reinterpret_cast<char *>(builder.GetBufferPointer());

auto model = lite::Model::Import(content, size);
ASSERT_NE(nullptr, model);
meta_graph.reset();
content = nullptr;
auto context = new lite::Context;
context->cpu_bind_mode_ = lite::NO_BIND;
context->device_ctx_.type = lite::DT_CPU;
context->thread_num_ = 4;
auto session = new SessionWithParallelExecutor();
session->Init(context);
ASSERT_NE(nullptr, session);
auto ret = session->CompileGraph(model);
ASSERT_EQ(lite::RET_OK, ret);
auto inputs = session->GetInputs();
ASSERT_EQ(inputs.size(), 2);
auto inTensor = inputs.front();
ASSERT_NE(nullptr, inTensor);
(void)inTensor->MutableData();
auto inTensor1 = inputs.back();
ASSERT_NE(nullptr, inTensor1);
(void)inTensor1->MutableData();
ret = session->RunGraph();
ASSERT_EQ(lite::RET_OK, ret);
auto outputs = session->GetOutputs();
ASSERT_EQ(outputs.size(), 1);
ASSERT_EQ(outputs.begin()->second.size(), 1);
auto outTensor = outputs.begin()->second.front();
ASSERT_NE(nullptr, outTensor);
ASSERT_EQ(28 * 28 * 3, outTensor->ElementsNum());
ASSERT_EQ(TypeId::kNumberTypeFloat32, outTensor->data_type());
auto *outData = reinterpret_cast<float *>(outTensor->MutableData());
ASSERT_NE(nullptr, outData);
MS_LOG(INFO) << "Passed";
}

TEST_F(InferTest, TestModel) {
auto buf = new char *[1];
size_t model_size;


Loading…
Cancel
Save