Browse Source

!24011 unified runtime code review

Merge pull request !24011 from limingqi107/bug_fix
tags/v1.6.0
i-robot Gitee 4 years ago
parent
commit
27b4d2a2d4
19 changed files with 178 additions and 97 deletions
  1. +2
    -0
      mindspore/ccsrc/backend/optimizer/mem_reuse/mem_dynamic_allocator.h
  2. +2
    -2
      mindspore/ccsrc/runtime/framework/actor/abstract_actor.cc
  3. +2
    -2
      mindspore/ccsrc/runtime/framework/actor/abstract_actor.h
  4. +3
    -5
      mindspore/ccsrc/runtime/framework/actor/actor_common.cc
  5. +2
    -2
      mindspore/ccsrc/runtime/framework/actor/actor_common.h
  6. +24
    -16
      mindspore/ccsrc/runtime/framework/actor/copy_actor.cc
  7. +11
    -5
      mindspore/ccsrc/runtime/framework/actor/data_prepare_actor.cc
  8. +7
    -9
      mindspore/ccsrc/runtime/framework/actor/data_source_actor.cc
  9. +1
    -1
      mindspore/ccsrc/runtime/framework/actor/data_source_actor.h
  10. +34
    -9
      mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc
  11. +6
    -5
      mindspore/ccsrc/runtime/framework/actor/memory_manager_actor.cc
  12. +4
    -4
      mindspore/ccsrc/runtime/framework/actor/memory_manager_actor.h
  13. +2
    -0
      mindspore/ccsrc/runtime/framework/actor/output_actor.cc
  14. +1
    -1
      mindspore/ccsrc/runtime/framework/actor/recorder_actor.h
  15. +4
    -4
      mindspore/ccsrc/runtime/framework/control_node_parser.h
  16. +1
    -0
      mindspore/ccsrc/runtime/framework/device_tensor_store.h
  17. +64
    -30
      mindspore/ccsrc/runtime/framework/graph_scheduler.cc
  18. +2
    -2
      mindspore/ccsrc/runtime/framework/graph_scheduler.h
  19. +6
    -0
      mindspore/ccsrc/vm/backend.cc

+ 2
- 0
mindspore/ccsrc/backend/optimizer/mem_reuse/mem_dynamic_allocator.h View File

