Browse Source

Pre Merge pull request !1506 from 赵之轩/my_dev

pull/1506/MERGE
赵之轩 Gitee 4 years ago
parent
commit
b777978689
16 changed files with 291 additions and 3 deletions
  1. +32
    -0
      ge/client/ge_api.cc
  2. +63
    -0
      ge/graph/execute/graph_execute.cc
  3. +9
    -0
      ge/graph/execute/graph_execute.h
  4. +12
    -0
      ge/graph/load/graph_loader.cc
  5. +58
    -0
      ge/graph/manager/graph_manager.cc
  6. +12
    -0
      ge/graph/manager/graph_manager.h
  7. +1
    -0
      ge/graph/manager/graph_manager_utils.cc
  8. +3
    -0
      ge/graph/manager/graph_manager_utils.h
  9. +6
    -1
      ge/model/ge_root_model.h
  10. +37
    -0
      ge/session/inner_session.cc
  11. +3
    -0
      ge/session/inner_session.h
  12. +27
    -0
      ge/session/session_manager.cc
  13. +14
    -0
      ge/session/session_manager.h
  14. +12
    -0
      inc/external/ge/ge_api.h
  15. +1
    -1
      metadef
  16. +1
    -1
      parser

+ 32
- 0
ge/client/ge_api.cc View File

@@ -582,6 +582,38 @@ Status Session::RunGraph(uint32_t graph_id, const std::vector<Tensor> &inputs, s
return ret;
}

// Run Graph with stream Asynchronously
Status Session::RunGraphWithStreamAsync(uint32_t graph_id, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs, void *stream) {
ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther);
GELOGT(TRACE_INIT, "Session RunGraphWithStreamAsync start");

ErrorManager::GetInstance().GenWorkStreamIdBySessionGraph(sessionId_, graph_id);
std::shared_ptr<GELib> instance_ptr = ge::GELib::GetInstance();
if (instance_ptr == nullptr || !instance_ptr->InitFlag()) {
GELOGE(GE_CLI_GE_NOT_INITIALIZED,
"[Run][Graph]RunGraphWithStreamAsyncFailed, the GELib instance is nullptr or is not InitFlag.");
REPORT_INNER_ERROR("E19999",
"RunGraphWithStreamAsync Failed, the GELib instance is nullptr or is not InitFlag.");
return FAILED;
}
GELOGT(TRACE_RUNNING, "Run Graph RunGraphWithStreamAsync");
Status ret = instance_ptr->SessionManagerObj().RunGraphWithStreamAsync(sessionId_, graph_id, inputs, outputs,
stream);
if (ret != SUCCESS) {
GELOGE(ret, "[Run][Graph]RunGraphWithStreamAsync Failed, error code:%u, session_id:%lu, graph_id:%u.",
ret, sessionId_, graph_id);
return FAILED;
}

if (!outputs.empty()) {
PrintOutputResult(outputs);
}

GELOGT(TRACE_STOP, "Session RunGraph finished");
return SUCCESS;
}

// Register Call Back
Status Session::RegisterCallBackFunc(const std::string &key, const pCallBackFunc &callback) {
ErrorManager::GetInstance().GenWorkStreamIdDefault();


+ 63
- 0
ge/graph/execute/graph_execute.cc View File

@@ -404,6 +404,69 @@ Status GraphExecutor::ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &
return SUCCESS;
}

void GraphExecutor::GetInputOutputData(const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor,
InputData &inputs,
OutputData &outputs) {
inputs.index = 0;
inputs.timeout = 0;
inputs.timestamp = 0;
for (auto &tensor : input_tensor) {
DataBuffer in_data_buf;
in_data_buf.data = const_cast<uint8_t *>(tensor.GetData().data());
in_data_buf.length = tensor.GetData().size();
in_data_buf.isDataSupportMemShare = false;
inputs.blobs.emplace_back(in_data_buf);
}

outputs.index = 0;
for (auto &tensor : output_tensor) {
DataBuffer out_data_buf;
out_data_buf.data = const_cast<uint8_t *>(tensor.GetData().data());
out_data_buf.length = tensor.GetData().size();
out_data_buf.isDataSupportMemShare = false;
outputs.blobs.emplace_back(out_data_buf);
}
}

