Browse Source

cal stream and event resource

pull/879/head
zhou_chao1993 5 years ago
parent
commit
d12006027d
19 changed files with 585 additions and 39 deletions
  1. +6
    -0
      ge/CMakeLists.txt
  2. +1
    -0
      ge/executor/CMakeLists.txt
  3. +2
    -2
      ge/executor/ge_executor.cc
  4. +67
    -0
      ge/graph/execute/inter_graph_tensor_cache.cc
  5. +42
    -0
      ge/graph/execute/inter_graph_tensor_cache.h
  6. +17
    -9
      ge/graph/load/model_manager/davinci_model.cc
  7. +4
    -2
      ge/graph/load/model_manager/davinci_model.h
  8. +129
    -1
      ge/graph/load/model_manager/model_manager.cc
  9. +7
    -3
      ge/graph/load/model_manager/model_manager.h
  10. +57
    -0
      ge/graph/load/model_manager/model_utils.cc
  11. +15
    -0
      ge/graph/load/model_manager/model_utils.h
  12. +32
    -12
      ge/hybrid/executor/hybrid_model_async_executor.cc
  13. +2
    -0
      ge/session/session_manager.cc
  14. +1
    -1
      ge/single_op/task/op_task.cc
  15. +21
    -6
      inc/external/ge/ge_api_types.h
  16. +4
    -3
      inc/framework/common/ge_types.h
  17. +8
    -0
      tests/ut/ge/CMakeLists.txt
  18. +52
    -0
      tests/ut/ge/graph/inter_graph_tensor_cache_unittest.cc
  19. +118
    -0
      tests/ut/ge/graph/load/model_manager_unittest.cc

+ 6
- 0
ge/CMakeLists.txt View File

