From d12006027d59d495997844f3ebd8d74a892759fa Mon Sep 17 00:00:00 2001 From: zhou_chao1993 Date: Wed, 3 Feb 2021 15:05:00 +0800 Subject: [PATCH] cal stream and event resource --- ge/CMakeLists.txt | 6 + ge/executor/CMakeLists.txt | 1 + ge/executor/ge_executor.cc | 4 +- ge/graph/execute/inter_graph_tensor_cache.cc | 67 +++++++++ ge/graph/execute/inter_graph_tensor_cache.h | 42 ++++++ ge/graph/load/model_manager/davinci_model.cc | 26 ++-- ge/graph/load/model_manager/davinci_model.h | 6 +- ge/graph/load/model_manager/model_manager.cc | 130 +++++++++++++++++- ge/graph/load/model_manager/model_manager.h | 10 +- ge/graph/load/model_manager/model_utils.cc | 57 ++++++++ ge/graph/load/model_manager/model_utils.h | 15 ++ .../executor/hybrid_model_async_executor.cc | 44 ++++-- ge/session/session_manager.cc | 2 + ge/single_op/task/op_task.cc | 2 +- inc/external/ge/ge_api_types.h | 27 +++- inc/framework/common/ge_types.h | 7 +- tests/ut/ge/CMakeLists.txt | 8 ++ .../inter_graph_tensor_cache_unittest.cc | 52 +++++++ .../ge/graph/load/model_manager_unittest.cc | 118 ++++++++++++++++ 19 files changed, 585 insertions(+), 39 deletions(-) create mode 100644 ge/graph/execute/inter_graph_tensor_cache.cc create mode 100644 ge/graph/execute/inter_graph_tensor_cache.h mode change 100755 => 100644 ge/graph/load/model_manager/davinci_model.cc mode change 100755 => 100644 ge/graph/load/model_manager/davinci_model.h mode change 100755 => 100644 ge/graph/load/model_manager/model_manager.cc mode change 100755 => 100644 ge/graph/load/model_manager/model_manager.h create mode 100644 tests/ut/ge/graph/inter_graph_tensor_cache_unittest.cc create mode 100644 tests/ut/ge/graph/load/model_manager_unittest.cc diff --git a/ge/CMakeLists.txt b/ge/CMakeLists.txt index 0a5d7ba4..eadd7dd9 100755 --- a/ge/CMakeLists.txt +++ b/ge/CMakeLists.txt @@ -123,6 +123,9 @@ set(TRAIN_SRC_LIST "graph/common/omg_util.cc" "graph/common/transop_util.cc" "graph/execute/graph_execute.cc" + "graph/execute/inter_graph_tensor_cache.cc" + "hybrid/common/tensor_value.cc" + "hybrid/common/npu_memory_allocator.cc" "graph/label/case_label_maker.cc" "graph/label/if_label_maker.cc" "graph/label/label_maker.cc" @@ -664,6 +667,9 @@ set(INFER_SRC_LIST "graph/build/memory/hybrid_mem_assigner.cc" "graph/build/memory/max_block_mem_assigner.cc" "graph/build/memory/var_mem_assign_util.cc" + "graph/execute/inter_graph_tensor_cache.cc" + "hybrid/common/tensor_value.cc" + "hybrid/common/npu_memory_allocator.cc" ) if (NOT ENABLE_D AND NOT ENABLE_ACL AND NOT ENABLE_MS_TESTCASES) diff --git a/ge/executor/CMakeLists.txt b/ge/executor/CMakeLists.txt index 05d627de..25c71ff4 100644 --- a/ge/executor/CMakeLists.txt +++ b/ge/executor/CMakeLists.txt @@ -20,6 +20,7 @@ set(SRC_LIST "../common/profiling/ge_profiling.cc" "../graph/load/graph_loader.cc" "../graph/execute/graph_execute.cc" + "../graph/execute/inter_graph_tensor_cache.cc" "../omm/csa_interact.cc" "../graph/manager/graph_manager_utils.cc" "../graph/manager/graph_var_manager.cc" diff --git a/ge/executor/ge_executor.cc b/ge/executor/ge_executor.cc index af8237e0..1a09b9d2 100755 --- a/ge/executor/ge_executor.cc +++ b/ge/executor/ge_executor.cc @@ -79,7 +79,7 @@ void GetDomiInputData(const ge::RunModelData &input_data, ge::InputData &inputs) inputs.timeout = input_data.timeout; inputs.request_id = input_data.request_id; for (const auto &data_item : input_data.blobs) { - ge::DataBuffer dataBuf{data_item.data, data_item.length, data_item.isDataSupportMemShare}; + ge::DataBuffer dataBuf{data_item.data, data_item.length, data_item.isDataSupportMemShare, 0}; inputs.blobs.emplace_back(dataBuf); } } @@ -88,7 +88,7 @@ void GetDomiOutputData(const ge::RunModelData &output_data, ge::OutputData &outp outputs.index = output_data.index; outputs.model_id = output_data.modelId; for (const auto &data_item : output_data.blobs) { - ge::DataBuffer dataBuf(data_item.data, data_item.length, data_item.isDataSupportMemShare); + ge::DataBuffer dataBuf(data_item.data, data_item.length, data_item.isDataSupportMemShare, 0); outputs.blobs.emplace_back(dataBuf); } } diff --git a/ge/graph/execute/inter_graph_tensor_cache.cc b/ge/graph/execute/inter_graph_tensor_cache.cc new file mode 100644 index 00000000..1fe2efdf --- /dev/null +++ b/ge/graph/execute/inter_graph_tensor_cache.cc @@ -0,0 +1,67 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "graph/execute/inter_graph_tensor_cache.h" + +namespace ge { +InterGraphTensorCache &InterGraphTensorCache::GetInstance() { + static InterGraphTensorCache instance; + return instance; +} + +void InterGraphTensorCache::SetDynamicExecuteDevAddr(uint64_t session_id, + const std::map &tensor_values) { + std::lock_guard lock(dev_addr_mutex_); + dynamic_execute_dev_addr_.emplace(session_id, tensor_values); +} + +std::map InterGraphTensorCache::GetTensorValue(uint64_t session_id) { + std::lock_guard lock(dev_addr_mutex_); + auto it = dynamic_execute_dev_addr_.find(session_id); + if (it != dynamic_execute_dev_addr_.end()) { + return it->second; + } + return {}; +} + +void InterGraphTensorCache::RemoveTensorValue(uint64_t session_id, void *data_dev_addr) { + std::lock_guard lock(dev_addr_mutex_); + auto it = dynamic_execute_dev_addr_.find(session_id); + if (it == dynamic_execute_dev_addr_.end()) { + GELOGW("Can not find session,session id is %lu", session_id); + return; + } + auto &tensor_values = it->second; + auto tensor_value_it = tensor_values.find(data_dev_addr); + if (tensor_value_it == tensor_values.end()) { + GELOGW("Can not find data device addr in map"); + return; + } + tensor_values.erase(tensor_value_it); + GELOGD("Remove tensor value success"); +} + +void InterGraphTensorCache::RemoveSessionTensorValue(uint64_t session_id) { + std::lock_guard lock(dev_addr_mutex_); + auto it = dynamic_execute_dev_addr_.find(session_id); + if (it != dynamic_execute_dev_addr_.end()) { + dynamic_execute_dev_addr_.erase(it); + } else { + GELOGW("Session id %lu in not found", session_id); + } + GELOGD("Remove session: %lu tenson value success",session_id); +} +} // namespace ge diff --git a/ge/graph/execute/inter_graph_tensor_cache.h b/ge/graph/execute/inter_graph_tensor_cache.h new file mode 100644 index 00000000..a2a771a3 --- /dev/null +++ b/ge/graph/execute/inter_graph_tensor_cache.h @@ -0,0 +1,42 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef GE_GRAPH_INTER_GRAPH_TENSOR_CACHE_H_ +#define GE_GRAPH_INTER_GRAPH_TENSOR_CACHE_H_ + +#include +#include + +#include "hybrid/common/tensor_value.h" +#include "common/ge_inner_error_codes.h" +#include "common/debug/log.h" + +namespace ge { +class InterGraphTensorCache { + public: + static InterGraphTensorCache &GetInstance(); + void SetDynamicExecuteDevAddr(uint64_t session_id, const std::map &tensor_values); + std::map GetTensorValue(uint64_t session_id); + void RemoveTensorValue(uint64_t session_id, void *data_dev_addr); + void RemoveSessionTensorValue(uint64_t session_id); + + private: + std::map> dynamic_execute_dev_addr_; + std::mutex dev_addr_mutex_; +}; +} // namespace ge + +#endif // GE_GRAPH_INTER_GRAPH_TENSOR_CACHE_H_ diff --git a/ge/graph/load/model_manager/davinci_model.cc b/ge/graph/load/model_manager/davinci_model.cc old mode 100755 new mode 100644 index ed2428d9..f2802205 --- a/ge/graph/load/model_manager/davinci_model.cc +++ b/ge/graph/load/model_manager/davinci_model.cc @@ -59,6 +59,7 @@ #include "securec.h" #include "graph/common/local_context.h" #include "common/formats/utils/formats_trans_utils.h" +#include "graph/execute/inter_graph_tensor_cache.h" // create std::thread, catch exceptions using try/catch #define CREATE_STD_THREAD(thread_id, func, args) \ @@ -82,7 +83,7 @@ const uint32_t kAddrLen = sizeof(void *); const int kDecimal = 10; const int kBytes = 8; const uint32_t kDataMemAlignSizeCompare = 64; -const uint32_t kDumpL1FusionOpMByteSize = 2097152; // 2 * 1024 * 1024 +const uint32_t kDumpL1FusionOpMByteSize = 2097152; // 2 * 1024 * 1024 const uint32_t kDumpFlagOfL1Fusion = 0; const char *const kDefaultBatchLable = "Batch_default"; const char *const kGetDynamicDimsName = "ascend_mbatch_get_dynamic_dims_node"; @@ -335,7 +336,6 @@ Status DavinciModel::InitWeightMem(void *dev_ptr, void *weight_ptr, size_t weigh return SUCCESS; } - Status DavinciModel::InitFeatureMapAndP2PMem(void *dev_ptr, size_t mem_size) { if (is_feature_map_mem_has_inited_) { GELOGE(PARAM_INVALID, "call InitFeatureMapMem more than once."); @@ -2098,8 +2098,7 @@ Status DavinciModel::GetOutputDescInfo(vector &output_descs return SUCCESS; } -Status DavinciModel::CopyInputData(const InputData &input_data, bool device_data) { - rtMemcpyKind_t kind = device_data ? RT_MEMCPY_DEVICE_TO_DEVICE : RT_MEMCPY_HOST_TO_DEVICE; +Status DavinciModel::CopyInputData(const InputData &input_data) { const std::vector &blobs = input_data.blobs; for (const auto &data : input_data_info_) { if (data.first >= blobs.size()) { @@ -2108,8 +2107,13 @@ Status DavinciModel::CopyInputData(const InputData &input_data, bool device_data data.second.GetOpName().c_str()); return FAILED; } - + rtMemcpyKind_t kind; const DataBuffer &data_buf = blobs[data.first]; + if (data_buf.placement == kPlacemetHost) { + kind = RT_MEMCPY_HOST_TO_DEVICE; + } else { + kind = RT_MEMCPY_DEVICE_TO_DEVICE; + } if (data_buf.length == 0) { GELOGW("No data need to memcpy!"); return SUCCESS; @@ -2125,6 +2129,9 @@ Status DavinciModel::CopyInputData(const InputData &input_data, bool device_data runtime_param_.graph_id, data.second.GetOpName().c_str(), data.first, mem_addr, data_buf_addr, data_size, data_buf_length); GE_CHK_RT_RET(rtMemcpy(mem_addr, data_size, data_buf_addr, data_buf_length, kind)); + if (data_buf.placement == kPlacementDevice) { + InterGraphTensorCache::GetInstance().RemoveTensorValue(session_id_, data_buf_addr); + } } return SUCCESS; @@ -2534,14 +2541,15 @@ Status DavinciModel::GenOutputTensorInfo(OutputData *output_data, vectorblobs.push_back({data_buf.get(), static_cast(output_buffer_size[i]), false}); + output_data->blobs.push_back({data_buf.get(), static_cast(output_buffer_size[i]), false, kPlacemetHost}); OutputTensorInfo output; output.dims = output_shape_info[i]; output.data = std::move(data_buf); output.length = output_buffer_size[i]; + output.placement = kPlacemetHost; outputs.emplace_back(std::move(output)); - GELOGD("Output index:%zu, output dims is %s, data length:%lu.", i, - formats::JoinToString(output.dims).c_str(), output.length); + GELOGD("Output index:%zu, output dims is %s, data length:%lu.", i, formats::JoinToString(output.dims).c_str(), + output.length); } return SUCCESS; @@ -2677,7 +2685,7 @@ void *DavinciModel::Run(DavinciModel *model) { GELOGI("Copy input data, model id:%u", model_id); GE_IF_BOOL_EXEC(ProfilingManager::Instance().ProfilingModelExecuteOn(), model->SetProfileTime(MODEL_PRE_PROC_START)); - ret = model->CopyInputData(current_data, false); + ret = model->CopyInputData(current_data); GE_CHK_BOOL_TRUE_EXEC_WITH_LOG( ret != SUCCESS, (void)model->ReturnResult(current_data.index, false, false, data_wrapper->GetOutput()); CsaInteract::GetInstance().StoreInternalErrorCode(ret, ERROR_MODULE_FMK, JOBSUBSTATE_GRAPH_EXEC); diff --git a/ge/graph/load/model_manager/davinci_model.h b/ge/graph/load/model_manager/davinci_model.h old mode 100755 new mode 100644 index 8ed82912..6bb6f14e --- a/ge/graph/load/model_manager/davinci_model.h +++ b/ge/graph/load/model_manager/davinci_model.h @@ -49,9 +49,9 @@ #include "task_info/task_info.h" #include "graph/common/local_context.h" +using std::multimap; using std::mutex; using std::thread; -using std::multimap; namespace ge { // op debug need 2048 bits buffer @@ -272,6 +272,8 @@ class DavinciModel { const vector &GetLabelList() const { return label_list_; } + uint64_t GetAllStreamNum() const { return stream_list_.size() + all_hccl_stream_list_.size(); } + Status DestroyThread(); // get Op @@ -605,7 +607,7 @@ class DavinciModel { Status UpdateIoTaskArgs(const map &data_info, bool is_input, const vector &blobs, bool is_dynamic, const string &batch_label); - Status CopyInputData(const InputData &input_data, bool device_data = false); + Status CopyInputData(const InputData &input_data); Status CopyOutputData(uint32_t data_id, OutputData &output_data, rtMemcpyKind_t kind); diff --git a/ge/graph/load/model_manager/model_manager.cc b/ge/graph/load/model_manager/model_manager.cc old mode 100755 new mode 100644 index 4eb3254b..7009d585 --- a/ge/graph/load/model_manager/model_manager.cc +++ b/ge/graph/load/model_manager/model_manager.cc @@ -35,6 +35,7 @@ #include "graph/utils/attr_utils.h" #include "common/formats/utils/formats_trans_utils.h" #include "hybrid/hybrid_davinci_model.h" +#include "graph/utils/graph_utils.h" namespace ge { thread_local uint32_t device_count = 0; @@ -52,9 +53,12 @@ const std::string kCmdTypeProfModelSubscribe = "prof_model_subscribe"; const std::string kCmdTypeProfModelUnsubscribe = "prof_model_cancel_subscribe"; const char *const kBatchLoadBuf = "batchLoadsoFrombuf"; const char *const kDeleteCustOp = "deleteCustOp"; +const char *const kStreamResource = "stream"; +const char *const kEventResource = "event"; const int kTimeSpecNano = 1000000000; const int kTimeSpecMiro = 1000000; const int kOpNameMaxSize = 100; +const int kMaxEventNum = 10; struct CustAicpuSoBuf { uint64_t kernelSoBuf; uint32_t kernelSoBufLen; @@ -347,7 +351,7 @@ Status ModelManager::LoadModelOnline(uint32_t &model_id, const shared_ptrAssign(ge_model)), GELOGW("assign model to modeldef failed."); break;); GE_TIMESTAMP_END(Assign, "GraphLoader::ModelAssign"); - + GE_CHK_STATUS_RET(CheckAndReleaseStreamEventResource(ge_model, model_id), "Check stream and event resource failed"); GE_TIMESTAMP_START(Init); GE_IF_BOOL_EXEC(SUCCESS != (ret = davinci_model->Init()), GELOGW("DavinciInit failed."); break;); GE_TIMESTAMP_END(Init, "GraphLoader::ModelInit"); @@ -378,6 +382,130 @@ void ModelManager::InsertModel(uint32_t id, shared_ptr free_stream_num) { + status = ReleaseResource(need_stream_num, free_stream_num, kStreamResource); + if (status != SUCCESS) { + GELOGE(FAILED, "Release stream resource failed"); + return FAILED; + } + } + + int64_t free_event_num = 0; + GetFreeEvent(free_event_num); + if (need_event_num > free_event_num) { + status = ReleaseResource(need_event_num, free_event_num, kEventResource); + if (status != SUCCESS) { + GELOGE(FAILED, "Release event resource failed"); + return FAILED; + } + } + return SUCCESS; +} + + +Status ModelManager::ReleaseResource(int64_t need_resource, int64_t free_resource, const string &resource_kind) { + while (need_resource > free_resource) { + uint32_t max_stream_model_id = 0; + uint32_t max_event_model_id = 0; + GetMaxStreamAndEventModel(max_stream_model_id, max_event_model_id); + GELOGD("The max stream num model is: %u,the max event num model is %u", max_stream_model_id, max_event_model_id); + std::lock_guard lock(map_mutex_); + if (resource_kind == "stream") { + uint64_t max_stream_num = model_map_.at(max_stream_model_id)->GetAllStreamNum(); + Status ret = Unload(max_stream_model_id); + if (ret != SUCCESS) { + GELOGE(FAILED, "Unload max stream model failed ,model id : %u", max_stream_model_id); + return FAILED; + } + free_resource = free_resource + max_stream_num; + GELOGD("Unload model for stream, model id : %u, stream num :%zu", max_stream_model_id, + max_stream_num); + } + + if (resource_kind == "event") { + uint64_t max_event_num = model_map_.at(max_event_model_id)->GetEventList().size(); + Status ret = Unload(max_event_model_id); + if (ret != SUCCESS) { + GELOGE(FAILED, "Unload max stream model failed ,model id : %u", max_stream_model_id); + return FAILED; + } + free_resource = free_resource + max_event_num; + GELOGD("Unload model for event, model id : %u, event num :%zu", max_event_model_id, + max_event_num); + } + } + return SUCCESS; +} + + +Status ModelManager::GetFreeStream(int64_t &free_stream) { + uint32_t max_stream_cout; + uint32_t max_task_cout; + rtError_t ret = rtGetMaxStreamAndTask(RT_NORMAL_STREAM, &max_stream_cout, &max_task_cout); + if (ret != RT_ERROR_NONE) { + GELOGE(FAILED, "Get max stream and task cout failed"); + return FAILED; + } + GELOGD("Allowed max stream count: %u,max task cout per stream:%u", max_stream_cout, max_task_cout); + std::lock_guard lock(map_mutex_); + uint64_t stream_sum = 0; + + for (auto &it : model_map_) { + stream_sum = stream_sum + it.second->GetAllStreamNum(); + } + free_stream = max_stream_cout - stream_sum; + return SUCCESS; +} + +void ModelManager::GetFreeEvent(int64_t &free_event) { + std::lock_guard lock(map_mutex_); + uint64_t event_sum = 0; + for (auto &it : model_map_) { + event_sum = event_sum + it.second->GetEventList().size(); + } + free_event = kMaxEventNum - event_sum; +} + +void ModelManager::GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model) { + std::lock_guard lock(map_mutex_); + uint64_t max_stream_num = 0; + uint64_t max_event_num = 0; + for (auto &it : model_map_) { + if (it.second->GetAllStreamNum() > max_stream_num) { + max_stream_num = it.second->GetAllStreamNum(); + max_stream_model = it.first; + } + if (it.second->GetEventList().size() > max_event_num) { + max_event_num = it.second->GetEventList().size(); + max_event_model = it.first; + } + } +} + Status ModelManager::DeleteModel(uint32_t id) { std::lock_guard lock(map_mutex_); diff --git a/ge/graph/load/model_manager/model_manager.h b/ge/graph/load/model_manager/model_manager.h old mode 100755 new mode 100644 index aa0753b1..39cd18c9 --- a/ge/graph/load/model_manager/model_manager.h +++ b/ge/graph/load/model_manager/model_manager.h @@ -177,8 +177,7 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager { static ge::Status HandleProfStartCommand(const Command &command); static ge::Status HandleProfStopCommand(const Command &command); - static ge::Status GetModelByCmd(const Command &command, - std::shared_ptr &davinci_model); + static ge::Status GetModelByCmd(const Command &command, std::shared_ptr &davinci_model); /// /// @ingroup domi_ome /// @brief get model memory usage @@ -333,6 +332,12 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager { void InsertModel(uint32_t id, std::shared_ptr &davinci_model); void InsertModel(uint32_t id, std::shared_ptr &hybrid_model); + Status CheckAndReleaseStreamEventResource(const GeModelPtr &ge_model, uint32_t model_id); + Status ReleaseResource(int64_t need_resource, int64_t free_resource, const string &resource_kind); + Status GetFreeStream(int64_t &free_stream); + void GetFreeEvent(int64_t &free_event); + void GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model); + /// /// @ingroup domi_ome /// @brief delete model from model manager set @@ -353,7 +358,6 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager { std::vector exception_infos_; std::mutex cust_aicpu_mutex_; std::map> cust_aicpu_so_; - static DumpProperties dump_properties_; }; } // namespace ge diff --git a/ge/graph/load/model_manager/model_utils.cc b/ge/graph/load/model_manager/model_utils.cc index 410e9364..940d20ea 100755 --- a/ge/graph/load/model_manager/model_utils.cc +++ b/ge/graph/load/model_manager/model_utils.cc @@ -21,6 +21,7 @@ #include "graph/utils/tensor_utils.h" #include "graph/manager/graph_var_manager.h" #include "graph/types.h" +#include "graph/utils/graph_utils.h" #define VALIDATE_MEM_RANGE(OP, SIZE, OFFSET) \ do { \ @@ -30,6 +31,9 @@ } \ } while (0) +namespace { + const char *const kUsedStreamNum = "used_stream_num"; +} // namespace namespace ge { /// /// @ingroup ge @@ -574,4 +578,57 @@ Status ModelUtils::GetRtAddress(const RuntimeParam ¶m, uintptr_t logic_addr, mem_addr = runtime_base_addr + logic_addr; return SUCCESS; } + +Status ModelUtils::CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num) { + const auto &model_def = ge_model->GetModelTaskDefPtr(); + GE_CHECK_NOTNULL(model_def); + Graph graph = ge_model->GetGraph(); + ComputeGraphPtr compute_graph = GraphUtils::GetComputeGraph(graph); + GE_CHECK_NOTNULL(compute_graph); + + map op_list; + for (auto &node : compute_graph->GetDirectNode()) { + OpDescPtr op_desc = node->GetOpDesc(); + if (op_desc == nullptr) { + GELOGE(PARAM_INVALID, "Op desc is nullptr"); + return PARAM_INVALID; + } + op_list.emplace(op_desc->GetId(), op_desc); + } + + std::multimap main_follow_num; + for (int i = 0; i < model_def->task_size(); i++) { + const domi::TaskDef &task = model_def->task(i); + if (static_cast(task.type()) == RT_MODEL_TASK_HCCL) { + auto hccl_def = task.kernel_hccl(); + OpDescPtr hccl_op_desc = op_list.at(hccl_def.op_index()); + int64_t main_stream_id = hccl_op_desc->GetStreamId(); + int64_t follow_stream_num = 0; + if (!ge::AttrUtils::GetInt(hccl_op_desc, kUsedStreamNum, follow_stream_num)) { + GELOGW("Get used_stream_num failed, op is %s", hccl_op_desc->GetName().c_str()); + } + main_follow_num.emplace(main_stream_id, follow_stream_num); + } + } + hccl_fellow_stream_num = CalFollowStreamSum(main_follow_num); + return SUCCESS; +} + +int64_t ModelUtils::CalFollowStreamSum(const std::multimap &hccl_stream_map) { + int64_t need_follow_stream_num = 0; + std::map max_follow_stream_map; + for (auto &it : hccl_stream_map) { + auto max_it = max_follow_stream_map.find(it.first); + if (max_it == max_follow_stream_map.end()) { + max_follow_stream_map.emplace(it.first, it.second); + } else if (it.second > max_it->second) { + max_follow_stream_map.at(max_it->first) = it.second; + } + } + for (auto &follow_it : max_follow_stream_map) { + need_follow_stream_num = need_follow_stream_num + follow_it.second; + } + return need_follow_stream_num; +} + } // namespace ge diff --git a/ge/graph/load/model_manager/model_utils.h b/ge/graph/load/model_manager/model_utils.h index 26f8d700..e3603889 100755 --- a/ge/graph/load/model_manager/model_utils.h +++ b/ge/graph/load/model_manager/model_utils.h @@ -24,6 +24,7 @@ #include "graph/load/model_manager/task_info/task_info.h" #include "graph/op_desc.h" #include "graph/utils/tensor_adapter.h" +#include "model/ge_model.h" using std::vector; @@ -108,6 +109,20 @@ class ModelUtils { /// static Status GetRtAddress(const RuntimeParam &model_param, uintptr_t logic_addr, uint8_t *&mem_addr); + /// + /// @ingroup ge + /// @brief Calculate hccl follow stream + /// @return Status + /// + static Status CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num); + + /// + /// @ingroup ge + /// @brief Calculate the sum of follow stream + /// @return int64_t + /// + static int64_t CalFollowStreamSum(const std::multimap &hccl_stream_map); + private: /// /// @ingroup ge diff --git a/ge/hybrid/executor/hybrid_model_async_executor.cc b/ge/hybrid/executor/hybrid_model_async_executor.cc index a30f7daf..3296be48 100644 --- a/ge/hybrid/executor/hybrid_model_async_executor.cc +++ b/ge/hybrid/executor/hybrid_model_async_executor.cc @@ -20,12 +20,15 @@ #include "graph/utils/type_utils.h" #include "graph/ge_context.h" #include "omm/csa_interact.h" +#include "graph/execute/inter_graph_tensor_cache.h" +#include "graph/debug/ge_attr_define.h" namespace ge { namespace hybrid { namespace { const int kDataOutputIndex = 0; const size_t kMinimumPiplineStages = 2; +const char *const kLazyRecompile = "lazy_recompile"; } HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model) : model_(model), run_flag_(false) { @@ -62,6 +65,8 @@ Status HybridModelAsyncExecutor::Start(const std::shared_ptr &lis future_ = std::async(std::launch::async, [&]() -> Status { GetThreadLocalContext() = *executor_->GetContext()->ge_context; GetContext().SetSessionId(executor_->GetContext()->session_id); + GE_CHECK_NOTNULL(executor_->GetContext()->ge_context); + GetThreadLocalContext() = *executor_->GetContext()->ge_context; return RunInternal(); }); @@ -223,8 +228,8 @@ Status HybridModelAsyncExecutor::SyncVarData() { Status HybridModelAsyncExecutor::PrepareInputs(const InputData ¤t_data, HybridModelExecutor::ExecuteArgs &args) { if (current_data.blobs.size() < input_tensor_desc_.size()) { - GELOGE(PARAM_INVALID, "Blob size mismatches, expect at least %zu, but got %zu", - input_tensor_desc_.size(), current_data.blobs.size()); + GELOGE(PARAM_INVALID, "Blob size mismatches, expect at least %zu, but got %zu", input_tensor_desc_.size(), + current_data.blobs.size()); return PARAM_INVALID; } @@ -343,6 +348,13 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a } GELOGD("Number of outputs = %zu", output_tensor_desc_list.size()); + map tensor_value_info; + string execute_mode; + auto result = ge::GetContext().GetOption(OPTION_EXEC_DYNAMIC_EXECUTE_MODE, execute_mode); + if (result != SUCCESS) { + GELOGW("Can not get dynamic execute mode attr"); + } + GELOGD("The dynamic execute attr is %s", execute_mode.c_str()); for (size_t i = 0; i < output_tensors.size(); ++i) { GELOGD("Start to process output[%zu]", i); auto &output_tensor = output_tensors[i]; @@ -380,19 +392,25 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a output.dims = tensor_desc->GetShape().GetDims(); output.length = output_size; if (output_size > 0) { - std::unique_ptr data_buf(new(std::nothrow) uint8_t[output_size]); - GE_CHECK_NOTNULL(data_buf); - GE_CHK_RT_RET(rtMemcpy(data_buf.get(), - output_size, - output_tensor.GetData(), - output_size, - RT_MEMCPY_DEVICE_TO_HOST)); - output.data = std::move(data_buf); - output_data->blobs.emplace_back(data_buf.get(), static_cast(output_size), false); + if (execute_mode != kLazyRecompile) { + std::unique_ptr data_buf(new (std::nothrow) uint8_t[output_size]); + GE_CHECK_NOTNULL(data_buf); + GE_CHK_RT_RET( + rtMemcpy(data_buf.get(), output_size, output_tensor.GetData(), output_size, RT_MEMCPY_DEVICE_TO_HOST)); + output.data = std::move(data_buf); + output.placement = kPlacemetHost; + output_data->blobs.emplace_back(data_buf.get(), static_cast(output_size), false, kPlacemetHost); + } else { + output.dev_data = output_tensor.MutableData(); + output.placement = kPlacementDevice; + tensor_value_info.emplace(output_tensor.GetData(), output_tensor); + output_data->blobs.emplace_back(output_tensor.MutableData(), static_cast(output_size), false, kPlacementDevice); + } + } else { GELOGW("Output[%zu] is empty. shape = [%s]", i, tensor_desc->GetShape().ToString().c_str()); output.data = nullptr; - output_data->blobs.emplace_back(nullptr, 0U, false); + output_data->blobs.emplace_back(nullptr, 0U, false, kPlacemetHost); } outputs.emplace_back(std::move(output)); @@ -403,6 +421,8 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a output_size); } + uint64_t session_id = executor_->GetContext()->session_id; + InterGraphTensorCache::GetInstance().SetDynamicExecuteDevAddr(session_id, tensor_value_info); return SUCCESS; } diff --git a/ge/session/session_manager.cc b/ge/session/session_manager.cc index 3c531747..2c5c3004 100755 --- a/ge/session/session_manager.cc +++ b/ge/session/session_manager.cc @@ -22,6 +22,7 @@ #include "graph/ge_context.h" #include "graph/load/model_manager/model_manager.h" #include "graph/manager/util/rt_context_util.h" +#include "graph/execute/inter_graph_tensor_cache.h" using std::map; using std::string; @@ -105,6 +106,7 @@ Status SessionManager::DestroySession(SessionId session_id) { ModelManager::GetInstance()->DestroyAicpuSession(session_id); } + InterGraphTensorCache::GetInstance().RemoveSessionTensorValue(session_id); // Unified destruct rt_context RtContextUtil::GetInstance().DestroyRtContexts(session_id); diff --git a/ge/single_op/task/op_task.cc b/ge/single_op/task/op_task.cc index ff200806..865c5f4c 100755 --- a/ge/single_op/task/op_task.cc +++ b/ge/single_op/task/op_task.cc @@ -787,7 +787,7 @@ Status AiCpuTask::LaunchKernel(const std::vector &input_desc, if (unknown_type_ == DEPEND_COMPUTE) { std::vector summary_buffers; for (size_t i = 0; i < num_outputs_; ++i) { - summary_buffers.emplace_back(output_summary_[i], sizeof(aicpu::FWKAdapter::ResultSummary), false); + summary_buffers.emplace_back(output_summary_[i], sizeof(aicpu::FWKAdapter::ResultSummary), false, kPlacemetHost); } GE_CHK_STATUS_RET_NOLOG(UpdateIoAddr(input_buffers, summary_buffers)); } else { diff --git a/inc/external/ge/ge_api_types.h b/inc/external/ge/ge_api_types.h index 250252f9..effd6afc 100644 --- a/inc/external/ge/ge_api_types.h +++ b/inc/external/ge/ge_api_types.h @@ -311,12 +311,18 @@ const std::string HCOM_MULTI_MODE = "ge.hcomMultiMode"; // Graph run mode enum GraphRunMode { PREDICTION = 0, TRAIN }; +// if data addr is in host +const uint32_t kPlacemetHost = 0; + +// if data addr is in device +const uint32_t kPlacementDevice = 1; // Input/Output tensor info struct InputTensorInfo { - uint32_t data_type; // data type - std::vector dims; // shape description - void *data; // tensor data - int64_t length; // tensor length + uint32_t data_type; // data type + std::vector dims; // shape description + void *data; // tensor data + int64_t length; // tensor length + uint32_t placement = kPlacemetHost; // data placement }; struct OutputTensorInfo { @@ -324,9 +330,16 @@ struct OutputTensorInfo { std::vector dims; // shape description std::unique_ptr data; // tensor data int64_t length; // tensor length - OutputTensorInfo() : data_type(0), dims({}), data(nullptr), length(0) {} + uint32_t placement; // data placement + void *dev_data; // device data addr + OutputTensorInfo() : data_type(0), dims({}), data(nullptr), length(0), placement(0), dev_data(nullptr) {} OutputTensorInfo(OutputTensorInfo &&out) - : data_type(out.data_type), dims(out.dims), data(std::move(out.data)), length(out.length) {} + : data_type(out.data_type), + dims(out.dims), + data(std::move(out.data)), + length(out.length), + placement(out.placement), + dev_data(out.dev_data) {} OutputTensorInfo &operator=(OutputTensorInfo &&out) { if (this != &out) { @@ -334,6 +347,8 @@ struct OutputTensorInfo { dims = out.dims; data = std::move(out.data); length = out.length; + placement = out.placement; + dev_data = out.dev_data; } return *this; } diff --git a/inc/framework/common/ge_types.h b/inc/framework/common/ge_types.h index ec5adcba..be7a30da 100644 --- a/inc/framework/common/ge_types.h +++ b/inc/framework/common/ge_types.h @@ -67,10 +67,11 @@ struct DataBuffer { void *data; // Data address uint64_t length; // Data length bool isDataSupportMemShare = false; - DataBuffer(void *dataIn, uint64_t len, bool isSupportMemShare) - : data(dataIn), length(len), isDataSupportMemShare(isSupportMemShare) {} + uint32_t placement; + DataBuffer(void *dataIn, uint64_t len, bool isSupportMemShare, uint32_t placement) + : data(dataIn), length(len), isDataSupportMemShare(isSupportMemShare), placement(placement) {} - DataBuffer() : data(nullptr), length(0), isDataSupportMemShare(false) {} + DataBuffer() : data(nullptr), length(0), isDataSupportMemShare(false), placement(0) {} }; /// diff --git a/tests/ut/ge/CMakeLists.txt b/tests/ut/ge/CMakeLists.txt index 4f014714..4fc5432e 100755 --- a/tests/ut/ge/CMakeLists.txt +++ b/tests/ut/ge/CMakeLists.txt @@ -138,6 +138,7 @@ set(COMMON_SRC_FILES "${GE_CODE_DIR}/ge/session/inner_session.cc" "${GE_CODE_DIR}/ge/graph/manager/util/rt_context_util.cc" "${GE_CODE_DIR}/ge/graph/execute/graph_execute.cc" + "${GE_CODE_DIR}/ge/graph/execute/inter_graph_tensor_cache.cc" "${GE_CODE_DIR}/ge/graph/preprocess/graph_preprocess.cc" "${GE_CODE_DIR}/ge/hybrid/hybrid_davinci_model_stub.cc" "${GE_CODE_DIR}/ge/graph/load/model_manager/davinci_model.cc" @@ -310,6 +311,8 @@ set(COMMON_SRC_FILES "${GE_CODE_DIR}/ge/common/dump/dump_op.cc" "${GE_CODE_DIR}/ge/common/model_saver.cc" "${GE_CODE_DIR}/ge/hybrid/node_executor/aicpu/aicpu_ext_info.cc" + "${GE_CODE_DIR}/ge/hybrid/common/tensor_value.cc" + "${GE_CODE_DIR}/ge/hybrid/common/npu_memory_allocator.cc" "${GE_CODE_DIR}/ge/common/ge/datatype_util.cc" "${GE_CODE_DIR}/metadef/register/ops_kernel_builder_registry.cc" "${GE_CODE_DIR}/metadef/register/op_tiling.cpp" @@ -632,7 +635,12 @@ set(DISTINCT_GRAPH_LOAD_TEST_FILES #"graph/graph_load_unittest.cc" "graph/ge_executor_unittest.cc" "graph/load/model_helper_unittest.cc" +<<<<<<< Updated upstream "graph/load/model_utils_unittest.cc" +======= + "graph/inter_graph_tensor_cache_unittest.cc" + "graph/load/model_manager_unittest.cc" +>>>>>>> Stashed changes ) set(PASS_TEST_FILES diff --git a/tests/ut/ge/graph/inter_graph_tensor_cache_unittest.cc b/tests/ut/ge/graph/inter_graph_tensor_cache_unittest.cc new file mode 100644 index 00000000..e5b635f7 --- /dev/null +++ b/tests/ut/ge/graph/inter_graph_tensor_cache_unittest.cc @@ -0,0 +1,52 @@ +/** + * Copyright 2019-2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#define protected public +#define private public +#include "graph/execute/inter_graph_tensor_cache.h" +#include "hybrid/common/tensor_value.h" +#include "graph/load/new_model_manager/model_manager.h" +#include "graph/manager/graph_manager_utils.h" +#include "model/ge_model.h" +#undef private +#undef protected + +using namespace testing; +namespace ge { + +class UtestInterGraphTensorCache : public testing::Test { + protected: + void SetUp() {} + + void TearDown() {} +}; + +TEST_F(UtestInterGraphTensorCache, set_dynamic_execute_dev_addr) { + hybrid::TensorValue tensor_value; + std::map tensor_value_map = {{(const void *)0x12345, tensor_value}}; + InterGraphTensorCache::GetInstance().SetDynamicExecuteDevAddr(1, tensor_value_map); + InterGraphTensorCache::GetInstance().RemoveTensorValue(1, (void *)0x12345); + InterGraphTensorCache::GetInstance().RemoveTensorValue(1, (void *)0x123456); + InterGraphTensorCache::GetInstance().RemoveSessionTensorValue(2); + InterGraphTensorCache::GetInstance().RemoveSessionTensorValue(1); +} + +} // namespace ge diff --git a/tests/ut/ge/graph/load/model_manager_unittest.cc b/tests/ut/ge/graph/load/model_manager_unittest.cc new file mode 100644 index 00000000..37a1072c --- /dev/null +++ b/tests/ut/ge/graph/load/model_manager_unittest.cc @@ -0,0 +1,118 @@ +/** + * Copyright 2019-2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include "common/types.h" + +#define private public +#define protected public +#include "graph/load/new_model_manager/model_manager.h" + +#include "common/helper/om_file_helper.h" +#include "common/op/ge_op_utils.h" +#include "graph/load/graph_loader.h" +#include "graph/load/new_model_manager/davinci_model.h" +#include "graph/load/new_model_manager/davinci_model_parser.h" +#include "graph/load/new_model_manager/model_utils.h" + +#undef private +#undef protected + +using namespace std; +using namespace testing; +namespace ge { +class UtestModelManagerModelManager : public testing::Test { + protected: + void SetUp() {} + void TearDown() {} +}; + +TEST_F(UtestModelManagerModelManager, Cal_follwe_stream_sum) { + std::multimap hccl_stream_map = {{1, 10}, {1, 20}, {2, 10}, {2, 5}}; + int64_t result = ModelUtils::CalFollowStreamSum(hccl_stream_map); + EXPECT_EQ(result, 30); +} +TEST_F(UtestModelManagerModelManager, get_max_stream_and_event) { + ModelManager mm; + auto model1 = std::make_shared(1, nullptr); + auto model2 = std::make_shared(2, nullptr); + rtStream_t stream = nullptr; + rtStream_t stream2 = nullptr; + rtStream_t stream3 = nullptr; + rtStream_t stream4 = nullptr; + rtEvent_t event = nullptr; + rtEvent_t event2 = nullptr; + rtEvent_t event3 = nullptr; + model1->stream_list_= {stream,stream2,stream3,stream4}; + model1->event_list_ = {event, event2}; + model2->stream_list_ = {stream,stream2}; + model2->event_list_ = {event,event2,event3}; + + mm.InsertModel(1,model1); + mm.InsertModel(2,model2); + uint32_t max_stream_model; + uint32_t max_event_model; + mm.GetMaxStreamAndEventModel(max_stream_model, max_event_model); + EXPECT_EQ(max_stream_model, 1); + EXPECT_EQ(max_event_model, 2); + + int64_t free_stream; + int64_t free_event; + Status ret = mm.GetFreeStream(free_stream); + mm.GetFreeEvent(free_event); + EXPECT_EQ(ge::SUCCESS, ret); + +} + + +TEST_F(UtestModelManagerModelManager, release_resource_stream) { + ModelManager mm; + auto model1 = std::make_shared(1, nullptr); + auto model2 = std::make_shared(2, nullptr); + rtStream_t stream = nullptr; + rtStream_t stream2 = nullptr; + rtStream_t stream3 = nullptr; + rtStream_t stream4 = nullptr; + rtEvent_t event = nullptr; + rtEvent_t event2 = nullptr; + rtEvent_t event3 = nullptr; + model1->stream_list_= {stream,stream2,stream3,stream4}; + model1->event_list_ = {event, event2}; + model2->stream_list_ = {stream,stream2}; + model2->event_list_ = {event,event2,event3}; + + mm.InsertModel(1,model1); + mm.InsertModel(2,model2); + string kind = "stream"; + Status ret = mm.ReleaseResource(110, 109, kind); + EXPECT_EQ(ge::SUCCESS, ret); + + string kind2 = "event"; + Status ret2 = mm.ReleaseResource(110, 109, kind2); + EXPECT_EQ(ge::SUCCESS, ret2); +} + + +TEST_F(UtestModelManagerModelManager, check_stream_and_event_resource) { + ModelManager mm; + auto ge_model = make_shared(); + Status ret = mm.CheckAndReleaseStreamEventResource(ge_model, 1); + EXPECT_EQ(ge::FAILED, ret); +} + +} // namespace ge