Browse Source

unified runtime add data prepare actor and optimize code

tags/v1.5.0-rc1
limingqi107 4 years ago
parent
commit
859aa42b41
19 changed files with 1028 additions and 841 deletions
  1. +0
    -1
      mindspore/ccsrc/pipeline/jit/action.cc
  2. +46
    -2
      mindspore/ccsrc/runtime/framework/actor/actor_common.cc
  3. +25
    -0
      mindspore/ccsrc/runtime/framework/actor/actor_common.h
  4. +543
    -0
      mindspore/ccsrc/runtime/framework/actor/data_prepare_actor.cc
  5. +121
    -0
      mindspore/ccsrc/runtime/framework/actor/data_prepare_actor.h
  6. +7
    -0
      mindspore/ccsrc/runtime/framework/actor/data_source_actor.cc
  7. +2
    -0
      mindspore/ccsrc/runtime/framework/actor/data_source_actor.h
  8. +3
    -5
      mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc
  9. +18
    -91
      mindspore/ccsrc/runtime/framework/actor/loop_count_actor.cc
  10. +3
    -17
      mindspore/ccsrc/runtime/framework/actor/loop_count_actor.h
  11. +6
    -12
      mindspore/ccsrc/runtime/framework/actor/memory_manager_actor.cc
  12. +4
    -1
      mindspore/ccsrc/runtime/framework/control_node_parser.h
  13. +3
    -3
      mindspore/ccsrc/runtime/framework/device_tensor_store.h
  14. +3
    -0
      mindspore/ccsrc/runtime/framework/graph_compiler.cc
  15. +51
    -0
      mindspore/ccsrc/runtime/framework/graph_compiler.h
  16. +150
    -604
      mindspore/ccsrc/runtime/framework/graph_scheduler.cc
  17. +22
    -77
      mindspore/ccsrc/runtime/framework/graph_scheduler.h
  18. +10
    -1
      mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.cc
  19. +11
    -27
      mindspore/ccsrc/vm/backend.cc

+ 0
- 1
mindspore/ccsrc/pipeline/jit/action.cc View File

@@ -757,7 +757,6 @@ bool ExecuteAction(const ResourcePtr &res) {
MS_LOG(EXCEPTION) << "Execute args error";
}
std::string backend = MsContext::GetInstance()->backend_policy();

// The graph running of mindRT.
if ((backend == kMsConvert) && MsContext::GetInstance()->get_param<bool>(MS_CTX_ENABLE_MINDRT)) {
ExecuteActionForMindRT(res);


+ 46
- 2
mindspore/ccsrc/runtime/framework/actor/actor_common.cc View File

@@ -16,7 +16,6 @@

#include "runtime/framework/actor/actor_common.h"
#include "runtime/framework/device_tensor_store.h"
#include "backend/session/anf_runtime_algorithm.h"
#include "utils/ms_context.h"

namespace mindspore {
@@ -83,7 +82,7 @@ bool IsHostQueueDSActor(const AnfNodePtr &node, const KernelGraphPtr &graph,
bool is_host = ((front_node == nullptr) || host_parameters.empty() ||
find(host_parameters.begin(), host_parameters.end(), front_node) != host_parameters.end());

// Judge whether node is internal parameter.
// Judge whether node is internal parameter.
const auto &internal_front_node = graph->GetFrontNodeByInternalParameter(node);
if (internal_front_node.first == nullptr && is_host) {
return true;
@@ -174,5 +173,50 @@ bool Copy(const DeviceTensor *dst_device_tensor, const DeviceTensor *src_device_
return false;
}
}

void UpdateRefCount(DeviceTensor *const device_tensor, bool is_max_ref_count) {
MS_EXCEPTION_IF_NULL(device_tensor);
if (is_max_ref_count) {
device_tensor->set_original_ref_count(SIZE_MAX);
} else {
device_tensor->IncreaseOriginalRefCount();
}
device_tensor->ResetRefCount();
}

void UpdateRefCount(const AnfNodePtr &node, size_t output_idx, bool is_max_ref_count) {
MS_EXCEPTION_IF_NULL(node);
auto device_tensor = AnfAlgo::GetMutableOutputAddr(node, output_idx, false);
UpdateRefCount(device_tensor.get(), is_max_ref_count);
}

AnfNodePtr FetchFrontNodeByBackendNode(const AnfNodePtr &backend_node, const KernelGraphPtr &graph) {
MS_EXCEPTION_IF_NULL(backend_node);
MS_EXCEPTION_IF_NULL(graph);

// Internal parameter ---> front node.
auto front_node_with_index = graph->GetFrontNodeByInternalParameter(backend_node);
if (front_node_with_index.first != nullptr) {
return front_node_with_index.first;
}

auto front_node = graph->GetFrontAnfByBackendAnf(backend_node);
// PyNative forward graph does not has front node, using backend node instead.
if (front_node == nullptr) {
front_node = backend_node;
}
return front_node;
}

KernelWithIndex FetchFrontNodeWithIndexByGraphOutput(const KernelWithIndex &output_with_index,
const KernelGraphPtr &graph) {
MS_EXCEPTION_IF_NULL(graph);
auto front_node_with_index = graph->GetFrontNodeWithIndexByGraphOutput(output_with_index);
// PyNative forward graph does not has front node, using backend node instead.
if (front_node_with_index.first == nullptr) {
front_node_with_index = output_with_index;
}
return front_node_with_index;
}
} // namespace runtime
} // namespace mindspore

+ 25
- 0
mindspore/ccsrc/runtime/framework/actor/actor_common.h View File

@@ -25,12 +25,14 @@
#include <algorithm>
#include "mindrt/include/actor/op_actor.h"
#include "runtime/device/device_address.h"
#include "backend/session/anf_runtime_algorithm.h"
#include "backend/session/kernel_graph.h"
#include "utils/log_adapter.h"
#include "ir/tensor.h"

namespace mindspore {
namespace runtime {
using mindspore::session::KernelWithIndex;
using tensor::TensorPtr;
using DeviceTensor = mindspore::device::DeviceAddress;

@@ -45,6 +47,7 @@ enum class GraphExecutionStrategy {

enum class KernelTransformType {
kUnknown,
kDataPrepareActor,
kDeviceDataSourceActor,
kHostDataSourceActor,
kKernelActor,
@@ -79,6 +82,19 @@ enum class KernelTransformType {
return; \
}

#define SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(strategy, op_context, device_context, kernel_name, alloc_size) \
{ \
std::string message = "Device(id:" + std::to_string(device_context->device_context_key().device_id_) + \
") memory isn't enough and alloc failed, kernel name: " + kernel_name + \
", alloc size: " + std::to_string(alloc_size) + "B."; \
if (strategy == GraphExecutionStrategy::kStep) { \
MS_LOG(EXCEPTION) << message; \
} \
MS_LOG(ERROR) << message; \
op_context.SetFailed(kFailure); \
return; \
}

void ComputeThreadNums(size_t *actor_thread_num, size_t *OMP_thread_num, size_t *max_thread_num);

bool IsDeviceQueueDSActor(const AnfNodePtr &node, GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline);
@@ -109,6 +125,15 @@ bool IsGatherActor(const AnfNodePtr &front_node,

// Copy data from src_device_tensor to dst_device_tensor.
bool Copy(const DeviceTensor *dst_device_tensor, const DeviceTensor *src_device_tensor);

void UpdateRefCount(DeviceTensor *const device_tensor, bool is_max_ref_count = false);
// Update the reference count of device tensor by the output index of node.
void UpdateRefCount(const AnfNodePtr &node, size_t output_idx, bool is_max_ref_count = false);

// Get front node by backend node.
AnfNodePtr FetchFrontNodeByBackendNode(const AnfNodePtr &backend_node, const KernelGraphPtr &graph);
KernelWithIndex FetchFrontNodeWithIndexByGraphOutput(const KernelWithIndex &output_with_index,
const KernelGraphPtr &graph);
} // namespace runtime
} // namespace mindspore



+ 543
- 0
mindspore/ccsrc/runtime/framework/actor/data_prepare_actor.cc View File

@@ -0,0 +1,543 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "runtime/framework/actor/data_prepare_actor.h"
#include "runtime/framework/actor/memory_manager_actor.h"
#include "runtime/framework/actor/kernel_actor.h"
#include "runtime/hardware/device_context_manager.h"
#include "mindrt/include/async/async.h"
#include "utils/log_adapter.h"
#include "utils/convert_utils.h"
#include "common/trans.h"

namespace mindspore {
namespace runtime {
namespace {
void SyncTensorData(const TensorPtr &host_tensor, const DeviceTensorPtr &device_tensor, const AnfNodePtr &node,
const DeviceContext *device_context, OpContext<DeviceTensor> *const context,
GraphExecutionStrategy strategy) {
MS_EXCEPTION_IF_NULL(host_tensor);
MS_EXCEPTION_IF_NULL(device_tensor);
MS_EXCEPTION_IF_NULL(node);
MS_EXCEPTION_IF_NULL(device_context);
MS_EXCEPTION_IF_NULL(context);

if ((device_tensor->GetPtr() == nullptr) &&
(!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize()))) {
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(strategy, (*context), device_context, node->fullname_with_scope(),
device_tensor->GetSize());
}

// Copy data from host tensor to device.
auto host_tensor_size = LongToSize(host_tensor->data().nbytes());
auto host_tensor_type = host_tensor->data_type();
if (!device_tensor->SyncHostToDevice(trans::GetRuntimePaddingShape(node, 0), host_tensor_size, host_tensor_type,
host_tensor->data_c(), host_tensor->device_info().host_format_)) {
std::string error_info = "SyncHostToDevice failed, node name: " + node->fullname_with_scope() +
", host tensor size: " + std::to_string(host_tensor_size) +
", host tensor type: " + std::to_string(static_cast<int>(host_tensor_type)) +
", device tensor size: " + std::to_string(device_tensor->GetSize());
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy, (*context), error_info);
}
}

void FetchContinuousMemoryInfo(const CNodePtr &node, std::vector<DeviceTensorPtr> *const addr_list,
std::vector<size_t> *const size_list, size_t *const total_size, bool is_input) {
MS_EXCEPTION_IF_NULL(node);
const auto &kernel_mod = AnfAlgo::GetKernelMod(node);
MS_EXCEPTION_IF_NULL(kernel_mod);
(*addr_list).clear();
(*size_list).clear();
*total_size = 0;

if (is_input) {
const auto &intput_sizes = kernel_mod->GetInputSizeList();
for (size_t i = 0; i < intput_sizes.size(); ++i) {
const auto &device_tensor = AnfAlgo::GetPrevNodeMutableOutputAddr(node, i, false);
MS_EXCEPTION_IF_NULL(device_tensor);
*total_size += intput_sizes[i];
(void)size_list->emplace_back(intput_sizes[i]);
(void)addr_list->emplace_back(device_tensor);
}
} else {
const auto &output_sizes = kernel_mod->GetOutputSizeList();
for (size_t i = 0; i < output_sizes.size(); ++i) {
const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, i, false);
MS_EXCEPTION_IF_NULL(device_tensor);
*total_size += output_sizes[i];
(void)size_list->emplace_back(output_sizes[i]);
(void)addr_list->emplace_back(device_tensor);
}
}
}
} // namespace
void DataPrepareActor::Init() {
MS_EXCEPTION_IF_NULL(graph_compiler_info_);
strategy_ = graph_compiler_info_->strategy_;
if (graph_compiler_info_->graphs_.size() != graph_compiler_info_->device_contexts_.size()) {
MS_LOG(EXCEPTION) << "The number of graphs is not equal to the number of device contexts.";
}

for (auto &iter : continuous_memory_nodes_) {
size_t total_size = 0;
std::vector<size_t> size_list;
std::vector<DeviceTensorPtr> addr_list;
// Inputs need continuous memory.
if (iter.second.first == true) {
FetchContinuousMemoryInfo(iter.first.first, &addr_list, &size_list, &total_size, true);
(void)continuous_memory_alloc_list_list_.emplace_back(addr_list);
(void)size_list_list_.emplace_back(size_list);
(void)total_size_list_.emplace_back(total_size);
(void)continuous_memory_device_contexts_.emplace_back(iter.first.second);
}

// Outputs need continuous memory.
if (iter.second.second == true) {
FetchContinuousMemoryInfo(iter.first.first, &addr_list, &size_list, &total_size, false);
(void)continuous_memory_alloc_list_list_.emplace_back(addr_list);
(void)size_list_list_.emplace_back(size_list);
(void)total_size_list_.emplace_back(total_size);
(void)continuous_memory_device_contexts_.emplace_back(iter.first.second);
}
}
}

void DataPrepareActor::PrepareData(const std::vector<std::vector<TensorPtr>> &input_tensors,
OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);

if (first_running_) {
PrepareDataForDeviceTensorStore(input_tensors, context);
// The step execution mode has no concept of first running.
first_running_ = (strategy_ == GraphExecutionStrategy::kStep) ? true : false;
}

if (strategy_ == GraphExecutionStrategy::kPipeline) {
PrepareDataForHostTensorQueue(input_tensors, context);
} else if (strategy_ == GraphExecutionStrategy::kStep) {
PrepareDataForStepMode(input_tensors, context);
}

// Allocate continuous memory and send output to trigger the step running.
if (continuous_memory_alloc_list_list_.size() > 0) {
SendMemoryAllocReq(context);
} else {
SendOutput(context);
}
}

void DataPrepareActor::SendMemoryAllocReq(OpContext<DeviceTensor> *const context) {
// Allocate continuous memory in the begin of the step running.
Async(memory_manager_aid_, &MemoryManagerActor::AllocateContinuousMemory, &continuous_memory_alloc_list_list_,
&size_list_list_, &total_size_list_, &continuous_memory_device_contexts_, context, GetAID());
}

void DataPrepareActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
SendOutput(context);
}

void DataPrepareActor::SendOutput(OpContext<DeviceTensor> *const context) {
for (auto &data_source_aid : data_source_aids_) {
Async(data_source_aid, &DataSourceActor::FetchData, context);
}

auto source_aid = const_cast<AID *>(&GetAID());
for (auto &kernel_aid : no_input_kernel_aids_) {
Async(kernel_aid, &KernelActor::RunOpControl, source_aid, context);
}
}

void DataPrepareActor::PrepareDataForDeviceTensorStore(const std::vector<std::vector<TensorPtr>> &input_tensors,
OpContext<DeviceTensor> *const context) {
for (size_t i = 0; i < graph_compiler_info_->graphs_.size(); ++i) {
const auto &graph = graph_compiler_info_->graphs_[i];
const auto &device_context = graph_compiler_info_->device_contexts_[i];
MS_EXCEPTION_IF_NULL(graph);
// Prepare the data of device tensor store(value nodes of graph).
for (const auto &value_node : graph->graph_value_nodes()) {
if (AnfAlgo::OutputAddrExist(value_node, 0)) {
PrepareDataForValueNode(value_node, device_context, context);
}
}

// Prepare the data of device tensor store(weights of graph).
const auto &input_nodes = graph->input_nodes();
const auto &tensors = input_tensors[i];
for (size_t j = 0; j < input_nodes.size(); ++j) {
const auto &input_node = input_nodes[j];
const auto &input_tensor = tensors[j];
MS_EXCEPTION_IF_NULL(input_node);
if (!IsPersistentDeviceTensor(input_node)) {
continue;
}
const auto front_node = FetchFrontNodeByBackendNode(input_node, graph);
PrepareDataForWeightNode(input_node, front_node, input_tensor, device_context, context);
}
}

PrepareDeviceTensorStoreForControlNode(graph_compiler_info_->control_node_parser_, input_tensors.back(), context);
}

void DataPrepareActor::PrepareDataForHostTensorQueue(const std::vector<std::vector<TensorPtr>> &input_tensors,
OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
if ((host_data_source_actor_ == nullptr) || (host_tensor_queue_ == nullptr)) {
return;
}

std::vector<TensorPtr> host_tensors;
host_tensors.resize(host_data_source_actor_->data_nodes().size());
// Fill host tensors.
for (size_t i = 0; i < graph_compiler_info_->graphs_.size(); ++i) {
const auto &graph = graph_compiler_info_->graphs_[i];
MS_EXCEPTION_IF_NULL(graph);

const auto &input_nodes = graph->input_nodes();
const auto &tensors = input_tensors[i];
for (size_t j = 0; j < input_nodes.size(); ++j) {
const auto &input_node = input_nodes[j];
const auto &input_tensor = tensors[j];
MS_EXCEPTION_IF_NULL(input_node);
if (!IsHostQueueDSActor(input_node, graph, graph_compiler_info_->origin_parameters_order_, strategy_)) {
continue;
}
auto tensor_position = host_data_source_actor_->FetchNodePosition(input_node);
if (tensor_position >= host_tensors.size()) {
std::string error_info = "The position of tensor is out of range: " + std::to_string(tensor_position);
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
}
host_tensors[tensor_position] = input_tensor;

auto tensor_address = std::dynamic_pointer_cast<DeviceTensor>(input_tensor->device_address());
auto device_address = AnfAlgo::GetMutableOutputAddr(input_node, 0, false);
MS_EXCEPTION_IF_NULL(device_address);
if ((tensor_address != nullptr) && (tensor_address->DeviceType() == device_address->DeviceType())) {
AnfAlgo::SetOutputAddr(tensor_address, 0, input_node.get());
}
}
}

PrepareHostTensorQueueForControlNode(input_tensors.back(), &host_tensors, context);

host_tensor_queue_->Push(host_tensors);
}

void DataPrepareActor::PrepareDataForStepMode(const std::vector<std::vector<TensorPtr>> &input_tensors,
OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
std::vector<TensorPtr> host_tensors;
if ((host_data_source_actor_ != nullptr) && (host_tensor_queue_ != nullptr)) {
host_tensors.resize(host_data_source_actor_->data_nodes().size());
}

for (size_t i = 0; i < graph_compiler_info_->graphs_.size(); ++i) {
const auto &graph = graph_compiler_info_->graphs_[i];
const auto &device_context = graph_compiler_info_->device_contexts_[i];
MS_EXCEPTION_IF_NULL(graph);
MS_EXCEPTION_IF_NULL(device_context);

const auto &input_nodes = graph->input_nodes();
const auto &tensors = input_tensors[i];
for (size_t j = 0; j < input_nodes.size(); ++j) {
const auto &input_node = input_nodes[j];
const auto &input_tensor = tensors[j];
MS_EXCEPTION_IF_NULL(input_node);
MS_EXCEPTION_IF_NULL(input_tensor);
if (IsPersistentDeviceTensor(input_node)) {
continue;
}

if ((host_data_source_actor_ != nullptr) && (host_tensor_queue_ != nullptr)) {
auto tensor_position = host_data_source_actor_->FetchNodePosition(input_node);
if (tensor_position >= host_tensors.size()) {
std::string error_info = "The position of tensor is out of range: " + std::to_string(tensor_position);
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
}
host_tensors[tensor_position] = input_tensor;
}

auto host_tensor_address = std::dynamic_pointer_cast<DeviceTensor>(input_tensor->device_address());
if (host_tensor_address != nullptr) {
AnfAlgo::SetOutputAddr(host_tensor_address, 0, input_node.get());
continue;
}

if (!AnfAlgo::OutputAddrExist(input_node, 0, false)) {
TypeId output_type_id = AnfAlgo::GetOutputDeviceDataType(input_node, 0);
if (output_type_id == kTypeUnknown) {
output_type_id = AnfAlgo::GetOutputInferDataType(input_node, 0);
}
size_t tensor_size = AnfAlgo::GetOutputTensorMemSize(input_node, 0);
auto device_address = device_context->CreateDeviceAddress(
nullptr, tensor_size, AnfAlgo::GetOutputFormat(input_node, 0), output_type_id);
AnfAlgo::SetOutputAddr(device_address, 0, input_node.get());
}
auto device_tensor = AnfAlgo::GetMutableOutputAddr(input_node, 0, false);
input_tensor->set_device_address(device_tensor);
UpdateRefCount(device_tensor.get(), true);

SyncTensorData(input_tensor, device_tensor, input_node, device_context, context, strategy_);
}
}

if ((host_data_source_actor_ != nullptr) && (host_tensor_queue_ != nullptr)) {
host_tensor_queue_->Push(host_tensors);
}
}

// The branch processing of PrepareDataForValueNode that value type is tensor.
void DataPrepareActor::PrepareDataForValueNodeTensor(const ValueNodePtr &node, const ValuePtr &node_value,
const DeviceContext *device_context,
OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(node);
MS_EXCEPTION_IF_NULL(node_value);
MS_EXCEPTION_IF_NULL(device_context);
MS_EXCEPTION_IF_NULL(context);

std::vector<TensorPtr> tensors;
TensorValueToTensor(node_value, &tensors);
for (size_t i = 0; i < tensors.size(); i++) {
const auto &tensor = tensors[i];
if (tensor == nullptr) {
MS_LOG(WARNING) << "Tensor is null";
return;
}

const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, i, false);
MS_EXCEPTION_IF_NULL(device_tensor);
// If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared.
if (device_tensor->GetPtr() != nullptr) {
return;
}
MS_LOG(INFO) << "Prepare device data for value node: " << node->fullname_with_scope() << ", output index: " << i;
tensor->set_device_address(device_tensor);
UpdateRefCount(device_tensor.get(), true);

SyncTensorData(tensor, device_tensor, node, device_context, context, strategy_);
}
}

