Browse Source

Run raph with stream.

pull/1505/head
zhaozhixuan 4 years ago
parent
commit
a86cd3d469
14 changed files with 284 additions and 0 deletions
  1. +32
    -0
      ge/client/ge_api.cc
  2. +64
    -0
      ge/graph/execute/graph_execute.cc
  3. +6
    -0
      ge/graph/execute/graph_execute.h
  4. +12
    -0
      ge/graph/load/graph_loader.cc
  5. +58
    -0
      ge/graph/manager/graph_manager.cc
  6. +12
    -0
      ge/graph/manager/graph_manager.h
  7. +1
    -0
      ge/graph/manager/graph_manager_utils.cc
  8. +3
    -0
      ge/graph/manager/graph_manager_utils.h
  9. +3
    -0
      ge/model/ge_root_model.h
  10. +37
    -0
      ge/session/inner_session.cc
  11. +3
    -0
      ge/session/inner_session.h
  12. +27
    -0
      ge/session/session_manager.cc
  13. +14
    -0
      ge/session/session_manager.h
  14. +12
    -0
      inc/external/ge/ge_api.h

+ 32
- 0
ge/client/ge_api.cc View File

@@ -582,6 +582,38 @@ Status Session::RunGraph(uint32_t graph_id, const std::vector<Tensor> &inputs, s
return ret;
}

// Run Graph with stream Asynchronously
Status Session::RunGraphWithStreamAsync(uint32_t graph_id, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs, void *stream) {
ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther);
GELOGT(TRACE_INIT, "Session RunGraphWithStreamAsync start");

ErrorManager::GetInstance().GenWorkStreamIdBySessionGraph(sessionId_, graph_id);
std::shared_ptr<GELib> instance_ptr = ge::GELib::GetInstance();
if (instance_ptr == nullptr || !instance_ptr->InitFlag()) {
GELOGE(GE_CLI_GE_NOT_INITIALIZED,
"[Run][Graph]RunGraphWithStreamAsyncFailed, the GELib instance is nullptr or is not InitFlag.");
REPORT_INNER_ERROR("E19999",
"RunGraphWithStreamAsync Failed, the GELib instance is nullptr or is not InitFlag.");
return FAILED;
}
GELOGT(TRACE_RUNNING, "Run Graph RunGraphWithStreamAsync");
Status ret = instance_ptr->SessionManagerObj().RunGraphWithStreamAsync(sessionId_, graph_id, inputs, outputs,
stream);
if (ret != SUCCESS) {
GELOGE(ret, "[Run][Graph]RunGraphWithStreamAsync Failed, error code:%u, session_id:%lu, graph_id:%u.",
ret, sessionId_, graph_id);
return FAILED;
}

if (!outputs.empty()) {
PrintOutputResult(outputs);
}

GELOGT(TRACE_STOP, "Session RunGraph finished");
return SUCCESS;
}

// Register Call Back
Status Session::RegisterCallBackFunc(const std::string &key, const pCallBackFunc &callback) {
ErrorManager::GetInstance().GenWorkStreamIdDefault();


+ 64
- 0
ge/graph/execute/graph_execute.cc View File

@@ -400,6 +400,70 @@ Status GraphExecutor::ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &
return SUCCESS;
}

void GraphExecutor::GetInputOutputData(const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor,
InputData &inputs,
OutputData &outputs) {
graph_input_data.index = 0;
graph_input_data.timeout = 0;
graph_input_data.timestamp = 0;

for (const auto &tensor : input_tensor) {
DataBuffer in_data_buf;
in_data_buf.data = reinterpret_cast<uint8_t *>(tensor.GetData().data());
in_data_buf.length = tensor.GetData().size();
in_data_buf.isDataSupportMemShare = false;
inputs.blobs.emplace_back(in_data_buf);
}

outputs.index = 0;
for (const auto &tensor : output_tensor) {
DataBuffer out_data_buf;
out_data_buf.data = reinterpret_cast<uint8_t *>(tensor.GetData().data());
out_data_buf.length = tensor.GetData().size();
out_data_buf.isDataSupportMemShare = false;
outputs.blobs.emplace_back(out_data_buf);
}
}