Status GraphExecutor::ExecuteGraphWithStream(GraphId graph_id,
const GeRootModelPtr &ge_root_model,
const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor,
rtStream_t stream) {
GELOGI("[GraphExecutor] Start to execute graph with stream, graph_id=%u", graph_id);
if (!init_flag_) {
GELOGE(GE_GRAPH_EXECUTE_NOT_INIT, "[GraphExecutor] AI Core Engine without calling SetCondition!");
return GE_GRAPH_EXECUTE_NOT_INIT;
}

if (graph_id != last_graph_id_) {
auto ret = FreeExecuteMemory();
if (ret != SUCCESS) {
return ret;
}
}
last_graph_id_ = graph_id;

GE_CHECK_NOTNULL_EXEC(ge_root_model, return FAILED);
auto model_id = ge_root_model->GetModelId();
InputData input_data;
OutputData output_data;
input_data.model_id = model_id;
output_data.model_id = model_id;
GetInputOutputData(input_tensor, output_tensor, input_data, output_data);

auto async_mode = true;
std::vector<GeTensorDesc> input_ge_desc;
std::vector<GeTensorDesc> output_ge_desc;
auto model_manager = ge::ModelManager::GetInstance();
GE_CHECK_NOTNULL(model_manager);
model_manager->ExecuteModel(model_id, stream, async_mode, input_data, input_ge_desc, output_data, output_ge_desc);

GELOGI("[GraphExecutor] execute model success, modelId=%u.", model_id);
return SUCCESS;
}

bool CompareByLoad(const Uint32Pair &lhs, const Uint32Pair &rhs) {
return lhs.second < rhs.second;
}


+ 9
- 0
ge/graph/execute/graph_execute.h View File

@@ -52,6 +52,12 @@ class GraphExecutor {
ge::Status ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &ge_root_model,
const std::vector<InputTensorInfo> &input_tensor, const RunAsyncCallback &callback);

Status ExecuteGraphWithStream(GraphId graph_id,
const GeRootModelPtr &ge_root_model,
const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor,
rtStream_t stream);

Status SetCondition(std::mutex *mutex, std::condition_variable *cond, std::shared_ptr<GraphModelListener> listener);

Status SetGraphContext(GraphContextPtr graph_context_ptr);
@@ -122,6 +128,9 @@ class GraphExecutor {
Status PrepareInputData(const std::vector<GeTensor> &input_tensor, InputData &graph_input_data,
OutputData &graph_output_data, std::vector<InputOutputDescInfo> &output_desc);

void GetInputOutputData(const std::vector<GeTensor> &input_tensor, std::vector<GeTensor> &output_tensor,
InputData &inputs, OutputData &outputs);

Status SyncExecuteModel(uint32_t model_id, const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor);



+ 12
- 0
ge/graph/load/graph_loader.cc View File

@@ -80,6 +80,18 @@ Status GraphLoader::LoadModelOnline(uint32_t &model_id, const std::shared_ptr<ge
return ret;
}

if (ge_root_model_ptr->CheckIsSpecificStream()) {
GELOGI("No need to start a new thread to run model in specific scene");
rt_ret = rtDeviceReset(GetContext().DeviceId());
if (rt_ret != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtDeviceReset failed, device_id:%u, ret:0x%X",
GetContext().DeviceId(), rt_ret);
GELOGE(RT_FAILED, "Call rt api failed, ret: 0x%X", rt_ret);
}

return SUCCESS;
}