// Prepare the device data for persistent device tensor of value node.
void DataPrepareActor::PrepareDataForValueNode(const ValueNodePtr &node, const DeviceContext *device_context,
OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(node);
MS_EXCEPTION_IF_NULL(device_context);
MS_EXCEPTION_IF_NULL(context);
auto &node_value = node->value();
MS_EXCEPTION_IF_NULL(node_value);

if (node_value->isa<tensor::Tensor>() || node_value->isa<ValueTuple>()) {
// The branch processing that value type is tensor.
PrepareDataForValueNodeTensor(node, node_value, device_context, context);
} else if (node_value->isa<StringImm>()) {
const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, 0, false);
MS_EXCEPTION_IF_NULL(device_tensor);
// If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared.
if (device_tensor->GetPtr() != nullptr) {
return;
}
MS_LOG(INFO) << "Prepare device data for value node: " << node->fullname_with_scope();

if (!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize())) {
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(strategy_, (*context), device_context, node->fullname_with_scope(),
device_tensor->GetSize());
}

// Copy data from value to device.
auto value = GetValue<std::string>(node_value);
size_t tensor_size = value.size();
ShapeVector shape = {1, SizeToLong(tensor_size)};
if (!device_tensor->SyncHostToDevice(shape, tensor_size, kNumberTypeUInt8, value.data())) {
std::string error_info = "SyncHostToDevice failed, node name: " + node->fullname_with_scope();
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
}
}
}

// Prepare the device data for persistent device tensor of weight node from host tensor.
void DataPrepareActor::PrepareDataForWeightNode(const AnfNodePtr &backend_node, const AnfNodePtr &front_node,
const TensorPtr &tensor, const DeviceContext *device_context,
OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(backend_node);
MS_EXCEPTION_IF_NULL(front_node);
MS_EXCEPTION_IF_NULL(device_context);
MS_EXCEPTION_IF_NULL(context);
if (tensor == nullptr) {
return;
}

auto device_tensor = AnfAlgo::GetMutableOutputAddr(backend_node, 0, false);
auto host_tensor_address = std::dynamic_pointer_cast<DeviceTensor>(tensor->device_address());
// Use the device address of host tensor to set device tensor.
if (host_tensor_address != device_tensor) {
if (host_tensor_address == nullptr) {
MS_EXCEPTION_IF_NULL(device_tensor);
host_tensor_address = device_context->CreateDeviceAddress(nullptr, device_tensor->GetSize(),
device_tensor->format(), device_tensor->type_id());
tensor->set_device_address(host_tensor_address);
UpdateRefCount(host_tensor_address.get(), true);
}
MS_EXCEPTION_IF_NULL(host_tensor_address);
DeviceTensorStore::GetInstance().Insert(front_node.get(), host_tensor_address);
if (host_tensor_address->DeviceType() == device_tensor->DeviceType()) {
AnfAlgo::SetOutputAddr(host_tensor_address, 0, backend_node.get());
} else {
MS_LOG(INFO) << "The device type is not equal, host tensor type:" << host_tensor_address->DeviceType()
<< ", device tensor type:" << device_tensor->DeviceType();
}
}

// If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared.
if (host_tensor_address->GetPtr() == nullptr) {
MS_LOG(INFO) << "Prepare device data for weight node:" << backend_node->fullname_with_scope()
<< ", device type:" << host_tensor_address->DeviceType();
SyncTensorData(tensor, host_tensor_address, backend_node, device_context, context, strategy_);
}

// Allocate another device memory and copy data from host tensor to another device(if exist).
const auto &device_tensors = DeviceTensorStore::GetInstance().Fetch(front_node.get());
if (device_tensors.size() > 1) {
auto another_device_tensor = (device_tensors[0] == host_tensor_address) ? device_tensors[1] : device_tensors[0];
MS_EXCEPTION_IF_NULL(another_device_tensor);
auto another_device_type = another_device_tensor->DeviceType();
const auto &another_device_context = device::DeviceContextManager::GetInstance().GetOrCreateDeviceContext(
{device::kDeviceTypeToName.at(another_device_type), device_context->device_context_key().device_id_});
MS_EXCEPTION_IF_NULL(another_device_context);
if ((another_device_tensor->GetPtr() == nullptr) &&
(!another_device_context->AllocateMemory(another_device_tensor.get(), another_device_tensor->GetSize()))) {
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(strategy_, (*context), another_device_context,
backend_node->fullname_with_scope(),
another_device_tensor->GetSize());
}

MS_LOG(INFO) << "Prepare device data for weight node:" << backend_node->fullname_with_scope()
<< ", device type:" << another_device_type;
if (!Copy(another_device_tensor.get(), host_tensor_address.get())) {
std::string error_info = "Sync data error.";
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
}
}
}

// In control flow, all weight nodes associated with the host weight parameter need to use the same device tensor.
void DataPrepareActor::PrepareDataForControlWeightNode(
const AnfNodePtr &node, const AnfNodePtr &front_node, const TensorPtr &tensor, const DeviceContext *device_context,
const std::unordered_map<AnfNodePtr, std::vector<AnfNodePtr>> &host_parameter_to_weights,
OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(node);
MS_EXCEPTION_IF_NULL(front_node);
MS_EXCEPTION_IF_NULL(tensor);
MS_EXCEPTION_IF_NULL(device_context);

auto device_tensors = DeviceTensorStore::GetInstance().Fetch(front_node.get());
bool need_update_device_tensor_store = (device_tensors.size() == 0) ? true : false;
for (auto &device_tensor : device_tensors) {
if (device_tensor->GetPtr() == nullptr) {
need_update_device_tensor_store = true;
break;
}
}
if (need_update_device_tensor_store) {
PrepareDataForWeightNode(node, front_node, tensor, device_context, context);
}

const auto iter = host_parameter_to_weights.find(front_node);
if (iter == host_parameter_to_weights.end()) {
return;
}

// Fetch all the device tensors of host weight node and insert as the weight of other nodes.
const auto &sub_front_nodes = host_parameter_to_weights.at(front_node);
device_tensors = DeviceTensorStore::GetInstance().Fetch(front_node.get());
for (const auto &sub_front_node : sub_front_nodes) {
for (const auto &device_tensor : device_tensors) {
MS_EXCEPTION_IF_NULL(sub_front_node);
DeviceTensorStore::GetInstance().Insert(sub_front_node.get(), device_tensor);
}
}
}

void DataPrepareActor::PrepareDeviceTensorStoreForControlNode(const ControlNodeParserPtr &control_node_parser,
const std::vector<TensorPtr> &tensors,
OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(control_node_parser);
for (const auto &value_node_with_context : control_node_parser->front_value_nodes()) {
if (AnfAlgo::OutputAddrExist(value_node_with_context.first, 0)) {
PrepareDataForValueNode(value_node_with_context.first->cast<ValueNodePtr>(), value_node_with_context.second,
context);
}
}

const auto &control_node_parameters = control_node_parser->control_node_parameters();
for (size_t i = 0; i < control_node_parameters.size(); ++i) {
const auto &input_node = control_node_parameters[i];
const auto &input_tensor = tensors[i];
MS_EXCEPTION_IF_NULL(input_node);
if (IsPersistentDeviceTensor(input_node)) {
const auto &front_to_backend_parameters = control_node_parser->front_to_backend_parameters();
const auto &iter = front_to_backend_parameters.find(input_node);
if (iter == front_to_backend_parameters.end()) {
MS_LOG(EXCEPTION) << "Cannot find backend node for weight parameter:"
<< AnfAlgo::GetNodeDebugString(input_node);
}
const auto &node_with_context = iter->second;
PrepareDataForControlWeightNode(node_with_context.first, input_node, input_tensor, node_with_context.second,
control_node_parser->host_parameter_to_weights(), context);
}
}
}

void DataPrepareActor::PrepareHostTensorQueueForControlNode(const std::vector<TensorPtr> &tensors,
std::vector<TensorPtr> *const host_tensors,
OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(graph_compiler_info_->control_node_parser_);
MS_EXCEPTION_IF_NULL(host_data_source_actor_);
MS_EXCEPTION_IF_NULL(host_tensors);

const auto &control_node_parameters = graph_compiler_info_->control_node_parser_->control_node_parameters();
for (size_t i = 0; i < control_node_parameters.size(); ++i) {
const auto &input_node = control_node_parameters[i];
const auto &input_tensor = tensors[i];
MS_EXCEPTION_IF_NULL(input_node);
if (IsPersistentDeviceTensor(input_node)) {
continue;
}

if (find(graph_compiler_info_->origin_parameters_order_.begin(),
graph_compiler_info_->origin_parameters_order_.end(),
input_node) == graph_compiler_info_->origin_parameters_order_.end()) {
continue;
}

auto tensor_position = host_data_source_actor_->FetchNodePosition(input_node);
if (tensor_position >= host_tensors->size()) {
std::string error_info = "The position of tensor is out of range: " + std::to_string(tensor_position);
SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
}
(*host_tensors)[tensor_position] = input_tensor;

const AnfNodePtr &backend_node = host_data_source_actor_->FetchNode(tensor_position);
auto tensor_address = std::dynamic_pointer_cast<DeviceTensor>(input_tensor->device_address());
auto device_address = AnfAlgo::GetMutableOutputAddr(backend_node, 0, false);
MS_EXCEPTION_IF_NULL(device_address);
if ((tensor_address != nullptr) && (tensor_address->DeviceType() == device_address->DeviceType())) {
AnfAlgo::SetOutputAddr(tensor_address, 0, backend_node.get());
}
}
}
} // namespace runtime
} // namespace mindspore

