diff --git a/ge/client/ge_api.cc b/ge/client/ge_api.cc index 0c63c6e3..510e43b8 100644 --- a/ge/client/ge_api.cc +++ b/ge/client/ge_api.cc @@ -582,6 +582,38 @@ Status Session::RunGraph(uint32_t graph_id, const std::vector &inputs, s return ret; } +// Run Graph with stream Asynchronously +Status Session::RunGraphWithStreamAsync(uint32_t graph_id, const std::vector &inputs, + std::vector &outputs, void *stream) { + ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther); + GELOGT(TRACE_INIT, "Session RunGraphWithStreamAsync start"); + + ErrorManager::GetInstance().GenWorkStreamIdBySessionGraph(sessionId_, graph_id); + std::shared_ptr instance_ptr = ge::GELib::GetInstance(); + if (instance_ptr == nullptr || !instance_ptr->InitFlag()) { + GELOGE(GE_CLI_GE_NOT_INITIALIZED, + "[Run][Graph]RunGraphWithStreamAsyncFailed, the GELib instance is nullptr or is not InitFlag."); + REPORT_INNER_ERROR("E19999", + "RunGraphWithStreamAsync Failed, the GELib instance is nullptr or is not InitFlag."); + return FAILED; + } + GELOGT(TRACE_RUNNING, "Run Graph RunGraphWithStreamAsync"); + Status ret = instance_ptr->SessionManagerObj().RunGraphWithStreamAsync(sessionId_, graph_id, inputs, outputs, + stream); + if (ret != SUCCESS) { + GELOGE(ret, "[Run][Graph]RunGraphWithStreamAsync Failed, error code:%u, session_id:%lu, graph_id:%u.", + ret, sessionId_, graph_id); + return FAILED; + } + + if (!outputs.empty()) { + PrintOutputResult(outputs); + } + + GELOGT(TRACE_STOP, "Session RunGraph finished"); + return SUCCESS; +} + // Register Call Back Status Session::RegisterCallBackFunc(const std::string &key, const pCallBackFunc &callback) { ErrorManager::GetInstance().GenWorkStreamIdDefault(); diff --git a/ge/graph/execute/graph_execute.cc b/ge/graph/execute/graph_execute.cc index d924302c..8dddca06 100755 --- a/ge/graph/execute/graph_execute.cc +++ b/ge/graph/execute/graph_execute.cc @@ -400,6 +400,70 @@ Status GraphExecutor::ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr & return SUCCESS; } +void GraphExecutor::GetInputOutputData(const std::vector &input_tensor, + std::vector &output_tensor, + InputData &inputs, + OutputData &outputs) { + graph_input_data.index = 0; + graph_input_data.timeout = 0; + graph_input_data.timestamp = 0; + + for (const auto &tensor : input_tensor) { + DataBuffer in_data_buf; + in_data_buf.data = reinterpret_cast(tensor.GetData().data()); + in_data_buf.length = tensor.GetData().size(); + in_data_buf.isDataSupportMemShare = false; + inputs.blobs.emplace_back(in_data_buf); + } + + outputs.index = 0; + for (const auto &tensor : output_tensor) { + DataBuffer out_data_buf; + out_data_buf.data = reinterpret_cast(tensor.GetData().data()); + out_data_buf.length = tensor.GetData().size(); + out_data_buf.isDataSupportMemShare = false; + outputs.blobs.emplace_back(out_data_buf); + } +} + +Status GraphExecutor::ExecuteGraphWithStream(GraphId graph_id, + const GeRootModelPtr &ge_root_model, + const std::vector &input_tensor, + std::vector &output_tensor, + rtStream_t stream) { + GELOGI("[GraphExecutor] Start to execute graph with stream, graph_id=%u", graph_id); + if (!init_flag_) { + GELOGE(GE_GRAPH_EXECUTE_NOT_INIT, "[GraphExecutor] AI Core Engine without calling SetCondition!"); + return GE_GRAPH_EXECUTE_NOT_INIT; + } + + if (graph_id != last_graph_id_) { + auto ret = FreeExecuteMemory(); + if (ret != SUCCESS) { + return ret; + } + } + last_graph_id_ = graph_id; + + GE_CHECK_NOTNULL_EXEC(ge_root_model, return FAILED); + auto model_id = ge_root_model->GetModelId(); + InputData input_data; + OutputData output_data; + input_data.model_id = model_id; + output_data.model_id = model_id; + GetInputOutputData(input_tensor, output_tensor, input_data, output_data); + + auto async_mode = true; + std::vector input_ge_desc; + std::vector output_ge_desc; + auto model_manager = ge::ModelManager::GetInstance(); + GE_CHECK_NOTNULL(model_manager); + model_manager->ExecuteModel(model_id, stream, async_mode, input_data, input_ge_desc, output_data, output_ge_desc); + + GELOGI("[GraphExecutor] execute model success, modelId=%u.", model_id); + return SUCCESS; +} + Status GraphExecutor::AsyncExecuteModel(uint32_t model_id, const std::vector &inputs) { try { auto model_manager = ge::ModelManager::GetInstance(); diff --git a/ge/graph/execute/graph_execute.h b/ge/graph/execute/graph_execute.h index d2a92e47..14c4c858 100755 --- a/ge/graph/execute/graph_execute.h +++ b/ge/graph/execute/graph_execute.h @@ -52,6 +52,12 @@ class GraphExecutor { ge::Status ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &ge_root_model, const std::vector &input_tensor); + Status ExecuteGraphWithStream(GraphId graph_id, + const GeRootModelPtr &ge_root_model, + const std::vector &input_tensor, + std::vector &output_tensor, + rtStream_t stream); + Status SetCondition(std::mutex *mutex, std::condition_variable *cond, std::shared_ptr listener); Status SetGraphContext(GraphContextPtr graph_context_ptr); diff --git a/ge/graph/load/graph_loader.cc b/ge/graph/load/graph_loader.cc index cf95b271..a38655c5 100755 --- a/ge/graph/load/graph_loader.cc +++ b/ge/graph/load/graph_loader.cc @@ -81,6 +81,18 @@ Status GraphLoader::LoadModelOnline(uint32_t &model_id, const std::shared_ptrCheckIsSpecificStream()) { + GELOGI("No need to start a new thread to run model in specific scene"); + rt_ret = rtDeviceReset(GetContext().DeviceId()); + if (rt_ret != RT_ERROR_NONE) { + REPORT_CALL_ERROR("E19999", "Call rtDeviceReset failed, device_id:%u, ret:0x%X", + GetContext().DeviceId(), rt_ret); + GELOGE(RT_FAILED, "Call rt api failed, ret: 0x%X", rt_ret); + } + + return SUCCESS; + } + ret = model_manager->Start(model_id); if (ret != SUCCESS) { if (model_manager->Unload(model_id) != SUCCESS) { diff --git a/ge/graph/manager/graph_manager.cc b/ge/graph/manager/graph_manager.cc index f7357d9d..02a02fa8 100755 --- a/ge/graph/manager/graph_manager.cc +++ b/ge/graph/manager/graph_manager.cc @@ -1004,6 +1004,7 @@ Status GraphManager::LoadGraph(const GeRootModelPtr &ge_root_model, const GraphN GE_CHK_STATUS_RET(CheckAndReleaseMemory(ge_model, graph_node)); } } + ge_root_model->SetIsSpecificStream(graph_node->CheckIsSpecificStream()); GE_TIMESTAMP_START(LoadGraph); Status ret = GraphLoader::LoadModelOnline(model_id_info.model_id, ge_root_model, model_listener); GE_TIMESTAMP_EVENT_END(LoadGraph, "GraphManager::LoadGraph"); @@ -1127,6 +1128,63 @@ Status GraphManager::InnerRunGraph(GraphNodePtr &graph_node, const GraphId &grap return SUCCESS; } +Status RunGraphWithStreamAsync(const GraphId &graph_id, const std::vector &inputs, + std::vector &outputs, rtStream_t stream, uint64_t session_id) { + ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther); + std::lock_guard lock(run_mutex_); + GELOGI("[RunGraphWithStreamAsync] start to run graph, graph_id = %u, is_train_graph: %d", graph_id, GetTrainFlag()); + + if (inputs.empty()) { + GELOGI("[RunGraphWithStreamAsync] initialize sub graph has no inputs"); + } + + // find graph + GraphNodePtr graph_node = nullptr; + Status ret = GetGraphNode(graph_id, graph_node); + if (ret != SUCCESS) { + GELOGE(ret, "[RunGraphWithStreamAsync] graph not exist, graph_id = %u.", graph_id); + return ret; + } + if (graph_node == nullptr) { + GELOGE(GE_GRAPH_GRAPH_NODE_NULL, "[RunGraphWithStreamAsync] graph node is NULL, graph_id = %u.", graph_id); + return GE_GRAPH_GRAPH_NODE_NULL; + } + if (graph_node->GetRunFlag()) { + GELOGE(GE_GRAPH_ALREADY_RUNNING, "[RunGraphWithStreamAsync] graph already running, graph id = %u", graph_id); + return GE_GRAPH_ALREADY_RUNNING; + } + + UpdateLocalOmgContext(graph_id); + // set graph's run flag + graph_node->SetRunFlag(true); + graph_node->SetIsSpecificStream(true); + ComputeGraphPtr compute_graph_tmp = GraphUtils::GetComputeGraph(*(graph_node->GetGraph())); + + // when set incre build, add cache helper map + AddModelCacheHelperToMap(graph_id, session_id, compute_graph_tmp); + if (options_.local_fmk_op_flag) { + GetCompilerStages(graph_id).optimizer.TranFrameOp(compute_graph_tmp); + } + GeRootModelPtr ge_root_model = nullptr; + ret = StartForRunGraph(graph_node, inputs, ge_root_model, session_id); + if (ret != SUCCESS) { + GELOGE(ret, "[RunGraphWithStreamAsync] StartForRunGraph failed!"); + graph_node->SetRunFlag(false); + return ret; + } + + auto ret = graph_executor_.ExecuteGraphWithStream(graph_id, graph_node->GetGeRootModel(), + inputs, outputs, stream); + graph_node->SetRunFlag(false); + graph_node->SetIsSpecificStream(false); + if (ret != SUCCESS) { + GELOGE(ret, "[RunGraphWithStreamAsync] execute graph failed, graph_id = %u.", graph_id); + return ret; + } + GELOGI("[RunGraphWithStreamAsync] run graph success, graph_id = %u.", graph_id); + return SUCCESS; +} + Status GraphManager::RunGraph(const GraphId &graph_id, const std::vector &inputs, std::vector &outputs, uint64_t session_id) { ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther); diff --git a/ge/graph/manager/graph_manager.h b/ge/graph/manager/graph_manager.h index b63b138a..df8b8776 100644 --- a/ge/graph/manager/graph_manager.h +++ b/ge/graph/manager/graph_manager.h @@ -103,6 +103,18 @@ class GraphManager { Status RunGraph(const GraphId &graph_id, const std::vector &inputs, std::vector &outputs, uint64_t session_id = INVALID_SESSION_ID); + /// + /// @ingroup ge_graph + /// @brief run specific graph with specific session id and stream + /// @param [in] graph_id graph id + /// @param [in] inputs input data + /// @param [in] stream specific stream + /// @param [out] outputs output data + /// @return Status result of function + /// + Status RunGraphWithStreamAsync(const GraphId &graph_id, const std::vector &inputs, + std::vector &outputs, rtStream_t stream, uint64_t session_id); + /// /// @ingroup ge_graph /// @brief build specific graph diff --git a/ge/graph/manager/graph_manager_utils.cc b/ge/graph/manager/graph_manager_utils.cc index 3a8d577c..cd7eb2a5 100644 --- a/ge/graph/manager/graph_manager_utils.cc +++ b/ge/graph/manager/graph_manager_utils.cc @@ -41,6 +41,7 @@ GraphNode::GraphNode(GraphId graph_id) build_flag_(false), load_flag_(false), async_(false), + is_specific_stream_(false), ge_model_(nullptr), sem_(1) { graph_run_async_listener_ = MakeShared(); diff --git a/ge/graph/manager/graph_manager_utils.h b/ge/graph/manager/graph_manager_utils.h index cfe6588f..31dd6c11 100644 --- a/ge/graph/manager/graph_manager_utils.h +++ b/ge/graph/manager/graph_manager_utils.h @@ -164,6 +164,8 @@ class GraphNode { bool GetLoadFlag() const { return load_flag_; } void SetLoadFlag(bool load_flag) { load_flag_ = load_flag; } void SetGeModel(const GeModelPtr &ge_model) { ge_model_ = ge_model; } + void SetIsSpecificStream(bool specific_stream) { is_specific_stream_ = specific_stream; } + bool CheckIsSpecificStream() { return is_specific_stream_; } GeModelPtr GetGeModel() const { return ge_model_; } void SetGeRootModel(const GeRootModelPtr &ge_root_model) { ge_root_model_ = ge_root_model; } GeRootModelPtr GetGeRootModel() const { return ge_root_model_; } @@ -186,6 +188,7 @@ class GraphNode { bool build_flag_; bool load_flag_; bool async_; + bool is_specific_stream_; GeModelPtr ge_model_; GeRootModelPtr ge_root_model_; BlockingQueue sem_; diff --git a/ge/model/ge_root_model.h b/ge/model/ge_root_model.h index aa5a4d47..3f98c091 100755 --- a/ge/model/ge_root_model.h +++ b/ge/model/ge_root_model.h @@ -34,6 +34,8 @@ class GeRootModel { const ComputeGraphPtr &GetRootGraph() const { return root_graph_; }; void SetModelId(uint32_t model_id) { model_id_ = model_id; } + void SetIsSpecificStream(bool is_specific_stream) { is_specific_stream_ = is_specific_stream; } + bool CheckIsSpecificStream() {return is_specific_stream_; } uint32_t GetModelId() const { return model_id_; } Status CheckIsUnknownShape(bool &is_dynamic_shape); void SetRootGraph(ComputeGraphPtr graph) { root_graph_ = graph; } @@ -41,6 +43,7 @@ class GeRootModel { ComputeGraphPtr root_graph_ = nullptr; std::map subgraph_instance_name_to_model_; uint32_t model_id_ = 0; + bool is_specific_stream_ = false; }; } // namespace ge using GeRootModelPtr = std::shared_ptr; diff --git a/ge/session/inner_session.cc b/ge/session/inner_session.cc index e8b3ae0e..49e3a079 100755 --- a/ge/session/inner_session.cc +++ b/ge/session/inner_session.cc @@ -262,6 +262,43 @@ Status InnerSession::RunGraph(uint32_t graph_id, const std::vector &inpu } } +Status InnerSession::RunGraphWithStreamAsync(uint32_t graph_id, const std::vector &inputs, + std::vector &outputs, rtStream_t stream) { + GELOGI("[InnerSession:%lu] run graph with stream on session, graph_id=%u.", session_id_, graph_id); + if (mutex_.try_lock()) { // 必要的吗? + std::lock_guard lock(mutex_, std::adopt_lock); + if (!init_flag_) { // 必要的吗? + GELOGE(GE_SESS_INIT_FAILED, "[InnerSession:%lu] initialize failed.", session_id_); + return GE_SESS_INIT_FAILED; + } + UpdateThreadContext(graph_id); + vector geInputs; + for (auto &item : inputs) { + geInputs.emplace_back(TensorAdapter::AsGeTensor(item)); + } + vector geOutputs; + for (auto &item : outputs) { + ge_outputs.emplace_back(TensorAdapter::AsGeTensor(item)); + } + Status ret = graph_manager_.RunGraphWithStreamAsync(graph_id, geInputs, geOutputs, stream, session_id_); + domi::GetContext().out_nodes_map.clear(); + domi::GetContext().user_out_nodes.clear(); + if (ret != SUCCESS) { + GELOGE(ret, "[Run][GraphWithStreamAsync]failed, InnerSession:%lu graph_id=%u.", session_id_, graph_id); + REPORT_CALL_ERROR("E19999", + "GraphManager RunGrapWithStreamhAsync failed, InnerSession:%lu graph_id=%u.", + session_id_, graph_id); + return ret; + } + + GELOGI("[InnerSession:%lu] run graph success, graph_id=%u.", session_id_, graph_id); + return SUCCESS; + } else { + GELOGE(GE_SESS_ALREADY_RUNNING, "[InnerSession:%lu] run graph failed, graph_id=%u.", session_id_, graph_id); + return GE_SESS_ALREADY_RUNNING; + } +} + Status InnerSession::RemoveGraph(uint32_t graph_id) { std::lock_guard lock(resource_mutex_); if (!init_flag_) { diff --git a/ge/session/inner_session.h b/ge/session/inner_session.h index 5cab43d8..6fa23266 100644 --- a/ge/session/inner_session.h +++ b/ge/session/inner_session.h @@ -41,6 +41,9 @@ class InnerSession { Status RunGraph(uint32_t graph_id, const std::vector &inputs, std::vector &outputs); + Status RunGraphWithStreamAsync(uint32_t graph_id, const std::vector &inputs, std::vector &outputs, + rtStream_t stream); + Status RemoveGraph(uint32_t graph_id); Status BuildGraph(uint32_t graph_id, const std::vector &inputs); diff --git a/ge/session/session_manager.cc b/ge/session/session_manager.cc index 1e4efa6b..8cd54830 100755 --- a/ge/session/session_manager.cc +++ b/ge/session/session_manager.cc @@ -242,6 +242,33 @@ Status SessionManager::RunGraph(SessionId session_id, uint32_t graph_id, const s return innerSession->RunGraph(graph_id, inputs, outputs); } +Status SessionManager::RunGraphWithStreamAsync(SessionId session_id, + uint32_t graph_id, + const std::vector &inputs, + std::vector &outputs, + rtStream_t stream) { + if (!init_flag_) { + GELOGE(GE_SESSION_MANAGER_NOT_INIT, + "[RunWithStream][Graph]Session manager is not initialized, session_id:%lu, graph_id:%u.", + session_id, graph_id); + REPORT_INNER_ERROR("E19999", + "RunGraphWithStreamAsync fail for Session manager is not initialized, session_id:%lu, graph_id:%u.", + session_id, graph_id); + return GE_SESSION_MANAGER_NOT_INIT; + } + SessionPtr innerSession = nullptr; + { + std::lock_guard lock(mutex_); + std::map::iterator it = session_manager_map_.find(session_id); + if (it == session_manager_map_.end()) { + return GE_SESSION_NOT_EXIST; + } else { + innerSession = it->second; + } + } + return innerSession->RunGraphWithStreamAsync(graph_id, inputs, outputs, stream); +} + Status SessionManager::RemoveGraph(SessionId session_id, uint32_t graph_id) { if (!init_flag_) { GELOGE(GE_SESSION_MANAGER_NOT_INIT, diff --git a/ge/session/session_manager.h b/ge/session/session_manager.h index da23219c..bcf967a0 100644 --- a/ge/session/session_manager.h +++ b/ge/session/session_manager.h @@ -25,6 +25,7 @@ #include "common/ge_inner_error_codes.h" #include "ge/ge_api_types.h" #include "session/inner_session.h" +#include "runtime/base.h" namespace ge { using SessionPtr = std::shared_ptr; @@ -96,6 +97,19 @@ class SessionManager { Status RunGraph(SessionId session_id, uint32_t graph_id, const std::vector &inputs, std::vector &outputs); + /// + /// @ingroup ge_session + /// @brief run a graph of the session with specific stream asynchronously + /// @param [in] session_id session id + /// @param [in] graph_id graph id + /// @param [in] inputs input data + /// @param [in] stream specific stream + /// @param [out] outputs output data + /// @return Status result of function + /// + Status RunGraphWithStreamAsync(SessionId session_id, uint32_t graph_id, const std::vector &inputs, + std::vector &outputs, rtStream_t stream); + /// /// @ingroup ge_session /// @brief remove a graph from the session with specific session id diff --git a/inc/external/ge/ge_api.h b/inc/external/ge/ge_api.h index c8b5a8ec..7d1b37e2 100644 --- a/inc/external/ge/ge_api.h +++ b/inc/external/ge/ge_api.h @@ -121,6 +121,18 @@ class GE_FUNC_VISIBILITY Session { /// Status RunGraph(uint32_t graphId, const std::vector &inputs, std::vector &outputs); + /// + /// @ingroup ge_graph + /// @brief run a graph of the session with specific session id and specific stream asynchronously + /// @param [in] graph_id graph id + /// @param [in] inputs input data + /// @param [in] stream specific stream + /// @param [out] outputs output data + /// @return Status result of function + /// + Status RunGraphWithStreamAsync(uint32_t graph_id, const std::vector &inputs, std::vector &outputs, + void *stream); + /// /// @ingroup ge_graph /// @brief build graph in the session with specific session id