Browse Source

add the continue memory alloc of communication kernel for actor runtime

pull/15600/head
limingqi107 4 years ago
parent
commit
fba1dd8f2f
14 changed files with 179 additions and 29 deletions
  1. +6
    -0
      mindspore/ccsrc/backend/session/kernel_graph.h
  2. +3
    -2
      mindspore/ccsrc/pipeline/jit/pipeline.cc
  3. +1
    -0
      mindspore/ccsrc/runtime/device/gpu/gpu_memory_manager.cc
  4. +1
    -0
      mindspore/ccsrc/runtime/device/memory_manager.cc
  5. +15
    -6
      mindspore/ccsrc/runtime/framework/actor/data_source_actor.cc
  6. +19
    -4
      mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc
  7. +7
    -2
      mindspore/ccsrc/runtime/framework/actor/loop_count_actor.cc
  8. +6
    -0
      mindspore/ccsrc/runtime/framework/graph_compiler.cc
  9. +84
    -0
      mindspore/ccsrc/runtime/framework/graph_scheduler.cc
  10. +1
    -0
      mindspore/ccsrc/runtime/framework/graph_scheduler.h
  11. +1
    -1
      mindspore/ccsrc/runtime/hardware/device_context.h
  12. +26
    -13
      mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.cc
  13. +7
    -1
      mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.h
  14. +2
    -0
      mindspore/ccsrc/vm/backend.cc

+ 6
- 0
mindspore/ccsrc/backend/session/kernel_graph.h View File

@@ -292,6 +292,9 @@ class KernelGraph : public FuncGraph {
// set flag to indicate whether has multi-call.
void set_subgraph_multi_call(bool flag) { has_subgraph_multicall_ = flag; }

bool is_all_nop_node() const { return is_all_nop_node_; }
void set_is_all_nop_node(bool is_all_nop_node) { is_all_nop_node_ = is_all_nop_node; }

private:
// remove value node form graph
bool RemoveValueNodeFromGraph(const ValueNodePtr &value_node);
@@ -381,6 +384,9 @@ class KernelGraph : public FuncGraph {
// Number of labels. This is also the 'batch_num' for DavinciModel,
// It should be 1 if no labels used for control flow.
uint32_t label_num_ = 1;

// If all the nodes of graph is the nop node.
bool is_all_nop_node_{false};
};
} // namespace session
using KernelGraphPtr = std::shared_ptr<session::KernelGraph>;


+ 3
- 2
mindspore/ccsrc/pipeline/jit/pipeline.cc View File