+ 121
- 0
mindspore/ccsrc/runtime/framework/actor/data_prepare_actor.h View File

@@ -0,0 +1,121 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_DATA_PREPARE_ACTOR_H_
#define MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_DATA_PREPARE_ACTOR_H_

#include <vector>
#include <string>
#include <memory>
#include <utility>
#include <unordered_map>
#include <map>
#include "runtime/framework/graph_compiler.h"
#include "runtime/framework/actor/actor_common.h"
#include "runtime/framework/actor/data_source_actor.h"
#include "runtime/framework/actor/memory_aware_actor.h"
#include "runtime/framework/device_tensor_store.h"
#include "runtime/hardware/device_context.h"

namespace mindspore {
namespace runtime {
using mindspore::device::DeviceContext;

// The data prepare actor is used to prepare data for device tensor store and host tensor queue to represent the begin
// of one step.
class DataPrepareActor : public MemoryAwareActor {
public:
DataPrepareActor(const std::string &name, const AID &memory_manager_aid, const GraphCompilerInfo *graph_compiler_info,
const HostQueueDSActorPtr &host_data_source_actor, const HostTensorQueuePtr &host_tensor_queue)
: MemoryAwareActor(name, KernelTransformType::kDataPrepareActor, nullptr, memory_manager_aid),
graph_compiler_info_(graph_compiler_info),
strategy_(GraphExecutionStrategy::kPipeline),
host_data_source_actor_(host_data_source_actor),
host_tensor_queue_(host_tensor_queue),
first_running_(true) {}
~DataPrepareActor() override = default;

void Init() override;

// The process entry of data prepare.
void PrepareData(const std::vector<std::vector<TensorPtr>> &input_tensors, OpContext<DeviceTensor> *const context);

// The continuous memory related operation interface.
void SendMemoryAllocReq(OpContext<DeviceTensor> *const context) override;
void OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) override;

private:
friend class GraphScheduler;

// Send output controls when finish data prepare.
void SendOutput(OpContext<DeviceTensor> *const context);

void PrepareDataForDeviceTensorStore(const std::vector<std::vector<TensorPtr>> &input_tensors,
OpContext<DeviceTensor> *const context);
void PrepareDataForHostTensorQueue(const std::vector<std::vector<TensorPtr>> &input_tensors,
OpContext<DeviceTensor> *const context);
void PrepareDataForStepMode(const std::vector<std::vector<TensorPtr>> &input_tensors,
OpContext<DeviceTensor> *const context);

// Prepare the device data for persistent device tensor of weight node from host tensor.
void PrepareDataForWeightNode(const AnfNodePtr &backend_node, const AnfNodePtr &front_node, const TensorPtr &tensor,
const DeviceContext *device_context, OpContext<DeviceTensor> *const context);
// Prepare the device data for persistent device tensor of value node.
void PrepareDataForValueNode(const ValueNodePtr &node, const DeviceContext *device_context,
OpContext<DeviceTensor> *const context);
// The branch processing of PrepareDataForValueNode that value type is tensor.
void PrepareDataForValueNodeTensor(const ValueNodePtr &node, const ValuePtr &node_value,
const DeviceContext *device_context, OpContext<DeviceTensor> *const context);

// The data prepare in the control flow scene.
void PrepareDeviceTensorStoreForControlNode(const ControlNodeParserPtr &control_node_parser,
const std::vector<TensorPtr> &tensors,
OpContext<DeviceTensor> *const context);
void PrepareHostTensorQueueForControlNode(const std::vector<TensorPtr> &tensors,
std::vector<TensorPtr> *const host_tensors,
OpContext<DeviceTensor> *const context);
// In control flow, all weight nodes associated with the host weight parameter need to use the same device tensor.
void PrepareDataForControlWeightNode(
const AnfNodePtr &node, const AnfNodePtr &front_node, const TensorPtr &tensor, const DeviceContext *device_context,
const std::unordered_map<AnfNodePtr, std::vector<AnfNodePtr>> &host_parameter_to_weights,
OpContext<DeviceTensor> *const context);

const GraphCompilerInfo *graph_compiler_info_;
GraphExecutionStrategy strategy_;
HostQueueDSActorPtr host_data_source_actor_;
HostTensorQueuePtr host_tensor_queue_;
bool first_running_;

// The output controls contain the data source actors and the no input kernel actors.
std::vector<AID> data_source_aids_;
std::vector<AID> no_input_kernel_aids_;

// The nodes need continuous memory, which must allocate in the begin of step running. The first bool of pair
// expresses the inputs of node need continuous memory, the second bool of pair expresses the outputs of node need
// continuous memory.
std::map<std::pair<CNodePtr, DeviceContext *>, std::pair<bool, bool>> continuous_memory_nodes_;
// The members for continuous memory alloc fetched by continuous_memory_nodes_.
std::vector<std::vector<DeviceTensorPtr>> continuous_memory_alloc_list_list_;
std::vector<std::vector<size_t>> size_list_list_;
std::vector<size_t> total_size_list_;
std::vector<const DeviceContext *> continuous_memory_device_contexts_;
}; // namespace runtime

using DataPrepareActorPtr = std::shared_ptr<DataPrepareActor>;
} // namespace runtime
} // namespace mindspore

#endif // MINDSPORE_CCSRC_RUNTIME_FRAMEWORK_ACTOR_DATA_PREPARE_ACTOR_H_

+ 7
- 0
mindspore/ccsrc/runtime/framework/actor/data_source_actor.cc View File

@@ -322,6 +322,13 @@ size_t HostQueueDataSourceActor::FetchNodePosition(const AnfNodePtr &data_node)
return iter->second;
}

AnfNodePtr HostQueueDataSourceActor::FetchNode(size_t node_position) const {
if (node_position >= data_nodes_.size()) {
MS_LOG(EXCEPTION) << "The position of node is out of range: " << node_position;
}
return data_nodes_[node_position];
}

bool HostQueueDataSourceActor::IsSameDeviceType() const {
for (size_t i = 1; i < device_contexts_.size(); i++) {
if (device_contexts_[i] != device_contexts_[0]) {


+ 2
- 0
mindspore/ccsrc/runtime/framework/actor/data_source_actor.h View File

@@ -131,6 +131,8 @@ class HostQueueDataSourceActor : public DataSourceActor {
void OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) override;

size_t FetchNodePosition(const AnfNodePtr &node) const override;
AnfNodePtr FetchNode(size_t node_position) const;
std::vector<AnfNodePtr> &data_nodes() { return data_nodes_; }

protected:
void FillDataBuffer() override;


+ 3
- 5
mindspore/ccsrc/runtime/framework/actor/kernel_actor.cc View File

@@ -277,11 +277,9 @@ void KernelActor::CopyInputDeviceTensor(const OpData<DeviceTensor> *input_data,
if (copy_input_device_tensors_[input_data->index_]->GetPtr() == nullptr) {
if (!device_contexts_[0]->AllocateMemory(copy_input_device_tensors_[input_data->index_].get(),
copy_input_device_tensors_[input_data->index_]->GetSize())) {
std::string error_info =
"Device(id:" + std::to_string(device_contexts_[0]->device_context_key().device_id_) +
") memory isn't enough and alloc failed, actor name: " + GetAID().Name() +
", alloc size: " + std::to_string(copy_input_device_tensors_[input_data->index_]->GetSize());
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kPipeline, (*context), device_contexts_[0],
GetAID().Name(),
copy_input_device_tensors_[input_data->index_]->GetSize());
}
}



+ 18
- 91
mindspore/ccsrc/runtime/framework/actor/loop_count_actor.cc View File

@@ -15,8 +15,7 @@
*/

#include "runtime/framework/actor/loop_count_actor.h"
#include "runtime/framework/actor/data_source_actor.h"
#include "runtime/framework/actor/kernel_actor.h"
#include "runtime/framework/actor/data_prepare_actor.h"
#include "runtime/framework/actor/output_actor.h"
#include "runtime/framework/actor/memory_manager_actor.h"
#include "runtime/framework/actor/recorder_actor.h"
@@ -26,78 +25,20 @@

namespace mindspore {
namespace runtime {
namespace {
void FetchContinuousMemoryInfo(const CNodePtr &node, std::vector<DeviceTensorPtr> *const addr_list,
std::vector<size_t> *const size_list, size_t *const total_size, bool is_input) {
MS_EXCEPTION_IF_NULL(node);
const auto &kernel_mod = AnfAlgo::GetKernelMod(node);
MS_EXCEPTION_IF_NULL(kernel_mod);
(*addr_list).clear();
(*size_list).clear();
*total_size = 0;

if (is_input) {
const auto &intput_sizes = kernel_mod->GetInputSizeList();
for (size_t i = 0; i < intput_sizes.size(); ++i) {
const auto &device_tensor = AnfAlgo::GetPrevNodeMutableOutputAddr(node, i, false);
MS_EXCEPTION_IF_NULL(device_tensor);
*total_size += intput_sizes[i];
(void)size_list->emplace_back(intput_sizes[i]);
(void)addr_list->emplace_back(device_tensor);
}
} else {
const auto &output_sizes = kernel_mod->GetOutputSizeList();
for (size_t i = 0; i < output_sizes.size(); ++i) {
const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, i, false);
MS_EXCEPTION_IF_NULL(device_tensor);
*total_size += output_sizes[i];
(void)size_list->emplace_back(output_sizes[i]);
(void)addr_list->emplace_back(device_tensor);
}
}
}
} // namespace
void LoopCountActor::Init() {
for (auto &iter : continuous_memory_nodes_) {
size_t total_size = 0;
std::vector<size_t> size_list;
std::vector<DeviceTensorPtr> addr_list;
// Inputs need continuous memory.
if (iter.second.first == true) {
FetchContinuousMemoryInfo(iter.first.first, &addr_list, &size_list, &total_size, true);
(void)continuous_memory_alloc_list_list_.emplace_back(addr_list);
(void)size_list_list_.emplace_back(size_list);
(void)total_size_list_.emplace_back(total_size);
(void)device_contexts_.emplace_back(iter.first.second);
}

// Outputs need continuous memory.
if (iter.second.second == true) {
FetchContinuousMemoryInfo(iter.first.first, &addr_list, &size_list, &total_size, false);
(void)continuous_memory_alloc_list_list_.emplace_back(addr_list);
(void)size_list_list_.emplace_back(size_list);
(void)total_size_list_.emplace_back(total_size);
(void)device_contexts_.emplace_back(iter.first.second);
}
}
}

void LoopCountActor::RunOpControl(AID *const input_control, OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
auto sequential_num = context->sequential_num_;
(void)input_op_controls_[sequential_num].emplace_back(input_control);
if (CheckRunningCondition(context)) {
IncreaseLoopCount(context);
// Need wait MemoryManagerActor running finished to avoid the illegal memory timing problem before
// LoopCountActor exits, because other processors which are not in actor also will process device tensor.
Async(memory_manager_aid_, &MemoryManagerActor::Wait, context, GetAID());
}
}

void LoopCountActor::SendDebugReq(OpContext<DeviceTensor> *const context) {
Async(*debug_aid_, &DebugActor::DebugOnStepEnd, context, &GetAID());
}

void LoopCountActor::OnDebugFinish(OpContext<DeviceTensor> *const context) {
void LoopCountActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
SendOutput(context);
IncreaseLoopCount(context);
}

void LoopCountActor::IncreaseLoopCount(OpContext<DeviceTensor> *const context) {
@@ -118,30 +59,21 @@ void LoopCountActor::IncreaseLoopCount(OpContext<DeviceTensor> *const context) {
SendOutput(context);
}

void LoopCountActor::SendDebugReq(OpContext<DeviceTensor> *const context) {
Async(*debug_aid_, &DebugActor::DebugOnStepEnd, context, &GetAID());
}

void LoopCountActor::OnDebugFinish(OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
SendOutput(context);
}

void LoopCountActor::SendOutput(OpContext<DeviceTensor> *const context) {
// Send recorder info.
if (recorder_aid_ != nullptr) {
Async(*recorder_aid_, &RecorderActor::RecordOnStepEnd, context);
}
SendMemoryAllocReq(context);
}

void LoopCountActor::SendMemoryAllocReq(OpContext<DeviceTensor> *const context) {
if (current_count_ == loop_count_) {
// Need wait MemoryManagerActor running finished to avoid the illegal memory timing problem before
// LoopCountActor exits, because other processors which are not in actor also will allocate or free memory.
Async(memory_manager_aid_, &MemoryManagerActor::Wait, context, GetAID());
} else if (continuous_memory_alloc_list_list_.size() > 0) {
// Allocate continuous memory in the begin of next step running.
Async(memory_manager_aid_, &MemoryManagerActor::AllocateContinuousMemory, &continuous_memory_alloc_list_list_,
&size_list_list_, &total_size_list_, &device_contexts_, context, GetAID());
} else {
OnMemoryAllocFinish(context);
}
}

void LoopCountActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
MS_EXCEPTION_IF_NULL(context);
// Send loop count to output actor.
Async(output_aid_, &OutputActor::CollectLoopCount, current_count_, context);

@@ -151,14 +83,9 @@ void LoopCountActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context)
return;
}

// Send output control to trigger next step running.
for (auto &data_source_aid : data_source_aids_) {
Async(data_source_aid, &DataSourceActor::FetchData, context);
}
auto source_aid = const_cast<AID *>(&GetAID());
for (auto &kernel_aid : no_input_kernel_aids_) {
Async(kernel_aid, &KernelActor::RunOpControl, source_aid, context);
}
// Send to DataPrepareActor to trigger next step running.
std::vector<std::vector<TensorPtr>> input_tensors;
Async(data_prepare_aid_, &DataPrepareActor::PrepareData, input_tensors, context);
}
} // namespace runtime
} // namespace mindspore

+ 3
- 17
mindspore/ccsrc/runtime/framework/actor/loop_count_actor.h View File

@@ -43,14 +43,10 @@ class LoopCountActor : public DebugAwareActor {

~LoopCountActor() override = default;

void Init() override;

// The loop count actor run when receive the input control.
void RunOpControl(AID *const input_control, OpContext<DeviceTensor> *const context) override;

// The memory related operation interface.
void SendMemoryAllocReq(OpContext<DeviceTensor> *const context) override;
// The callback after memory alloc finished.
// The callback waits for the memory manager actor to finish all the message processing.
void OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) override;