ret = model_manager->Start(model_id);
if (ret != SUCCESS) {
if (model_manager->Unload(model_id) != SUCCESS) {


+ 58
- 0
ge/graph/manager/graph_manager.cc View File

@@ -1124,6 +1124,7 @@ Status GraphManager::LoadGraph(const GeRootModelPtr &ge_root_model, const GraphN
GE_CHK_STATUS_RET(CheckAndReleaseMemory(ge_model, graph_node));
}
}
ge_root_model->SetIsSpecificStream(graph_node->CheckIsSpecificStream());
GE_TIMESTAMP_START(LoadGraph);
Status ret = GraphLoader::LoadModelOnline(model_id_info.model_id, ge_root_model, model_listener);
GE_TIMESTAMP_EVENT_END(LoadGraph, "GraphManager::LoadGraph");
@@ -1247,6 +1248,63 @@ Status GraphManager::InnerRunGraph(GraphNodePtr &graph_node, const GraphId &grap
return SUCCESS;
}

Status GraphManager::RunGraphWithStreamAsync(const GraphId &graph_id, const std::vector<GeTensor> &inputs,
std::vector<GeTensor> &outputs, rtStream_t stream, uint64_t session_id) {
ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther);
std::lock_guard<std::mutex> lock(run_mutex_);
GELOGI("[RunGraphWithStreamAsync] start to run graph, graph_id = %u, is_train_graph: %d", graph_id, GetTrainFlag());

if (inputs.empty()) {
GELOGI("[RunGraphWithStreamAsync] initialize sub graph has no inputs");
}

// find graph
GraphNodePtr graph_node = nullptr;
Status ret = GetGraphNode(graph_id, graph_node);
if (ret != SUCCESS) {
GELOGE(ret, "[RunGraphWithStreamAsync] graph not exist, graph_id = %u.", graph_id);
return ret;
}
if (graph_node == nullptr) {
GELOGE(GE_GRAPH_GRAPH_NODE_NULL, "[RunGraphWithStreamAsync] graph node is NULL, graph_id = %u.", graph_id);
return GE_GRAPH_GRAPH_NODE_NULL;
}
if (graph_node->GetRunFlag()) {
GELOGE(GE_GRAPH_ALREADY_RUNNING, "[RunGraphWithStreamAsync] graph already running, graph id = %u", graph_id);
return GE_GRAPH_ALREADY_RUNNING;
}

UpdateLocalOmgContext(graph_id);
// set graph's run flag
graph_node->SetRunFlag(true);
graph_node->SetIsSpecificStream(true);
ComputeGraphPtr compute_graph_tmp = GraphUtils::GetComputeGraph(*(graph_node->GetGraph()));

// when set incre build, add cache helper map
AddModelCacheHelperToMap(graph_id, session_id, compute_graph_tmp);
if (options_.local_fmk_op_flag) {
GetCompilerStages(graph_id).optimizer.TranFrameOp(compute_graph_tmp);
}
GeRootModelPtr ge_root_model = nullptr;
ret = StartForRunGraph(graph_node, inputs, ge_root_model, session_id);
if (ret != SUCCESS) {
GELOGE(ret, "[RunGraphWithStreamAsync] StartForRunGraph failed!");
graph_node->SetRunFlag(false);
return ret;
}

ret = graph_executor_.ExecuteGraphWithStream(graph_id, graph_node->GetGeRootModel(),
inputs, outputs, stream);
graph_node->SetRunFlag(false);
graph_node->SetIsSpecificStream(false);
if (ret != SUCCESS) {
GELOGE(ret, "[RunGraphWithStreamAsync] execute graph failed, graph_id = %u.", graph_id);
return ret;
}
GELOGI("[RunGraphWithStreamAsync] run graph success, graph_id = %u.", graph_id);
return SUCCESS;
}

Status GraphManager::RunGraph(const GraphId &graph_id, const std::vector<GeTensor> &inputs,
std::vector<GeTensor> &outputs, uint64_t session_id) {
ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther);


+ 12
- 0
ge/graph/manager/graph_manager.h View File

