From 321164314e0e9965b6583260948b3e6a687ffb94 Mon Sep 17 00:00:00 2001 From: zhou_chao1993 Date: Thu, 17 Dec 2020 16:53:25 +0800 Subject: [PATCH] dynamic input shape --- ge/CMakeLists.txt | 2 + ge/executor/CMakeLists.txt | 1 + ge/executor/module.mk | 1 + ge/ge_inference.mk | 1 + ge/ge_runner.mk | 1 + ge/graph/execute/dynamic_execute_addr.cc | 46 ++++ ge/graph/execute/dynamic_execute_addr.h | 39 +++ .../load/new_model_manager/davinci_model.cc | 13 +- .../load/new_model_manager/model_manager.cc | 234 +++++++++++++++--- .../load/new_model_manager/model_manager.h | 16 +- ge/graph/manager/graph_manager.cc | 14 +- .../executor/hybrid_model_async_executor.cc | 31 ++- inc/external/ge/ge_api_types.h | 8 + inc/framework/common/ge_types.h | 1 + 14 files changed, 369 insertions(+), 39 deletions(-) create mode 100644 ge/graph/execute/dynamic_execute_addr.cc create mode 100644 ge/graph/execute/dynamic_execute_addr.h diff --git a/ge/CMakeLists.txt b/ge/CMakeLists.txt index 26a7ee99..2c417dc8 100755 --- a/ge/CMakeLists.txt +++ b/ge/CMakeLists.txt @@ -79,6 +79,7 @@ set(TRAIN_SRC_LIST "graph/common/omg_util.cc" "graph/common/transop_util.cc" "graph/execute/graph_execute.cc" + "graph/execute/dynamic_execute_addr.cc" "graph/label/case_label_maker.cc" "graph/label/if_label_maker.cc" "graph/label/label_maker.cc" @@ -384,6 +385,7 @@ set(INFER_SRC_LIST "graph/preprocess/multi_batch_options.cc" "graph/preprocess/multi_batch_copy_graph.cc" "graph/execute/graph_execute.cc" + "graph/execute/dynamic_execute_addr.cc" "graph/load/graph_loader.cc" "graph/optimize/graph_optimize.cc" "graph/optimize/mem_rw_conflict_optimize.cc" diff --git a/ge/executor/CMakeLists.txt b/ge/executor/CMakeLists.txt index cc5c1710..2d8829a3 100644 --- a/ge/executor/CMakeLists.txt +++ b/ge/executor/CMakeLists.txt @@ -20,6 +20,7 @@ set(SRC_LIST "../common/profiling/ge_profiling.cc" "../graph/load/graph_loader.cc" "../graph/execute/graph_execute.cc" + "../graph/execute/dynamic_execute_addr.cc" "../omm/csa_interact.cc" "../graph/manager/graph_manager_utils.cc" "../graph/manager/graph_var_manager.cc" diff --git a/ge/executor/module.mk b/ge/executor/module.mk index 34c2a37e..4ecbae66 100644 --- a/ge/executor/module.mk +++ b/ge/executor/module.mk @@ -11,6 +11,7 @@ local_ge_executor_src_files := \ ../common/profiling/ge_profiling.cc \ ../graph/load/graph_loader.cc \ ../graph/execute/graph_execute.cc \ + ../graph/execute/dynamic_execute_addr.cc \ ../omm/csa_interact.cc \ ../graph/manager/graph_manager_utils.cc \ ../graph/manager/graph_var_manager.cc \ diff --git a/ge/ge_inference.mk b/ge/ge_inference.mk index 80887e8b..63dea3e9 100755 --- a/ge/ge_inference.mk +++ b/ge/ge_inference.mk @@ -50,6 +50,7 @@ GRAPH_MANAGER_LOCAL_SRC_FILES := \ graph/preprocess/multi_batch_options.cc \ graph/preprocess/multi_batch_copy_graph.cc \ graph/execute/graph_execute.cc \ + graph/execute/dynamic_execute_addr.cc \ graph/load/graph_loader.cc \ graph/optimize/graph_optimize.cc \ graph/optimize/mem_rw_conflict_optimize.cc \ diff --git a/ge/ge_runner.mk b/ge/ge_runner.mk index c0f59320..85ea2994 100644 --- a/ge/ge_runner.mk +++ b/ge/ge_runner.mk @@ -48,6 +48,7 @@ LIBGE_LOCAL_SRC_FILES := \ graph/common/omg_util.cc \ graph/common/transop_util.cc \ graph/execute/graph_execute.cc \ + graph/execute/dynamic_execute_addr.cc \ graph/label/case_label_maker.cc \ graph/label/if_label_maker.cc \ graph/label/label_maker.cc \ diff --git a/ge/graph/execute/dynamic_execute_addr.cc b/ge/graph/execute/dynamic_execute_addr.cc new file mode 100644 index 00000000..79f6b990 --- /dev/null +++ b/ge/graph/execute/dynamic_execute_addr.cc @@ -0,0 +1,46 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "dynamic_execute_addr.h" + +#include "common/ge_inner_error_codes.h" +#include "common/debug/log.h" + +namespace ge { +DynamicExecuteAddr &DynamicExecuteAddr::GetInstance() { + static DynamicExecuteAddr instance; + return instance; +} + +std::mapDynamicExecuteAddr::GetTensorValue(uint64_t session_id) { + auto it = dynamic_execute_dev_addr_.find(session_id); + return it->second; +} + +void DynamicExecuteAddr::RemoveTensorValue(uint64_t session_id,void * data_dev_addr) { + auto it = dynamic_execute_dev_addr_.find(session_id); + if (it != dynamic_execute_dev_addr_.end()) { + auto tensor_values = it ->second; + auto tensor_value_it = tensor_values.find(data_dev_addr); + if (tensor_value_it != tensor_values.end()) { + tensor_values.erase(tensor_value_it); + } + GELOGW("Can not find data device addr"); + } + GELOGW("Can not find session,session id is %ld",session_id); +} + +} // namespace ge \ No newline at end of file diff --git a/ge/graph/execute/dynamic_execute_addr.h b/ge/graph/execute/dynamic_execute_addr.h new file mode 100644 index 00000000..e1cb40fd --- /dev/null +++ b/ge/graph/execute/dynamic_execute_addr.h @@ -0,0 +1,39 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef GE_GRAPH_EXECUTE_DYNAMIC_EXECUTE_ADDR_H_ +#define GE_GRAPH_EXECUTE_DYNAMIC_EXECUTE_ADDR_H_ + +#include "hybrid/common/tensor_value.h" + +#include + +namespace ge { +class DynamicExecuteAddr { + public: + static DynamicExecuteAddr &GetInstance(); + void SetDynamicExecuteDevAddr(uint64_t session_id, std::map tensor_value) { + dynamic_execute_dev_addr_.emplace(session_id, tensor_value); + } + std::map GetTensorValue(uint64_t session_id); + void RemoveTensorValue(uint64_t session_id, void *data_dev_addr); + + private: + std::map> dynamic_execute_dev_addr_; +}; +} // namespace ge + +#endif // GE_GRAPH_EXECUTE_DYNAMIC_EXECUTE_ADDR_H_ diff --git a/ge/graph/load/new_model_manager/davinci_model.cc b/ge/graph/load/new_model_manager/davinci_model.cc index 720c3c28..3e7255ee 100755 --- a/ge/graph/load/new_model_manager/davinci_model.cc +++ b/ge/graph/load/new_model_manager/davinci_model.cc @@ -59,6 +59,7 @@ #include "securec.h" #include "graph/common/local_context.h" #include "common/formats/utils/formats_trans_utils.h" +#include "graph/execute/dynamic_execute_addr.h" // create std::thread, catch exceptions using try/catch #define CREATE_STD_THREAD(thread_id, func, args) \ @@ -624,7 +625,7 @@ Status DavinciModel::Init(void *dev_ptr, size_t mem_size, void *weight_ptr, size if (hcom_streams_.find(i) != hcom_streams_.end()) { GE_CHK_RT_RET(rtStreamCreateWithFlags(&stream, priority_, stream_flags | RT_STREAM_FORCE_COPY)); - } else { + } else { GE_CHK_RT_RET(rtStreamCreateWithFlags(&stream, priority_, stream_flags)); } @@ -2208,6 +2209,9 @@ Status DavinciModel::CopyInputData(const InputData &input_data, bool device_data runtime_param_.graph_id, data.second.GetOpName().c_str(), data.first, mem_addr, data_buf_addr, data_size, data_buf_length); GE_CHK_RT_RET(rtMemcpy(mem_addr, data_size, data_buf_addr, data_buf_length, kind)); + if (device_data) { + DynamicExecuteAddr::GetInstance().RemoveTensorValue(session_id_,data_buf_addr); + } } return SUCCESS; @@ -2787,7 +2791,12 @@ void *DavinciModel::Run(DavinciModel *model) { GELOGI("Copy input data, model id:%u", model_id); GE_IF_BOOL_EXEC(ProfilingManager::Instance().ProfilingModelExecuteOn(), model->SetProfileTime(MODEL_PRE_PROC_START)); - ret = model->CopyInputData(current_data, false); + auto placement = current_data.blobs[0].placement; + if (placement == 1) { + ret = model->CopyInputData(current_data, true); + } else { + ret = model->CopyInputData(current_data, false); + } GE_CHK_BOOL_TRUE_EXEC_WITH_LOG( ret != SUCCESS, (void)model->ReturnResult(current_data.index, false, false, data_wrapper->GetOutput()); CsaInteract::GetInstance().StoreInternalErrorCode(ret, ERROR_MODULE_FMK, JOBSUBSTATE_GRAPH_EXEC); diff --git a/ge/graph/load/new_model_manager/model_manager.cc b/ge/graph/load/new_model_manager/model_manager.cc index 96ed2fbf..251931d6 100755 --- a/ge/graph/load/new_model_manager/model_manager.cc +++ b/ge/graph/load/new_model_manager/model_manager.cc @@ -32,6 +32,7 @@ #include "graph/common/local_context.h" #include "common/formats/utils/formats_trans_utils.h" #include "hybrid/hybrid_davinci_model.h" +#include "graph/utils/graph_utils.h" namespace ge { thread_local uint32_t device_count = 0; @@ -49,9 +50,13 @@ const std::string kCmdTypeProfModelSubscribe = "prof_model_subscribe"; const std::string kCmdTypeProfModelUnsubscribe = "prof_model_cancel_subscribe"; const char *const kBatchLoadBuf = "batchLoadsoFrombuf"; const char *const kDeleteCustOp = "deleteCustOp"; +const char *const kUsedStreamNum = "used_stream_num"; +const char *const kStreamResource = "stream"; +const char *const kEventResource = "event"; const int kTimeSpecNano = 1000000000; const int kTimeSpecMiro = 1000000; const int kSessionMaxBias = 100; +const int kMaxEventNum = 1024; struct CustAicpuSoBuf { uint64_t kernelSoBuf; uint32_t kernelSoBufLen; @@ -69,7 +74,7 @@ std::mutex ModelManager::exeception_infos_mutex_; std::shared_ptr ModelManager::GetInstance() { static const std::shared_ptr instance_ptr = - shared_ptr(new (std::nothrow) ModelManager(), ModelManager::FinalizeForPtr); + shared_ptr(new (std::nothrow) ModelManager(), ModelManager::FinalizeForPtr); return instance_ptr; } @@ -119,7 +124,7 @@ Status ModelManager::KernelLaunchEx(aicpu::FWKAdapter::FWKOperateType op_type, u } rt_ret = - rtMemcpy(devicebase, sizeof(STR_FWK_OP_KERNEL), ¶m_base, sizeof(STR_FWK_OP_KERNEL), RT_MEMCPY_HOST_TO_DEVICE); + rtMemcpy(devicebase, sizeof(STR_FWK_OP_KERNEL), ¶m_base, sizeof(STR_FWK_OP_KERNEL), RT_MEMCPY_HOST_TO_DEVICE); if (rt_ret != RT_ERROR_NONE) { GELOGE(RT_FAILED, "memory copy to device failed. ret: 0x%X", rt_ret); GE_IF_BOOL_EXEC(aicpu_kernel_addr != nullptr, GE_CHK_RT(rtFree(aicpu_kernel_addr))); @@ -368,6 +373,177 @@ void ModelManager::InsertModel(uint32_t id, shared_ptr free_stream_num) { + status = ReleaseRsource(need_stream_num, free_stream_num, kStreamResource); + if (status != SUCCESS) { + GELOGE(FAILED, "Release stream resource failed"); + return FAILED; + } + } + if (need_event_num > free_event_num) { + status = ReleaseRsource(need_event_num, free_event_num, kEventResource); + if (status != SUCCESS) { + GELOGE(FAILED, "Release event resource failed"); + return FAILED; + } + } + return SUCCESS; +} + +Status ModelManager::CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num) { + const auto &model_def = ge_model->GetModelTaskDefPtr(); + GE_CHECK_NOTNULL(model_def); + Graph graph = ge_model->GetGraph(); + ComputeGraphPtr compute_graph = GraphUtils::GetComputeGraph(graph); + GE_CHECK_NOTNULL(compute_graph); + + map op_list; + for (auto &node : compute_graph->GetDirectNode()) { + OpDescPtr op_desc = node->GetOpDesc(); + if (op_desc == nullptr) { + GELOGE(PARAM_INVALID, "Op desc is nullptr"); + return PARAM_INVALID; + } + op_list.emplace(op_desc->GetId(), op_desc); + } + + std::multimap main_follow_num; + for (int i = 0; i < model_def->task_size(); i++) { + const domi::TaskDef &task = model_def->task(i); + if (static_cast(task.type()) == RT_MODEL_TASK_HCCL) { + auto hccl_def = task.kernel_hccl(); + OpDescPtr hccl_op_desc = op_list.at(hccl_def.op_index()); + int64_t main_stream_id = hccl_op_desc->GetStreamId(); + int64_t follow_stream_num = 0; + if (!ge::AttrUtils::GetInt(hccl_op_desc, kUsedStreamNum, follow_stream_num)) { + GELOGW("Get used_stream_num failed, op is %s", hccl_op_desc->GetName().c_str()); + } + main_follow_num.emplace(main_stream_id, follow_stream_num); + } + } + hccl_fellow_stream_num = CalFollowStreamSum(main_follow_num); + return SUCCESS; +} + +int64_t ModelManager::CalFollowStreamSum(const std::multimap &hccl_stream_map) { + int64_t need_follow_stream_num = 0; + std::map max_follow_stream_map; + for (auto &it : hccl_stream_map) { + auto max_it = max_follow_stream_map.find(it.first); + if (max_it != max_follow_stream_map.end() && (it.second) > (max_it->second)) { + max_follow_stream_map.emplace(it.first, it.second); + } + } + for (auto &follow_it : max_follow_stream_map) { + need_follow_stream_num = need_follow_stream_num + follow_it.second; + } + return need_follow_stream_num; +} + +Status ModelManager::ReleaseRsource(int64_t need_resource, int64_t free_resource, const string &resource_kind) { + while (need_resource > free_resource) { + uint32_t max_stream_model_id = 0; + uint32_t max_event_model_id = 0; + GetMaxStreamAndEventModel(max_stream_model_id, max_event_model_id); + GELOGD("The max stream num model is: %u,the max event num model is %u", max_stream_model_id, max_event_model_id); + std::lock_guard lock(resource_mutex_); + if (resource_kind == "stream") { + Status ret = Unload(max_stream_model_id); + if (ret != SUCCESS) { + GELOGE(FAILED, "Unload max stream model failed ,model id : %u", max_stream_model_id); + return FAILED; + } + free_resource = free_resource - stream_map_.at(max_stream_model_id); + stream_map_.erase(max_stream_model_id); + GELOGD("Unload model for stream, model id : %u, stream num :%ld", max_stream_model_id, + stream_map_.at(max_stream_model_id)); + } + + if (resource_kind == "event") { + Status ret = Unload(max_event_model_id); + if (ret != SUCCESS) { + GELOGE(FAILED, "Unload max stream model failed ,model id : %u", max_stream_model_id); + return FAILED; + } + free_resource = free_resource - event_map_.at(max_event_model_id); + event_map_.erase(max_event_model_id); + GELOGD("Unload model for event, model id : %u, event num :%ld", max_event_model_id, + event_map_.at(max_event_model_id)); + } + } + return SUCCESS; +} + +Status ModelManager::GetFreeStreamAndEvent(int64_t &free_stream, int64_t &free_event) { + uint32_t max_stream_cout; + uint32_t max_task_cout; + rtError_t ret = rtGetMaxStreamAndTask(RT_NORMAL_STREAM, &max_stream_cout, &max_task_cout); + if (ret != RT_ERROR_NONE) { + GELOGE(FAILED, "Get max stream and task cout failed"); + return FAILED; + } + GELOGD("Allowed max stream count: %u,max task cout per stream:%u", max_stream_cout, max_task_cout); + std::lock_guard lock(resource_mutex_); + int64_t stream_sum = 0; + int64_t event_sum = 0; + for (auto &it : stream_map_) { + stream_sum = stream_sum + it.second; + } + for (auto &it : event_map_) { + event_sum = event_sum + it.second; + } + free_stream = max_stream_cout - stream_sum; + free_event = kMaxEventNum - event_sum; + return SUCCESS; +} + +void ModelManager::GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model) { + std::lock_guard lock(resource_mutex_); + int64_t max_stream_num = 0; + for (auto &it : stream_map_) { + if (it.second > max_stream_num) { + max_stream_num = it.second; + max_stream_model = it.first; + } + } + int64_t max_event_num = 0; + for (auto &it : event_map_) { + if (it.second > max_event_num) { + max_event_num = it.second; + max_event_model = it.first; + } + } +} + +void ModelManager::InsertModelResource(uint32_t model_id, int64_t stream_num, int64_t event_num) { + std::lock_guard lock(resource_mutex_); + stream_map_.emplace(model_id, stream_num); + event_map_.emplace(model_id, event_num); +} + Status ModelManager::DeleteModel(uint32_t id) { std::lock_guard lock(map_mutex_); @@ -459,8 +635,7 @@ Status ModelManager::GetCurDynamicDims(const vector> &user_real_ vector &cur_dynamic_dims) { GELOGD(" Start get cur dynamic dims."); if (user_real_input_dims.size() != user_input_dims.size()) { - GELOGE(INTERNAL_ERROR, - "The input count of user: %zu should be equal to the data count of graph: %zu", + GELOGE(INTERNAL_ERROR, "The input count of user: %zu should be equal to the data count of graph: %zu", user_real_input_dims.size(), user_input_dims.size()); return INTERNAL_ERROR; } @@ -516,6 +691,7 @@ Status ModelManager::DataInputTensor(uint32_t model_id, const std::vector(cur_dynamic_dims.size() * sizeof(int64_t)); GE_CHK_BOOL_EXEC(memcpy_s(data.data, length, cur_dynamic_dims.data(), length) == EOK, return INTERNAL_ERROR, @@ -630,11 +806,13 @@ Status ModelManager::Stop(uint32_t model_id) { /// Status ModelManager::HandleCommand(const Command &command) { static const std::map> cmds = { - {kCmdTypeDump, HandleDumpCommand}, {kCmdTypeProfInit, HandleProfInitCommand}, - {kCmdTypeProfFinalize, HandleProfFinalizeCommand}, {kCmdTypeProfStart, HandleProfStartCommand}, - {kCmdTypeProfStop, HandleProfStopCommand}, - {kCmdTypeProfModelSubscribe, HandleProfModelSubscribeCommand}, - {kCmdTypeProfModelUnsubscribe, HandleProfModelUnsubscribeCommand}}; + {kCmdTypeDump, HandleDumpCommand}, + {kCmdTypeProfInit, HandleProfInitCommand}, + {kCmdTypeProfFinalize, HandleProfFinalizeCommand}, + {kCmdTypeProfStart, HandleProfStartCommand}, + {kCmdTypeProfStop, HandleProfStopCommand}, + {kCmdTypeProfModelSubscribe, HandleProfModelSubscribeCommand}, + {kCmdTypeProfModelUnsubscribe, HandleProfModelUnsubscribeCommand}}; auto iter = cmds.find(command.cmd_type); if (iter == cmds.end()) { @@ -645,17 +823,16 @@ Status ModelManager::HandleCommand(const Command &command) { } } -Status ModelManager::GetModelByCmd(const Command &command, - std::shared_ptr &davinci_model) { +Status ModelManager::GetModelByCmd(const Command &command, std::shared_ptr &davinci_model) { if (command.cmd_params.size() < kCmdParSize) { GELOGE(PARAM_INVALID, "When the cmd_type is '%s', the size of cmd_params must larger than 2.", - command.cmd_type.c_str()); + command.cmd_type.c_str()); return PARAM_INVALID; } std::string map_key = command.cmd_params[0]; std::string value = command.cmd_params[1]; - if (map_key == PROFILE_MODEL_ID) { + if (map_key == PROFILE_MODEL_ID) { int32_t model_id = 0; try { model_id = std::stoi(value); @@ -692,8 +869,8 @@ Status ModelManager::HandleProfModelSubscribeCommand(const Command &command) { return ret; } - if (ProfilingManager::Instance().ProfModelSubscribe(command.module_index, - static_cast(davinci_model.get())) != SUCCESS) { + if (ProfilingManager::Instance().ProfModelSubscribe(command.module_index, static_cast(davinci_model.get())) != + SUCCESS) { GELOGE(FAILED, "Handle prof model subscribe failed."); return FAILED; } @@ -1011,8 +1188,7 @@ Status ModelManager::GetInputOutputDescInfoForZeroCopy(const uint32_t model_id, Status ModelManager::GetAIPPInfo(const uint32_t model_id, uint32_t index, AippConfigInfo &aipp_info) { std::shared_ptr davinci_model = GetModel(model_id); GE_CHK_BOOL_RET_STATUS(davinci_model != nullptr, ACL_ERROR_GE_EXEC_MODEL_ID_INVALID, - "GetAIPPInfo failed, invalid model_id is %u.", - model_id); + "GetAIPPInfo failed, invalid model_id is %u.", model_id); return davinci_model->GetAIPPInfo(index, aipp_info); } @@ -1020,8 +1196,7 @@ Status ModelManager::GetAIPPInfo(const uint32_t model_id, uint32_t index, AippCo Status ModelManager::GetAippType(uint32_t model_id, uint32_t index, InputAippType &type, size_t &aipp_index) { std::shared_ptr davinci_model = GetModel(model_id); GE_CHK_BOOL_RET_STATUS(davinci_model != nullptr, ACL_ERROR_GE_EXEC_MODEL_ID_INVALID, - "GetAIPPInfo failed, invalid model_id is %u.", - model_id); + "GetAIPPInfo failed, invalid model_id is %u.", model_id); return davinci_model->GetAippType(index, type, aipp_index); } @@ -1047,8 +1222,8 @@ Status ModelManager::GenSessionId(uint64_t &session_id) { Status ModelManager::LoadModelOffline(uint32_t &model_id, const ModelData &model, shared_ptr listener, void *dev_ptr, size_t mem_size, void *weight_ptr, size_t weight_size) { - GE_CHK_BOOL_RET_STATUS(model.key.empty() || mmAccess2(model.key.c_str(), M_F_OK) == EN_OK, - ACL_ERROR_GE_PARAM_INVALID, "input key file path %s is invalid, %s", model.key.c_str(), strerror(errno)); + GE_CHK_BOOL_RET_STATUS(model.key.empty() || mmAccess2(model.key.c_str(), M_F_OK) == EN_OK, ACL_ERROR_GE_PARAM_INVALID, + "input key file path %s is invalid, %s", model.key.c_str(), strerror(errno)); GenModelId(&model_id); shared_ptr davinci_model = nullptr; @@ -1142,8 +1317,8 @@ Status ModelManager::LoadModelWithQ(uint32_t &model_id, const ModelData &model_d const std::vector &input_queue_ids, const std::vector &output_queue_ids) { GE_CHK_BOOL_RET_STATUS(model_data.key.empty() || mmAccess2(model_data.key.c_str(), M_F_OK) == EN_OK, - ACL_ERROR_GE_PARAM_INVALID, "input key file path %s is not valid, %s", - model_data.key.c_str(), strerror(errno)); + ACL_ERROR_GE_PARAM_INVALID, "input key file path %s is not valid, %s", model_data.key.c_str(), + strerror(errno)); ModelHelper model_helper; Status ret = model_helper.LoadModel(model_data); @@ -1344,8 +1519,8 @@ Status ModelManager::LaunchKernelCustAicpuSo(const string &kernel_name) { } allocated_mem.push_back(d_so_name); GE_CHK_RT(rtMemcpy(d_aicpu_data, aicpu_data_length, aicpu_data, aicpu_data_length, RT_MEMCPY_HOST_TO_DEVICE)); - GE_CHK_RT(rtMemcpy(d_so_name, so_name.size(), reinterpret_cast(so_name.c_str()), - so_name.size(), RT_MEMCPY_HOST_TO_DEVICE)); + GE_CHK_RT(rtMemcpy(d_so_name, so_name.size(), reinterpret_cast(so_name.c_str()), so_name.size(), + RT_MEMCPY_HOST_TO_DEVICE)); CustAicpuSoBuf cust_aicpu_so_buf; cust_aicpu_so_buf.kernelSoBuf = static_cast(reinterpret_cast(d_aicpu_data)); @@ -1379,8 +1554,8 @@ Status ModelManager::LaunchKernelCustAicpuSo(const string &kernel_name) { return RT_ERROR_TO_GE_STATUS(status); } allocated_mem.push_back(batch_args); - GE_CHK_RT(rtMemcpy(batch_args, batch_args_size, static_cast(&batch_cust_so), - batch_args_size, RT_MEMCPY_HOST_TO_DEVICE)); + GE_CHK_RT(rtMemcpy(batch_args, batch_args_size, static_cast(&batch_cust_so), batch_args_size, + RT_MEMCPY_HOST_TO_DEVICE)); GE_CHK_RT(rtStreamCreate(&stream, 0)); GE_CHK_RT(rtCpuKernelLaunch(nullptr, kernel_name.c_str(), 1, batch_args, batch_args_size, nullptr, stream)); @@ -1473,8 +1648,7 @@ void ModelManager::GenModelId(uint32_t *id) { Status ModelManager::GetOrigInputInfo(uint32_t model_id, uint32_t index, OriginInputInfo &orig_input_info) { std::shared_ptr davinci_model = GetModel(model_id); GE_CHK_BOOL_RET_STATUS(davinci_model != nullptr, ACL_ERROR_GE_EXEC_MODEL_ID_INVALID, - "GetOrigInputInfo failed, invalid model_id is %u.", - model_id); + "GetOrigInputInfo failed, invalid model_id is %u.", model_id); return davinci_model->GetOrigInputInfo(index, orig_input_info); } diff --git a/ge/graph/load/new_model_manager/model_manager.h b/ge/graph/load/new_model_manager/model_manager.h index fc98d9c2..c998f77a 100755 --- a/ge/graph/load/new_model_manager/model_manager.h +++ b/ge/graph/load/new_model_manager/model_manager.h @@ -177,8 +177,7 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager { static ge::Status HandleProfStartCommand(const Command &command); static ge::Status HandleProfStopCommand(const Command &command); - static ge::Status GetModelByCmd(const Command &command, - std::shared_ptr &davinci_model); + static ge::Status GetModelByCmd(const Command &command, std::shared_ptr &davinci_model); /// /// @ingroup domi_ome /// @brief get model memory usage @@ -341,6 +340,15 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager { void InsertModel(uint32_t id, std::shared_ptr &davinci_model); void InsertModel(uint32_t id, std::shared_ptr &hybrid_model); + void InsertModelResource(uint32_t model_id, int64_t stream_num, int64_t event_num); + + Status CheckStreamAndEventResource(const GeModelPtr &ge_model); + Status CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num); + int64_t CalFollowStreamSum(const std::multimap &hccl_stream_map); + Status ReleaseRsource(int64_t need_resource, int64_t free_resource, const string &resource_kind); + Status GetFreeStreamAndEvent(int64_t &free_stream, int64_t &free_event); + void GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model); + /// /// @ingroup domi_ome /// @brief delete model from model manager set @@ -362,7 +370,9 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager { std::vector exception_infos_; std::mutex cust_aicpu_mutex_; std::map> cust_aicpu_so_; - + std::map stream_map_; + std::map event_map_; + std::mutex resource_mutex_; static DumpProperties dump_properties_; }; } // namespace ge diff --git a/ge/graph/manager/graph_manager.cc b/ge/graph/manager/graph_manager.cc index 2c2495b4..e33d9fa5 100755 --- a/ge/graph/manager/graph_manager.cc +++ b/ge/graph/manager/graph_manager.cc @@ -133,6 +133,7 @@ const char *const kShapeDataName = "ascend_mbatch_shape_data"; const char *const kGetNextName = "IteratorV2"; const char *const kExtAttrDataNodes = "data_nodes"; const char *const kExtAttrGetNextNoSink = "getnext_no_sink"; +const char *const kLazyRecompile = "LazyRecompile"; bool IsTailingOptimization() { string is_tailing_optimization_option; @@ -379,6 +380,17 @@ Status GraphManager::AddGraph(const GraphId &graph_id, const Graph &graph, graph_node->SetOptions(options); AddGraphNode(graph_id, graph_node); + string dynamic_input; + Status status = ParseOption(options,DYNAMIC_INPUT,dynamic_input); + string dynamic_execute; + status = ParseOption(options,DYNAMIC_EXECUTE,dynamic_execute); + + if (dynamic_input == "1" && dynamic_execute == kLazyRecompile) { + if (!AttrUtils::SetStr(*compute_graph, ATTR_NAME_DYNAMIC_EXECUTE, dynamic_execute)) { + GELOGW("Set attribute dynamic execute failed."); + } + } + AddLocalOmgContext(graph_id, omg_context); if (!options_.output_datatype.empty()) { GetLocalOmgContext().output_type = options_.output_datatype; @@ -390,7 +402,7 @@ Status GraphManager::AddGraph(const GraphId &graph_id, const Graph &graph, CompilerStages &stages = GetCompilerStages(graph_id); stages.preparer.SetOptions(options_); - Status status = stages.optimizer.SetOptions(options_); + status = stages.optimizer.SetOptions(options_); if (status != SUCCESS) { GELOGE(status, "Graph optimizer set options failed."); return status; diff --git a/ge/hybrid/executor/hybrid_model_async_executor.cc b/ge/hybrid/executor/hybrid_model_async_executor.cc index ba717a2d..c6eaf59c 100644 --- a/ge/hybrid/executor/hybrid_model_async_executor.cc +++ b/ge/hybrid/executor/hybrid_model_async_executor.cc @@ -20,10 +20,13 @@ #include "graph/utils/type_utils.h" #include "graph/ge_context.h" #include "omm/csa_interact.h" +#include "graph/execute/dynamic_execute_addr.h" +#include "graph/debug/ge_attr_define.h" namespace ge { namespace hybrid { namespace { +const char *const kLazyRecompile = "LazyRecompile";ss int kDataOutputIndex = 0; } HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model) @@ -290,6 +293,8 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a } GELOGD("Number of outputs = %zu", output_tensor_desc_list.size()); + map tensor_value_info; + map > session_tensor_values; for (size_t i = 0; i < output_tensors.size(); ++i) { GELOGD("Start to process output[%zu]", i); auto &output_tensor = output_tensors[i]; @@ -326,16 +331,33 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a output.data_type = static_cast(tensor_desc->GetDataType()); output.dims = tensor_desc->GetShape().GetDims(); output.length = output_size; + GeRootModelPtr ge_root_model = model_->ge_root_model_; + GE_CHECK_NOTNULL(ge_root_model); + ComputeGraphPtr computer_graph = ge_root_model->GetRootGraph(); + string execute_mode; + if (!AttrUtils::GetStr(*computer_graph, ATTR_NAME_DYNAMIC_EXECUTE, execute_mode)) { + GELOGW("Can not get dynamic execute attr"); + } if (output_size > 0) { - std::unique_ptr data_buf(new(std::nothrow) uint8_t[output_size]); - GE_CHECK_NOTNULL(data_buf); - GE_CHK_RT_RET(rtMemcpy(data_buf.get(), + if (execute_mode != kLazyRecompile){ + std::unique_ptr data_buf(new(std::nothrow) uint8_t[output_size]); + GE_CHECK_NOTNULL(data_buf); + GE_CHK_RT_RET(rtMemcpy(data_buf.get(), output_size, output_tensor.GetData(), output_size, RT_MEMCPY_DEVICE_TO_HOST)); output.data = std::move(data_buf); + output.placement = 0; output_data->blobs.emplace_back(data_buf.get(), static_cast(output_size), false); + } else { + GELOGD("The dynamic execute attr is %s", execute_mode.c_str()); + output.data.reset(reinterpret_cast(const_cast(output_tensor.GetData()))); + output.placement = 1; + tensor_value_info.emplace(output_tensor.GetData(),output_tensor); + output_data->blobs.emplace_back(output_tensor.GetData(), static_cast(output_size), false); + } + } else { GELOGW("Output[%zu] is empty. shape = [%s]", i, tensor_desc->GetShape().ToString().c_str()); output.data = nullptr; @@ -350,6 +372,9 @@ Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &a output_size); } + uint64_t session_id = executor_->GetContext()->session_id; + session_tensor_values.emplace(session_id,tensor_value_info); + DynamicExecuteAddr::GetInstance().SetDynamicExecuteDevAddr(session_id,session_tensor_values); return SUCCESS; } diff --git a/inc/external/ge/ge_api_types.h b/inc/external/ge/ge_api_types.h index cce17f93..dfe2a0f1 100644 --- a/inc/external/ge/ge_api_types.h +++ b/inc/external/ge/ge_api_types.h @@ -297,12 +297,19 @@ const std::string OP_BANK_PATH_FLAG = "ge.op_bank_path"; // Graph run mode enum GraphRunMode { PREDICTION = 0, TRAIN }; +// Dynamic input shape for training +const std::string DYNAMIC_INPUT = "ge.dynamic_input"; + +// Dynamic execute mode +const std::string DYNAMIC_EXECUTE = "ge.dynamic_execute"; + // Input/Output tensor info struct InputTensorInfo { uint32_t data_type; // data type std::vector dims; // shape description void *data; // tensor data int64_t length; // tensor length + uint32_t placement; }; struct OutputTensorInfo { @@ -310,6 +317,7 @@ struct OutputTensorInfo { std::vector dims; // shape description std::unique_ptr data; // tensor data int64_t length; // tensor length + uint32_t placement; OutputTensorInfo() : data_type(0), dims({}), data(nullptr), length(0) {} OutputTensorInfo(OutputTensorInfo &&out) : data_type(out.data_type), dims(out.dims), data(std::move(out.data)), length(out.length) {} diff --git a/inc/framework/common/ge_types.h b/inc/framework/common/ge_types.h index fb1f0be1..a513657b 100644 --- a/inc/framework/common/ge_types.h +++ b/inc/framework/common/ge_types.h @@ -59,6 +59,7 @@ struct DataBuffer { void *data; // Data address uint64_t length; // Data length bool isDataSupportMemShare = false; + uint32_t placement; DataBuffer(void *dataIn, uint64_t len, bool isSupportMemShare) : data(dataIn), length(len), isDataSupportMemShare(isSupportMemShare) {}