// The debug related operation interface.
@@ -70,19 +66,9 @@ class LoopCountActor : public DebugAwareActor {
// The total running count represents the toal step running count.
size_t total_running_count_;

// The output controls contain the data source actors and the no input kernel actors and output actor.
std::vector<AID> data_source_aids_;
std::vector<AID> no_input_kernel_aids_;
// The output controls contain the data prepare actor and output actor.
AID data_prepare_aid_;
AID output_aid_;

// The nodes need continuous memory, which must allocate in the begin of step running. The first bool of pair
// expresses the inputs of node need continuous memory, the second bool of pair expresses the outputs of node need
// continuous memory.
std::map<std::pair<CNodePtr, DeviceContext *>, std::pair<bool, bool>> continuous_memory_nodes_;
// The members for continuous memory alloc fetched by continuous_memory_nodes_.
std::vector<std::vector<DeviceTensorPtr>> continuous_memory_alloc_list_list_;
std::vector<std::vector<size_t>> size_list_list_;
std::vector<size_t> total_size_list_;
};

using LoopCountActorPtr = std::shared_ptr<LoopCountActor>;


+ 6
- 12
mindspore/ccsrc/runtime/framework/actor/memory_manager_actor.cc View File

@@ -36,10 +36,8 @@ void MemoryManagerActor::AllocateMemory(const std::vector<DeviceTensor *> *alloc
}
// Allocate memory through the device context.
if (!device_context->AllocateMemory(device_tensor, device_tensor->GetSize())) {
std::string error_info = "Device(id:" + std::to_string(device_context->device_context_key().device_id_) +
") memory isn't enough and alloc failed, actor name: " + from_aid.Name() +
", alloc size: " + std::to_string(device_tensor->GetSize());
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*op_context), error_info);
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kPipeline, (*op_context), device_context,
from_aid.Name(), device_tensor->GetSize());
}
}

@@ -71,10 +69,8 @@ void MemoryManagerActor::AllocateContinuousMemory(const std::vector<std::vector<
auto &device_context = (*device_contexts)[i];
// Allocate memory through the device context.
if (!device_context->AllocateContinuousMemory(alloc_list, total_size, size_list)) {
std::string error_info = "Device(id:" + std::to_string(device_context->device_context_key().device_id_) +
") memory isn't enough and alloc failed, actor name: " + from_aid.Name() +
", alloc size: " + std::to_string(total_size);
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*op_context), error_info);
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kPipeline, (*op_context), device_context,
from_aid.Name(), total_size);
}
}

@@ -104,10 +100,8 @@ void MemoryManagerActor::AllocateBatchMemory(const std::vector<DeviceTensor *> *

// Allocate memory through the device context.
if (!device_context->AllocateMemory(device_tensor, device_tensor->GetSize())) {
std::string error_info = "Device(id:" + std::to_string(device_context->device_context_key().device_id_) +
") memory isn't enough and alloc failed, actor name: " + from_aid.Name() +
", alloc size: " + std::to_string(device_tensor->GetSize());
SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*op_context), error_info);
SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kPipeline, (*op_context), device_context,
from_aid.Name(), device_tensor->GetSize());
}
}



+ 4
- 1
mindspore/ccsrc/runtime/framework/control_node_parser.h View File