@@ -103,6 +103,18 @@ class GraphManager {
Status RunGraph(const GraphId &graph_id, const std::vector<GeTensor> &inputs, std::vector<GeTensor> &outputs,
uint64_t session_id = INVALID_SESSION_ID);

///
/// @ingroup ge_graph
/// @brief run specific graph with specific session id and stream
/// @param [in] graph_id graph id
/// @param [in] inputs input data
/// @param [in] stream specific stream
/// @param [out] outputs output data
/// @return Status result of function
///
Status RunGraphWithStreamAsync(const GraphId &graph_id, const std::vector<GeTensor> &inputs,
std::vector<GeTensor> &outputs, rtStream_t stream, uint64_t session_id);

///
/// @ingroup ge_graph
/// @brief build specific graph


+ 1
- 0
ge/graph/manager/graph_manager_utils.cc View File

@@ -41,6 +41,7 @@ GraphNode::GraphNode(GraphId graph_id)
build_flag_(false),
load_flag_(false),
async_(false),
is_specific_stream_(false),
ge_model_(nullptr),
sem_(1) {
graph_run_async_listener_ = MakeShared<RunAsyncListener>();


+ 3
- 0
ge/graph/manager/graph_manager_utils.h View File

@@ -167,6 +167,8 @@ class GraphNode {
void UpdateLoadFlag() { load_flag_ = load_count_ == 0 || load_record_ >= kMaxLoadNum; }
void SetLoadFlag(bool load_flag) { load_flag_ = load_flag; }
void SetGeModel(const GeModelPtr &ge_model) { ge_model_ = ge_model; }
void SetIsSpecificStream(bool specific_stream) { is_specific_stream_ = specific_stream; }
bool CheckIsSpecificStream() { return is_specific_stream_; }
GeModelPtr GetGeModel() const { return ge_model_; }
void SetGeRootModel(const GeRootModelPtr &ge_root_model) { ge_root_model_ = ge_root_model; }
GeRootModelPtr GetGeRootModel() const { return ge_root_model_; }
@@ -197,6 +199,7 @@ class GraphNode {
// load_flag_ is true if more than 1 model were loaded
bool load_flag_;
bool async_;
bool is_specific_stream_;
GeModelPtr ge_model_;
GeRootModelPtr ge_root_model_;
BlockingQueue<uint8_t> sem_;


+ 6
- 1
ge/model/ge_root_model.h View File

@@ -32,12 +32,16 @@ class GeRootModel {
return subgraph_instance_name_to_model_;
};

const ComputeGraphPtr &GetRootGraph() const { return root_graph_; }
const ComputeGraphPtr &GetRootGraph() const { return root_graph_; };
void SetModelId(uint32_t model_id) {
model_id_ = model_id;
// cached for removement
model_ids_.emplace_back(model_id);
}

void SetIsSpecificStream(bool is_specific_stream) { is_specific_stream_ = is_specific_stream; }
bool CheckIsSpecificStream() {return is_specific_stream_; }

uint32_t GetModelId() const { return model_id_; }

std::vector<uint32_t> GetAllModelId() const { return model_ids_; }
@@ -58,6 +62,7 @@ class GeRootModel {
ComputeGraphPtr root_graph_ = nullptr;
std::map<std::string, GeModelPtr> subgraph_instance_name_to_model_;
uint32_t model_id_ = 0;
bool is_specific_stream_ = false;
// In multithread online secenario, same graph can owns different davinci_model for for concurrency
std::vector<uint32_t> model_ids_;
bool train_flag_ = false;


+ 37
- 0
ge/session/inner_session.cc View File

@@ -262,6 +262,43 @@ Status InnerSession::RunGraph(uint32_t graph_id, const std::vector<Tensor> &inpu
}
}

Status InnerSession::RunGraphWithStreamAsync(uint32_t graph_id, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs, rtStream_t stream) {
GELOGI("[InnerSession:%lu] run graph with stream on session, graph_id=%u.", session_id_, graph_id);
if (mutex_.try_lock()) { // 必要的吗?
std::lock_guard<std::mutex> lock(mutex_, std::adopt_lock);
if (!init_flag_) { // 必要的吗?
GELOGE(GE_SESS_INIT_FAILED, "[InnerSession:%lu] initialize failed.", session_id_);
return GE_SESS_INIT_FAILED;
}
UpdateThreadContext(graph_id);
vector<GeTensor> ge_inputs;
for (auto &item : inputs) {
ge_inputs.emplace_back(TensorAdapter::AsGeTensor(item));
}
vector<GeTensor> ge_outputs;
for (auto &item : outputs) {
ge_outputs.emplace_back(TensorAdapter::AsGeTensor(item));
}
Status ret = graph_manager_.RunGraphWithStreamAsync(graph_id, ge_inputs, ge_outputs, stream, session_id_);
domi::GetContext().out_nodes_map.clear();
domi::GetContext().user_out_nodes.clear();
if (ret != SUCCESS) {
GELOGE(ret, "[Run][GraphWithStreamAsync]failed, InnerSession:%lu graph_id=%u.", session_id_, graph_id);
REPORT_CALL_ERROR("E19999",
"GraphManager RunGrapWithStreamhAsync failed, InnerSession:%lu graph_id=%u.",
session_id_, graph_id);
return ret;
}

GELOGI("[InnerSession:%lu] run graph success, graph_id=%u.", session_id_, graph_id);
return SUCCESS;
} else {
GELOGE(GE_SESS_ALREADY_RUNNING, "[InnerSession:%lu] run graph failed, graph_id=%u.", session_id_, graph_id);
return GE_SESS_ALREADY_RUNNING;
}
}

Status InnerSession::RemoveGraph(uint32_t graph_id) {
std::lock_guard<std::mutex> lock(resource_mutex_);
if (!init_flag_) {


+ 3
- 0
ge/session/inner_session.h View File

@@ -41,6 +41,9 @@ class InnerSession {

Status RunGraph(uint32_t graph_id, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs);

Status RunGraphWithStreamAsync(uint32_t graph_id, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs,
rtStream_t stream);

Status RemoveGraph(uint32_t graph_id);

Status BuildGraph(uint32_t graph_id, const std::vector<InputTensorInfo> &inputs);


+ 27
- 0
ge/session/session_manager.cc View File

@@ -242,6 +242,33 @@ Status SessionManager::RunGraph(SessionId session_id, uint32_t graph_id, const s
return innerSession->RunGraph(graph_id, inputs, outputs);
}

Status SessionManager::RunGraphWithStreamAsync(SessionId session_id,
uint32_t graph_id,
const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs,
rtStream_t stream) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT,
"[RunWithStream][Graph]Session manager is not initialized, session_id:%lu, graph_id:%u.",
session_id, graph_id);
REPORT_INNER_ERROR("E19999",
"RunGraphWithStreamAsync fail for Session manager is not initialized, session_id:%lu, graph_id:%u.",
session_id, graph_id);
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
}
}
return innerSession->RunGraphWithStreamAsync(graph_id, inputs, outputs, stream);
}

Status SessionManager::RemoveGraph(SessionId session_id, uint32_t graph_id) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT,


+ 14
- 0
ge/session/session_manager.h View File

@@ -25,6 +25,7 @@
#include "common/ge_inner_error_codes.h"
#include "ge/ge_api_types.h"
#include "session/inner_session.h"
#include "runtime/base.h"

namespace ge {
using SessionPtr = std::shared_ptr<InnerSession>;
@@ -96,6 +97,19 @@ class SessionManager {
Status RunGraph(SessionId session_id, uint32_t graph_id, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs);

///
/// @ingroup ge_session
/// @brief run a graph of the session with specific stream asynchronously
/// @param [in] session_id session id
/// @param [in] graph_id graph id
/// @param [in] inputs input data
/// @param [in] stream specific stream
/// @param [out] outputs output data
/// @return Status result of function
///
Status RunGraphWithStreamAsync(SessionId session_id, uint32_t graph_id, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs, rtStream_t stream);

///
/// @ingroup ge_session
/// @brief remove a graph from the session with specific session id


+ 12
- 0
inc/external/ge/ge_api.h View File

@@ -121,6 +121,18 @@ class GE_FUNC_VISIBILITY Session {
///
Status RunGraph(uint32_t graphId, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs);

///
/// @ingroup ge_graph
/// @brief run a graph of the session with specific session id and specific stream asynchronously
/// @param [in] graph_id graph id
/// @param [in] inputs input data
/// @param [in] stream specific stream
/// @param [out] outputs output data
/// @return Status result of function
///
Status RunGraphWithStreamAsync(uint32_t graph_id, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs,
void *stream);

///
/// @ingroup ge_graph
/// @brief build graph in the session with specific session id


+ 1
- 1
metadef

@@ -1 +1 @@
Subproject commit fcebf37d7428caf4e0bd6e6c3a4f8143f6eac8b7
Subproject commit 1e88df1d6bfe60faae0aa9fa2d87f273b793aeb0

+ 1
- 1
parser

@@ -1 +1 @@
Subproject commit 424ac0609fe17f455865436462a2c62f85aea2b1
Subproject commit df9abef65f902f37ca664f6dda4c60727dac2aca

Loading…
Cancel
Save