@@ -987,12 +987,13 @@ bool InitExecDatasetVm(const std::string &queue_name, int64_t size, int64_t batc
// AbstractNone indicates there is no output for this apply node.
auto abstract_none = std::make_shared<abstract::AbstractNone>();
app_init->set_abstract(abstract_none);
// Before the graph compiling, need reset the iter num.
ConfigManager::GetInstance().ResetIterNum();

auto backend = compile::CreateBackend();
MS_EXCEPTION_IF_NULL(backend);
// The data set graph compiling and running of mindRT.
if (compile::IsMindRTUsed()) {
ConfigManager::GetInstance().set_iter_num(size);
const auto &mindrt_backend = std::dynamic_pointer_cast<compile::MindRTBackend>(backend);
MS_EXCEPTION_IF_NULL(mindrt_backend);
auto graph_id = mindrt_backend->CompileGraphs(func_graph);
@@ -1000,13 +1001,13 @@ bool InitExecDatasetVm(const std::string &queue_name, int64_t size, int64_t batc
if (need_run) {
(void)mindrt_backend->RunGraph(graph_id, args);
}
ConfigManager::GetInstance().set_iter_num(size);
return true;
}

auto convert_fn = backend->convert_fn();
MS_EXCEPTION_IF_NULL(convert_fn);
// Convert CNodeList to LinConvertResult.
ConfigManager::GetInstance().set_iter_num(1);
auto segment = std::make_shared<GraphSegment>(std::vector<AnfNodePtr>{app_init}, false);
auto runner = convert_fn(segment, "");
if (MsContext::GetInstance()->get_param<int>(MS_CTX_EXECUTION_MODE) != kPynativeMode) {


+ 1
- 0
mindspore/ccsrc/runtime/device/gpu/gpu_memory_manager.cc View File

@@ -61,6 +61,7 @@ bool GPUMemoryManager::MallocContinuousMemFromMemPool(const DeviceAddressPtrList
FreeMemFromMemPool(old_addr);
}
addr_list[i]->ptr_ = new_addr;
addr_list[i]->size_ = size_list[i];
addr_list[i]->from_mem_pool_ = true;
}
if (need_sync_stream) {


+ 1
- 0
mindspore/ccsrc/runtime/device/memory_manager.cc View File

@@ -214,6 +214,7 @@ bool MemoryManager::MallocContinuousMemFromMemPool(const DeviceAddressPtrList ad
MS_EXCEPTION_IF_NULL(device_ptr_list[i]);
MS_EXCEPTION_IF_NULL(addr_list[i]);
addr_list[i]->ptr_ = device_ptr_list[i];
addr_list[i]->size_ = size_list[i];
addr_list[i]->from_mem_pool_ = true;
}
return true;


+ 15
- 6
mindspore/ccsrc/runtime/framework/actor/data_source_actor.cc View File

@@ -27,9 +27,12 @@ void DataSourceActor::FetchData(OpContext<DeviceTensor> *context) {
MS_LOG(INFO) << "Data source actor(" << GetAID().Name() << ") fetches data.";
MS_EXCEPTION_IF_NULL(context);
if (buffers_.size() == buffer_capacity_) {
// Send output to trigger computing and free memory.
SendOutput(context);
// Note that FreeMemory must be before SendOutput, because SendOutput will trigger AllocateMemory of the next actor
// and the actor is asynchronous execution. So it is necessary to ensure that FreeMemory of the current actor is
// before AllocateMemory of the next actor. One is to reuse the memory more fully, the other is to ensure the
// execution order and avoid the illegal memory timing problem.
FreeMemory(context);
SendOutput(context);
buffers_.pop();
return;
}
@@ -110,9 +113,12 @@ void DeviceQueueDataSourceActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *co
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}

// Send output to trigger computing and free memory.
SendOutput(context);
// Note that FreeMemory must be in front of SendOutput, because SendOutput will trigger AllocateMemory of the next
// actor and the actor is asynchronous execution. So it is necessary to ensure that FreeMemory of the current actor
// is in front of AllocateMemory of the next actor. One is to reuse the memory more fully, the other is to ensure
// the execution order and avoid the illegal memory timing problem.
FreeMemory(context);
SendOutput(context);
buffers_.pop();
}

@@ -156,9 +162,12 @@ void HostQueueDataSourceActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *cont
}
}

// Send output to trigger computing and free memory.
SendOutput(context);
// Note that FreeMemory must be in front of SendOutput, because SendOutput will trigger AllocateMemory of the next
// actor and the actor is asynchronous execution. So it is necessary to ensure that FreeMemory of the current actor
// is in front of AllocateMemory of the next actor. One is to reuse the memory more fully, the other is to ensure
// the execution order and avoid the illegal memory timing problem.
FreeMemory(context);
SendOutput(context);
buffers_.pop();
}
} // namespace runtime


+ 19
- 4
mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc View File

@@ -74,9 +74,16 @@ void KernelActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *context) {
std::string error_info = "Launch kernel failed: " + kernel_->ToString();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}
SendOutput(context);
FreeMemory(context);
// The input is invalid and needs to be erased when finish kernel launch.
EraseInput(context);

// Note that FreeMemory must be in front of SendOutput, because SendOutput will trigger AllocateMemory of the next
// actor and the actor is asynchronous execution. So it is necessary to ensure that FreeMemory of the current actor
// is in front of AllocateMemory of the next actor. One is to reuse the memory more fully, the other is to ensure
// the execution order and avoid the illegal memory timing problem.
FreeMemory(context);
SendOutput(context);
}