@@ -90,7 +90,10 @@ class ControlNodeParser {
void Parse(const std::vector<AnfNodePtr> &control_nodes, const std::vector<KernelGraphPtr> &graphs,
const std::vector<DeviceContext *> &device_contexts, const FuncGraphPtr &root_graph);

std::vector<AnfNodePtr> GetControlNodeParameter() { return control_node_parameters_; }
std::vector<AnfNodePtr> &control_node_parameters() { return control_node_parameters_; }
FrontToBackendNodeWithContext &front_to_backend_parameters() { return front_to_backend_parameters_; }
HostParameterToWeight &host_parameter_to_weights() { return host_parameter_to_weights_; }
NodeWithDeviceContext &front_value_nodes() { return front_value_nodes_; }

// Get the output of funcgraph, usually there is only one output node, In the control flow, there are
// multiple branch outputs, there will be multiple output nodes.


+ 3
- 3
mindspore/ccsrc/runtime/framework/device_tensor_store.h View File

@@ -81,9 +81,9 @@ class DeviceTensorStore {
MS_EXCEPTION_IF_NULL(key);
const auto &iter = device_tensors_.find(key);
if (iter != device_tensors_.end()) {
for (const auto &devcie_tensor : iter->second) {
if (devcie_tensor->DeviceType() == value_type) {
return devcie_tensor.get();
for (const auto &device_tensor : iter->second) {
if (device_tensor->DeviceType() == value_type) {
return device_tensor.get();
}
}
}


+ 3
- 0
mindspore/ccsrc/runtime/framework/graph_compiler.cc View File

@@ -18,6 +18,7 @@
#include <numeric>
#include <map>
#include <utility>
#include "runtime/framework/graph_scheduler.h"
#include "runtime/device/device_address.h"
#include "common/trans.h"
#include "utils/convert_utils.h"
@@ -290,6 +291,8 @@ void UpdateRefCountForGraphOutput(const std::vector<KernelWithIndex> &output_wit
}
} // namespace

GraphCompilerInfo::~GraphCompilerInfo() { GraphScheduler::GetInstance().Clear(name_, graphs_); }

GraphId GraphCompiler::CompileGraph(const AnfNodePtrList &nodes, const AnfNodePtrList &outputs,
const DeviceContext *device_context) {
MS_EXCEPTION_IF_NULL(session_);


+ 51
- 0
mindspore/ccsrc/runtime/framework/graph_compiler.h View File

@@ -24,6 +24,8 @@
#include <map>
#include <set>
#include "runtime/hardware/device_context.h"
#include "runtime/framework/actor/actor_common.h"
#include "runtime/framework/control_node_parser.h"
#include "backend/session/session_basic.h"
#include "backend/session/session_factory.h"
#include "ir/tensor.h"
@@ -39,6 +41,55 @@ using session::OpRunInfo;
using tensor::TensorPtr;

namespace runtime {
// Position of kernel with index, the value pair<branch_id, vector<pos>> means the branch id of the kernel and the pos
// of the kernel. Generally, there is only one branch, and the branch id is 0 at this time. In control flow, there are
// multiple branch scenarios, and pos represents the position of the kernel in the branch.
using KernelMapPosition = std::map<KernelWithIndex, std::vector<size_t>, session::KernelWithIndexCmp>;

// The graph compiler info generated by graph compiler is the express of executable graph.
// The device context is unified interface of interaction with device of corresponding graph.
// The tensors mask is used to distinguish input tensor's type.
// The input tensor is used to link graphs in the dynamic build scenario.
// The control node is used to link graphs in the control flow scenario.
// The control node parser is used to parse the edge info in control nodes.
// The origin parameters order is used to correspond to the input args.
// The origin outputs order is used to correspond to the output args.
// The need_erase means need erase this GraphCompilerInfo object after run actor set.
struct GraphCompilerInfo {
GraphCompilerInfo(const std::vector<KernelGraphPtr> &graphs, const std::vector<DeviceContext *> &device_contexts,
const std::vector<std::vector<int64_t> *> &tensors_mask,
const std::vector<std::vector<TensorPtr> *> &input_tensors,
const std::vector<AnfNodePtr> &control_nodes,
const std::vector<AnfNodePtr> &origin_parameters_order, const ControlNodeParserPtr &parser,
const KernelMapPosition &origin_outputs_order, const size_t outputs_num, const std::string &name,
bool need_erase, GraphExecutionStrategy strategy)
: graphs_(graphs),
device_contexts_(device_contexts),
tensors_mask_(tensors_mask),
input_tensors_(input_tensors),
control_nodes_(control_nodes),
control_node_parser_(parser),
origin_parameters_order_(origin_parameters_order),
origin_outputs_order_(origin_outputs_order),
outputs_num_(outputs_num),
name_(name),
need_erase_(need_erase),
strategy_(strategy) {}
~GraphCompilerInfo();
std::vector<KernelGraphPtr> graphs_;
std::vector<DeviceContext *> device_contexts_;
std::vector<std::vector<int64_t> *> tensors_mask_;
std::vector<std::vector<TensorPtr> *> input_tensors_;
std::vector<AnfNodePtr> control_nodes_;
ControlNodeParserPtr control_node_parser_;
std::vector<AnfNodePtr> origin_parameters_order_;
KernelMapPosition origin_outputs_order_;
size_t outputs_num_;
std::string name_;
bool need_erase_;
GraphExecutionStrategy strategy_;
};

class GraphCompiler {
public:
GraphCompiler() { session_ = session::SessionFactory::Get().Create(kSessionBasic); }


+ 150
- 604
mindspore/ccsrc/runtime/framework/graph_scheduler.cc View File

@@ -30,7 +30,6 @@
#if !defined(_WIN32) && !defined(_WIN64)
#include "utils/signal_util.h"
#endif
#include "common/trans.h"
#include "debug/data_dump/dump_json_parser.h"
#ifdef ENABLE_DUMP_IR
#include "debug/rdr/recorder_manager.h"
@@ -55,353 +54,19 @@ bool IsNeedInsertCopyActor(const DeviceContext *from_device_context, const Devic
}
}

void UpdateRefCount(DeviceTensor *const device_tensor, bool is_max_ref_count = false) {
MS_EXCEPTION_IF_NULL(device_tensor);
if (is_max_ref_count) {
device_tensor->set_original_ref_count(SIZE_MAX);
} else {
device_tensor->IncreaseOriginalRefCount();
}
device_tensor->ResetRefCount();
}

// Update the reference count of device tensor by the output index of node.
void UpdateRefCount(const AnfNodePtr &node, size_t output_idx, bool is_max_ref_count = false) {
MS_EXCEPTION_IF_NULL(node);
auto device_tensor = AnfAlgo::GetMutableOutputAddr(node, output_idx, false);
UpdateRefCount(device_tensor.get(), is_max_ref_count);
}

AnfNodePtr FetchFrontNodeByBackendNode(const AnfNodePtr &backend_node, const KernelGraphPtr &graph) {
MS_EXCEPTION_IF_NULL(backend_node);
MS_EXCEPTION_IF_NULL(graph);

// Internal parameter ---> front node.
auto front_node_with_index = graph->GetFrontNodeByInternalParameter(backend_node);
if (front_node_with_index.first != nullptr) {
return front_node_with_index.first;
}

auto front_node = graph->GetFrontAnfByBackendAnf(backend_node);
// PyNative forward graph does not has front node, using backend node instead.
if (front_node == nullptr) {
front_node = backend_node;
}
return front_node;
}

KernelWithIndex FetchFrontNodeWithIndexByGraphOutput(const KernelWithIndex &output_with_index,
const KernelGraphPtr &graph) {
MS_EXCEPTION_IF_NULL(graph);
auto front_node_with_index = graph->GetFrontNodeWithIndexByGraphOutput(output_with_index);
// PyNative forward graph does not has front node, using backend node instead.
if (front_node_with_index.first == nullptr) {
front_node_with_index = output_with_index;
}
return front_node_with_index;
}

// The branch processing of PrepareDataForValueNode that value type is tensor.
void PrepareDataForValueNodeTensor(const ValueNodePtr &node, const ValuePtr &node_value,
const DeviceContext *device_context) {
MS_EXCEPTION_IF_NULL(node);
MS_EXCEPTION_IF_NULL(node_value);
MS_EXCEPTION_IF_NULL(device_context);

std::vector<TensorPtr> tensors;
TensorValueToTensor(node_value, &tensors);

for (size_t i = 0; i < tensors.size(); i++) {
const auto &tensor = tensors[i];
if (tensor == nullptr) {
MS_LOG(WARNING) << "Tensor is null";
return;
}

const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, i, false);
MS_EXCEPTION_IF_NULL(device_tensor);
// If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared.
if (device_tensor->GetPtr() != nullptr) {
return;
}
MS_LOG(INFO) << "Prepare device data for value node: " << node->fullname_with_scope() << ", output index: " << i;
tensor->set_device_address(device_tensor);

// Allocate device memory.
if (!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize())) {
MS_LOG(EXCEPTION) << "Device(id:" << device_context->device_context_key().device_id_
<< ") memory isn't enough and alloc failed, node name: " << node->fullname_with_scope()
<< ", alloc size: " << device_tensor->GetSize();
}

// Copy data from host tensor to device.
if (!device_tensor->SyncHostToDevice(trans::GetRuntimePaddingShape(node, 0), LongToSize(tensor->data().nbytes()),
tensor->data_type(), tensor->data_c(), tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed, node name: " << node->fullname_with_scope();
}
}
}

// Prepare the device data for persistent device tensor of value node.
void PrepareDataForValueNode(const ValueNodePtr &node, const DeviceContext *device_context) {
MS_EXCEPTION_IF_NULL(node);
MS_EXCEPTION_IF_NULL(device_context);
auto &node_value = node->value();
MS_EXCEPTION_IF_NULL(node_value);

if (node_value->isa<tensor::Tensor>() || node_value->isa<ValueTuple>()) {
// The branch processing that value type is tensor.
PrepareDataForValueNodeTensor(node, node_value, device_context);
} else if (node_value->isa<StringImm>()) {
const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, 0, false);
MS_EXCEPTION_IF_NULL(device_tensor);
// If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared.
if (device_tensor->GetPtr() != nullptr) {
return;
}
MS_LOG(INFO) << "Prepare device data for value node: " << node->fullname_with_scope();

// Allocate device memory.
if (!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize())) {
MS_LOG(EXCEPTION) << "Device(id:" << device_context->device_context_key().device_id_
<< ") memory isn't enough and alloc failed, node name: " << node->fullname_with_scope()
<< ", alloc size: " << device_tensor->GetSize();
}

// Copy data from value to device.
auto value = GetValue<std::string>(node_value);
size_t tensor_size = value.size();
ShapeVector shape = {1, SizeToLong(tensor_size)};
if (!device_tensor->SyncHostToDevice(shape, tensor_size, kNumberTypeUInt8, value.data())) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed, node name: " << node->fullname_with_scope();
}
}
}

// Prepare the device data for persistent device tensor of weight node from host tensor.
void PrepareDataForWeightNode(const AnfNodePtr &backend_node, const AnfNodePtr &front_node, const TensorPtr &tensor,
const DeviceContext *device_context) {
MS_EXCEPTION_IF_NULL(backend_node);
MS_EXCEPTION_IF_NULL(front_node);
MS_EXCEPTION_IF_NULL(device_context);
if (tensor == nullptr) {
return;
}

auto device_tensor = AnfAlgo::GetMutableOutputAddr(backend_node, 0, false);
auto host_tensor_address = std::dynamic_pointer_cast<DeviceTensor>(tensor->device_address());
// Use the device address of host tensor to set device tensor.
if (host_tensor_address != device_tensor) {
if (host_tensor_address == nullptr) {
MS_EXCEPTION_IF_NULL(device_tensor);
host_tensor_address = device_context->CreateDeviceAddress(nullptr, device_tensor->GetSize(),
device_tensor->format(), device_tensor->type_id());
tensor->set_device_address(host_tensor_address);
UpdateRefCount(host_tensor_address.get(), true);
}
MS_EXCEPTION_IF_NULL(host_tensor_address);
DeviceTensorStore::GetInstance().Insert(front_node.get(), host_tensor_address);
if (host_tensor_address->DeviceType() == device_tensor->DeviceType()) {
AnfAlgo::SetOutputAddr(host_tensor_address, 0, backend_node.get());
} else {
MS_LOG(INFO) << "The device type is not equal, host tensor type:" << host_tensor_address->DeviceType()
<< ", device tensor type:" << device_tensor->DeviceType();
}
}

// If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared.
if (host_tensor_address->GetPtr() == nullptr) {
MS_LOG(INFO) << "Prepare device data for weight node:" << backend_node->fullname_with_scope()
<< ", device type:" << host_tensor_address->DeviceType();
// Allocate device memory and copy data from host tensor to device.
if (!device_context->AllocateMemory(host_tensor_address.get(), host_tensor_address->GetSize())) {
MS_LOG(EXCEPTION) << "Device(id:" << device_context->device_context_key().device_id_
<< ") memory isn't enough and alloc failed, node name: " << backend_node->fullname_with_scope();
}
if (!host_tensor_address->SyncHostToDevice(trans::GetRuntimePaddingShape(backend_node, 0),
LongToSize(tensor->data().nbytes()), tensor->data_type(),
tensor->data_c(), tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed, node name: " << backend_node->fullname_with_scope();
}
}

// Allocate another device memory and copy data from host tensor to another device(if exist).
const auto &device_tensors = DeviceTensorStore::GetInstance().Fetch(front_node.get());
if (device_tensors.size() > 1) {
auto another_device_tensor = (device_tensors[0] == host_tensor_address) ? device_tensors[1] : device_tensors[0];
MS_EXCEPTION_IF_NULL(another_device_tensor);
auto another_device_type = another_device_tensor->DeviceType();
const auto &another_device_context = device::DeviceContextManager::GetInstance().GetOrCreateDeviceContext(
{device::kDeviceTypeToName.at(another_device_type), device_context->device_context_key().device_id_});
MS_EXCEPTION_IF_NULL(another_device_context);
if (another_device_tensor->GetPtr() == nullptr) {
if (!another_device_context->AllocateMemory(another_device_tensor.get(), another_device_tensor->GetSize())) {
MS_LOG(EXCEPTION) << "Device(id:" << another_device_context->device_context_key().device_id_
<< ") memory isn't enough and alloc failed, node name: "
<< backend_node->fullname_with_scope();
}
}
MS_LOG(INFO) << "Prepare device data for weight node:" << backend_node->fullname_with_scope()
<< ", device type:" << another_device_type;
if (!Copy(another_device_tensor.get(), host_tensor_address.get())) {
MS_LOG(EXCEPTION) << "Sync data error.";
}
}
}

// In control flow, all weight nodes associated with the host weight parameter need to use the same device tensor.
void PrepareDataForControlWeightNode(
const AnfNodePtr &node, const AnfNodePtr &front_node, const TensorPtr &tensor, const DeviceContext *device_context,
const std::unordered_map<AnfNodePtr, std::vector<AnfNodePtr>> &host_parameter_to_weights = {}) {
MS_EXCEPTION_IF_NULL(node);
MS_EXCEPTION_IF_NULL(front_node);
MS_EXCEPTION_IF_NULL(tensor);
MS_EXCEPTION_IF_NULL(device_context);

auto device_tensors = DeviceTensorStore::GetInstance().Fetch(front_node.get());
bool need_update_device_tensor_store = (device_tensors.size() == 0) ? true : false;
for (auto &device_tensor : device_tensors) {
if (device_tensor->GetPtr() == nullptr) {
need_update_device_tensor_store = true;
break;
}
}
if (need_update_device_tensor_store) {
PrepareDataForWeightNode(node, front_node, tensor, device_context);
}

const auto iter = host_parameter_to_weights.find(front_node);
if (iter == host_parameter_to_weights.end()) {
return;
}

// Fetch all the device tensors of host weight node and insert as the weight of other nodes.
const auto &sub_front_nodes = host_parameter_to_weights.at(front_node);
device_tensors = DeviceTensorStore::GetInstance().Fetch(front_node.get());
for (const auto &sub_front_node : sub_front_nodes) {
for (const auto &device_tensor : device_tensors) {
if (sub_front_node == nullptr) {
MS_LOG(EXCEPTION) << "Front node is empty!";
}
DeviceTensorStore::GetInstance().Insert(sub_front_node.get(), device_tensor);
}
}
}

void PrepareDataForHostDataSourceActor(const std::unordered_map<AnfNodePtr, size_t> &data_node_position_map,
const AnfNodePtr &node, const TensorPtr &tensor,
std::vector<TensorPtr> *const host_tensors) {
MS_EXCEPTION_IF_NULL(tensor);

// Fill the host tensors for non weighted parameters.
const auto &iter = data_node_position_map.find(node);
if (iter == data_node_position_map.end()) {
return;
}

(*host_tensors)[iter->second] = tensor;
auto tensor_address = std::dynamic_pointer_cast<DeviceTensor>(tensor->device_address());
auto device_address = AnfAlgo::GetMutableOutputAddr(node, 0, false);
MS_EXCEPTION_IF_NULL(device_address);
if ((tensor_address != nullptr) && (tensor_address->DeviceType() == device_address->DeviceType())) {
AnfAlgo::SetOutputAddr(tensor_address, 0, node.get());
}
}

void PrepareDataForInputData(const HostQueueDataSourceActor *host_data_source_actor, const AnfNodePtr &node,
const TensorPtr &tensor, const DeviceContext *device_context,
std::vector<TensorPtr> *const host_tensors) {
MS_EXCEPTION_IF_NULL(tensor);
// Fill the host tensors for non weighted parameters.
if (host_data_source_actor != nullptr) {
(*host_tensors)[host_data_source_actor->FetchNodePosition(node)] = tensor;
}

auto device_address = std::dynamic_pointer_cast<DeviceTensor>(tensor->device_address());
if (device_address != nullptr) {
AnfAlgo::SetOutputAddr(device_address, 0, node.get());
return;
}

DeviceTensorPtr node_device_address = nullptr;
if (!AnfAlgo::OutputAddrExist(node, 0, false)) {
TypeId output_type_id = AnfAlgo::GetOutputDeviceDataType(node, 0);
if (output_type_id == kTypeUnknown) {
output_type_id = AnfAlgo::GetOutputInferDataType(node, 0);
}

size_t tensor_size = AnfAlgo::GetOutputTensorMemSize(node, 0);
auto new_device_address =
device_context->CreateDeviceAddress(nullptr, tensor_size, AnfAlgo::GetOutputFormat(node, 0), output_type_id);
AnfAlgo::SetOutputAddr(new_device_address, 0, node.get());
node_device_address = new_device_address;
} else {
node_device_address = AnfAlgo::GetMutableOutputAddr(node, 0, false);
}

tensor->set_device_address(node_device_address);
UpdateRefCount(node_device_address.get(), true);

MS_EXCEPTION_IF_NULL(device_context);
if (node_device_address->GetPtr() == nullptr &&
!device_context->AllocateMemory(node_device_address.get(), node_device_address->GetSize())) {
MS_LOG(EXCEPTION) << "Device(id:" << device_context->device_context_key().device_id_
<< ") memory isn't enough and alloc failed, node name: " << node->fullname_with_scope();
}

if (!node_device_address->SyncHostToDevice(trans::GetRuntimePaddingShape(node, 0),
LongToSize(tensor->data().nbytes()), tensor->data_type(), tensor->data_c(),
tensor->device_info().host_format_)) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed.";
}
}

inline bool IsSingleOpActorSet(const ActorSet *actor_set) {
MS_EXCEPTION_IF_NULL(actor_set);
return actor_set->kernel_actors_.size() == 1;
}

bool RunInStepMode(const ActorSet *actor_set, const std::vector<TensorPtr> *input_tensors) {
OpContext<DeviceTensor> op_context;
// Step mode does not need sequential number.
op_context.sequential_num_ = nullptr;

// Trigger kernel actor running in the step execution strategy.
if (IsSingleOpActorSet(actor_set)) {
MS_EXCEPTION_IF_NULL(input_tensors);
for (auto &kernel_actor : actor_set->kernel_actors_) {
MS_EXCEPTION_IF_NULL(kernel_actor);
kernel_actor->RunOpControlWithInputTensor(nullptr, &op_context, input_tensors);
}
return true;
}

std::vector<Promise<int>> result(1);
op_context.results_ = &result;

// Trigger data source actor running.
for (auto &data_source_actor : actor_set->data_source_actors_) {
MS_EXCEPTION_IF_NULL(data_source_actor);
Async(data_source_actor->GetAID(), &DataSourceActor::FetchData, &op_context);
}

// Trigger no input kernel actor running.
for (auto &no_input_kernel_actor : actor_set->no_input_kernel_actors_) {
MS_EXCEPTION_IF_NULL(no_input_kernel_actor);
Async(no_input_kernel_actor->GetAID(), &KernelActor::RunOpControl, nullptr, &op_context);
}

auto result_future = result[0].GetFuture();
result_future.Wait();
MsException::Instance().CheckException();
return result_future.IsOK();
}

// Convert the actors vector by the actor set.
std::vector<ActorReference> CollectActors(const ActorSet *actor_set) {
MS_EXCEPTION_IF_NULL(actor_set);
std::vector<ActorReference> actors;

if (actor_set->data_prepare_actor_ != nullptr) {
(void)actors.emplace_back(static_cast<ActorReference>(actor_set->data_prepare_actor_));
}
for (auto &data_source_actor : actor_set->data_source_actors_) {
MS_EXCEPTION_IF_NULL(data_source_actor);
(void)actors.emplace_back(static_cast<ActorReference>(data_source_actor));
@@ -432,7 +97,7 @@ std::vector<ActorReference> CollectActors(const ActorSet *actor_set) {
return actors;
}

void ClearNodeInfo(const KernelGraphPtr graph) {
void ClearNodeInfo(const KernelGraphPtr &graph) {
MS_EXCEPTION_IF_NULL(graph);

// Clear input parameter device tensor and device tensor store.
@@ -487,8 +152,6 @@ void IntHandler(int, siginfo_t *, void *) {
#endif
} // namespace

GraphCompilerInfo::~GraphCompilerInfo() { GraphScheduler::GetInstance().Clear(name_, graphs_); }

void GraphScheduler::Clear(const ActorInfo &actor_info, const std::vector<KernelGraphPtr> &graphs) {
// Terminate the actors of actor info.
if (actors_.count(actor_info) > 0) {
@@ -497,6 +160,7 @@ void GraphScheduler::Clear(const ActorInfo &actor_info, const std::vector<Kernel
auto actor_set = actors_[actor_info];
auto base_actors = CollectActors(actor_set.get());
for (auto &base_actor : base_actors) {
(void)actor_name_to_actor_.erase(base_actor->GetAID().Name());
actorMgr->Terminate(base_actor->GetAID());
}
}
@@ -508,7 +172,6 @@ void GraphScheduler::Clear(const ActorInfo &actor_info, const std::vector<Kernel

// Clear global maps of actor info.
(void)actors_.erase(actor_info);
(void)actor_to_host_queue_.erase(actor_info);
}

void GraphScheduler::Clear() {
@@ -523,7 +186,6 @@ void GraphScheduler::Clear() {
// Clear global maps.
actors_.clear();
actor_name_to_actor_.clear();
actor_to_host_queue_.clear();
}

using DataArrowLinkFunc = void (GraphScheduler::*)(AbstractActor *const, KernelActor *const, const KernelWithIndex &,
@@ -609,7 +271,6 @@ ActorSet *GraphScheduler::Transform(const GraphCompilerInfo &graph_compiler_info
}

PersistDeviceTensor(graph_compiler_info);
auto strategy = graph_compiler_info.strategy_;
const auto &actor_set = Build(graph_compiler_info);
CacheGraphOutputToActor(graph_compiler_info);
Link(actor_set.get(), graph_compiler_info);
@@ -619,7 +280,7 @@ ActorSet *GraphScheduler::Transform(const GraphCompilerInfo &graph_compiler_info
(void)actors_.emplace(actor_set->name_, actor_set);

DumpActor(actor_set.get(), graph_compiler_info);
if (!CheckActorValid(actor_set.get(), strategy)) {
if (!CheckActorValid(actor_set.get(), graph_compiler_info.strategy_)) {
MS_LOG(EXCEPTION) << "The actor set of " << graph_compiler_info.name_ << " is invalid.";
}
MS_LOG(INFO) << "Graph(" << graph_compiler_info.name_ << ") transforms actor end.";
@@ -643,179 +304,13 @@ void GraphScheduler::Schedule(const ActorSet *actor_set) {
}
}

void GraphScheduler::PrepareRun(const ActorSet *actor_set, const GraphCompilerInfo &graph_compiler_info,
const std::vector<std::vector<TensorPtr>> &input_tensors) {
MS_EXCEPTION_IF_NULL(actor_set);
std::vector<TensorPtr> host_tensors;
std::string actor_name = actor_set->name_ + "_HostDSActor";
const auto &host_data_source_actor = dynamic_cast<HostQueueDataSourceActor *>(FetchActor(actor_name));
if (host_data_source_actor != nullptr) {
host_tensors.resize(host_data_source_actor->data_nodes_.size());
}

for (size_t i = 0; i < graph_compiler_info.graphs_.size(); ++i) {
const auto &graph = graph_compiler_info.graphs_[i];
const auto &device_context = graph_compiler_info.device_contexts_[i];
MS_EXCEPTION_IF_NULL(graph);
// 1.Prepare the data of device tensor store(value nodes of graph).
for (const auto &value_node : graph->graph_value_nodes()) {
if (AnfAlgo::OutputAddrExist(value_node, 0)) {
PrepareDataForValueNode(value_node, device_context);
}
}

// 1.Prepare the data of device tensor store(weights of graph), and fill host tensors for non weighted parameters.
const auto &input_nodes = graph->input_nodes();
const auto &tensors = input_tensors[i];
for (size_t j = 0; j < input_nodes.size(); ++j) {
const auto &input_node = input_nodes[j];
const auto &input_tensor = tensors[j];
MS_EXCEPTION_IF_NULL(input_node);
if (IsPersistentDeviceTensor(input_node)) {
// Prepare the device data for weights.
const auto front_node = FetchFrontNodeByBackendNode(input_node, graph);
PrepareDataForWeightNode(input_node, front_node, input_tensor, device_context);
} else if (IsHostQueueDSActor(input_node, graph, graph_compiler_info.origin_parameters_order_,
graph_compiler_info.strategy_)) {
MS_EXCEPTION_IF_NULL(host_data_source_actor);
PrepareDataForHostDataSourceActor(host_data_source_actor->data_node_position_map_, input_node, input_tensor,
&host_tensors);
}
}
}

// 2.Prepare the continuous memory for communication kernel.
if (actor_set->loop_count_actor_ != nullptr) {
auto alloc_list_list = actor_set->loop_count_actor_->continuous_memory_alloc_list_list_;
auto size_list_list = actor_set->loop_count_actor_->size_list_list_;
auto total_size_list = actor_set->loop_count_actor_->total_size_list_;
auto device_contexts = actor_set->loop_count_actor_->device_contexts_;
if ((alloc_list_list.size() != size_list_list.size()) || (size_list_list.size() != total_size_list.size()) ||
(total_size_list.size() != device_contexts.size())) {
MS_LOG(EXCEPTION)
<< "The size of alloc_list_list, size_list_list, total_size_list and device_contexts are not equal.";
}
for (size_t i = 0; i < alloc_list_list.size(); ++i) {
auto &alloc_list = alloc_list_list[i];
auto &size_list = size_list_list[i];
auto &total_size = total_size_list[i];
auto &device_context = device_contexts[i];
if (!device_context->AllocateContinuousMemory(alloc_list, total_size, size_list)) {
MS_LOG(EXCEPTION) << "Device memory isn't enough and alloc failed, alloc size: " << total_size;
}
}
}

// 3.Prepare the data which belongs to control node.
PrepareDataForControlNode(host_data_source_actor, graph_compiler_info.control_node_parser_,
graph_compiler_info.origin_parameters_order_, input_tensors.back(), &host_tensors);

// 4.Prepare the data of host tensor queue(non weighted parameters of graph).
if (host_data_source_actor != nullptr) {
const auto &host_tensor_queue = FetchHostQueue(actor_set->name_);
MS_EXCEPTION_IF_NULL(host_tensor_queue);
host_tensor_queue->Push(host_tensors);
}
}

void GraphScheduler::PrepareRunOp(const ActorSet *actor_set, const GraphCompilerInfo &graph_compiler_info,
const std::vector<std::vector<TensorPtr>> &input_tensors) {
MS_EXCEPTION_IF_NULL(actor_set);
std::vector<TensorPtr> host_tensors;
std::string actor_name = actor_set->name_ + "_HostDSActor";
const auto &host_data_source_actor = dynamic_cast<HostQueueDataSourceActor *>(FetchActor(actor_name));
if (host_data_source_actor != nullptr) {
host_tensors.resize(host_data_source_actor->data_nodes_.size());
}

for (size_t i = 0; i < graph_compiler_info.graphs_.size(); ++i) {
const auto &graph = graph_compiler_info.graphs_[i];
const auto &device_context = graph_compiler_info.device_contexts_[i];
MS_EXCEPTION_IF_NULL(graph);

// 1.Prepare the data of device tensor store(value nodes of graph).
for (const auto &value_node : graph->graph_value_nodes()) {
if (AnfAlgo::OutputAddrExist(value_node, 0)) {
PrepareDataForValueNode(value_node, device_context);
}
}

// 2.Prepare the data of device tensor store(weights of graph), and fill host tensors for non weighted parameters.
const auto &input_nodes = graph->input_nodes();
const auto &tensors = input_tensors[i];
for (size_t j = 0; j < input_nodes.size(); ++j) {
const auto &input_node = input_nodes[j];
const auto &input_tensor = tensors[j];
MS_EXCEPTION_IF_NULL(input_node);
if (IsPersistentDeviceTensor(input_node)) {
// Prepare the device data for weights.
PrepareDataForWeightNode(input_node, input_node, input_tensor, device_context);
} else {
PrepareDataForInputData(host_data_source_actor, input_node, input_tensor, device_context, &host_tensors);
}
}
}

// 3.Prepare the data of host tensor queue(non weighted parameters of graph).
if (host_data_source_actor != nullptr) {
const auto &host_tensor_queue = FetchHostQueue(actor_set->name_);
MS_EXCEPTION_IF_NULL(host_tensor_queue);
host_tensor_queue->Push(host_tensors);
}
}

void GraphScheduler::PrepareDataForControlNode(HostQueueDataSourceActor *const host_data_source_actor,
const ControlNodeParserPtr &control_node_parser,
const std::vector<AnfNodePtr> &origin_parameters,
const std::vector<TensorPtr> &tensors,
std::vector<TensorPtr> *const host_tensors) {
const auto &control_node_parameters = control_node_parser->GetControlNodeParameter();

for (size_t j = 0; j < control_node_parameters.size(); ++j) {
const auto &input_node = control_node_parameters[j];
const auto &input_tensor = tensors[j];
MS_EXCEPTION_IF_NULL(input_node);
if (IsPersistentDeviceTensor(input_node)) {
const auto &front_to_backend_parameters = control_node_parser->front_to_backend_parameters_;
const auto &iter = front_to_backend_parameters.find(input_node);
if (iter == front_to_backend_parameters.end()) {
MS_LOG(EXCEPTION) << "Cannot find backend node for weight parameter:"
<< AnfAlgo::GetNodeDebugString(input_node);
}
const auto &node_with_context = iter->second;
PrepareDataForControlWeightNode(node_with_context.first, input_node, input_tensor, node_with_context.second,
control_node_parser->host_parameter_to_weights_);
} else if (find(origin_parameters.begin(), origin_parameters.end(), input_node) != origin_parameters.end()) {
const auto &iter = host_data_source_actor->data_node_position_map_.find(input_node);
if (iter == host_data_source_actor->data_node_position_map_.end()) {
MS_LOG(EXCEPTION) << "Cannot find node" << AnfAlgo::GetNodeDebugString(input_node) << " in data source actor";
}
const size_t pos = iter->second;
const AnfNodePtr &backend_node = host_data_source_actor->data_nodes_[pos];
(*host_tensors)[pos] = input_tensor;
auto device_address = std::dynamic_pointer_cast<DeviceTensor>(input_tensor->device_address());
if (device_address != nullptr) {
AnfAlgo::SetOutputAddr(device_address, 0, backend_node.get());
}
}
}

for (const auto &value_node_with_context : control_node_parser->front_value_nodes_) {
if (AnfAlgo::OutputAddrExist(value_node_with_context.first, 0)) {
PrepareDataForValueNode(value_node_with_context.first->cast<ValueNodePtr>(), value_node_with_context.second);
}
}
}

bool GraphScheduler::Run(const ActorSet *actor_set, GraphExecutionStrategy strategy,
const std::vector<TensorPtr> *input_tensors) {
bool GraphScheduler::Run(const ActorSet *actor_set, const std::vector<std::vector<TensorPtr>> &input_tensors,
const std::vector<TensorPtr> &input_tensors_with_value_node, GraphExecutionStrategy strategy) {
MS_EXCEPTION_IF_NULL(actor_set);
MS_EXCEPTION_IF_NULL(actor_set->data_prepare_actor_);
#if !defined(_WIN32) && !defined(_WIN64)
SignalGuard sg(IntHandler);
#endif
if (strategy == GraphExecutionStrategy::kStep) {
return RunInStepMode(actor_set, input_tensors);
}

// Construct OpContext.
OpContext<DeviceTensor> op_context;
@@ -824,17 +319,15 @@ bool GraphScheduler::Run(const ActorSet *actor_set, GraphExecutionStrategy strat
op_context.sequential_num_ = &sequential_num;
op_context.results_ = &result;

// Trigger data source actor running.
for (auto &data_source_actor : actor_set->data_source_actors_) {
MS_EXCEPTION_IF_NULL(data_source_actor);
Async(data_source_actor->GetAID(), &DataSourceActor::FetchData, &op_context);
if ((strategy == GraphExecutionStrategy::kStep) && IsSingleOpActorSet(actor_set)) {
actor_set->data_prepare_actor_->PrepareData(input_tensors, &op_context);
MS_EXCEPTION_IF_NULL(actor_set->kernel_actors_[0]);
actor_set->kernel_actors_[0]->RunOpControlWithInputTensor(nullptr, &op_context, &input_tensors_with_value_node);
return true;
}

// Trigger no input kernel actor running.
for (auto &no_input_kernel_actor : actor_set->no_input_kernel_actors_) {
MS_EXCEPTION_IF_NULL(no_input_kernel_actor);
Async(no_input_kernel_actor->GetAID(), &KernelActor::RunOpControl, nullptr, &op_context);
}
// Trigger data prepare actor running.
Async(actor_set->data_prepare_actor_->GetAID(), &DataPrepareActor::PrepareData, input_tensors, &op_context);

// Trigger output actor running when there are no data source actor and kernel actor.
if ((actor_set->data_source_actors_.size() == 0) && (actor_set->kernel_actors_.size() == 0)) {
@@ -865,11 +358,12 @@ ActorSetPtr GraphScheduler::Build(const GraphCompilerInfo &graph_compiler_info)
MS_EXCEPTION_IF_NULL(actor_set);

auto host_queue = std::make_shared<HostTensorQueue>();
(void)actor_to_host_queue_.emplace(actor_set->name_, host_queue);
actor_set->data_source_actors_ = BuildDataSourceActor(graph_compiler_info, host_queue);
actor_set->kernel_actors_ = BuildKernelActor(graph_compiler_info);
actor_set->loop_count_actor_ = BuildLoopCountActor(graph_compiler_info);
actor_set->output_actor_ = BuildOutputActor(graph_compiler_info);
actor_set->data_prepare_actor_ =
BuildDataPrepareActor(graph_compiler_info, actor_set->data_source_actors_, host_queue);
actor_set->switch_actors_ = BuildSwitchActor(graph_compiler_info);
actor_set->gather_actors_ = BuildGatherActor(graph_compiler_info);

@@ -972,25 +466,12 @@ void GraphScheduler::Link(ActorSet *actor_set, const GraphCompilerInfo &graph_co
LinkControlArrowBySendRecvNodes(graph);
}

// Link the control arrows by the communication nodes to ensure communication nodes running order.
LinkControlArrowByCommunicationNode(communication_nodes, graph_compiler_info);

// Link the arrow in the control flow scene.
if (graph_compiler_info.strategy_ == GraphExecutionStrategy::kPipeline) {
// Link the arrow by control node.
LinkArrowByControlNode(graph_compiler_info, actor_set);
}

// Auto monad actor may modify the device tensor store.
LinkDeviceTensorStoreForAutoMonadActor(auto_monad_actors);

// BuildNoInputKernelActor depends on whether kernel actors have input, so must be behind the link of kernel actors.
actor_set->no_input_kernel_actors_ = BuildNoInputKernelActor(actor_set, graph_compiler_info.strategy_);

// Link the control arrows of loop count actor, which depends on the no input kernel actors.
LinkControlArrowForLoopCountActor(actor_set->loop_count_actor_.get(), actor_set,
graph_compiler_info.control_node_parser_);

// Link the output result arrows for output actors.
LinkGlobalControlArrow(actor_set, communication_nodes, auto_monad_actors, graph_compiler_info);
LinkOutputResultArrowForOutputActor(actor_set->output_actor_.get(), graph_compiler_info);
}

@@ -1062,7 +543,7 @@ std::vector<DataSourceActorPtr> GraphScheduler::BuildDataSourceActor(const Graph

// Initialize the parameter in the control node, first get all the front parameters in the control node, then find
// the corresponding backend parameter from the map, and insert it into the host data source actor
std::vector<AnfNodePtr> control_node_parameters = graph_compiler_info.control_node_parser_->GetControlNodeParameter();
std::vector<AnfNodePtr> control_node_parameters = graph_compiler_info.control_node_parser_->control_node_parameters();
for (const auto parameter : control_node_parameters) {
if (IsPersistentDeviceTensor(parameter)) {
continue;
@@ -1141,30 +622,6 @@ LoopCountActorPtr GraphScheduler::BuildLoopCountActor(const GraphCompilerInfo &g
MS_LOG(INFO) << "Create loop count actor: " << actor_name;
MS_EXCEPTION_IF_NULL(loop_count_actor);

// Cache the nodes which need continuous memory.
for (size_t index = 0; index < graph_compiler_info.graphs_.size(); ++index) {
const auto &graph = graph_compiler_info.graphs_[index];
MS_EXCEPTION_IF_NULL(graph);
auto &execution_order = graph->execution_order();
for (auto &kernel : execution_order) {
if (!AnfAlgo::IsCommunicationOp(kernel)) {
continue;
}

auto key = std::make_pair(kernel, graph_compiler_info.device_contexts_[index]);
auto value = std::make_pair(false, false);
if (AnfAlgo::GetInputTensorNum(kernel) > 1) {
value.first = true;
}
if (AnfAlgo::GetOutputTensorNum(kernel) > 1) {
value.second = true;
}
if ((value.first == true) || (value.second == true)) {
loop_count_actor->continuous_memory_nodes_[key] = value;
}
}
}

InsertActor(loop_count_actor.get());
return loop_count_actor;
}
@@ -1186,6 +643,53 @@ OutputActorPtr GraphScheduler::BuildOutputActor(const GraphCompilerInfo &graph_c
return output_actor;
}

DataPrepareActorPtr GraphScheduler::BuildDataPrepareActor(const GraphCompilerInfo &graph_compiler_info,
const std::vector<DataSourceActorPtr> &data_source_actors,
const HostTensorQueuePtr &host_queue) {
HostQueueDSActorPtr host_queue_ds_actor = nullptr;
auto iter = std::find_if(data_source_actors.begin(), data_source_actors.end(), [&](const auto &data_source_actor) {
return data_source_actor->type_ == KernelTransformType::kHostDataSourceActor;
});
if (iter != data_source_actors.end()) {
host_queue_ds_actor = std::dynamic_pointer_cast<HostQueueDataSourceActor>(*iter);
}

auto actor_name = graph_compiler_info.name_ + "_DataPrepareActor";
auto data_prepare_actor = std::make_shared<DataPrepareActor>(actor_name, memory_manager_aid_, &graph_compiler_info,
host_queue_ds_actor, host_queue);
MS_LOG(INFO) << "Create data prepare actor: " << actor_name;
MS_EXCEPTION_IF_NULL(data_prepare_actor);

// Cache the nodes which need continuous memory.
if (graph_compiler_info.strategy_ == GraphExecutionStrategy::kPipeline) {
for (size_t index = 0; index < graph_compiler_info.graphs_.size(); ++index) {
const auto &graph = graph_compiler_info.graphs_[index];
MS_EXCEPTION_IF_NULL(graph);
auto &execution_order = graph->execution_order();
for (auto &kernel : execution_order) {
if (!AnfAlgo::IsCommunicationOp(kernel)) {
continue;
}

auto key = std::make_pair(kernel, graph_compiler_info.device_contexts_[index]);
auto value = std::make_pair(false, false);
if (AnfAlgo::GetInputTensorNum(kernel) > 1) {
value.first = true;
}
if (AnfAlgo::GetOutputTensorNum(kernel) > 1) {
value.second = true;
}
if ((value.first == true) || (value.second == true)) {
data_prepare_actor->continuous_memory_nodes_[key] = value;
}
}
}
}

InsertActor(data_prepare_actor.get());
return data_prepare_actor;
}

std::vector<KernelActorPtr> GraphScheduler::BuildNoInputKernelActor(const ActorSet *actor_set,
GraphExecutionStrategy strategy) {
MS_EXCEPTION_IF_NULL(actor_set);
@@ -1817,6 +1321,29 @@ void GraphScheduler::LinkControlArrowBySendRecvNodes(const KernelGraphPtr &graph
}
}

void GraphScheduler::LinkGlobalControlArrow(ActorSet *const actor_set, const std::vector<CNodePtr> &communication_nodes,
const std::vector<KernelActor *> &auto_monad_actors,
const GraphCompilerInfo &graph_compiler_info) {
MS_EXCEPTION_IF_NULL(actor_set);

// Link the control arrows by the communication nodes to ensure communication nodes running order.
LinkControlArrowByCommunicationNode(communication_nodes, graph_compiler_info);

// Auto monad actor may modify the device tensor store.
LinkDeviceTensorStoreForAutoMonadActor(auto_monad_actors);

// BuildNoInputKernelActor depends on whether kernel actors have input, so must be behind the link of kernel actors.
actor_set->no_input_kernel_actors_ = BuildNoInputKernelActor(actor_set, graph_compiler_info.strategy_);

// Link the control arrows of data prepare actor, which depends on the no input kernel actors.
if ((graph_compiler_info.strategy_ == GraphExecutionStrategy::kPipeline) || (!IsSingleOpActorSet(actor_set))) {
LinkControlArrowForDataPrepareActor(actor_set->data_prepare_actor_.get(), actor_set);
}

LinkControlArrowForLoopCountActor(actor_set->loop_count_actor_.get(), actor_set,
graph_compiler_info.control_node_parser_);
}

void GraphScheduler::LinkControlArrowByCommunicationNode(const std::vector<CNodePtr> &communication_nodes,
const GraphCompilerInfo &graph_compiler_info) {
const size_t kCommunicationNodesMinNum = 2;
@@ -1849,6 +1376,25 @@ void GraphScheduler::LinkControlArrowByCommunicationNode(const std::vector<CNode
}
}

void GraphScheduler::LinkControlArrowForDataPrepareActor(DataPrepareActor *data_prepare_actor,
const ActorSet *actor_set) {
MS_EXCEPTION_IF_NULL(data_prepare_actor);
MS_EXCEPTION_IF_NULL(actor_set);

// Data prepare actor --> data source actor.
for (auto &data_source_actor : actor_set->data_source_actors_) {
MS_EXCEPTION_IF_NULL(data_source_actor);
(void)data_prepare_actor->data_source_aids_.emplace_back(data_source_actor->GetAID());
}

// Data prepare actor --> no input kernel actor.
for (auto &no_input_kernel_actor : actor_set->no_input_kernel_actors_) {
MS_EXCEPTION_IF_NULL(no_input_kernel_actor);
(void)data_prepare_actor->no_input_kernel_aids_.emplace_back(no_input_kernel_actor->GetAID());
no_input_kernel_actor->input_controls_num_++;
}
}

void GraphScheduler::LinkControlArrowForLoopCountActor(LoopCountActor *loop_count_actor, const ActorSet *actor_set,
const ControlNodeParserPtr &parser) {
MS_EXCEPTION_IF_NULL(actor_set);
@@ -1878,24 +1424,16 @@ void GraphScheduler::LinkControlArrowForLoopCountActor(LoopCountActor *loop_coun
(void)no_output_actors.emplace_back(copy_actor.get());
}
}

// No output actor --> loop count actor.
for (auto &no_output_actor : no_output_actors) {
(void)no_output_actor->output_control_arrows_.emplace_back(loop_count_actor->GetAID());
loop_count_actor->input_controls_num_++;
}

// Loop count actor --> data source actor.
for (auto &data_source_actor : actor_set->data_source_actors_) {
MS_EXCEPTION_IF_NULL(data_source_actor);
(void)loop_count_actor->data_source_aids_.emplace_back(data_source_actor->GetAID());
}

// Loop count actor --> no input kernel actor.
for (auto &no_input_kernel_actor : actor_set->no_input_kernel_actors_) {
MS_EXCEPTION_IF_NULL(no_input_kernel_actor);
(void)loop_count_actor->no_input_kernel_aids_.emplace_back(no_input_kernel_actor->GetAID());
no_input_kernel_actor->input_controls_num_++;
}
// Loop count actor --> data prepare actor.
MS_EXCEPTION_IF_NULL(actor_set->data_prepare_actor_);
loop_count_actor->data_prepare_aid_ = actor_set->data_prepare_actor_->GetAID();

// Loop count actor --> output actor.
MS_EXCEPTION_IF_NULL(actor_set->output_actor_);
@@ -2751,15 +2289,6 @@ void GraphScheduler::PersistDeviceTensor(const GraphCompilerInfo &graph_compiler
}
}

HostTensorQueue *GraphScheduler::FetchHostQueue(const ActorInfo &actor_info) const {
const auto &iter = actor_to_host_queue_.find(actor_info);
if (iter != actor_to_host_queue_.end()) {
return iter->second.get();
} else {
return nullptr;
}
}

void GraphScheduler::FetchKernelTransformTypeAndName(const AnfNodePtr &node, const KernelGraphPtr &graph,
const GraphCompilerInfo &graph_compiler_info,
KernelTransformType *const kernel_type,
@@ -2828,6 +2357,12 @@ void GraphScheduler::DumpActor(const ActorSet *actor_set, const GraphCompilerInf
ofs << "[Device tensor stores]\n";
DumpDeviceTensorStore(graph_compiler_info, ofs);

const auto &data_prepare_actor = actor_set->data_prepare_actor_;
ofs << "\n\n[Data prepare actor:" << (data_prepare_actor != nullptr ? 1 : 0) << "]\n";
if (data_prepare_actor != nullptr) {
DumpDataPrepareActor(data_prepare_actor.get(), ofs);
}

ofs << "\n\n[Data source actors:" << actor_set->data_source_actors_.size() << "]\n";
for (const auto &data_source_actor : actor_set->data_source_actors_) {
DumpDSActor(data_source_actor.get(), ofs);
@@ -2945,12 +2480,33 @@ void GraphScheduler::DumpAbstractActor(const AbstractActor *actor, std::ofstream
}
}

void GraphScheduler::DumpDataPrepareActor(const DataPrepareActor *actor, std::ofstream &ofs) const {
MS_EXCEPTION_IF_NULL(actor);
ofs << "\tactor_name:" << actor->GetAID().Name() << "\n";
DumpAbstractActor(actor, ofs);

ofs << "\t\toutput_control_arrows:" << actor->data_source_aids_.size() + actor->no_input_kernel_aids_.size() << "\n ";
for (const auto &aid : actor->data_source_aids_) {
ofs << "\t\t\tto_actor_name:" << aid.Name() << "\n";
}
for (const auto &aid : actor->no_input_kernel_aids_) {
ofs << "\t\t\tto_actor_name:" << aid.Name() << "\n";
}

ofs << "\t\tcontinuous_memory_nodes:" << actor->continuous_memory_nodes_.size() << "\n ";
for (const auto &iter : actor->continuous_memory_nodes_) {
ofs << "\t\t\tnode_name:" << iter.first.first->fullname_with_scope()
<< "\tdevice_context:" << iter.first.second->device_context_key().ToString()
<< "\tis_input_need:" << iter.second.first << "\tis_output_need:" << iter.second.second << "\n";
}
}

void GraphScheduler::DumpDSActor(const DataSourceActor *actor, std::ofstream &ofs) const {
MS_EXCEPTION_IF_NULL(actor);
const auto &actor_name = actor->GetAID().Name();
ofs << "\tactor_name:" << actor_name << "\n";

if (actor_name.find("_DeviceDSActor") != string::npos) {
if (actor->type_ == KernelTransformType::kDeviceDataSourceActor) {
// Dump the member info of device queue data source actor.
const auto &device_queue_ds_actor = dynamic_cast<const DeviceQueueDataSourceActor *>(actor);
const auto &data_kernel = device_queue_ds_actor->data_kernel_;
@@ -2964,7 +2520,7 @@ void GraphScheduler::DumpDSActor(const DataSourceActor *actor, std::ofstream &of
ofs << "\t\t\toutput_index:" << i << "\tptr:" << device_tensor->GetPtr() << "\tsize:" << device_tensor->GetSize()
<< "\toriginal_ref_count:" << device_tensor->original_ref_count() << "\n ";
}
} else if (actor_name.find("_HostDSActor") != string::npos) {
} else if (actor->type_ == KernelTransformType::kHostDataSourceActor) {
// Dump the member info of host queue data source actor.
const auto &host_queue_ds_actor = dynamic_cast<const HostQueueDataSourceActor *>(actor);
ofs << "\t\tdata_nodes:" << host_queue_ds_actor->data_nodes_.size() << "\n";
@@ -2988,22 +2544,10 @@ void GraphScheduler::DumpLoopCountActor(const LoopCountActor *actor, std::ofstre
ofs << "\tactor_name:" << actor->GetAID().Name() << "\tloop_count:" << actor->loop_count_ << "\n";
DumpAbstractActor(actor, ofs);

ofs << "\t\toutput_control_arrows:" << (actor->data_source_aids_.size() + actor->no_input_kernel_aids_.size() + 1)
<< "\n ";
for (const auto &aid : actor->data_source_aids_) {
ofs << "\t\t\tto_actor_name:" << aid.Name() << "\n";
}
for (const auto &aid : actor->no_input_kernel_aids_) {
ofs << "\t\t\tto_actor_name:" << aid.Name() << "\n";
}
const size_t kOutputControlArrowsNum = 2;
ofs << "\t\toutput_control_arrows:" << kOutputControlArrowsNum << "\n ";
ofs << "\t\t\tto_actor_name:" << actor->output_aid_.Name() << "\n";

ofs << "\t\tcontinuous_memory_nodes:" << actor->continuous_memory_nodes_.size() << "\n ";
for (const auto &iter : actor->continuous_memory_nodes_) {
ofs << "\t\t\tnode_name:" << iter.first.first->fullname_with_scope()
<< "\tdevice_context:" << iter.first.second->device_context_key().ToString()
<< "\tis_input_need:" << iter.second.first << "\tis_output_need:" << iter.second.second << "\n";
}
ofs << "\t\t\tto_actor_name:" << actor->data_prepare_aid_.Name() << "\n";
}

void GraphScheduler::DumpKernelActor(const KernelActor *actor, std::ofstream &ofs) const {
@@ -3059,7 +2603,8 @@ void GraphScheduler::DumpDeviceTensorStore(const GraphCompilerInfo &graph_compil
const auto &front_node = FetchFrontNodeByBackendNode(value_node, graph);
MS_EXCEPTION_IF_NULL(front_node);
const auto device_tensors = DeviceTensorStore::GetInstance().Fetch(front_node.get());
ofs << "\t\tdevice tensor key:" << front_node->DebugString() << "\tvalue size:" << device_tensors.size() << "\n";
ofs << "\t\tdevice tensor key:" << front_node->fullname_with_scope() << "\tvalue size:" << device_tensors.size()
<< "\n";
for (const auto &device_tensor : device_tensors) {
MS_EXCEPTION_IF_NULL(device_tensor);
ofs << "\t\t\tdevice tensor value:" << device_tensor << "\tptr:" << device_tensor->GetPtr()
@@ -3081,7 +2626,8 @@ void GraphScheduler::DumpDeviceTensorStore(const GraphCompilerInfo &graph_compil
}
const auto device_tensors = DeviceTensorStore::GetInstance().Fetch(front_node.get());
MS_EXCEPTION_IF_NULL(front_node);
ofs << "\t\tdevice tensor key:" << front_node->DebugString() << "\tvalue size:" << device_tensors.size() << "\n";
ofs << "\t\tdevice tensor key:" << front_node->fullname_with_scope() << "\tvalue size:" << device_tensors.size()
<< "\n";
for (const auto &device_tensor : device_tensors) {
MS_EXCEPTION_IF_NULL(device_tensor);
ofs << "\t\t\tdevice tensor value:" << device_tensor << "\tptr:" << device_tensor->GetPtr()


+ 22
- 77
mindspore/ccsrc/runtime/framework/graph_scheduler.h View File

@@ -27,6 +27,8 @@
#include <set>
#include <algorithm>
#include <fstream>
#include "runtime/framework/graph_compiler.h"
#include "runtime/framework/actor/data_prepare_actor.h"
#include "runtime/framework/actor/data_source_actor.h"
#include "runtime/framework/actor/loop_count_actor.h"
#include "runtime/framework/actor/kernel_actor.h"
@@ -34,8 +36,6 @@
#include "runtime/framework/actor/switch_actor.h"
#include "runtime/framework/actor/gather_actor.h"
#include "runtime/framework/actor/copy_actor.h"
#include "runtime/hardware/device_context.h"
#include "backend/session/kernel_graph.h"
#include "thread/actor_threadpool.h"

namespace mindspore {
@@ -43,61 +43,15 @@ namespace runtime {
using mindspore::device::DeviceContext;
using mindspore::session::KernelGraph;
using mindspore::session::KernelWithIndex;
// Position of kernel with index, the value pair<branch_id, vector<pos>> means the branch id of the kernel and the pos
// of the kernel. Generally, there is only one branch, and the branch id is 0 at this time. In control flow, there are
// multiple branch scenarios, and pos represents the position of the kernel in the branch.
using KernelMapPosition = std::map<KernelWithIndex, std::vector<size_t>, session::KernelWithIndexCmp>;
using ActorInfo = std::string;

// The second element of pair represents the output index of abstract actor corresponding to the graph output node.
using GraphOutputPair = std::pair<AbstractActor *, size_t>;

// The graph compiler info generated by graph compiler is the express of executable graph.
// The device context is unified interface of interaction with device of corresponding graph.
// The tensors mask is used to distinguish input tensor's type.
// The input tensor is used to link graphs in the dynamic build scenario.
// The control node is used to link graphs in the control flow scenario.
// The control node parser is used to parse the edge info in control nodes.
// The origin parameters order is used to correspond to the input args.
// The origin outputs order is used to correspond to the output args.
// The need_erase means need erase this GraphCompilerInfo object after run actor set.
struct GraphCompilerInfo {
GraphCompilerInfo(const std::vector<KernelGraphPtr> &graphs, const std::vector<DeviceContext *> &device_contexts,
const std::vector<std::vector<int64_t> *> &tensors_mask,
const std::vector<std::vector<TensorPtr> *> &input_tensors,
const std::vector<AnfNodePtr> &control_nodes,
const std::vector<AnfNodePtr> &origin_parameters_order, const ControlNodeParserPtr &parser,
const KernelMapPosition &origin_outputs_order, const size_t outputs_num, const std::string &name,
bool need_erase, GraphExecutionStrategy strategy)
: graphs_(graphs),
device_contexts_(device_contexts),
tensors_mask_(tensors_mask),
input_tensors_(input_tensors),
control_nodes_(control_nodes),
control_node_parser_(parser),
origin_parameters_order_(origin_parameters_order),
origin_outputs_order_(origin_outputs_order),
outputs_num_(outputs_num),
name_(name),
need_erase_(need_erase),
strategy_(strategy) {}
~GraphCompilerInfo();
std::vector<KernelGraphPtr> graphs_;
std::vector<DeviceContext *> device_contexts_;
std::vector<std::vector<int64_t> *> tensors_mask_;
std::vector<std::vector<TensorPtr> *> input_tensors_;
std::vector<AnfNodePtr> control_nodes_;
ControlNodeParserPtr control_node_parser_;
std::vector<AnfNodePtr> origin_parameters_order_;
KernelMapPosition origin_outputs_order_;
size_t outputs_num_;
std::string name_;
bool need_erase_;
GraphExecutionStrategy strategy_;
};

// The actor set generated by graph transformer is the execution unit of actor runtime.
// It includes data source actor, kernel actor, switch actor, copy actor, loop count actor and output actor.
// The data prepare actor is used to prepare data for device tensor store and host tensor queue to represent the begin
// of one step.
// The data source actor is used to obtain data and process them into device tensors, and send them to kernel actor.
// The kernel actor is used to receive the device tensors to luanch kernel. Specifically notice the no input
// kernel actor, it means that this actor has no input device tensor, need be triggered externally.
@@ -110,6 +64,7 @@ struct GraphCompilerInfo {
// The output actor is used to receive the output result of actor which represents the graph output.
struct ActorSet {
explicit ActorSet(const ActorInfo &name) : name_(name) {}
DataPrepareActorPtr data_prepare_actor_{nullptr};
std::vector<DataSourceActorPtr> data_source_actors_;
std::vector<KernelActorPtr> kernel_actors_;
// No input kernel actors need be triggered specifically.
@@ -145,22 +100,10 @@ class GraphScheduler {
// will be supported in the future.
void Schedule(const ActorSet *actor_set);

// The prepare processing before run. (used in pipeline mode):
// 1. Prepare the data of device tensor store(such as weights and value nodes of graph).
// 2. Prepare the data of host tensor queue(such as non weighted parameters of graph).
// 3. Prepare the continuous memory for communication kernel.
void PrepareRun(const ActorSet *actor_set, const GraphCompilerInfo &graph_compiler_info,
const std::vector<std::vector<TensorPtr>> &input_tensors);

// The prepare processing before run. (used in step mode):
// 1. Prepare the data of device tensor store(such as weights and value nodes of graph).
// 2. Prepare the data of host tensor queue(such as non weighted parameters of graph).
void PrepareRunOp(const ActorSet *actor_set, const GraphCompilerInfo &graph_compiler_info,
const std::vector<std::vector<TensorPtr>> &input_tensors);

// The processing entry of actors running.
bool Run(const ActorSet *actor_set, GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline,
const std::vector<TensorPtr> *input_tensors = nullptr);
// The processing entry of actors running. The third parameter is used only in the step execution strategy.
bool Run(const ActorSet *actor_set, const std::vector<std::vector<TensorPtr>> &input_tensors,
const std::vector<TensorPtr> &input_tensors_with_value_node = {},
GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline);

// Fetch the actor set by actor info.
ActorSet *Fetch(const ActorInfo &actor_info) const;
@@ -184,6 +127,9 @@ class GraphScheduler {
std::vector<KernelActorPtr> BuildKernelActor(const GraphCompilerInfo &graph_compiler_info);
LoopCountActorPtr BuildLoopCountActor(const GraphCompilerInfo &graph_compiler_info);
OutputActorPtr BuildOutputActor(const GraphCompilerInfo &graph_compiler_info);
DataPrepareActorPtr BuildDataPrepareActor(const GraphCompilerInfo &graph_compiler_info,
const std::vector<DataSourceActorPtr> &data_source_actors,
const HostTensorQueuePtr &host_queue);
std::vector<KernelActorPtr> BuildNoInputKernelActor(const ActorSet *actor_set, GraphExecutionStrategy strategy);
std::vector<SwitchActorPtr> BuildSwitchActor(const GraphCompilerInfo &graph_compiler_info);
std::vector<GatherActorPtr> BuildGatherActor(const GraphCompilerInfo &graph_compiler_info);
@@ -192,7 +138,7 @@ class GraphScheduler {
// previous graph and the head of next graph.
void CacheGraphOutputToActor(const GraphCompilerInfo &graph_compiler_info);

// The processing of actors link statically.
// The processing of actors linking.
// 1. The processing of linking data arrows.
// The gather of linking data arrows of kernel, it will call following functions by the different from actor type.
void LinkDataArrow(KernelActor *const to_actor, const GraphCompilerInfo &graph_compiler_info,
@@ -223,17 +169,23 @@ class GraphScheduler {
const KernelWithIndex &to_kernel_with_input_idx);

// 2. The processing of linking control arrows.
void LinkControlArrowForLoopCountActor(LoopCountActor *loop_count_actor, const ActorSet *actor_set,
const ControlNodeParserPtr &parser);
void LinkControlArrowByAutoMonad(KernelActor *to_actor, const AnfNodePtr &from_node, const KernelGraphPtr &graph);
// The skipped node doesn't run, so need link the control arrow between the inputs and user of skipped node.
void LinkControlArrowBySkippedNode(KernelActor *to_actor, const AnfNodePtr &skipped_node);
// Link the control arrows for allreduce kernel by the send/recv nodes in the kernel graph.
void LinkControlArrowBySendRecvNodes(const KernelGraphPtr &graph);

// The gather of linking the global control arrows, it will call following functions:
void LinkGlobalControlArrow(ActorSet *const actor_set, const std::vector<CNodePtr> &communication_nodes,
const std::vector<KernelActor *> &auto_monad_actors,
const GraphCompilerInfo &graph_compiler_info);
// Link the control arrows by the communication nodes in the kernel graph to ensure communication nodes running order.
void LinkControlArrowByCommunicationNode(const std::vector<CNodePtr> &communication_nodes,
const GraphCompilerInfo &graph_compiler_info);
void LinkDeviceTensorStoreForAutoMonadActor(const std::vector<KernelActor *> &auto_monad_actors);
void LinkControlArrowForDataPrepareActor(DataPrepareActor *data_prepare_actor, const ActorSet *actor_set);
void LinkControlArrowForLoopCountActor(LoopCountActor *loop_count_actor, const ActorSet *actor_set,
const ControlNodeParserPtr &parser);

// 3. The processing of linking output result arrows.
void LinkOutputResultArrowForOutputActor(OutputActor *to_actor, const GraphCompilerInfo &graph_compiler_info);
@@ -267,10 +219,6 @@ class GraphScheduler {
void LinkBranchArrowForSwitchActor(const GraphCompilerInfo &graph_compiler_info);
void LinkBranchArrowForGatherActor(const GraphCompilerInfo &graph_compiler_info);
void LinkOutputResultArrowForSwitchActor(const GraphCompilerInfo &graph_compiler_info, const ActorSet *actor_set);
void PrepareDataForControlNode(HostQueueDataSourceActor *const host_data_source_actor,
const ControlNodeParserPtr &control_node_parser,
const std::vector<AnfNodePtr> &origin_parameters,
const std::vector<TensorPtr> &tensors, std::vector<TensorPtr> *const host_tensors);
// Add input for switch actor. Since part of the input of funcgraph is on call node, these inputs need to be added
// to switch actor.
void PrepareInputNodeForSwitchActor(const std::vector<AnfNodePtr> &control_nodes);
@@ -282,9 +230,6 @@ class GraphScheduler {
// Persist device tensors of graph's some nodes(such as weights and value nodes).
void PersistDeviceTensor(const GraphCompilerInfo &graph_compiler_info);

// Fetch the hsot tensor queue by actor info.
HostTensorQueue *FetchHostQueue(const ActorInfo &actor_info) const;

// The fetch results are kernel_type and kernel_name.
void FetchKernelTransformTypeAndName(const AnfNodePtr &node, const KernelGraphPtr &graph,
const GraphCompilerInfo &graph_compiler_info,
@@ -297,6 +242,7 @@ class GraphScheduler {
// Display the actor information of corresponding kernel graph.
void DumpActor(const ActorSet *actor_set, const GraphCompilerInfo &graph_compiler_info) const;
void DumpAbstractActor(const AbstractActor *actor, std::ofstream &ofs) const;
void DumpDataPrepareActor(const DataPrepareActor *actor, std::ofstream &ofs) const;
void DumpDSActor(const DataSourceActor *actor, std::ofstream &ofs) const;
void DumpLoopCountActor(const LoopCountActor *actor, std::ofstream &ofs) const;
void DumpKernelActor(const KernelActor *actor, std::ofstream &ofs) const;
@@ -309,7 +255,6 @@ class GraphScheduler {
// The global maps, only be cleared in the deconstruction.
std::unordered_map<ActorInfo, ActorSetPtr> actors_;
std::unordered_map<std::string, OpActor<DeviceTensor> *> actor_name_to_actor_;
std::unordered_map<ActorInfo, HostTensorQueuePtr> actor_to_host_queue_;

// The local maps and vectors, will be cleared at the end of each graph transform:
// 1.The second element of pair represents the output index of op actor corresponding to the graph output front node.


+ 10
- 1
mindspore/ccsrc/runtime/hardware/gpu/gpu_device_context.cc View File

@@ -359,6 +359,7 @@ void GPUDeviceContext::SetOperatorInfo(const std::vector<CNodePtr> &nodes) const
void GPUDeviceContext::CreateKernel(const std::vector<CNodePtr> &nodes) const { CreateGPUKernel(nodes); }

void GPUDeviceContext::UpdateDynamicShape(const CNodePtr &kernel) const {
MS_EXCEPTION_IF_NULL(kernel);
auto ms_context = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(ms_context);
bool is_pynative_infer = ms_context->get_param<bool>(MS_CTX_ENABLE_PYNATIVE_INFER);
@@ -371,7 +372,7 @@ void GPUDeviceContext::UpdateDynamicShape(const CNodePtr &kernel) const {
MS_EXCEPTION_IF_NULL(kernel_mod);

if (session::AnfRuntimeAlgorithm::GetKernelType(kernel) == KernelType::AKG_KERNEL) {
MS_LOG(EXCEPTION) << "Akg kernels do not support dynamic shape by now.";
MS_LOG(EXCEPTION) << "Akg kernel do not support dynamic shape: " << kernel->fullname_with_scope();
}

kernel::GpuKernel *gpu_kernel = dynamic_cast<kernel::GpuKernel *>(kernel_mod);
@@ -410,6 +411,14 @@ bool GPUDeviceContext::LaunchKernel(const CNodePtr &kernel, const std::vector<Ad
return false;
}

// Sync running.
auto ms_context = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(ms_context);
if ((ms_context->get_param<int>(MS_CTX_EXECUTION_MODE) == kPynativeMode) &&
ms_context->get_param<bool>(MS_CTX_ENABLE_PYNATIVE_SYNCHRONIZE) && !SyncStream()) {
return false;
}

// Processing after execution of dynamic kernel to update output shape.
if (is_dynamic_shape) {
kernel::GpuKernel *gpu_kernel = dynamic_cast<kernel::GpuKernel *>(kernel_mod);


+ 11
- 27
mindspore/ccsrc/vm/backend.cc View File

@@ -779,7 +779,7 @@ void MindRTBackend::RunGraphBySingleOp(const std::vector<KernelGraphPtr> &graphs
}

void MindRTBackend::RunGraph(const ActorInfo &actor_info, const VectorRef &args, VectorRef *outputs) {
MS_LOG(DEBUG) << "Run actor begin, actor name: " << actor_info;
MS_LOG(INFO) << "Run actor begin, actor name: " << actor_info;
MS_EXCEPTION_IF_NULL(root_graph_);
if (IsGraphOutputValueNodeOrParameter(root_graph_->output(), args, outputs)) {
return;
@@ -817,7 +817,7 @@ void MindRTBackend::RunGraph(const ActorInfo &actor_info, const VectorRef &args,
std::vector<tensor::TensorPtr> input_tensor;
MS_EXCEPTION_IF_NULL(graph_compiler_info.control_node_parser_);
// Get inputs of control node which come from the host actor.
const auto &control_node_parameters = graph_compiler_info.control_node_parser_->GetControlNodeParameter();
const auto &control_node_parameters = graph_compiler_info.control_node_parser_->control_node_parameters();
for (const auto &parameter : control_node_parameters) {
PushTensor(args, origin_parameters, parameter, &input_tensor);
}
@@ -831,14 +831,14 @@ void MindRTBackend::RunGraph(const ActorInfo &actor_info, const VectorRef &args,
return;
}

mindspore::ScopedLongRunning long_running;
// Debugger pre-execute graph.
PrepareForDebuggr(graph_compiler_info);

// Run actor DAG.
mindspore::ScopedLongRunning long_running;
const auto &actor_set = runtime::GraphScheduler::GetInstance().Fetch(actor_info);
MS_EXCEPTION_IF_NULL(actor_set);
runtime::GraphScheduler::GetInstance().PrepareRun(actor_set, graph_compiler_info, input_tensors);
// Debugger pre-execute graph.
PrepareForDebuggr(graph_compiler_info);
if (!runtime::GraphScheduler::GetInstance().Run(actor_set)) {
if (!runtime::GraphScheduler::GetInstance().Run(actor_set, input_tensors)) {
#ifdef ENABLE_DUMP_IR
mindspore::RDR::TriggerAll();
#endif
@@ -866,12 +866,13 @@ void MindRTBackend::RunGraph(const ActorInfo &actor_info, const VectorRef &args,
size_t output_position = 0;
ConstructOutputs(root_graph_->output(), output_tensors, &output_position, outputs);
}
MS_LOG(INFO) << "Run actor end, actor name: " << actor_info;
MS_EXCEPTION_IF_NULL(graph_compiler_);
graph_compiler_->Summary(graph_compiler_info.graphs_);

// Update device address for output node of graph.
actor_set->output_actor_->UpdateOutputDeviceAddress();
MS_LOG(INFO) << "Run actor end, actor name: " << actor_info;
}

void MindRTBackend::ConstructOutputs(const AnfNodePtr &output_node,
@@ -1031,20 +1032,6 @@ void MindRTBackend::EraseSingleOpCache(const ActorInfo &actor_info, const Kernel
actor_to_graph_compiler_info_.erase(actor_info);
}

void DebugStreamSync(const GraphCompilerInfo &graph_compiler_info) {
auto ms_context = MsContext::GetInstance();
MS_EXCEPTION_IF_NULL(ms_context);
auto enable_sync_run = ms_context->get_param<bool>(MS_CTX_ENABLE_PYNATIVE_SYNCHRONIZE);
if (enable_sync_run) {
if (!graph_compiler_info.device_contexts_.empty()) {
MS_EXCEPTION_IF_NULL(graph_compiler_info.device_contexts_[0]);
if (!graph_compiler_info.device_contexts_[0]->SyncStream()) {
MS_LOG(EXCEPTION) << "Sync stream failed!";
}
}
}
}

void MindRTBackend::RunGraph(const ActorInfo &actor_info, OpRunInfo *op_run_info,
const std::vector<int64_t> *tensors_mask,
const std::vector<tensor::TensorPtr> *input_tensors, VectorRef *outputs) {
@@ -1079,14 +1066,11 @@ void MindRTBackend::RunGraph(const ActorInfo &actor_info, OpRunInfo *op_run_info
}
}

runtime::GraphScheduler::GetInstance().PrepareRunOp(actor_set, graph_compiler_info, {tensors_without_value_node});
if (!runtime::GraphScheduler::GetInstance().Run(actor_set, runtime::GraphExecutionStrategy::kStep, input_tensors)) {
if (!runtime::GraphScheduler::GetInstance().Run(actor_set, {tensors_without_value_node}, *input_tensors,
runtime::GraphExecutionStrategy::kStep)) {
MS_LOG(EXCEPTION) << "The actor runs failed, actor name: " << actor_set->name_;
}

// Debug for pynative
DebugStreamSync(graph_compiler_info);

// Fetch outputs.
const auto &graph = graph_compiler_info.graphs_.front();
MS_EXCEPTION_IF_NULL(graph);


Loading…
Cancel
Save