Status GraphExecutor::ExecuteGraphWithStream(GraphId graph_id,
const GeRootModelPtr &ge_root_model,
const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor,
rtStream_t stream) {
GELOGI("[GraphExecutor] Start to execute graph with stream, graph_id=%u", graph_id);
if (!init_flag_) {
GELOGE(GE_GRAPH_EXECUTE_NOT_INIT, "[GraphExecutor] AI Core Engine without calling SetCondition!");
return GE_GRAPH_EXECUTE_NOT_INIT;
}

if (graph_id != last_graph_id_) {
auto ret = FreeExecuteMemory();
if (ret != SUCCESS) {
return ret;
}
}
last_graph_id_ = graph_id;

GE_CHECK_NOTNULL_EXEC(ge_root_model, return FAILED);
auto model_id = ge_root_model->GetModelId();
InputData input_data;
OutputData output_data;
input_data.model_id = model_id;
output_data.model_id = model_id;
GetInputOutputData(input_tensor, output_tensor, input_data, output_data);

auto async_mode = true;
std::vector<GeTensorDesc> input_ge_desc;
std::vector<GeTensorDesc> output_ge_desc;
auto model_manager = ge::ModelManager::GetInstance();
GE_CHECK_NOTNULL(model_manager);
model_manager->ExecuteModel(model_id, stream, async_mode, input_data, input_ge_desc, output_data, output_ge_desc);

GELOGI("[GraphExecutor] execute model success, modelId=%u.", model_id);
return SUCCESS;
}