@@ -123,6 +123,9 @@ set(TRAIN_SRC_LIST
"graph/common/omg_util.cc"
"graph/common/transop_util.cc"
"graph/execute/graph_execute.cc"
"graph/execute/inter_graph_tensor_cache.cc"
"hybrid/common/tensor_value.cc"
"hybrid/common/npu_memory_allocator.cc"
"graph/label/case_label_maker.cc"
"graph/label/if_label_maker.cc"
"graph/label/label_maker.cc"
@@ -664,6 +667,9 @@ set(INFER_SRC_LIST
"graph/build/memory/hybrid_mem_assigner.cc"
"graph/build/memory/max_block_mem_assigner.cc"
"graph/build/memory/var_mem_assign_util.cc"
"graph/execute/inter_graph_tensor_cache.cc"
"hybrid/common/tensor_value.cc"
"hybrid/common/npu_memory_allocator.cc"
)

if (NOT ENABLE_D AND NOT ENABLE_ACL AND NOT ENABLE_MS_TESTCASES)


+ 1
- 0
ge/executor/CMakeLists.txt View File

@@ -20,6 +20,7 @@ set(SRC_LIST
"../common/profiling/ge_profiling.cc"
"../graph/load/graph_loader.cc"
"../graph/execute/graph_execute.cc"
"../graph/execute/inter_graph_tensor_cache.cc"
"../omm/csa_interact.cc"
"../graph/manager/graph_manager_utils.cc"
"../graph/manager/graph_var_manager.cc"


+ 2
- 2
ge/executor/ge_executor.cc View File

@@ -79,7 +79,7 @@ void GetDomiInputData(const ge::RunModelData &input_data, ge::InputData &inputs)
inputs.timeout = input_data.timeout;
inputs.request_id = input_data.request_id;
for (const auto &data_item : input_data.blobs) {
ge::DataBuffer dataBuf{data_item.data, data_item.length, data_item.isDataSupportMemShare};
ge::DataBuffer dataBuf{data_item.data, data_item.length, data_item.isDataSupportMemShare, 0};
inputs.blobs.emplace_back(dataBuf);
}
}
@@ -88,7 +88,7 @@ void GetDomiOutputData(const ge::RunModelData &output_data, ge::OutputData &outp
outputs.index = output_data.index;
outputs.model_id = output_data.modelId;
for (const auto &data_item : output_data.blobs) {
ge::DataBuffer dataBuf(data_item.data, data_item.length, data_item.isDataSupportMemShare);
ge::DataBuffer dataBuf(data_item.data, data_item.length, data_item.isDataSupportMemShare, 0);
outputs.blobs.emplace_back(dataBuf);
}
}


+ 67
- 0
ge/graph/execute/inter_graph_tensor_cache.cc View File

@@ -0,0 +1,67 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "graph/execute/inter_graph_tensor_cache.h"

namespace ge {
InterGraphTensorCache &InterGraphTensorCache::GetInstance() {
static InterGraphTensorCache instance;
return instance;
}

void InterGraphTensorCache::SetDynamicExecuteDevAddr(uint64_t session_id,
const std::map<const void *, hybrid::TensorValue> &tensor_values) {
std::lock_guard<std::mutex> lock(dev_addr_mutex_);
dynamic_execute_dev_addr_.emplace(session_id, tensor_values);
}

std::map<const void *, hybrid::TensorValue> InterGraphTensorCache::GetTensorValue(uint64_t session_id) {
std::lock_guard<std::mutex> lock(dev_addr_mutex_);
auto it = dynamic_execute_dev_addr_.find(session_id);
if (it != dynamic_execute_dev_addr_.end()) {
return it->second;
}
return {};
}

void InterGraphTensorCache::RemoveTensorValue(uint64_t session_id, void *data_dev_addr) {
std::lock_guard<std::mutex> lock(dev_addr_mutex_);
auto it = dynamic_execute_dev_addr_.find(session_id);
if (it == dynamic_execute_dev_addr_.end()) {
GELOGW("Can not find session,session id is %lu", session_id);
return;
}
auto &tensor_values = it->second;
auto tensor_value_it = tensor_values.find(data_dev_addr);
if (tensor_value_it == tensor_values.end()) {
GELOGW("Can not find data device addr in map");
return;
}
tensor_values.erase(tensor_value_it);
GELOGD("Remove tensor value success");
}

void InterGraphTensorCache::RemoveSessionTensorValue(uint64_t session_id) {
std::lock_guard<std::mutex> lock(dev_addr_mutex_);
auto it = dynamic_execute_dev_addr_.find(session_id);
if (it != dynamic_execute_dev_addr_.end()) {
dynamic_execute_dev_addr_.erase(it);
} else {
GELOGW("Session id %lu in not found", session_id);
}
GELOGD("Remove session: %lu tenson value success",session_id);
}
} // namespace ge

+ 42
- 0
ge/graph/execute/inter_graph_tensor_cache.h View File

@@ -0,0 +1,42 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef GE_GRAPH_INTER_GRAPH_TENSOR_CACHE_H_
#define GE_GRAPH_INTER_GRAPH_TENSOR_CACHE_H_

#include <map>
#include <mutex>

#include "hybrid/common/tensor_value.h"
#include "common/ge_inner_error_codes.h"
#include "common/debug/log.h"

namespace ge {
class InterGraphTensorCache {
public:
static InterGraphTensorCache &GetInstance();
void SetDynamicExecuteDevAddr(uint64_t session_id, const std::map<const void *, hybrid::TensorValue> &tensor_values);
std::map<const void *, hybrid::TensorValue> GetTensorValue(uint64_t session_id);
void RemoveTensorValue(uint64_t session_id, void *data_dev_addr);
void RemoveSessionTensorValue(uint64_t session_id);

private:
std::map<uint64_t, std::map<const void *, hybrid::TensorValue>> dynamic_execute_dev_addr_;
std::mutex dev_addr_mutex_;
};
} // namespace ge

#endif // GE_GRAPH_INTER_GRAPH_TENSOR_CACHE_H_

+ 17
- 9
ge/graph/load/model_manager/davinci_model.cc View File

@@ -59,6 +59,7 @@
#include "securec.h"
#include "graph/common/local_context.h"
#include "common/formats/utils/formats_trans_utils.h"
#include "graph/execute/inter_graph_tensor_cache.h"

// create std::thread, catch exceptions using try/catch
#define CREATE_STD_THREAD(thread_id, func, args) \
@@ -82,7 +83,7 @@ const uint32_t kAddrLen = sizeof(void *);
const int kDecimal = 10;
const int kBytes = 8;
const uint32_t kDataMemAlignSizeCompare = 64;
const uint32_t kDumpL1FusionOpMByteSize = 2097152; // 2 * 1024 * 1024
const uint32_t kDumpL1FusionOpMByteSize = 2097152; // 2 * 1024 * 1024
const uint32_t kDumpFlagOfL1Fusion = 0;
const char *const kDefaultBatchLable = "Batch_default";
const char *const kGetDynamicDimsName = "ascend_mbatch_get_dynamic_dims_node";
@@ -335,7 +336,6 @@ Status DavinciModel::InitWeightMem(void *dev_ptr, void *weight_ptr, size_t weigh
return SUCCESS;
}


Status DavinciModel::InitFeatureMapAndP2PMem(void *dev_ptr, size_t mem_size) {
if (is_feature_map_mem_has_inited_) {
GELOGE(PARAM_INVALID, "call InitFeatureMapMem more than once.");
@@ -2098,8 +2098,7 @@ Status DavinciModel::GetOutputDescInfo(vector<InputOutputDescInfo> &output_descs
return SUCCESS;
}

Status DavinciModel::CopyInputData(const InputData &input_data, bool device_data) {
rtMemcpyKind_t kind = device_data ? RT_MEMCPY_DEVICE_TO_DEVICE : RT_MEMCPY_HOST_TO_DEVICE;
Status DavinciModel::CopyInputData(const InputData &input_data) {
const std::vector<DataBuffer> &blobs = input_data.blobs;
for (const auto &data : input_data_info_) {
if (data.first >= blobs.size()) {
@@ -2108,8 +2107,13 @@ Status DavinciModel::CopyInputData(const InputData &input_data, bool device_data
data.second.GetOpName().c_str());
return FAILED;
}
rtMemcpyKind_t kind;
const DataBuffer &data_buf = blobs[data.first];
if (data_buf.placement == kPlacemetHost) {
kind = RT_MEMCPY_HOST_TO_DEVICE;
} else {
kind = RT_MEMCPY_DEVICE_TO_DEVICE;
}
if (data_buf.length == 0) {
GELOGW("No data need to memcpy!");
return SUCCESS;
@@ -2125,6 +2129,9 @@ Status DavinciModel::CopyInputData(const InputData &input_data, bool device_data
runtime_param_.graph_id, data.second.GetOpName().c_str(), data.first, mem_addr, data_buf_addr, data_size,
data_buf_length);
GE_CHK_RT_RET(rtMemcpy(mem_addr, data_size, data_buf_addr, data_buf_length, kind));
if (data_buf.placement == kPlacementDevice) {
InterGraphTensorCache::GetInstance().RemoveTensorValue(session_id_, data_buf_addr);
}
}

return SUCCESS;
@@ -2534,14 +2541,15 @@ Status DavinciModel::GenOutputTensorInfo(OutputData *output_data, vector<OutputT
GELOGE(GE_GRAPH_MALLOC_FAILED, "Malloc buffer failed.");
return GE_GRAPH_MALLOC_FAILED;
}
output_data->blobs.push_back({data_buf.get(), static_cast<uint64_t>(output_buffer_size[i]), false});
output_data->blobs.push_back({data_buf.get(), static_cast<uint64_t>(output_buffer_size[i]), false, kPlacemetHost});
OutputTensorInfo output;
output.dims = output_shape_info[i];
output.data = std::move(data_buf);
output.length = output_buffer_size[i];
output.placement = kPlacemetHost;
outputs.emplace_back(std::move(output));
GELOGD("Output index:%zu, output dims is %s, data length:%lu.", i,
formats::JoinToString(output.dims).c_str(), output.length);
GELOGD("Output index:%zu, output dims is %s, data length:%lu.", i, formats::JoinToString(output.dims).c_str(),
output.length);
}

return SUCCESS;
@@ -2677,7 +2685,7 @@ void *DavinciModel::Run(DavinciModel *model) {
GELOGI("Copy input data, model id:%u", model_id);
GE_IF_BOOL_EXEC(ProfilingManager::Instance().ProfilingModelExecuteOn(),
model->SetProfileTime(MODEL_PRE_PROC_START));
ret = model->CopyInputData(current_data, false);
ret = model->CopyInputData(current_data);
GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(
ret != SUCCESS, (void)model->ReturnResult(current_data.index, false, false, data_wrapper->GetOutput());
CsaInteract::GetInstance().StoreInternalErrorCode(ret, ERROR_MODULE_FMK, JOBSUBSTATE_GRAPH_EXEC);


+ 4
- 2
ge/graph/load/model_manager/davinci_model.h View File

@@ -49,9 +49,9 @@
#include "task_info/task_info.h"
#include "graph/common/local_context.h"

using std::multimap;
using std::mutex;
using std::thread;
using std::multimap;

namespace ge {
// op debug need 2048 bits buffer
@@ -272,6 +272,8 @@ class DavinciModel {

const vector<rtLabel_t> &GetLabelList() const { return label_list_; }

uint64_t GetAllStreamNum() const { return stream_list_.size() + all_hccl_stream_list_.size(); }

Status DestroyThread();

// get Op
@@ -605,7 +607,7 @@ class DavinciModel {
Status UpdateIoTaskArgs(const map<uint32_t, ZeroCopyOffset> &data_info, bool is_input,
const vector<DataBuffer> &blobs, bool is_dynamic, const string &batch_label);

Status CopyInputData(const InputData &input_data, bool device_data = false);
Status CopyInputData(const InputData &input_data);

Status CopyOutputData(uint32_t data_id, OutputData &output_data, rtMemcpyKind_t kind);



+ 129
- 1
ge/graph/load/model_manager/model_manager.cc View File

@@ -35,6 +35,7 @@
#include "graph/utils/attr_utils.h"
#include "common/formats/utils/formats_trans_utils.h"
#include "hybrid/hybrid_davinci_model.h"
#include "graph/utils/graph_utils.h"

namespace ge {
thread_local uint32_t device_count = 0;
@@ -52,9 +53,12 @@ const std::string kCmdTypeProfModelSubscribe = "prof_model_subscribe";
const std::string kCmdTypeProfModelUnsubscribe = "prof_model_cancel_subscribe";
const char *const kBatchLoadBuf = "batchLoadsoFrombuf";
const char *const kDeleteCustOp = "deleteCustOp";
const char *const kStreamResource = "stream";
const char *const kEventResource = "event";
const int kTimeSpecNano = 1000000000;
const int kTimeSpecMiro = 1000000;
const int kOpNameMaxSize = 100;
const int kMaxEventNum = 10;
struct CustAicpuSoBuf {
uint64_t kernelSoBuf;
uint32_t kernelSoBufLen;
@@ -347,7 +351,7 @@ Status ModelManager::LoadModelOnline(uint32_t &model_id, const shared_ptr<ge::Ge
GE_IF_BOOL_EXEC(SUCCESS != (ret = davinci_model->Assign(ge_model)), GELOGW("assign model to modeldef failed.");
break;);
GE_TIMESTAMP_END(Assign, "GraphLoader::ModelAssign");
GE_CHK_STATUS_RET(CheckAndReleaseStreamEventResource(ge_model, model_id), "Check stream and event resource failed");
GE_TIMESTAMP_START(Init);
GE_IF_BOOL_EXEC(SUCCESS != (ret = davinci_model->Init()), GELOGW("DavinciInit failed."); break;);
GE_TIMESTAMP_END(Init, "GraphLoader::ModelInit");
@@ -378,6 +382,130 @@ void ModelManager::InsertModel(uint32_t id, shared_ptr<hybrid::HybridDavinciMode
hybrid_model_map_[id] = hybrid_model;
}

Status ModelManager::CheckAndReleaseStreamEventResource(const GeModelPtr &ge_model, uint32_t model_id) {
GE_CHK_BOOL_EXEC(ge_model != nullptr, return FAILED, "ge model ptr is null");
int64_t value = 0;
bool ret = ge::AttrUtils::GetInt(ge_model, ATTR_MODEL_STREAM_NUM, value);
int64_t need_stream_num = ret ? value : 0;
ret = ge::AttrUtils::GetInt(ge_model, ATTR_MODEL_EVENT_NUM, value);
int64_t need_event_num = ret ? value : 0;
GELOGD("The main stream number is %lu, event number is %lu", need_stream_num, need_event_num);

int64_t hccl_follow_stream = 0;
Status status = ModelUtils::CalculateFollowStream(ge_model, hccl_follow_stream);
if (status != SUCCESS) {
GELOGE(FAILED, "Calculate hccl follow stream failed");
return FAILED;
}
need_stream_num = need_stream_num + hccl_follow_stream;
GELOGD("The model is %u need stream num is %ld", model_id, need_stream_num);

int64_t free_stream_num = 0;
status = GetFreeStream(free_stream_num);
if (status != SUCCESS) {
GELOGE(FAILED, "Get free steam and event failed");
return FAILED;
}
if (need_stream_num > free_stream_num) {
status = ReleaseResource(need_stream_num, free_stream_num, kStreamResource);
if (status != SUCCESS) {
GELOGE(FAILED, "Release stream resource failed");
return FAILED;
}
}

int64_t free_event_num = 0;
GetFreeEvent(free_event_num);
if (need_event_num > free_event_num) {
status = ReleaseResource(need_event_num, free_event_num, kEventResource);
if (status != SUCCESS) {
GELOGE(FAILED, "Release event resource failed");
return FAILED;
}
}
return SUCCESS;
}


Status ModelManager::ReleaseResource(int64_t need_resource, int64_t free_resource, const string &resource_kind) {
while (need_resource > free_resource) {
uint32_t max_stream_model_id = 0;
uint32_t max_event_model_id = 0;
GetMaxStreamAndEventModel(max_stream_model_id, max_event_model_id);
GELOGD("The max stream num model is: %u,the max event num model is %u", max_stream_model_id, max_event_model_id);
std::lock_guard<std::recursive_mutex> lock(map_mutex_);
if (resource_kind == "stream") {
uint64_t max_stream_num = model_map_.at(max_stream_model_id)->GetAllStreamNum();
Status ret = Unload(max_stream_model_id);
if (ret != SUCCESS) {
GELOGE(FAILED, "Unload max stream model failed ,model id : %u", max_stream_model_id);
return FAILED;
}
free_resource = free_resource + max_stream_num;
GELOGD("Unload model for stream, model id : %u, stream num :%zu", max_stream_model_id,
max_stream_num);
}

if (resource_kind == "event") {
uint64_t max_event_num = model_map_.at(max_event_model_id)->GetEventList().size();
Status ret = Unload(max_event_model_id);
if (ret != SUCCESS) {
GELOGE(FAILED, "Unload max stream model failed ,model id : %u", max_stream_model_id);
return FAILED;
}
free_resource = free_resource + max_event_num;
GELOGD("Unload model for event, model id : %u, event num :%zu", max_event_model_id,
max_event_num);
}
}
return SUCCESS;
}


Status ModelManager::GetFreeStream(int64_t &free_stream) {
uint32_t max_stream_cout;
uint32_t max_task_cout;
rtError_t ret = rtGetMaxStreamAndTask(RT_NORMAL_STREAM, &max_stream_cout, &max_task_cout);
if (ret != RT_ERROR_NONE) {
GELOGE(FAILED, "Get max stream and task cout failed");
return FAILED;
}
GELOGD("Allowed max stream count: %u,max task cout per stream:%u", max_stream_cout, max_task_cout);
std::lock_guard<std::recursive_mutex> lock(map_mutex_);
uint64_t stream_sum = 0;

for (auto &it : model_map_) {
stream_sum = stream_sum + it.second->GetAllStreamNum();
}
free_stream = max_stream_cout - stream_sum;
return SUCCESS;
}

void ModelManager::GetFreeEvent(int64_t &free_event) {
std::lock_guard<std::recursive_mutex> lock(map_mutex_);
uint64_t event_sum = 0;
for (auto &it : model_map_) {
event_sum = event_sum + it.second->GetEventList().size();
}
free_event = kMaxEventNum - event_sum;
}

void ModelManager::GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model) {
std::lock_guard<std::recursive_mutex> lock(map_mutex_);
uint64_t max_stream_num = 0;
uint64_t max_event_num = 0;
for (auto &it : model_map_) {
if (it.second->GetAllStreamNum() > max_stream_num) {
max_stream_num = it.second->GetAllStreamNum();
max_stream_model = it.first;
}
if (it.second->GetEventList().size() > max_event_num) {
max_event_num = it.second->GetEventList().size();
max_event_model = it.first;
}
}
}

Status ModelManager::DeleteModel(uint32_t id) {
std::lock_guard<std::recursive_mutex> lock(map_mutex_);



+ 7
- 3
ge/graph/load/model_manager/model_manager.h View File

@@ -177,8 +177,7 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager {
static ge::Status HandleProfStartCommand(const Command &command);
static ge::Status HandleProfStopCommand(const Command &command);

static ge::Status GetModelByCmd(const Command &command,
std::shared_ptr<DavinciModel> &davinci_model);
static ge::Status GetModelByCmd(const Command &command, std::shared_ptr<DavinciModel> &davinci_model);
///
/// @ingroup domi_ome
/// @brief get model memory usage
@@ -333,6 +332,12 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager {
void InsertModel(uint32_t id, std::shared_ptr<DavinciModel> &davinci_model);
void InsertModel(uint32_t id, std::shared_ptr<hybrid::HybridDavinciModel> &hybrid_model);

Status CheckAndReleaseStreamEventResource(const GeModelPtr &ge_model, uint32_t model_id);
Status ReleaseResource(int64_t need_resource, int64_t free_resource, const string &resource_kind);
Status GetFreeStream(int64_t &free_stream);
void GetFreeEvent(int64_t &free_event);
void GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model);

///
/// @ingroup domi_ome
/// @brief delete model from model manager set
@@ -353,7 +358,6 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager {
std::vector<rtExceptionInfo> exception_infos_;
std::mutex cust_aicpu_mutex_;
std::map<uintptr_t, std::map<std::string, CustAICPUKernelPtr>> cust_aicpu_so_;

static DumpProperties dump_properties_;
};
} // namespace ge


+ 57
- 0
ge/graph/load/model_manager/model_utils.cc View File

@@ -21,6 +21,7 @@
#include "graph/utils/tensor_utils.h"
#include "graph/manager/graph_var_manager.h"
#include "graph/types.h"
#include "graph/utils/graph_utils.h"

#define VALIDATE_MEM_RANGE(OP, SIZE, OFFSET) \
do { \
@@ -30,6 +31,9 @@
} \
} while (0)

namespace {
const char *const kUsedStreamNum = "used_stream_num";
} // namespace
namespace ge {
///
/// @ingroup ge
@@ -574,4 +578,57 @@ Status ModelUtils::GetRtAddress(const RuntimeParam &param, uintptr_t logic_addr,
mem_addr = runtime_base_addr + logic_addr;
return SUCCESS;
}

Status ModelUtils::CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num) {
const auto &model_def = ge_model->GetModelTaskDefPtr();
GE_CHECK_NOTNULL(model_def);
Graph graph = ge_model->GetGraph();
ComputeGraphPtr compute_graph = GraphUtils::GetComputeGraph(graph);
GE_CHECK_NOTNULL(compute_graph);

map<uint32_t, OpDescPtr> op_list;
for (auto &node : compute_graph->GetDirectNode()) {
OpDescPtr op_desc = node->GetOpDesc();
if (op_desc == nullptr) {
GELOGE(PARAM_INVALID, "Op desc is nullptr");
return PARAM_INVALID;
}
op_list.emplace(op_desc->GetId(), op_desc);
}

std::multimap<int64_t, int64_t> main_follow_num;
for (int i = 0; i < model_def->task_size(); i++) {
const domi::TaskDef &task = model_def->task(i);
if (static_cast<rtModelTaskType_t>(task.type()) == RT_MODEL_TASK_HCCL) {
auto hccl_def = task.kernel_hccl();
OpDescPtr hccl_op_desc = op_list.at(hccl_def.op_index());
int64_t main_stream_id = hccl_op_desc->GetStreamId();
int64_t follow_stream_num = 0;
if (!ge::AttrUtils::GetInt(hccl_op_desc, kUsedStreamNum, follow_stream_num)) {
GELOGW("Get used_stream_num failed, op is %s", hccl_op_desc->GetName().c_str());
}
main_follow_num.emplace(main_stream_id, follow_stream_num);
}
}
hccl_fellow_stream_num = CalFollowStreamSum(main_follow_num);
return SUCCESS;
}

int64_t ModelUtils::CalFollowStreamSum(const std::multimap<int64_t, int64_t> &hccl_stream_map) {
int64_t need_follow_stream_num = 0;
std::map<int64_t, int64_t> max_follow_stream_map;
for (auto &it : hccl_stream_map) {
auto max_it = max_follow_stream_map.find(it.first);
if (max_it == max_follow_stream_map.end()) {
max_follow_stream_map.emplace(it.first, it.second);
} else if (it.second > max_it->second) {
max_follow_stream_map.at(max_it->first) = it.second;
}
}
for (auto &follow_it : max_follow_stream_map) {
need_follow_stream_num = need_follow_stream_num + follow_it.second;
}
return need_follow_stream_num;
}

} // namespace ge

+ 15
- 0
ge/graph/load/model_manager/model_utils.h View File

@@ -24,6 +24,7 @@
#include "graph/load/model_manager/task_info/task_info.h"
#include "graph/op_desc.h"
#include "graph/utils/tensor_adapter.h"
#include "model/ge_model.h"

using std::vector;

@@ -108,6 +109,20 @@ class ModelUtils {
///
static Status GetRtAddress(const RuntimeParam &model_param, uintptr_t logic_addr, uint8_t *&mem_addr);

///
/// @ingroup ge
/// @brief Calculate hccl follow stream
/// @return Status
///
static Status CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num);

///
/// @ingroup ge
/// @brief Calculate the sum of follow stream
/// @return int64_t
///
static int64_t CalFollowStreamSum(const std::multimap<int64_t, int64_t> &hccl_stream_map);

private:
///
/// @ingroup ge


+ 32
- 12
ge/hybrid/executor/hybrid_model_async_executor.cc View File

@@ -20,12 +20,15 @@
#include "graph/utils/type_utils.h"
#include "graph/ge_context.h"
#include "omm/csa_interact.h"
#include "graph/execute/inter_graph_tensor_cache.h"
#include "graph/debug/ge_attr_define.h"

namespace ge {
namespace hybrid {
namespace {
const int kDataOutputIndex = 0;
const size_t kMinimumPiplineStages = 2;
const char *const kLazyRecompile = "lazy_recompile";
}
HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model)
: model_(model), run_flag_(false) {
@@ -62,6 +65,8 @@ Status HybridModelAsyncExecutor::Start(const std::shared_ptr<ModelListener> &lis
future_ = std::async(std::launch::async, [&]() -> Status {
GetThreadLocalContext() = *executor_->GetContext()->ge_context;
GetContext().SetSessionId(executor_->GetContext()->session_id);
GE_CHECK_NOTNULL(executor_->GetContext()->ge_context);
GetThreadLocalContext() = *executor_->GetContext()->ge_context;
return RunInternal();
});

@@ -223,8 +228,8 @@ Status HybridModelAsyncExecutor::SyncVarData() {

Status HybridModelAsyncExecutor::PrepareInputs(const InputData &current_data, HybridModelExecutor::ExecuteArgs &args) {
if (current_data.blobs.size() < input_tensor_desc_.size()) {
GELOGE(PARAM_INVALID, "Blob size mismatches, expect at least %zu, but got %zu",
input_tensor_desc_.size(), current_data.blobs.size());
GELOGE(PARAM_INVALID, "Blob size mismatches, expect at least %zu, but got %zu", input_tensor_desc_.size(),
current_data.blobs.size());
return PARAM_INVALID;
}

@@ -343,6 +348,13 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a
}

GELOGD("Number of outputs = %zu", output_tensor_desc_list.size());
map<const void *, TensorValue> tensor_value_info;
string execute_mode;
auto result = ge::GetContext().GetOption(OPTION_EXEC_DYNAMIC_EXECUTE_MODE, execute_mode);
if (result != SUCCESS) {
GELOGW("Can not get dynamic execute mode attr");
}
GELOGD("The dynamic execute attr is %s", execute_mode.c_str());
for (size_t i = 0; i < output_tensors.size(); ++i) {
GELOGD("Start to process output[%zu]", i);
auto &output_tensor = output_tensors[i];
@@ -380,19 +392,25 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a
output.dims = tensor_desc->GetShape().GetDims();
output.length = output_size;
if (output_size > 0) {
std::unique_ptr<uint8_t[]> data_buf(new(std::nothrow) uint8_t[output_size]);
GE_CHECK_NOTNULL(data_buf);
GE_CHK_RT_RET(rtMemcpy(data_buf.get(),
output_size,
output_tensor.GetData(),
output_size,
RT_MEMCPY_DEVICE_TO_HOST));
output.data = std::move(data_buf);
output_data->blobs.emplace_back(data_buf.get(), static_cast<uint32_t>(output_size), false);
if (execute_mode != kLazyRecompile) {
std::unique_ptr<uint8_t[]> data_buf(new (std::nothrow) uint8_t[output_size]);
GE_CHECK_NOTNULL(data_buf);
GE_CHK_RT_RET(
rtMemcpy(data_buf.get(), output_size, output_tensor.GetData(), output_size, RT_MEMCPY_DEVICE_TO_HOST));
output.data = std::move(data_buf);
output.placement = kPlacemetHost;
output_data->blobs.emplace_back(data_buf.get(), static_cast<uint32_t>(output_size), false, kPlacemetHost);
} else {
output.dev_data = output_tensor.MutableData();
output.placement = kPlacementDevice;
tensor_value_info.emplace(output_tensor.GetData(), output_tensor);
output_data->blobs.emplace_back(output_tensor.MutableData(), static_cast<uint32_t>(output_size), false, kPlacementDevice);
}

} else {
GELOGW("Output[%zu] is empty. shape = [%s]", i, tensor_desc->GetShape().ToString().c_str());
output.data = nullptr;
output_data->blobs.emplace_back(nullptr, 0U, false);
output_data->blobs.emplace_back(nullptr, 0U, false, kPlacemetHost);
}

outputs.emplace_back(std::move(output));
@@ -403,6 +421,8 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a
output_size);
}

uint64_t session_id = executor_->GetContext()->session_id;
InterGraphTensorCache::GetInstance().SetDynamicExecuteDevAddr(session_id, tensor_value_info);
return SUCCESS;
}



+ 2
- 0
ge/session/session_manager.cc View File

@@ -22,6 +22,7 @@
#include "graph/ge_context.h"
#include "graph/load/model_manager/model_manager.h"
#include "graph/manager/util/rt_context_util.h"
#include "graph/execute/inter_graph_tensor_cache.h"

using std::map;
using std::string;
@@ -105,6 +106,7 @@ Status SessionManager::DestroySession(SessionId session_id) {
ModelManager::GetInstance()->DestroyAicpuSession(session_id);
}

InterGraphTensorCache::GetInstance().RemoveSessionTensorValue(session_id);
// Unified destruct rt_context
RtContextUtil::GetInstance().DestroyRtContexts(session_id);



+ 1
- 1
ge/single_op/task/op_task.cc View File

@@ -787,7 +787,7 @@ Status AiCpuTask::LaunchKernel(const std::vector<GeTensorDesc> &input_desc,
if (unknown_type_ == DEPEND_COMPUTE) {
std::vector<DataBuffer> summary_buffers;
for (size_t i = 0; i < num_outputs_; ++i) {
summary_buffers.emplace_back(output_summary_[i], sizeof(aicpu::FWKAdapter::ResultSummary), false);
summary_buffers.emplace_back(output_summary_[i], sizeof(aicpu::FWKAdapter::ResultSummary), false, kPlacemetHost);
}
GE_CHK_STATUS_RET_NOLOG(UpdateIoAddr(input_buffers, summary_buffers));
} else {


+ 21
- 6
inc/external/ge/ge_api_types.h View File

@@ -311,12 +311,18 @@ const std::string HCOM_MULTI_MODE = "ge.hcomMultiMode";
// Graph run mode
enum GraphRunMode { PREDICTION = 0, TRAIN };

// if data addr is in host
const uint32_t kPlacemetHost = 0;

// if data addr is in device
const uint32_t kPlacementDevice = 1;
// Input/Output tensor info
struct InputTensorInfo {
uint32_t data_type; // data type
std::vector<int64_t> dims; // shape description
void *data; // tensor data
int64_t length; // tensor length
uint32_t data_type; // data type
std::vector<int64_t> dims; // shape description
void *data; // tensor data
int64_t length; // tensor length
uint32_t placement = kPlacemetHost; // data placement
};

struct OutputTensorInfo {
@@ -324,9 +330,16 @@ struct OutputTensorInfo {
std::vector<int64_t> dims; // shape description
std::unique_ptr<uint8_t[]> data; // tensor data
int64_t length; // tensor length
OutputTensorInfo() : data_type(0), dims({}), data(nullptr), length(0) {}
uint32_t placement; // data placement
void *dev_data; // device data addr
OutputTensorInfo() : data_type(0), dims({}), data(nullptr), length(0), placement(0), dev_data(nullptr) {}
OutputTensorInfo(OutputTensorInfo &&out)
: data_type(out.data_type), dims(out.dims), data(std::move(out.data)), length(out.length) {}
: data_type(out.data_type),
dims(out.dims),
data(std::move(out.data)),
length(out.length),
placement(out.placement),
dev_data(out.dev_data) {}

OutputTensorInfo &operator=(OutputTensorInfo &&out) {
if (this != &out) {
@@ -334,6 +347,8 @@ struct OutputTensorInfo {
dims = out.dims;
data = std::move(out.data);
length = out.length;
placement = out.placement;
dev_data = out.dev_data;
}
return *this;
}


+ 4
- 3
inc/framework/common/ge_types.h View File

@@ -67,10 +67,11 @@ struct DataBuffer {
void *data; // Data address
uint64_t length; // Data length
bool isDataSupportMemShare = false;
DataBuffer(void *dataIn, uint64_t len, bool isSupportMemShare)
: data(dataIn), length(len), isDataSupportMemShare(isSupportMemShare) {}
uint32_t placement;
DataBuffer(void *dataIn, uint64_t len, bool isSupportMemShare, uint32_t placement)
: data(dataIn), length(len), isDataSupportMemShare(isSupportMemShare), placement(placement) {}

DataBuffer() : data(nullptr), length(0), isDataSupportMemShare(false) {}
DataBuffer() : data(nullptr), length(0), isDataSupportMemShare(false), placement(0) {}
};

///


+ 8
- 0
tests/ut/ge/CMakeLists.txt View File

@@ -138,6 +138,7 @@ set(COMMON_SRC_FILES
"${GE_CODE_DIR}/ge/session/inner_session.cc"
"${GE_CODE_DIR}/ge/graph/manager/util/rt_context_util.cc"
"${GE_CODE_DIR}/ge/graph/execute/graph_execute.cc"
"${GE_CODE_DIR}/ge/graph/execute/inter_graph_tensor_cache.cc"
"${GE_CODE_DIR}/ge/graph/preprocess/graph_preprocess.cc"
"${GE_CODE_DIR}/ge/hybrid/hybrid_davinci_model_stub.cc"
"${GE_CODE_DIR}/ge/graph/load/model_manager/davinci_model.cc"
@@ -310,6 +311,8 @@ set(COMMON_SRC_FILES
"${GE_CODE_DIR}/ge/common/dump/dump_op.cc"
"${GE_CODE_DIR}/ge/common/model_saver.cc"
"${GE_CODE_DIR}/ge/hybrid/node_executor/aicpu/aicpu_ext_info.cc"
"${GE_CODE_DIR}/ge/hybrid/common/tensor_value.cc"
"${GE_CODE_DIR}/ge/hybrid/common/npu_memory_allocator.cc"
"${GE_CODE_DIR}/ge/common/ge/datatype_util.cc"
"${GE_CODE_DIR}/metadef/register/ops_kernel_builder_registry.cc"
"${GE_CODE_DIR}/metadef/register/op_tiling.cpp"
@@ -632,7 +635,12 @@ set(DISTINCT_GRAPH_LOAD_TEST_FILES
#"graph/graph_load_unittest.cc"
"graph/ge_executor_unittest.cc"
"graph/load/model_helper_unittest.cc"
<<<<<<< Updated upstream
"graph/load/model_utils_unittest.cc"
=======
"graph/inter_graph_tensor_cache_unittest.cc"
"graph/load/model_manager_unittest.cc"
>>>>>>> Stashed changes
)

set(PASS_TEST_FILES


+ 52
- 0
tests/ut/ge/graph/inter_graph_tensor_cache_unittest.cc View File

@@ -0,0 +1,52 @@
/**
* Copyright 2019-2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gtest/gtest.h>
#include <memory>
#include <vector>
#include <map>

#define protected public
#define private public
#include "graph/execute/inter_graph_tensor_cache.h"
#include "hybrid/common/tensor_value.h"
#include "graph/load/new_model_manager/model_manager.h"
#include "graph/manager/graph_manager_utils.h"
#include "model/ge_model.h"
#undef private
#undef protected

using namespace testing;
namespace ge {

class UtestInterGraphTensorCache : public testing::Test {
protected:
void SetUp() {}

void TearDown() {}
};

TEST_F(UtestInterGraphTensorCache, set_dynamic_execute_dev_addr) {
hybrid::TensorValue tensor_value;
std::map<const void *, hybrid::TensorValue> tensor_value_map = {{(const void *)0x12345, tensor_value}};
InterGraphTensorCache::GetInstance().SetDynamicExecuteDevAddr(1, tensor_value_map);
InterGraphTensorCache::GetInstance().RemoveTensorValue(1, (void *)0x12345);
InterGraphTensorCache::GetInstance().RemoveTensorValue(1, (void *)0x123456);
InterGraphTensorCache::GetInstance().RemoveSessionTensorValue(2);
InterGraphTensorCache::GetInstance().RemoveSessionTensorValue(1);
}

} // namespace ge

+ 118
- 0
tests/ut/ge/graph/load/model_manager_unittest.cc View File

@@ -0,0 +1,118 @@
/**
* Copyright 2019-2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gtest/gtest.h>

#include <cce/compiler_stub.h>
#include "common/types.h"

#define private public
#define protected public
#include "graph/load/new_model_manager/model_manager.h"

#include "common/helper/om_file_helper.h"
#include "common/op/ge_op_utils.h"
#include "graph/load/graph_loader.h"
#include "graph/load/new_model_manager/davinci_model.h"
#include "graph/load/new_model_manager/davinci_model_parser.h"
#include "graph/load/new_model_manager/model_utils.h"

#undef private
#undef protected

using namespace std;
using namespace testing;
namespace ge {
class UtestModelManagerModelManager : public testing::Test {
protected:
void SetUp() {}
void TearDown() {}
};

TEST_F(UtestModelManagerModelManager, Cal_follwe_stream_sum) {
std::multimap<int64_t, int64_t> hccl_stream_map = {{1, 10}, {1, 20}, {2, 10}, {2, 5}};
int64_t result = ModelUtils::CalFollowStreamSum(hccl_stream_map);
EXPECT_EQ(result, 30);
}
TEST_F(UtestModelManagerModelManager, get_max_stream_and_event) {
ModelManager mm;
auto model1 = std::make_shared<DavinciModel>(1, nullptr);
auto model2 = std::make_shared<DavinciModel>(2, nullptr);
rtStream_t stream = nullptr;
rtStream_t stream2 = nullptr;
rtStream_t stream3 = nullptr;
rtStream_t stream4 = nullptr;
rtEvent_t event = nullptr;
rtEvent_t event2 = nullptr;
rtEvent_t event3 = nullptr;
model1->stream_list_= {stream,stream2,stream3,stream4};
model1->event_list_ = {event, event2};
model2->stream_list_ = {stream,stream2};
model2->event_list_ = {event,event2,event3};
mm.InsertModel(1,model1);
mm.InsertModel(2,model2);
uint32_t max_stream_model;
uint32_t max_event_model;
mm.GetMaxStreamAndEventModel(max_stream_model, max_event_model);
EXPECT_EQ(max_stream_model, 1);
EXPECT_EQ(max_event_model, 2);

int64_t free_stream;
int64_t free_event;
Status ret = mm.GetFreeStream(free_stream);
mm.GetFreeEvent(free_event);
EXPECT_EQ(ge::SUCCESS, ret);
}


TEST_F(UtestModelManagerModelManager, release_resource_stream) {
ModelManager mm;
auto model1 = std::make_shared<DavinciModel>(1, nullptr);
auto model2 = std::make_shared<DavinciModel>(2, nullptr);
rtStream_t stream = nullptr;
rtStream_t stream2 = nullptr;
rtStream_t stream3 = nullptr;
rtStream_t stream4 = nullptr;
rtEvent_t event = nullptr;
rtEvent_t event2 = nullptr;
rtEvent_t event3 = nullptr;
model1->stream_list_= {stream,stream2,stream3,stream4};
model1->event_list_ = {event, event2};
model2->stream_list_ = {stream,stream2};
model2->event_list_ = {event,event2,event3};
mm.InsertModel(1,model1);
mm.InsertModel(2,model2);
string kind = "stream";
Status ret = mm.ReleaseResource(110, 109, kind);
EXPECT_EQ(ge::SUCCESS, ret);

string kind2 = "event";
Status ret2 = mm.ReleaseResource(110, 109, kind2);
EXPECT_EQ(ge::SUCCESS, ret2);
}


TEST_F(UtestModelManagerModelManager, check_stream_and_event_resource) {
ModelManager mm;
auto ge_model = make_shared<GeModel>();
Status ret = mm.CheckAndReleaseStreamEventResource(ge_model, 1);
EXPECT_EQ(ge::FAILED, ret);
}

} // namespace ge

Loading…
Cancel
Save