bool KernelActor::CheckLaunchCondition(OpContext<DeviceTensor> *context) const {
@@ -188,11 +195,19 @@ void KernelActor::SendOutput(OpContext<DeviceTensor> *context) const {
void KernelActor::EraseInput(OpContext<DeviceTensor> *context) {
MS_EXCEPTION_IF_NULL(context);
if (input_datas_num_ != 0) {
(void)input_op_datas_.erase(context->sequential_num_);
auto ret = input_op_datas_.erase(context->sequential_num_);
if (ret == 0) {
std::string error_info = "Erase input data failed: " + GetAID().Name();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}
}

if (input_controls_num_ != 0) {
(void)input_op_controls_.erase(context->sequential_num_);
auto ret = input_op_controls_.erase(context->sequential_num_);
if (ret == 0) {
std::string error_info = "Erase input controls failed: " + GetAID().Name();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}
}
}



+ 7
- 2
mindspore/ccsrc/runtime/framework/actor/loop_count_actor.cc View File

@@ -27,9 +27,14 @@ void LoopCountActor::RunOpControl(AID *input_control, OpContext<DeviceTensor> *c
auto sequential_num = context->sequential_num_;
input_op_controls_[sequential_num].emplace_back(input_control);
if (input_op_controls_[sequential_num].size() == input_controls_num_) {
auto ret = input_op_controls_.erase(sequential_num);
if (ret == 0) {
std::string error_info = "Erase input controls failed: " + GetAID().Name();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}

current_count_++;
(void)input_op_controls_.erase(sequential_num);
MS_LOG(INFO) << "Loop count actor(" << GetAID().Name() << ") runs op control, loop count: " << loop_count_
MS_LOG(INFO) << "Loop count actor(" << GetAID().Name() << ") running, loop count: " << loop_count_
<< ", current count: " << current_count_;
if (current_count_ == loop_count_) {
current_count_ = 0;


+ 6
- 0
mindspore/ccsrc/runtime/framework/graph_compiler.cc View File

@@ -22,6 +22,7 @@
#include "common/trans.h"
#include "utils/convert_utils.h"
#include "ir/tensor.h"
#include "backend/optimizer/common/helper.h"

namespace mindspore {
namespace runtime {
@@ -226,6 +227,8 @@ GraphId GraphCompiler::CompileGraphImpl(const KernelGraphPtr &graph) const {
// Create device address for all anf nodes of graph.
CreateDeviceAddress(graph);

graph->set_is_all_nop_node(opt::IsAllNopNode(graph.get()));

return graph->graph_id();
}

@@ -257,6 +260,9 @@ GraphId GraphCompiler::CompileGraph(session::OpRunInfo *op_run_info, const Graph

// Create device address for all anf nodes of graph.
CreateDeviceAddress(graph);

graph->set_is_all_nop_node(opt::IsAllNopNode(graph.get()));

// Transform graph to actor DAG, contains build and link.
GraphScheduler::GetInstance().Transform({graph}, {device_context_}, input_tensors, nullptr,
GraphExecutionStrategy::kStep);


+ 84
- 0
mindspore/ccsrc/runtime/framework/graph_scheduler.cc View File

@@ -283,6 +283,82 @@ BaseRef CreateOutputTensors(const AnfNodePtr &output_node, const KernelGraphPtr

return CreateOutputTensor(item_with_index, graph, input_tensors);
}

void AllocateContinuousMemoryForInput(const AnfNodePtr &kernel, const DeviceContext *device_context,
bool is_all_nop_node) {
MS_EXCEPTION_IF_NULL(kernel);
MS_EXCEPTION_IF_NULL(device_context);
bool is_need_alloc_memory = false;
size_t total_size = 0;
std::vector<size_t> size_list;
std::vector<DeviceTensorPtr> addr_list;

const auto &kernel_mod = AnfAlgo::GetKernelMod(kernel);
MS_EXCEPTION_IF_NULL(kernel_mod);
const auto &intput_sizes = kernel_mod->GetInputSizeList();
for (size_t i = 0; i < intput_sizes.size(); ++i) {
DeviceTensorPtr device_tensor;
if (is_all_nop_node) {
// Graph may be all nop nodes and not remove nop node, so this can not skip nop node.
device_tensor = AnfAlgo::GetPrevNodeMutableOutputAddr(kernel, i, false);
} else {
device_tensor = AnfAlgo::GetPrevNodeMutableOutputAddr(kernel, i, true);
}
MS_EXCEPTION_IF_NULL(device_tensor);
// In the scene of communication op and computing op parallel multi stream, the input address of communication op
// can't be reused, so set the max reference count.
device_tensor->set_ref_count(SIZE_MAX);
device_tensor->ResetRefCountUsed();

if (device_tensor->GetPtr() == nullptr) {
is_need_alloc_memory = true;
}
total_size += intput_sizes[i];
size_list.emplace_back(intput_sizes[i]);
addr_list.emplace_back(device_tensor);
}

if (is_need_alloc_memory) {
auto ret = device_context->AllocateContinuousMemory(addr_list, total_size, size_list);
if (!ret) {
MS_LOG(EXCEPTION) << "Malloc device memory failed.";
}
}
}

void AllocateContinuousMemoryForOutput(const AnfNodePtr &kernel, const DeviceContext *device_context) {
MS_EXCEPTION_IF_NULL(kernel);
MS_EXCEPTION_IF_NULL(device_context);
bool is_need_alloc_memory = false;
size_t total_size = 0;
std::vector<size_t> size_list;
std::vector<DeviceTensorPtr> addr_list;

const auto &kernel_mod = AnfAlgo::GetKernelMod(kernel);
MS_EXCEPTION_IF_NULL(kernel_mod);
const auto &output_sizes = kernel_mod->GetOutputSizeList();
for (size_t i = 0; i < output_sizes.size(); ++i) {
const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(kernel, i, false);
MS_EXCEPTION_IF_NULL(device_tensor);
// One time application for continuous memory, so set the max reference count.
device_tensor->set_ref_count(SIZE_MAX);
device_tensor->ResetRefCountUsed();

if (device_tensor->GetPtr() == nullptr) {
is_need_alloc_memory = true;
}
total_size += output_sizes[i];
size_list.emplace_back(output_sizes[i]);
addr_list.emplace_back(device_tensor);
}

if (is_need_alloc_memory) {
auto ret = device_context->AllocateContinuousMemory(addr_list, total_size, size_list);
if (!ret) {
MS_LOG(EXCEPTION) << "Malloc device memory failed.";
}
}
}
} // namespace

void GraphScheduler::Initialize() {
@@ -409,6 +485,14 @@ void GraphScheduler::PrepareRun(const KernelGraphPtr &graph, const std::vector<T
MS_LOG(INFO) << "Create node output: " << output_node->fullname_with_scope();
outputs->emplace_back(CreateOutputTensors(output_node, graph, *input_tensors));
}

// 4.Prepare the continuous memory for communication kernel.
for (const auto &kernel : graph->execution_order()) {
if (AnfAlgo::IsCommunicationOp(kernel)) {
AllocateContinuousMemoryForInput(kernel, device_context, graph->is_all_nop_node());
AllocateContinuousMemoryForOutput(kernel, device_context);
}
}
}

bool GraphScheduler::Run(const ActorSet *actor_set, GraphExecutionStrategy strategy) {


+ 1
- 0
mindspore/ccsrc/runtime/framework/graph_scheduler.h View File

@@ -82,6 +82,7 @@ class GraphScheduler {
// 1. Prepare the data of device tensor store(such as weights and value nodes of graph).
// 2. Prepare the data of host tensor queue(such as non weighted parameters of graph).
// 3. Prepare the output tensor of graph.
// 4.Prepare the continuous memory for communication kernel.
void PrepareRun(const KernelGraphPtr &graph, const std::vector<TensorPtr> *input_tensors, VectorRef *const &outputs);

// The processing entry of actors running.


+ 1
- 1
mindspore/ccsrc/runtime/hardware/device_context.h View File

@@ -58,7 +58,7 @@ class DeviceContext {
// Allocate continuous device memory end to end into 'addr_list'.
// Communication operators may need continuous memory for input and output
// to optimize the communication performance.
virtual bool AllocateContinuousMemory(const std::vector<DeviceAddress *> &addr_list, size_t total_size,
virtual bool AllocateContinuousMemory(const std::vector<DeviceAddressPtr> &addr_list, size_t total_size,
const std::vector<size_t> &size_list) const {
return true;
}


+ 26
- 13
mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.cc View File

@@ -34,6 +34,8 @@
namespace mindspore {
namespace device {
namespace gpu {
static thread_local bool cur_thread_device_inited{false};

bool GPUDeviceContext::Initialize() {
if (initialized_ == true) {
CHECK_OP_RET_WITH_EXCEPT(CudaDriver::SetDevice(UintToInt(device_context_key_.device_id_)),
@@ -130,6 +132,9 @@ void GPUDeviceContext::Destroy() {

bool GPUDeviceContext::AllocateMemory(DeviceAddress *const &address, size_t size) const {
MS_EXCEPTION_IF_NULL(address);
if (!BindDeviceToCurrentThread()) {
return false;
}
auto device_ptr = mem_manager_->MallocMemFromMemPool(size);
if (!device_ptr) {
return false;
@@ -147,22 +152,12 @@ void GPUDeviceContext::FreeMemory(DeviceAddress *const &address) const {
address->ptr_ = nullptr;
}

bool GPUDeviceContext::AllocateContinuousMemory(const std::vector<DeviceAddress *> &addr_list, size_t total_size,
bool GPUDeviceContext::AllocateContinuousMemory(const std::vector<DeviceAddressPtr> &addr_list, size_t total_size,
const std::vector<size_t> &size_list) const {
auto device_ptr_list = mem_manager_->MallocContinuousMemFromMemPool(total_size, size_list);
if (device_ptr_list.size() == 0) {
if (!BindDeviceToCurrentThread()) {
return false;
}
if (addr_list.size() != device_ptr_list.size()) {
MS_LOG(EXCEPTION) << "The size of device list is not equal to the size of address list.";
}
for (size_t i = 0; i < addr_list.size(); i++) {
MS_EXCEPTION_IF_NULL(device_ptr_list[i]);
MS_EXCEPTION_IF_NULL(addr_list[i]);
addr_list[i]->ptr_ = device_ptr_list[i];
addr_list[i]->from_mem_pool_ = true;
}
return true;
return mem_manager_->MallocContinuousMemFromMemPool(addr_list, total_size, size_list);
}

DeviceAddressPtr GPUDeviceContext::CreateDeviceAddress(void *device_ptr, size_t device_size, const string &format,
@@ -265,6 +260,10 @@ bool GPUDeviceContext::LaunchKernel(KernelMod *kernel_mod, const std::vector<Add
const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs) const {
MS_EXCEPTION_IF_NULL(kernel_mod);
if (!BindDeviceToCurrentThread()) {
return false;
}
std::lock_guard<std::mutex> locker(launch_mutex_);
return kernel_mod->Launch(inputs, workspace, outputs, streams_.front());
}

@@ -275,6 +274,20 @@ bool GPUDeviceContext::SyncStream(size_t stream_id) const {
return GPUDeviceManager::GetInstance().SyncStream(streams_[stream_id]);
}

bool GPUDeviceContext::BindDeviceToCurrentThread() const {
if (cur_thread_device_inited) {
return true;
}

if (!CudaDriver::SetDevice(UintToInt(device_context_key_.device_id_))) {
MS_LOG(ERROR) << "Failed to set device id: " << device_context_key_.device_id_;
return false;
}

cur_thread_device_inited = true;
return true;
}

MS_REGISTER_DEVICE(kGPUDevice, GPUDeviceContext);
} // namespace gpu
} // namespace device


+ 7
- 1
mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.h View File

@@ -41,7 +41,7 @@ class GPUDeviceContext : public DeviceContext {

bool AllocateMemory(DeviceAddress *const &address, size_t size) const override;
void FreeMemory(DeviceAddress *const &address) const override;
bool AllocateContinuousMemory(const std::vector<DeviceAddress *> &addr_list, size_t total_size,
bool AllocateContinuousMemory(const std::vector<DeviceAddressPtr> &addr_list, size_t total_size,
const std::vector<size_t> &size_list) const override;

DeviceAddressPtr CreateDeviceAddress(void *device_ptr, size_t device_size, const string &format,
@@ -73,6 +73,12 @@ class GPUDeviceContext : public DeviceContext {
// Update Graph Dynamic Shape Attr.
void UpdateGraphDynamicShapeAttr(const NotNull<KernelGraphPtr> &graph) const;

bool BindDeviceToCurrentThread() const;

// The cublas handle is not thread safety specifically, it is not recommended that multiple threads access the same
// cublas handle at the same time, so need the launch mutex when multiple threads launch the cublas kernels.
mutable std::mutex launch_mutex_;

std::shared_ptr<MemoryManager> mem_manager_;
std::vector<void *> streams_;
bool initialized_;


+ 2
- 0
mindspore/ccsrc/vm/backend.cc View File

@@ -30,6 +30,7 @@
#include "runtime/hardware/device_context_manager.h"
#include "runtime/framework/graph_compiler.h"
#include "runtime/framework/graph_scheduler.h"
#include "utils/scoped_long_running.h"
#ifdef ENABLE_GE
#include "utils/callbacks_ge.h"
#endif
@@ -345,6 +346,7 @@ VectorRef MindRTBackend::RunGraph(GraphId graph_id, const VectorRef &args) {
MS_EXCEPTION_IF_NULL(actor_set);

// Run actor DAG.
mindspore::ScopedLongRunning long_running;
VectorRef outputs;
runtime::GraphScheduler::GetInstance().PrepareRun(kernel_graph, &inputs, &outputs);
if (!runtime::GraphScheduler::GetInstance().Run(actor_set)) {


Loading…
Cancel
Save