Status GraphExecutor::AsyncExecuteModel(uint32_t model_id, const std::vector<InputTensorInfo> &inputs) {
try {
auto model_manager = ge::ModelManager::GetInstance();


+ 6
- 0
ge/graph/execute/graph_execute.h View File

@@ -52,6 +52,12 @@ class GraphExecutor {
ge::Status ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &ge_root_model,
const std::vector<InputTensorInfo> &input_tensor);

Status ExecuteGraphWithStream(GraphId graph_id,
const GeRootModelPtr &ge_root_model,
const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor,
rtStream_t stream);

Status SetCondition(std::mutex *mutex, std::condition_variable *cond, std::shared_ptr<GraphModelListener> listener);

Status SetGraphContext(GraphContextPtr graph_context_ptr);


+ 12
- 0
ge/graph/load/graph_loader.cc View File

@@ -81,6 +81,18 @@ Status GraphLoader::LoadModelOnline(uint32_t &model_id, const std::shared_ptr<ge
return ret;
}

if (ge_root_model_ptr->CheckIsSpecificStream()) {
GELOGI("No need to start a new thread to run model in specific scene");
rt_ret = rtDeviceReset(GetContext().DeviceId());
if (rt_ret != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtDeviceReset failed, device_id:%u, ret:0x%X",
GetContext().DeviceId(), rt_ret);
GELOGE(RT_FAILED, "Call rt api failed, ret: 0x%X", rt_ret);
}

return SUCCESS;
}

ret = model_manager->Start(model_id);
if (ret != SUCCESS) {
if (model_manager->Unload(model_id) != SUCCESS) {


+ 58
- 0
ge/graph/manager/graph_manager.cc View File

@@ -1004,6 +1004,7 @@ Status GraphManager::LoadGraph(const GeRootModelPtr &ge_root_model, const GraphN
GE_CHK_STATUS_RET(CheckAndReleaseMemory(ge_model, graph_node));
}
}
ge_root_model->SetIsSpecificStream(graph_node->CheckIsSpecificStream());
GE_TIMESTAMP_START(LoadGraph);
Status ret = GraphLoader::LoadModelOnline(model_id_info.model_id, ge_root_model, model_listener);
GE_TIMESTAMP_EVENT_END(LoadGraph, "GraphManager::LoadGraph");
@@ -1127,6 +1128,63 @@ Status GraphManager::InnerRunGraph(GraphNodePtr &graph_node, const GraphId &grap
return SUCCESS;
}

Status RunGraphWithStreamAsync(const GraphId &graph_id, const std::vector<GeTensor> &inputs,
std::vector<GeTensor> &outputs, rtStream_t stream, uint64_t session_id) {
ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther);
std::lock_guard<std::mutex> lock(run_mutex_);
GELOGI("[RunGraphWithStreamAsync] start to run graph, graph_id = %u, is_train_graph: %d", graph_id, GetTrainFlag());

if (inputs.empty()) {
GELOGI("[RunGraphWithStreamAsync] initialize sub graph has no inputs");
}

// find graph
GraphNodePtr graph_node = nullptr;
Status ret = GetGraphNode(graph_id, graph_node);
if (ret != SUCCESS) {
GELOGE(ret, "[RunGraphWithStreamAsync] graph not exist, graph_id = %u.", graph_id);
return ret;
}
if (graph_node == nullptr) {
GELOGE(GE_GRAPH_GRAPH_NODE_NULL, "[RunGraphWithStreamAsync] graph node is NULL, graph_id = %u.", graph_id);
return GE_GRAPH_GRAPH_NODE_NULL;
}
if (graph_node->GetRunFlag()) {
GELOGE(GE_GRAPH_ALREADY_RUNNING, "[RunGraphWithStreamAsync] graph already running, graph id = %u", graph_id);
return GE_GRAPH_ALREADY_RUNNING;
}

UpdateLocalOmgContext(graph_id);
// set graph's run flag
graph_node->SetRunFlag(true);
graph_node->SetIsSpecificStream(true);
ComputeGraphPtr compute_graph_tmp = GraphUtils::GetComputeGraph(*(graph_node->GetGraph()));

// when set incre build, add cache helper map
AddModelCacheHelperToMap(graph_id, session_id, compute_graph_tmp);
if (options_.local_fmk_op_flag) {
GetCompilerStages(graph_id).optimizer.TranFrameOp(compute_graph_tmp);
}
GeRootModelPtr ge_root_model = nullptr;
ret = StartForRunGraph(graph_node, inputs, ge_root_model, session_id);
if (ret != SUCCESS) {
GELOGE(ret, "[RunGraphWithStreamAsync] StartForRunGraph failed!");
graph_node->SetRunFlag(false);
return ret;
}

auto ret = graph_executor_.ExecuteGraphWithStream(graph_id, graph_node->GetGeRootModel(),
inputs, outputs, stream);
graph_node->SetRunFlag(false);
graph_node->SetIsSpecificStream(false);
if (ret != SUCCESS) {
GELOGE(ret, "[RunGraphWithStreamAsync] execute graph failed, graph_id = %u.", graph_id);
return ret;
}
GELOGI("[RunGraphWithStreamAsync] run graph success, graph_id = %u.", graph_id);
return SUCCESS;
}

Status GraphManager::RunGraph(const GraphId &graph_id, const std::vector<GeTensor> &inputs,
std::vector<GeTensor> &outputs, uint64_t session_id) {
ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther);


+ 12
- 0
ge/graph/manager/graph_manager.h View File

@@ -103,6 +103,18 @@ class GraphManager {
Status RunGraph(const GraphId &graph_id, const std::vector<GeTensor> &inputs, std::vector<GeTensor> &outputs,
uint64_t session_id = INVALID_SESSION_ID);

///
/// @ingroup ge_graph
/// @brief run specific graph with specific session id and stream
/// @param [in] graph_id graph id
/// @param [in] inputs input data
/// @param [in] stream specific stream
/// @param [out] outputs output data
/// @return Status result of function
///
Status RunGraphWithStreamAsync(const GraphId &graph_id, const std::vector<GeTensor> &inputs,
std::vector<GeTensor> &outputs, rtStream_t stream, uint64_t session_id);

///
/// @ingroup ge_graph
/// @brief build specific graph


+ 1
- 0
ge/graph/manager/graph_manager_utils.cc View File

@@ -41,6 +41,7 @@ GraphNode::GraphNode(GraphId graph_id)
build_flag_(false),
load_flag_(false),
async_(false),
is_specific_stream_(false),
ge_model_(nullptr),
sem_(1) {
graph_run_async_listener_ = MakeShared<RunAsyncListener>();


+ 3
- 0
ge/graph/manager/graph_manager_utils.h View File

@@ -164,6 +164,8 @@ class GraphNode {
bool GetLoadFlag() const { return load_flag_; }
void SetLoadFlag(bool load_flag) { load_flag_ = load_flag; }
void SetGeModel(const GeModelPtr &ge_model) { ge_model_ = ge_model; }
void SetIsSpecificStream(bool specific_stream) { is_specific_stream_ = specific_stream; }
bool CheckIsSpecificStream() { return is_specific_stream_; }
GeModelPtr GetGeModel() const { return ge_model_; }
void SetGeRootModel(const GeRootModelPtr &ge_root_model) { ge_root_model_ = ge_root_model; }
GeRootModelPtr GetGeRootModel() const { return ge_root_model_; }
@@ -186,6 +188,7 @@ class GraphNode {
bool build_flag_;
bool load_flag_;
bool async_;
bool is_specific_stream_;
GeModelPtr ge_model_;
GeRootModelPtr ge_root_model_;
BlockingQueue<uint8_t> sem_;


+ 3
- 0
ge/model/ge_root_model.h View File

@@ -34,6 +34,8 @@ class GeRootModel {

const ComputeGraphPtr &GetRootGraph() const { return root_graph_; };
void SetModelId(uint32_t model_id) { model_id_ = model_id; }
void SetIsSpecificStream(bool is_specific_stream) { is_specific_stream_ = is_specific_stream; }
bool CheckIsSpecificStream() {return is_specific_stream_; }
uint32_t GetModelId() const { return model_id_; }
Status CheckIsUnknownShape(bool &is_dynamic_shape);
void SetRootGraph(ComputeGraphPtr graph) { root_graph_ = graph; }
@@ -41,6 +43,7 @@ class GeRootModel {
ComputeGraphPtr root_graph_ = nullptr;
std::map<std::string, GeModelPtr> subgraph_instance_name_to_model_;
uint32_t model_id_ = 0;
bool is_specific_stream_ = false;
};
} // namespace ge
using GeRootModelPtr = std::shared_ptr<ge::GeRootModel>;


+ 37
- 0
ge/session/inner_session.cc View File

@@ -262,6 +262,43 @@ Status InnerSession::RunGraph(uint32_t graph_id, const std::vector<Tensor> &inpu
}
}

Status InnerSession::RunGraphWithStreamAsync(uint32_t graph_id, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs, rtStream_t stream) {
GELOGI("[InnerSession:%lu] run graph with stream on session, graph_id=%u.", session_id_, graph_id);
if (mutex_.try_lock()) { // 必要的吗?
std::lock_guard<std::mutex> lock(mutex_, std::adopt_lock);
if (!init_flag_) { // 必要的吗?
GELOGE(GE_SESS_INIT_FAILED, "[InnerSession:%lu] initialize failed.", session_id_);
return GE_SESS_INIT_FAILED;
}
UpdateThreadContext(graph_id);
vector<GeTensor> geInputs;
for (auto &item : inputs) {
geInputs.emplace_back(TensorAdapter::AsGeTensor(item));
}
vector<GeTensor> geOutputs;
for (auto &item : outputs) {
ge_outputs.emplace_back(TensorAdapter::AsGeTensor(item));
}
Status ret = graph_manager_.RunGraphWithStreamAsync(graph_id, geInputs, geOutputs, stream, session_id_);
domi::GetContext().out_nodes_map.clear();
domi::GetContext().user_out_nodes.clear();
if (ret != SUCCESS) {
GELOGE(ret, "[Run][GraphWithStreamAsync]failed, InnerSession:%lu graph_id=%u.", session_id_, graph_id);
REPORT_CALL_ERROR("E19999",
"GraphManager RunGrapWithStreamhAsync failed, InnerSession:%lu graph_id=%u.",
session_id_, graph_id);
return ret;
}

GELOGI("[InnerSession:%lu] run graph success, graph_id=%u.", session_id_, graph_id);
return SUCCESS;
} else {
GELOGE(GE_SESS_ALREADY_RUNNING, "[InnerSession:%lu] run graph failed, graph_id=%u.", session_id_, graph_id);
return GE_SESS_ALREADY_RUNNING;
}
}

Status InnerSession::RemoveGraph(uint32_t graph_id) {
std::lock_guard<std::mutex> lock(resource_mutex_);
if (!init_flag_) {


+ 3
- 0
ge/session/inner_session.h View File

@@ -41,6 +41,9 @@ class InnerSession {

Status RunGraph(uint32_t graph_id, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs);

Status RunGraphWithStreamAsync(uint32_t graph_id, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs,
rtStream_t stream);

Status RemoveGraph(uint32_t graph_id);

Status BuildGraph(uint32_t graph_id, const std::vector<InputTensorInfo> &inputs);


+ 27
- 0
ge/session/session_manager.cc View File

@@ -242,6 +242,33 @@ Status SessionManager::RunGraph(SessionId session_id, uint32_t graph_id, const s
return innerSession->RunGraph(graph_id, inputs, outputs);
}

Status SessionManager::RunGraphWithStreamAsync(SessionId session_id,
uint32_t graph_id,
const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs,
rtStream_t stream) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT,
"[RunWithStream][Graph]Session manager is not initialized, session_id:%lu, graph_id:%u.",
session_id, graph_id);
REPORT_INNER_ERROR("E19999",
"RunGraphWithStreamAsync fail for Session manager is not initialized, session_id:%lu, graph_id:%u.",
session_id, graph_id);
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
}
}
return innerSession->RunGraphWithStreamAsync(graph_id, inputs, outputs, stream);
}

Status SessionManager::RemoveGraph(SessionId session_id, uint32_t graph_id) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT,


+ 14
- 0
ge/session/session_manager.h View File

@@ -25,6 +25,7 @@
#include "common/ge_inner_error_codes.h"
#include "ge/ge_api_types.h"
#include "session/inner_session.h"
#include "runtime/base.h"

namespace ge {
using SessionPtr = std::shared_ptr<InnerSession>;
@@ -96,6 +97,19 @@ class SessionManager {
Status RunGraph(SessionId session_id, uint32_t graph_id, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs);

///
/// @ingroup ge_session
/// @brief run a graph of the session with specific stream asynchronously
/// @param [in] session_id session id
/// @param [in] graph_id graph id
/// @param [in] inputs input data
/// @param [in] stream specific stream
/// @param [out] outputs output data
/// @return Status result of function
///
Status RunGraphWithStreamAsync(SessionId session_id, uint32_t graph_id, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs, rtStream_t stream);

///
/// @ingroup ge_session
/// @brief remove a graph from the session with specific session id


+ 12
- 0
inc/external/ge/ge_api.h View File

@@ -121,6 +121,18 @@ class GE_FUNC_VISIBILITY Session {
///
Status RunGraph(uint32_t graphId, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs);

///
/// @ingroup ge_graph
/// @brief run a graph of the session with specific session id and specific stream asynchronously
/// @param [in] graph_id graph id
/// @param [in] inputs input data
/// @param [in] stream specific stream
/// @param [out] outputs output data
/// @return Status result of function
///
Status RunGraphWithStreamAsync(uint32_t graph_id, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs,
void *stream);

///
/// @ingroup ge_graph
/// @brief build graph in the session with specific session id


Loading…
Cancel
Save