@@ -82,12 +82,14 @@ class DynamicMemPoolBestFit {
public:
DynamicMemPoolBestFit() = default;
virtual ~DynamicMemPoolBestFit();
// The main program entry of memory alloc.
DeviceMemPtr AllocTensorMem(size_t size);
// The main program entry of continuous memory alloc.
std::vector<DeviceMemPtr> AllocContinuousTensorMem(size_t total_size, std::vector<size_t> size_list);
// The main program entry of memory free.
void FreeTensorMem(const DeviceMemPtr &device_addr);
// Release the real device memory.
void ReleaseDeviceRes();
// Display the information of memory block and memory buf.


+ 2
- 2
mindspore/ccsrc/runtime/framework/actor/abstract_actor.cc View File

@@ -19,7 +19,7 @@

namespace mindspore {
namespace runtime {
bool AbstractActor::CheckRunningCondition(OpContext<DeviceTensor> *const context) const {
bool AbstractActor::CheckRunningCondition(const OpContext<DeviceTensor> *context) const {
MS_EXCEPTION_IF_NULL(context);
if (input_datas_num_ != 0) {
const auto &data_iter = input_op_datas_.find(context->sequential_num_);
@@ -43,7 +43,7 @@ bool AbstractActor::CheckRunningCondition(OpContext<DeviceTensor> *const context
return true;
}

void AbstractActor::EraseInput(OpContext<DeviceTensor> *const context) {
void AbstractActor::EraseInput(const OpContext<DeviceTensor> *context) {
MS_EXCEPTION_IF_NULL(context);
if (input_datas_num_ != 0) {
auto ret = input_op_datas_.erase(context->sequential_num_);


+ 2
- 2
mindspore/ccsrc/runtime/framework/actor/abstract_actor.h View File

@@ -52,9 +52,9 @@ class AbstractActor : public OpActor<DeviceTensor> {
friend class GraphScheduler;

// Check whether satisfy the actor running condition.
bool CheckRunningCondition(OpContext<DeviceTensor> *const context) const;
bool CheckRunningCondition(const OpContext<DeviceTensor> *context) const;
// Erase input data and input controls when finish actor running.
void EraseInput(OpContext<DeviceTensor> *const context);
void EraseInput(const OpContext<DeviceTensor> *const context);

KernelTransformType type_;



+ 3
- 5
mindspore/ccsrc/runtime/framework/actor/actor_common.cc View File

@@ -140,12 +140,10 @@ bool IsPersistentDeviceTensor(const AnfNodePtr &node) {

bool IsGatherActor(const AnfNodePtr &front_node,
const std::unordered_map<std::string, OpActor<DeviceTensor> *> &actor_name_to_actor) {
MS_EXCEPTION_IF_NULL(front_node);
if (front_node->isa<Parameter>() && (!AnfAlgo::IsParameterWeight(front_node->cast<ParameterPtr>())) &&
front_node->func_graph() != nullptr) {
const auto &func_graph = front_node->func_graph();
if (func_graph != nullptr && actor_name_to_actor.find(func_graph->ToString()) != actor_name_to_actor.end()) {
return true;
}
(front_node->func_graph() != nullptr) && (actor_name_to_actor.count(front_node->func_graph()->ToString()) > 0)) {
return true;
}
return false;
}


+ 2
- 2
mindspore/ccsrc/runtime/framework/actor/actor_common.h View File

@@ -84,14 +84,14 @@ enum class KernelTransformType {

#define SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(strategy, op_context, device_context, kernel_name, alloc_size) \
{ \
std::string message = "Device(id:" + std::to_string(device_context->device_context_key().device_id_) + \
std::string message = "Device(id:" + std::to_string((device_context).device_context_key().device_id_) + \
") memory isn't enough and alloc failed, kernel name: " + kernel_name + \
", alloc size: " + std::to_string(alloc_size) + "B."; \
if (strategy == GraphExecutionStrategy::kStep) { \
MS_LOG(EXCEPTION) << message; \
} \
MS_LOG(ERROR) << message; \
op_context.SetFailed(kFailure); \
(op_context).SetFailed(kFailure); \
return; \
}



+ 24
- 16
mindspore/ccsrc/runtime/framework/actor/copy_actor.cc View File

@@ -21,7 +21,9 @@

namespace mindspore {
namespace runtime {
const size_t kDeviceTensorNum = 1;

const size_t kInputDeviceContextIndex = 0;
const size_t kOutputDeviceContextIndex = 1;

void CopyActor::Init() {
// Check device contexts number.
@@ -29,6 +31,7 @@ void CopyActor::Init() {
MS_LOG(EXCEPTION) << "The device contexts number is wrong.";
}

const size_t kDeviceTensorNum = 1;
input_device_tensor_.resize(kDeviceTensorNum);
output_device_tensor_.resize(kDeviceTensorNum);

@@ -66,13 +69,15 @@ void CopyActor::RunOpControl(AID *const input_control, OpContext<DeviceTensor> *
}

void CopyActor::SendMemoryAllocReq(OpContext<DeviceTensor> *const context) {
Async(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &output_device_tensor_, device_contexts_[1], context,
GetAID());
Async(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &output_device_tensor_,
device_contexts_[kOutputDeviceContextIndex], context, GetAID());
}

void CopyActor::SendMemoryFreeReq(OpContext<DeviceTensor> *const context) {
Async(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &input_device_tensor_, device_contexts_[0], context);
Async(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &output_device_tensor_, device_contexts_[1], context);
Async(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &input_device_tensor_,
device_contexts_[kInputDeviceContextIndex], context);
Async(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &output_device_tensor_,
device_contexts_[kOutputDeviceContextIndex], context);
}

void CopyActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
@@ -103,26 +108,29 @@ void CopyActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {

void CopyActor::FetchDeviceTensor(OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
MS_EXCEPTION_IF_NULL(device_contexts_[0]);
const auto &input_device_context = device_contexts_[kInputDeviceContextIndex];
const auto &output_device_context = device_contexts_[kOutputDeviceContextIndex];
MS_EXCEPTION_IF_NULL(input_device_context);
MS_EXCEPTION_IF_NULL(output_device_context);

if (device_tensor_store_keys_.size() > 0) {
input_device_tensor_[0] = DeviceTensorStore::GetInstance().Fetch(device_tensor_store_keys_[0].second.get(),
device_contexts_[0]->GetDeviceAddressType());
const auto &device_tensor_store_node = device_tensor_store_keys_[0].second;
MS_EXCEPTION_IF_NULL(device_tensor_store_node);
input_device_tensor_[0] = DeviceTensorStore::GetInstance().Fetch(device_tensor_store_node.get(),
input_device_context->GetDeviceAddressType());
if (input_device_tensor_[0] == nullptr) {
std::string error_info =
GetAID().Name() +
" get device tensor store failed: " + device_tensor_store_keys_[0].second->fullname_with_scope() +
", device type:" + std::to_string(static_cast<int>(device_contexts_[0]->GetDeviceAddressType()));
GetAID().Name() + " get device tensor store failed: " + device_tensor_store_node->fullname_with_scope() +
", device type:" + std::to_string(static_cast<int>(input_device_context->GetDeviceAddressType()));
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}

output_device_tensor_[0] = DeviceTensorStore::GetInstance().Fetch(device_tensor_store_keys_[0].second.get(),
device_contexts_[1]->GetDeviceAddressType());
output_device_tensor_[0] = DeviceTensorStore::GetInstance().Fetch(device_tensor_store_node.get(),
output_device_context->GetDeviceAddressType());
if (output_device_tensor_[0] == nullptr) {
std::string error_info =
GetAID().Name() +
" get device tensor store failed: " + device_tensor_store_keys_[0].second->fullname_with_scope() +
", device type:" + std::to_string(static_cast<int>(device_contexts_[1]->GetDeviceAddressType()));
GetAID().Name() + " get device tensor store failed: " + device_tensor_store_node->fullname_with_scope() +
", device type:" + std::to_string(static_cast<int>(output_device_context->GetDeviceAddressType()));
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}
} else {


+ 11
- 5
mindspore/ccsrc/runtime/framework/actor/data_prepare_actor.cc View File

@@ -39,7 +39,7 @@ void SyncTensorData(const TensorPtr &host_tensor, const DeviceTensorPtr &device_

if ((device_tensor->GetPtr() == nullptr) &&
(!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize()))) {
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(strategy, (*context), device_context, node->fullname_with_scope(),
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(strategy, *context, *device_context, node->fullname_with_scope(),
device_tensor->GetSize());
}

@@ -59,6 +59,10 @@ void SyncTensorData(const TensorPtr &host_tensor, const DeviceTensorPtr &device_
void FetchContinuousMemoryInfo(const CNodePtr &node, std::vector<DeviceTensorPtr> *const addr_list,
std::vector<size_t> *const size_list, size_t *const total_size, bool is_input) {
MS_EXCEPTION_IF_NULL(node);
MS_EXCEPTION_IF_NULL(addr_list);
MS_EXCEPTION_IF_NULL(size_list);
MS_EXCEPTION_IF_NULL(total_size);

const auto &kernel_mod = AnfAlgo::GetKernelMod(node);
MS_EXCEPTION_IF_NULL(kernel_mod);
(*addr_list).clear();
@@ -150,7 +154,7 @@ void DataPrepareActor::SendDebugReq(OpContext<DeviceTensor> *const context) {
graph_compiler_info_->device_contexts_, context, &GetAID());
}

void DataPrepareActor::OnDebugFinish(OpContext<DeviceTensor> *context) {
void DataPrepareActor::OnDebugFinish(OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
if (continuous_memory_alloc_list_list_.size() > 0) {
SendMemoryAllocReq(context);
@@ -378,7 +382,7 @@ void DataPrepareActor::PrepareDataForValueNode(const ValueNodePtr &node, const D
MS_LOG(INFO) << "Prepare device data for value node: " << node->fullname_with_scope();

if (!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize())) {
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(strategy_, (*context), device_context, node->fullname_with_scope(),
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(strategy_, *context, *device_context, node->fullname_with_scope(),
device_tensor->GetSize());
}

@@ -406,11 +410,11 @@ void DataPrepareActor::PrepareDataForWeightNode(const AnfNodePtr &backend_node,
}

auto device_tensor = AnfAlgo::GetMutableOutputAddr(backend_node, 0, false);
MS_EXCEPTION_IF_NULL(device_tensor);
auto host_tensor_address = std::dynamic_pointer_cast<DeviceTensor>(tensor->device_address());
// Use the device address of host tensor to set device tensor.
if (host_tensor_address != device_tensor) {
if (host_tensor_address == nullptr) {
MS_EXCEPTION_IF_NULL(device_tensor);
host_tensor_address = device_context->CreateDeviceAddress(nullptr, device_tensor->GetSize(),
device_tensor->format(), device_tensor->type_id());
tensor->set_device_address(host_tensor_address);
@@ -427,6 +431,7 @@ void DataPrepareActor::PrepareDataForWeightNode(const AnfNodePtr &backend_node,
}

// If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared.
MS_EXCEPTION_IF_NULL(host_tensor_address);
if (host_tensor_address->GetPtr() == nullptr) {
MS_LOG(INFO) << "Prepare device data for weight node:" << backend_node->fullname_with_scope()
<< ", device type:" << host_tensor_address->DeviceType();
@@ -444,7 +449,7 @@ void DataPrepareActor::PrepareDataForWeightNode(const AnfNodePtr &backend_node,
MS_EXCEPTION_IF_NULL(another_device_context);
if ((another_device_tensor->GetPtr() == nullptr) &&
(!another_device_context->AllocateMemory(another_device_tensor.get(), another_device_tensor->GetSize()))) {
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(strategy_, (*context), another_device_context,
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(strategy_, *context, *another_device_context,
backend_node->fullname_with_scope(),
another_device_tensor->GetSize());
}
@@ -471,6 +476,7 @@ void DataPrepareActor::PrepareDataForControlWeightNode(
auto device_tensors = DeviceTensorStore::GetInstance().Fetch(front_node.get());
bool need_update_device_tensor_store = (device_tensors.size() == 0) ? true : false;
for (auto &device_tensor : device_tensors) {
MS_EXCEPTION_IF_NULL(device_tensor);
if (device_tensor->GetPtr() == nullptr) {
need_update_device_tensor_store = true;
break;


+ 7
- 9
mindspore/ccsrc/runtime/framework/actor/data_source_actor.cc View File

@@ -23,9 +23,6 @@
#include "mindrt/include/async/async.h"
#include "common/trans.h"
#include "utils/log_adapter.h"
#ifdef ENABLE_DUMP_IR
#include "debug/rdr/running_data_recorder.h"
#endif

namespace mindspore {
namespace runtime {
@@ -150,6 +147,7 @@ void DeviceQueueDataSourceActor::SendMemoryFreeReq(OpContext<DeviceTensor> *cons

void DeviceQueueDataSourceActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
MS_EXCEPTION_IF_NULL(data_kernel_);
MS_EXCEPTION_IF_NULL(device_contexts_[0]);
if (buffers_.size() == 0) {
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The data queue is empty.");
@@ -157,7 +155,11 @@ void DeviceQueueDataSourceActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *co

// Construct outputs of data kernel launching.
auto &device_tensors = buffers_.back();
if (launch_info_.outputs_.size() != device_tensors.size()) {
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The outputs number is not equal to the device tensors number.");
}
for (size_t i = 0; i < device_tensors.size(); ++i) {
MS_EXCEPTION_IF_NULL(launch_info_.outputs_[i]);
MS_EXCEPTION_IF_NULL(device_tensors[i]);
launch_info_.outputs_[i]->addr = device_tensors[i]->GetMutablePtr();
launch_info_.outputs_[i]->size = device_tensors[i]->GetSize();
@@ -168,16 +170,10 @@ void DeviceQueueDataSourceActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *co
auto ret = device_contexts_[0]->LaunchKernel(data_kernel_, launch_info_.inputs_, launch_info_.workspaces_,
launch_info_.outputs_);
if (!ret) {
#ifdef ENABLE_DUMP_IR
mindspore::RDR::TriggerAll();
#endif
std::string error_info = "Launch kernel failed: " + data_kernel_->fullname_with_scope();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
}
} catch (const std::exception &e) {
#ifdef ENABLE_DUMP_IR
mindspore::RDR::TriggerAll();
#endif
MsException::Instance().SetException();
std::string error_info = "Launch kernel exception: " + data_kernel_->fullname_with_scope();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
@@ -216,6 +212,7 @@ void DeviceQueueDataSourceActor::SendResult(OpContext<DeviceTensor> *const conte

void DeviceQueueDataSourceActor::SendRecorderInfo(OpContext<DeviceTensor> *const context) {
if (recorder_aid_ != nullptr) {
MS_EXCEPTION_IF_NULL(data_kernel_);
Async(*recorder_aid_, &RecorderActor::RecordInfo, data_kernel_->fullname_with_scope(), &launch_info_,
device_contexts_[0], context);
}
@@ -315,6 +312,7 @@ void HostQueueDataSourceActor::SendResult(OpContext<DeviceTensor> *const context
}

size_t HostQueueDataSourceActor::FetchNodePosition(const AnfNodePtr &data_node) const {
MS_EXCEPTION_IF_NULL(data_node);
const auto &iter = data_node_position_map_.find(data_node);
if (iter == data_node_position_map_.end()) {
MS_LOG(EXCEPTION) << "Data node: " << data_node->fullname_with_scope() << " is not exist.";


+ 1
- 1
mindspore/ccsrc/runtime/framework/actor/data_source_actor.h View File

@@ -132,7 +132,7 @@ class HostQueueDataSourceActor : public DataSourceActor {

size_t FetchNodePosition(const AnfNodePtr &node) const override;
AnfNodePtr FetchNode(size_t node_position) const;
std::vector<AnfNodePtr> &data_nodes() { return data_nodes_; }
const std::vector<AnfNodePtr> &data_nodes() const { return data_nodes_; }

protected:
void FillDataBuffer() override;


+ 34
- 9
mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc View File

@@ -81,6 +81,8 @@ void KernelActor::Init() {

void KernelActor::RunOpData(OpData<DeviceTensor> *const input_data, OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
MS_EXCEPTION_IF_NULL(device_contexts_[0]);

auto &sequential_num = context->sequential_num_;
(void)input_op_datas_[sequential_num].emplace_back(input_data);
if (input_data->data_ == nullptr) {
@@ -107,6 +109,8 @@ void KernelActor::RunOpData(OpData<DeviceTensor> *const input_data, OpContext<De

void KernelActor::RunOpControl(AID *const input_control, OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
MS_EXCEPTION_IF_NULL(device_contexts_[0]);

auto &sequential_num = context->sequential_num_;
(void)input_op_controls_[sequential_num].emplace_back(input_control);
// When all the inputs are collected, then allocate memory and callback launch.
@@ -145,8 +149,10 @@ void KernelActor::RunOpControlWithInputTensor(AID *const input_control, OpContex
}

namespace {
void AllocateMemory(const std::vector<DeviceTensor *> &alloc_list, const DeviceContext *device_context) {
void AllocateMemory(const std::vector<DeviceTensor *> &alloc_list, const DeviceContext *device_context,
OpContext<DeviceTensor> *const context, const std::string &actor_name) {
MS_EXCEPTION_IF_NULL(device_context);
MS_EXCEPTION_IF_NULL(context);

for (auto &device_tensor : alloc_list) {
MS_EXCEPTION_IF_NULL(device_tensor);
@@ -155,10 +161,8 @@ void AllocateMemory(const std::vector<DeviceTensor *> &alloc_list, const DeviceC
}
// Allocate memory through the device context.
if (!device_context->AllocateMemory(device_tensor, device_tensor->GetSize())) {
std::string error_info =
"Device(id:" + std::to_string(device_context->device_context_key().device_id_) +
") memory isn't enough and alloc failed, alloc size: " + std::to_string(device_tensor->GetSize());
MS_LOG(EXCEPTION) << error_info;
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kStep, *context, *device_context, actor_name,
device_tensor->GetSize());
}
}
}
@@ -189,7 +193,7 @@ void KernelActor::SendMemoryAllocReq(OpContext<DeviceTensor> *const context) {
Async(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &memory_alloc_list_, device_contexts_[0], context,
GetAID());
} else {
AllocateMemory(memory_alloc_list_, device_contexts_[0]);
AllocateMemory(memory_alloc_list_, device_contexts_[0], context, GetAID().Name());
}
}

@@ -248,8 +252,9 @@ void KernelActor::PushInputDeviceTensor(const std::vector<TensorPtr> *input_tens
}

for (size_t input_index = 0; input_index < input_tensors->size(); input_index++) {
const auto &device_tensor =
std::dynamic_pointer_cast<DeviceTensor>((*input_tensors)[input_index]->device_address());
const auto &input_tensor = (*input_tensors)[input_index];
MS_EXCEPTION_IF_NULL(input_tensor);
const auto &device_tensor = std::dynamic_pointer_cast<DeviceTensor>(input_tensor->device_address());
if (device_tensor != nullptr) {
input_device_tensors_[input_index] = device_tensor.get();
memory_free_list_[input_index] = device_tensor.get();
@@ -260,6 +265,8 @@ void KernelActor::PushInputDeviceTensor(const std::vector<TensorPtr> *input_tens
void KernelActor::CopyInputDeviceTensor(const OpData<DeviceTensor> *input_data,
OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(input_data);
MS_EXCEPTION_IF_NULL(context);
MS_EXCEPTION_IF_NULL(device_contexts_[0]);
if ((input_data->data_ == nullptr) ||
(input_data->data_->DeviceType() == device_contexts_[0]->GetDeviceAddressType())) {
return;
@@ -277,7 +284,7 @@ void KernelActor::CopyInputDeviceTensor(const OpData<DeviceTensor> *input_data,
if (copy_input_device_tensors_[input_data->index_]->GetPtr() == nullptr) {
if (!device_contexts_[0]->AllocateMemory(copy_input_device_tensors_[input_data->index_].get(),
copy_input_device_tensors_[input_data->index_]->GetSize())) {
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kPipeline, (*context), device_contexts_[0],
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kPipeline, *context, *(device_contexts_[0]),
GetAID().Name(),
copy_input_device_tensors_[input_data->index_]->GetSize());
}
@@ -301,6 +308,10 @@ void KernelActor::FetchInputDeviceTensor(OpContext<DeviceTensor> *const context)
if (data_iter != input_op_datas_.end()) {
for (auto &input_data : data_iter->second) {
MS_EXCEPTION_IF_NULL(input_data);
if (IntToSize(input_data->index_) >= input_device_tensors_.size()) {
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), "The input index is out of range.");
}

if (input_device_tensors_[input_data->index_] != input_data->data_) {
input_device_tensors_[input_data->index_] = input_data->data_;
memory_free_list_[input_data->index_] = input_data->data_;
@@ -318,6 +329,10 @@ void KernelActor::FetchInputDeviceTensor(OpContext<DeviceTensor> *const context)
", device type:" + std::to_string(static_cast<int>(device_contexts_[0]->GetDeviceAddressType()));
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
}

if (device_tensor_store_key.first >= input_device_tensors_.size()) {
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), "The input index is out of range.");
}
if (input_device_tensors_[device_tensor_store_key.first] != device_tensor) {
input_device_tensors_[device_tensor_store_key.first] = device_tensor;
memory_free_list_[device_tensor_store_key.first] = device_tensor;
@@ -332,8 +347,13 @@ void KernelActor::FetchOutputDeviceTensor() {
MS_EXCEPTION_IF_NULL(kernel_mod);
const auto &output_size_list = kernel_mod->GetOutputSizeList();

if (output_addresses.size() != output_size_list.size()) {
MS_LOG(EXCEPTION) << "The outputs number is not equal.";
}

for (size_t i = 0; i < output_addresses.size(); ++i) {
auto output_address = output_addresses[i].get();
MS_EXCEPTION_IF_NULL(output_address);
if (output_size_list[i] != output_address->GetSize()) {
// The size of output address may be changed in dynamic shape scenario.
output_address->SetSize(output_size_list[i]);
@@ -347,6 +367,7 @@ void KernelActor::FetchOutputDeviceTensor() {

// Update output data.
for (auto &output_data : output_data_by_output_index_[i]) {
MS_EXCEPTION_IF_NULL(output_data);
output_data->data_ = output_address;
}
}
@@ -356,18 +377,21 @@ void KernelActor::FetchOutputDeviceTensor() {
void KernelActor::PreLaunchKernel(OpContext<DeviceTensor> *) {
for (size_t i = 0; i < input_device_tensors_.size(); ++i) {
MS_EXCEPTION_IF_NULL(input_device_tensors_[i]);
MS_EXCEPTION_IF_NULL(launch_info_.inputs_[i]);
launch_info_.inputs_[i]->addr = input_device_tensors_[i]->GetMutablePtr();
launch_info_.inputs_[i]->size = input_device_tensors_[i]->GetSize();
}

for (size_t i = 0; i < output_device_tensors_.size(); ++i) {
MS_EXCEPTION_IF_NULL(output_device_tensors_[i]);
MS_EXCEPTION_IF_NULL(launch_info_.outputs_[i]);
launch_info_.outputs_[i]->addr = output_device_tensors_[i]->GetMutablePtr();
launch_info_.outputs_[i]->size = output_device_tensors_[i]->GetSize();
}

for (size_t i = 0; i < workspace_device_tensors_.size(); ++i) {
MS_EXCEPTION_IF_NULL(workspace_device_tensors_[i]);
MS_EXCEPTION_IF_NULL(launch_info_.workspaces_[i]);
launch_info_.workspaces_[i]->addr = workspace_device_tensors_[i]->GetMutablePtr();
launch_info_.workspaces_[i]->size = workspace_device_tensors_[i]->GetSize();
}
@@ -391,6 +415,7 @@ void KernelActor::PostLaunchKernel(OpContext<DeviceTensor> *const context) {

void KernelActor::SendOutput(OpContext<DeviceTensor> *const context) const {
MS_EXCEPTION_IF_NULL(context);
MS_EXCEPTION_IF_NULL(kernel_);
if (strategy_ == GraphExecutionStrategy::kStep) {
return;
}


+ 6
- 5
mindspore/ccsrc/runtime/framework/actor/memory_manager_actor.cc View File

@@ -24,7 +24,7 @@ namespace mindspore {
namespace runtime {
void MemoryManagerActor::AllocateMemory(const std::vector<DeviceTensor *> *alloc_list,
const DeviceContext *device_context, OpContext<DeviceTensor> *const op_context,
const AID from_aid) {
const AID &from_aid) {
MS_EXCEPTION_IF_NULL(alloc_list);
MS_EXCEPTION_IF_NULL(device_context);
MS_EXCEPTION_IF_NULL(op_context);
@@ -49,7 +49,7 @@ void MemoryManagerActor::AllocateContinuousMemory(const std::vector<std::vector<
const std::vector<std::vector<size_t>> *size_list_list,
const std::vector<size_t> *total_size_list,
const std::vector<const DeviceContext *> *device_contexts,
OpContext<DeviceTensor> *const op_context, const AID from_aid) {
OpContext<DeviceTensor> *const op_context, const AID &from_aid) {
MS_EXCEPTION_IF_NULL(alloc_list_list);
MS_EXCEPTION_IF_NULL(size_list_list);
MS_EXCEPTION_IF_NULL(total_size_list);
@@ -67,6 +67,7 @@ void MemoryManagerActor::AllocateContinuousMemory(const std::vector<std::vector<
auto &size_list = (*size_list_list)[i];
auto &total_size = (*total_size_list)[i];
auto &device_context = (*device_contexts)[i];
MS_EXCEPTION_IF_NULL(device_context);
// Allocate memory through the device context.
if (!device_context->AllocateContinuousMemory(alloc_list, total_size, size_list)) {
SetOpContextMemoryAllocFail(from_aid.Name(), device_context, total_size, op_context);
@@ -80,7 +81,7 @@ void MemoryManagerActor::AllocateContinuousMemory(const std::vector<std::vector<

void MemoryManagerActor::AllocateBatchMemory(const std::vector<DeviceTensor *> *alloc_list,
const std::vector<const DeviceContext *> *device_contexts,
OpContext<DeviceTensor> *const op_context, const AID from_aid) {
OpContext<DeviceTensor> *const op_context, const AID &from_aid) {
MS_EXCEPTION_IF_NULL(alloc_list);
MS_EXCEPTION_IF_NULL(device_contexts);
MS_EXCEPTION_IF_NULL(op_context);
@@ -161,7 +162,7 @@ void MemoryManagerActor::FreeBatchMemory(const std::vector<DeviceTensor *> *free
}
}

void MemoryManagerActor::Wait(OpContext<DeviceTensor> *const op_context, const AID from_aid) {
void MemoryManagerActor::Wait(OpContext<DeviceTensor> *const op_context, const AID &from_aid) {
// Call back to the from actor to process.
Async(from_aid, &MemoryAwareActor::OnMemoryAllocFinish, op_context);
}
@@ -178,7 +179,7 @@ void MemoryManagerActor::SetOpContextMemoryAllocFail(const std::string &kernel_n
if (mem_alloc_failed_step_ids_.find(step_id) == mem_alloc_failed_step_ids_.end()) {
mem_alloc_failed_step_ids_.clear();
(void)mem_alloc_failed_step_ids_.insert(step_id);
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kPipeline, (*op_context), device_context,
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kPipeline, *op_context, *device_context,
kernel_name, alloc_size);
}
}


+ 4
- 4
mindspore/ccsrc/runtime/framework/actor/memory_manager_actor.h View File

@@ -38,18 +38,18 @@ class MemoryManagerActor : public ActorBase {

// The process entry of memory alloc.
void AllocateMemory(const std::vector<DeviceTensor *> *alloc_list, const DeviceContext *device_context,
OpContext<DeviceTensor> *const op_context, const AID from_aid);
OpContext<DeviceTensor> *const op_context, const AID &from_aid);
// The process entry of continuous memory alloc, the size of alloc_list_list, size_list_list, total_size_list and
// device_contexts must be equal.
void AllocateContinuousMemory(const std::vector<std::vector<DeviceTensorPtr>> *alloc_list_list,
const std::vector<std::vector<size_t>> *size_list_list,
const std::vector<size_t> *total_size_list,
const std::vector<const DeviceContext *> *device_contexts,
OpContext<DeviceTensor> *const op_context, const AID from_aid);
OpContext<DeviceTensor> *const op_context, const AID &from_aid);
// device_contexts is from different device, the size of device_contexts must be equal to the alloc_list.
void AllocateBatchMemory(const std::vector<DeviceTensor *> *alloc_list,
const std::vector<const DeviceContext *> *device_contexts,
OpContext<DeviceTensor> *const op_context, const AID from_aid);
OpContext<DeviceTensor> *const op_context, const AID &from_aid);

// The process entry of memory free.
void FreeMemory(const std::vector<DeviceTensor *> *free_list, const DeviceContext *device_context,
@@ -60,7 +60,7 @@ class MemoryManagerActor : public ActorBase {
OpContext<DeviceTensor> *const op_context);

// Wait the MemoryManagerActor to finish running all current messages.
void Wait(OpContext<DeviceTensor> *const op_context, const AID from_aid);
void Wait(OpContext<DeviceTensor> *const op_context, const AID &from_aid);

private:
// When allocate device memory fail, print error log and set op context failed status.


+ 2
- 0
mindspore/ccsrc/runtime/framework/actor/output_actor.cc View File

@@ -119,12 +119,14 @@ void OutputActor::CollectOutput(const AnfNodePtr &output_node, size_t output_ind
}

auto tensor = CreateOutputTensor(output_node, output_index, output_position);
MS_EXCEPTION_IF_NULL(tensor);
tensor->set_need_release_device_mem(true);
outputs_[output_position] = tensor;
current_outputs_num_++;

// Save the output nodes to clear the device tensor in the running end.
output_nodes_[output_position] = KernelWithIndex(output_node, output_index);

// There is no loop count actor in step mode, need trigger call CollectLoopCount to replace old output device tensors.
if (!need_loop_count_ && (current_outputs_num_ + device_tensor_store_keys_.size() == outputs_num_)) {
CollectLoopCount(++current_count_, context);


+ 1
- 1
mindspore/ccsrc/runtime/framework/actor/recorder_actor.h View File

@@ -29,7 +29,7 @@ namespace runtime {
using mindspore::device::DeviceContext;
using mindspore::kernel::KernelLaunchInfo;

// The recorder actor is used to record kernel info for RDR module.
// The recorder actor is used to record kernel info.
class RecorderActor : public ActorBase {
public:
RecorderActor() : ActorBase("RecorderActor") {}


+ 4
- 4
mindspore/ccsrc/runtime/framework/control_node_parser.h View File

@@ -90,10 +90,10 @@ class ControlNodeParser {
void Parse(const std::vector<AnfNodePtr> &control_nodes, const std::vector<KernelGraphPtr> &graphs,
const std::vector<DeviceContext *> &device_contexts, const FuncGraphPtr &root_graph);

std::vector<AnfNodePtr> &control_node_parameters() { return control_node_parameters_; }
FrontToBackendNodeWithContext &front_to_backend_parameters() { return front_to_backend_parameters_; }
HostParameterToWeight &host_parameter_to_weights() { return host_parameter_to_weights_; }
NodeWithDeviceContext &front_value_nodes() { return front_value_nodes_; }
const std::vector<AnfNodePtr> &control_node_parameters() const { return control_node_parameters_; }
const FrontToBackendNodeWithContext &front_to_backend_parameters() const { return front_to_backend_parameters_; }
const HostParameterToWeight &host_parameter_to_weights() const { return host_parameter_to_weights_; }
const NodeWithDeviceContext &front_value_nodes() const { return front_value_nodes_; }

// Get the output of funcgraph, usually there is only one output node, In the control flow, there are
// multiple branch outputs, there will be multiple output nodes.


+ 1
- 0
mindspore/ccsrc/runtime/framework/device_tensor_store.h View File

@@ -82,6 +82,7 @@ class DeviceTensorStore {
const auto &iter = device_tensors_.find(key);
if (iter != device_tensors_.end()) {
for (const auto &device_tensor : iter->second) {
MS_EXCEPTION_IF_NULL(device_tensor);
if (device_tensor->DeviceType() == value_type) {
return device_tensor.get();
}


+ 64
- 30
mindspore/ccsrc/runtime/framework/graph_scheduler.cc View File

@@ -154,16 +154,20 @@ void IntHandler(int, siginfo_t *, void *) {
#endif
} // namespace

void GraphScheduler::Clear(const ActorInfo &actor_info, const std::vector<KernelGraphPtr> &graphs) {
void GraphScheduler::Clear(const ActorInfo &actor_info, const std::vector<KernelGraphPtr> &graphs) noexcept {
// Terminate the actors of actor info.
if (actors_.count(actor_info) > 0) {
auto actorMgr = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actorMgr);
auto actor_manager = ActorMgr::GetActorMgrRef();
if (actor_manager == nullptr) {
MS_LOG(ERROR) << "Actor manager is not exist.";
return;
}
auto actor_set = actors_[actor_info];
auto base_actors = CollectActors(actor_set.get());
for (auto &base_actor : base_actors) {
MS_EXCEPTION_IF_NULL(base_actor);
(void)actor_name_to_actor_.erase(base_actor->GetAID().Name());
actorMgr->Terminate(base_actor->GetAID());
actor_manager->Terminate(base_actor->GetAID());
}
}

@@ -178,9 +182,9 @@ void GraphScheduler::Clear(const ActorInfo &actor_info, const std::vector<Kernel

void GraphScheduler::Clear() {
// Terminate all actors.
auto actorMgr = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actorMgr);
actorMgr->Finalize();
auto actor_manager = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actor_manager);
actor_manager->Finalize();

// Clear the member of DeviceTensorStore.
DeviceTensorStore::GetInstance().Clear();
@@ -229,8 +233,8 @@ void GraphScheduler::Initialize() {
}

void GraphScheduler::BuildAndScheduleGlobalActor() {
auto actorMgr = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actorMgr);
auto actor_manager = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actor_manager);

// Create and schedule memory manager actor.
auto memory_manager_actor = std::make_shared<MemoryManagerActor>();
@@ -238,14 +242,14 @@ void GraphScheduler::BuildAndScheduleGlobalActor() {
memory_manager_aid_ = memory_manager_actor->GetAID();
auto base_actor = static_cast<ActorReference>(memory_manager_actor);
// Bind single thread to response to memory alloc and free quickly.
(void)actorMgr->Spawn(base_actor, false);
(void)actor_manager->Spawn(base_actor, false);

// Create and schedule recorder actor.
auto recorder_actor = std::make_shared<RecorderActor>();
MS_EXCEPTION_IF_NULL(recorder_actor);
recorder_aid_ = &(recorder_actor->GetAID());
auto base_recorder_actor = static_cast<ActorReference>(recorder_actor);
(void)actorMgr->Spawn(base_recorder_actor, true);
(void)actor_manager->Spawn(base_recorder_actor, true);

// Create and schedule debug actor.
#ifndef ENABLE_SECURITY
@@ -262,7 +266,7 @@ void GraphScheduler::BuildAndScheduleGlobalActor() {
MS_EXCEPTION_IF_NULL(debug_actor);
debug_aid_ = &(debug_actor->GetAID());
auto base_debug_actor = static_cast<ActorReference>(debug_actor);
(void)actorMgr->Spawn(base_debug_actor, true);
(void)actor_manager->Spawn(base_debug_actor, true);
}
#endif
}
@@ -278,6 +282,7 @@ ActorSet *GraphScheduler::Transform(const GraphCompilerInfo &graph_compiler_info

PersistDeviceTensor(graph_compiler_info);
const auto &actor_set = Build(graph_compiler_info);
MS_EXCEPTION_IF_NULL(actor_set);
CacheGraphOutputToActor(graph_compiler_info);
Link(actor_set.get(), graph_compiler_info);
// The copy actors are built in the link, so need push into the actor set after link.
@@ -303,10 +308,10 @@ void GraphScheduler::Schedule(const ActorSet *actor_set) {
MS_EXCEPTION_IF_NULL(actor_set);
auto actors = CollectActors(actor_set);
// Schedule actors.
auto actorMgr = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actorMgr);
auto actor_manager = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actor_manager);
for (auto actor : actors) {
(void)actorMgr->Spawn(actor);
(void)actor_manager->Spawn(actor);
}
}

@@ -435,6 +440,7 @@ void GraphScheduler::Link(ActorSet *actor_set, const GraphCompilerInfo &graph_co
MS_EXCEPTION_IF_NULL(graph);
auto execution_order = graph->execution_order();
for (auto &kernel : execution_order) {
MS_EXCEPTION_IF_NULL(kernel);
if (AnfAlgo::IsCommunicationOp(kernel)) {
(void)communication_nodes.emplace_back(kernel);
}
@@ -538,12 +544,13 @@ std::vector<DataSourceActorPtr> GraphScheduler::BuildDataSourceActor(const Graph
}
}

MS_EXCEPTION_IF_NULL(graph_compiler_info.control_node_parser_);
const auto &front_to_backend_parameter = graph_compiler_info.control_node_parser_->front_to_backend_parameters_;

// Initialize the parameter in the control node, first get all the front parameters in the control node, then find
// the corresponding backend parameter from the map, and insert it into the host data source actor
std::vector<AnfNodePtr> control_node_parameters = graph_compiler_info.control_node_parser_->control_node_parameters();
for (const auto parameter : control_node_parameters) {
const auto &control_node_parameters = graph_compiler_info.control_node_parser_->control_node_parameters();
for (const auto &parameter : control_node_parameters) {
if (IsPersistentDeviceTensor(parameter)) {
continue;
}
@@ -593,6 +600,7 @@ std::vector<KernelActorPtr> GraphScheduler::BuildKernelActor(const GraphCompiler
}

for (auto &kernel : execution_order) {
MS_EXCEPTION_IF_NULL(kernel);
if (IsKernelActor(kernel, graph_compiler_info.strategy_) && (!IsSkippedKernelActor(kernel))) {
auto kernel_actor = std::make_shared<KernelActor>(kernel->fullname_with_scope(), kernel, device_context,
memory_manager_aid_, debug_aid_, recorder_aid_, strategy);
@@ -707,9 +715,11 @@ std::vector<KernelActorPtr> GraphScheduler::BuildNoInputKernelActor(const ActorS
// In general, all no input nodes belong to the root funcgraph, and the corresponding gather actor should be
// empty. In control flow, the control arrow of the no input node in the sub funcgraph should be sent by the
// gather actor and should not be placed in the no input list.
MS_EXCEPTION_IF_NULL(kernel_actor->kernel_);
const auto &graph = kernel_actor->kernel_->func_graph();
if (graph != nullptr) {
const auto &kernel_graph = dynamic_cast<KernelGraph *>(graph.get());
MS_EXCEPTION_IF_NULL(kernel_graph);
const auto func_graph = kernel_graph->GetFuncGraph();
if (func_graph != nullptr && FetchActor(func_graph->ToString()) != nullptr) {
continue;
@@ -878,6 +888,8 @@ void GraphScheduler::LinkDataArrow(KernelActor *const to_actor, const GraphCompi
MS_EXCEPTION_IF_NULL(graph);

auto from_kernel = from_kernel_with_output_idx.first;
MS_EXCEPTION_IF_NULL(from_kernel);
MS_EXCEPTION_IF_NULL(graph_compiler_info.control_node_parser_);
if (from_kernel->isa<Parameter>() && graph_compiler_info.control_node_parser_->IsCallInputKernelGraph(graph)) {
const auto &kernel_with_index = GetFrontNodeByKernelGraph(from_kernel, graph);
const auto &real_front_node_with_index =
@@ -1048,10 +1060,10 @@ void GraphScheduler::LinkDataArrowForHostDSActor(AbstractActor *const from_actor
if (from_kernel_with_output_idx.first != nullptr) {
// Get the position of from kernel in the data source actor.
auto position = host_ds_actor->FetchNodePosition(from_kernel_with_output_idx.first);
real_from_kernel_with_output_idx.first = host_ds_actor->data_nodes_[position];
real_from_kernel_with_output_idx.first = host_ds_actor->FetchNode(position);
real_from_kernel_with_output_idx.second = from_kernel_with_output_idx.second;
} else {
real_from_kernel_with_output_idx.first = host_ds_actor->data_nodes_[from_kernel_with_output_idx.second];
real_from_kernel_with_output_idx.first = host_ds_actor->FetchNode(from_kernel_with_output_idx.second);
real_from_kernel_with_output_idx.second = 0;
}

@@ -1072,13 +1084,14 @@ void GraphScheduler::LinkDataArrowForKernelActor(AbstractActor *const from_actor
real_from_kernel_with_output_idx.first = kernel_actor->kernel_;
}

// Update the from kernel info by the real node info.
MS_EXCEPTION_IF_NULL(from_kernel);
if (IsSkippedKernelActor(from_kernel)) {
real_from_kernel_with_output_idx = AnfAlgo::GetPrevNodeOutput(from_kernel, 0);
MS_EXCEPTION_IF_NULL(real_from_kernel_with_output_idx.first);
LinkControlArrowBySkippedNode(to_actor, from_kernel);

// Update the from kernel info by the real node info.
MS_EXCEPTION_IF_NULL(to_kernel_with_input_idx.first);
MS_LOG(INFO) << "Link data arrow for inplace node, aggregate node: "
<< to_kernel_with_input_idx.first->fullname_with_scope()
<< ", aggregate input index: " << to_kernel_with_input_idx.second
@@ -1162,6 +1175,7 @@ void GraphScheduler::LinkControlArrowByAutoMonad(KernelActor *to_actor, const An
const KernelGraphPtr &graph) {
MS_EXCEPTION_IF_NULL(to_actor);
MS_EXCEPTION_IF_NULL(from_node);
MS_EXCEPTION_IF_NULL(graph);
// Find the real input node, include the monad node and make tuple node.
const std::vector<PrimitivePtr> return_types = {prim::kPrimDepend, prim::kPrimUpdateState, prim::kPrimLoad,
prim::kPrimMakeTuple};
@@ -1258,12 +1272,18 @@ void GraphScheduler::LinkControlArrowBySendRecvNodes(const KernelGraphPtr &graph
MS_EXCEPTION_IF_NULL(graph);
for (auto &from_iter : graph->allreduce_from_send_recv_pairs()) {
auto to_allreduce_node = from_iter.first;
MS_LOG(INFO) << "Link control arrow for to_allreduce_node: " << to_allreduce_node->fullname_with_scope();
auto from_send_node = from_iter.second.first;
auto from_recv_node = from_iter.second.second;
MS_EXCEPTION_IF_NULL(to_allreduce_node);
MS_EXCEPTION_IF_NULL(from_send_node);
MS_EXCEPTION_IF_NULL(from_recv_node);
MS_LOG(INFO) << "Link control arrow for to_allreduce_node: " << to_allreduce_node->fullname_with_scope();
auto to_allreduce_actor = dynamic_cast<KernelActor *>(FetchActor(to_allreduce_node->fullname_with_scope()));
auto from_send_actor = dynamic_cast<KernelActor *>(FetchActor(from_send_node->fullname_with_scope()));
auto from_recv_actor = dynamic_cast<KernelActor *>(FetchActor(from_recv_node->fullname_with_scope()));
MS_EXCEPTION_IF_NULL(to_allreduce_actor);
MS_EXCEPTION_IF_NULL(from_send_actor);
MS_EXCEPTION_IF_NULL(from_recv_actor);

// inputs of to_allreduce_actor --> from_send_actor
for (auto &input_aid : to_allreduce_actor->input_data_arrow_aids_) {
@@ -1285,12 +1305,18 @@ void GraphScheduler::LinkControlArrowBySendRecvNodes(const KernelGraphPtr &graph

for (auto &to_iter : graph->allreduce_to_send_recv_pairs()) {
auto from_allreduce_node = to_iter.first;
MS_LOG(INFO) << "Link control arrow for from_allreduce_node: " << from_allreduce_node->fullname_with_scope();
auto to_send_node = to_iter.second.first;
auto to_recv_node = to_iter.second.second;
MS_EXCEPTION_IF_NULL(from_allreduce_node);
MS_EXCEPTION_IF_NULL(to_send_node);
MS_EXCEPTION_IF_NULL(to_recv_node);
MS_LOG(INFO) << "Link control arrow for from_allreduce_node: " << from_allreduce_node->fullname_with_scope();
auto from_allreduce_actor = dynamic_cast<KernelActor *>(FetchActor(from_allreduce_node->fullname_with_scope()));
auto to_send_actor = dynamic_cast<KernelActor *>(FetchActor(to_send_node->fullname_with_scope()));
auto to_recv_actor = dynamic_cast<KernelActor *>(FetchActor(to_recv_node->fullname_with_scope()));
MS_EXCEPTION_IF_NULL(from_allreduce_actor);
MS_EXCEPTION_IF_NULL(to_send_actor);
MS_EXCEPTION_IF_NULL(to_recv_actor);

// from_allreduce_actor --> to_send_actor
(void)from_allreduce_actor->output_control_arrows_.emplace_back(to_send_actor->GetAID());
@@ -1363,6 +1389,7 @@ void GraphScheduler::LinkControlArrowByCommunicationNode(const std::vector<CNode
// Ensure all actors execute orderly to optimize the execution performance in the multi device scenario currently.
// Using the multi stream to optimize the performance in the future.
for (auto &graph : graph_compiler_info.graphs_) {
MS_EXCEPTION_IF_NULL(graph);
auto &execution_order = graph->execution_order();
for (size_t i = 1; i < execution_order.size(); ++i) {
auto from_actor = dynamic_cast<KernelActor *>(FetchActor(execution_order[i - 1]->fullname_with_scope()));
@@ -1403,6 +1430,7 @@ void GraphScheduler::LinkControlArrowForDataPrepareActor(DataPrepareActor *data_
void GraphScheduler::LinkControlArrowForLoopCountActor(LoopCountActor *loop_count_actor, const ActorSet *actor_set,
const ControlNodeParserPtr &parser) {
MS_EXCEPTION_IF_NULL(actor_set);
MS_EXCEPTION_IF_NULL(parser);
// There is no loop count actor in step mode.
if (loop_count_actor == nullptr) {
return;
@@ -1453,10 +1481,9 @@ void GraphScheduler::LinkOutputResultArrowForOutputActor(OutputActor *to_actor,

MS_EXCEPTION_IF_NULL(to_actor);

size_t number = 0;
for (const auto &graph : graph_compiler_info.graphs_) {
for (size_t i = 0; i < graph_compiler_info.graphs_.size(); ++i) {
const auto &graph = graph_compiler_info.graphs_[i];
MS_EXCEPTION_IF_NULL(graph);
++number;
auto outputs = AnfAlgo::GetAllOutputWithIndex(graph->output());
std::set<std::vector<size_t>> unique_output_positions;
std::set<KernelWithIndex> unique_outputs;
@@ -1481,7 +1508,10 @@ void GraphScheduler::LinkOutputResultArrowForOutputActor(OutputActor *to_actor,
}
(void)unique_output_positions.insert(iter->second);
for (auto &output_position : iter->second) {
to_actor->device_contexts_[output_position] = graph_compiler_info.device_contexts_[number - 1];
if (output_position >= to_actor->device_contexts_.size()) {
MS_LOG(EXCEPTION) << "The output position is out of range.";
}
to_actor->device_contexts_[output_position] = graph_compiler_info.device_contexts_[i];
// The device tensor of graph out need be taken over by host tensor, so set the max reference count.
UpdateRefCount(output_with_index.first, output_with_index.second, true);

@@ -2255,12 +2285,12 @@ void GraphScheduler::PersistDeviceTensor(const GraphCompilerInfo &graph_compiler
}

// The sub front nodes share the device tensor store with the root front node.
auto front_node = sub_front_node;
if (graph_compiler_info.control_node_parser_ != nullptr) {
front_node = graph_compiler_info.control_node_parser_->FetchRootGraphFrontNodeBySubFrontNode(sub_front_node);
}
MS_EXCEPTION_IF_NULL(graph_compiler_info.control_node_parser_);
auto front_node = graph_compiler_info.control_node_parser_->FetchRootGraphFrontNodeBySubFrontNode(sub_front_node);
MS_EXCEPTION_IF_NULL(front_node);
MS_LOG(DEBUG) << "Graph id:" << graph->graph_id() << ", sub front node:" << sub_front_node->DebugString()
<< ", root front node:" << front_node->DebugString();

auto device_tensor = AnfAlgo::GetMutableOutputAddr(input_node, 0, false);
MS_EXCEPTION_IF_NULL(device_tensor);
if (IsPersistentDeviceTensor(input_node)) {
@@ -2500,6 +2530,8 @@ void GraphScheduler::DumpDataPrepareActor(const DataPrepareActor *actor, std::of

ofs << "\t\tcontinuous_memory_nodes:" << actor->continuous_memory_nodes_.size() << "\n ";
for (const auto &iter : actor->continuous_memory_nodes_) {
MS_EXCEPTION_IF_NULL(iter.first.first);
MS_EXCEPTION_IF_NULL(iter.first.second);
ofs << "\t\t\tnode_name:" << iter.first.first->fullname_with_scope()
<< "\tdevice_context:" << iter.first.second->device_context_key().ToString()
<< "\tis_input_need:" << iter.second.first << "\tis_output_need:" << iter.second.second << "\n";
@@ -2514,6 +2546,7 @@ void GraphScheduler::DumpDSActor(const DataSourceActor *actor, std::ofstream &of
if (actor->type_ == KernelTransformType::kDeviceDataSourceActor) {
// Dump the member info of device queue data source actor.
const auto &device_queue_ds_actor = dynamic_cast<const DeviceQueueDataSourceActor *>(actor);
MS_EXCEPTION_IF_NULL(device_queue_ds_actor);
const auto &data_kernel = device_queue_ds_actor->data_kernel_;
MS_EXCEPTION_IF_NULL(data_kernel);
ofs << "\t\tdata_kernel_name:" << data_kernel->fullname_with_scope()
@@ -2528,6 +2561,7 @@ void GraphScheduler::DumpDSActor(const DataSourceActor *actor, std::ofstream &of
} else if (actor->type_ == KernelTransformType::kHostDataSourceActor) {
// Dump the member info of host queue data source actor.
const auto &host_queue_ds_actor = dynamic_cast<const HostQueueDataSourceActor *>(actor);
MS_EXCEPTION_IF_NULL(host_queue_ds_actor);
ofs << "\t\tdata_nodes:" << host_queue_ds_actor->data_nodes_.size() << "\n";
for (size_t i = 0; i < host_queue_ds_actor->data_nodes_.size(); ++i) {
const auto &data_node = host_queue_ds_actor->data_nodes_[i];


+ 2
- 2
mindspore/ccsrc/runtime/framework/graph_scheduler.h View File

@@ -80,7 +80,7 @@ using ActorSetPtr = std::shared_ptr<ActorSet>;

class GraphScheduler {
public:
static GraphScheduler &GetInstance() {
static GraphScheduler &GetInstance() noexcept {
static GraphScheduler instance;
return instance;
}
@@ -91,7 +91,7 @@ class GraphScheduler {

// Clear the members.
void Clear();
void Clear(const ActorInfo &actor_info, const std::vector<KernelGraphPtr> &graphs);
void Clear(const ActorInfo &actor_info, const std::vector<KernelGraphPtr> &graphs) noexcept;

// Transform graph to actor DAG, contains build and link.
ActorSet *Transform(const GraphCompilerInfo &graph_compiler_info);


+ 6
- 0
mindspore/ccsrc/vm/backend.cc View File

@@ -919,12 +919,18 @@ void MindRTBackend::ConstructOutputs(const AnfNodePtr &output_node,
if (output_abstract->isa<abstract::AbstractTuple>()) {
VectorRef output_tuple;
for (size_t i = 0; i < outputs_num; ++i) {
if (*output_position >= output_tensors.size()) {
MS_LOG(EXCEPTION) << "The output position is out of range: " << *output_position;
}
output_tuple.emplace_back(std::move(output_tensors[*output_position]));
++(*output_position);
}
outputs->emplace_back(std::move(output_tuple));
} else {
for (size_t i = 0; i < outputs_num; ++i) {
if (*output_position >= output_tensors.size()) {
MS_LOG(EXCEPTION) << "The output position is out of range: " << *output_position;
}
outputs->emplace_back(std::move(output_tensors[*output_position]));
++(*output_position);
}


Loading…
Cancel
Save