From 012f70a753e379f28d3445b264530349862463ed Mon Sep 17 00:00:00 2001 From: chuxing Date: Wed, 30 Dec 2020 17:25:12 +0800 Subject: [PATCH 1/3] Single op with multiple tasks --- ge/hybrid/common/tensor_value.cc | 2 +- ge/hybrid/executor/hybrid_model_executor.cc | 3 +- ge/hybrid/executor/subgraph_executor.cc | 44 ++++++++++++++++++++- ge/hybrid/executor/subgraph_executor.h | 15 ++++++- ge/hybrid/model/hybrid_model.cc | 8 +++- ge/hybrid/model/hybrid_model.h | 2 +- ge/hybrid/model/hybrid_model_builder.cc | 12 ++++++ ge/hybrid/model/hybrid_model_builder.h | 1 + ge/hybrid/node_executor/task_context.cc | 2 +- ge/single_op/single_op.cc | 21 +++++++++- ge/single_op/single_op.h | 5 ++- ge/single_op/single_op_model.cc | 21 ++++++++++ ge/single_op/stream_resource.cc | 4 ++ ge/single_op/stream_resource.h | 1 + 14 files changed, 129 insertions(+), 12 deletions(-) diff --git a/ge/hybrid/common/tensor_value.cc b/ge/hybrid/common/tensor_value.cc index 16ecfaa4..c691c6f3 100644 --- a/ge/hybrid/common/tensor_value.cc +++ b/ge/hybrid/common/tensor_value.cc @@ -71,7 +71,7 @@ TensorValue::TensorValue(void *buffer, size_t size) : ref_buffer_(buffer), ref_s TensorValue::~TensorValue() { Destroy(); } void TensorValue::Destroy() { - if (buffer_ != nullptr || ref_buffer_ != nullptr) { + if (buffer_ != nullptr) { GELOGD("Unref tensor: %s", DebugString().c_str()); buffer_.reset(); } diff --git a/ge/hybrid/executor/hybrid_model_executor.cc b/ge/hybrid/executor/hybrid_model_executor.cc index e17998db..4043c964 100755 --- a/ge/hybrid/executor/hybrid_model_executor.cc +++ b/ge/hybrid/executor/hybrid_model_executor.cc @@ -68,7 +68,8 @@ Status HybridModelExecutor::ExecuteGraphInternal(SubgraphExecutor &executor, GE_CHK_STATUS_RET_NOLOG(ResetExecutionContext(context_)); RECORD_MODEL_EXECUTION_EVENT(&context_, "[InitContext] End"); - GE_CHK_STATUS_RET(executor.ExecuteAsync(args.inputs, args.input_desc), "Failed to execute partitioned call."); + GE_CHK_STATUS_RET(executor.ExecuteAsync(args.inputs, args.input_desc, args.outputs), + "Failed to execute partitioned call."); RECORD_MODEL_EXECUTION_EVENT(&context_, "[ExecuteAsync] End"); GE_CHK_STATUS_RET(executor.Synchronize(), "Failed to sync root graph."); diff --git a/ge/hybrid/executor/subgraph_executor.cc b/ge/hybrid/executor/subgraph_executor.cc index 4b6dddab..b6b333d9 100644 --- a/ge/hybrid/executor/subgraph_executor.cc +++ b/ge/hybrid/executor/subgraph_executor.cc @@ -131,10 +131,14 @@ Status SubgraphExecutor::InitInputsForKnownShape(const std::vector } Status SubgraphExecutor::ExecuteAsync(const std::vector &inputs, - const std::vector &input_desc) { + const std::vector &input_desc, + const std::vector &outputs) { GELOGD("[%s] is dynamic = %s", graph_item_->GetName().c_str(), graph_item_->IsDynamic() ? "true" : "false"); GE_CHK_STATUS_RET(Init(inputs, input_desc), "[%s] Failed to init executor.", graph_item_->GetName().c_str()); - + if (!outputs.empty()) { + GE_CHK_STATUS_RET(EnableOutputZeroCopy(outputs), + "Failed to enable output zero copy by user provided outputs."); + } if (!graph_item_->IsDynamic()) { return ExecuteAsyncForKnownShape(inputs); } @@ -144,6 +148,11 @@ Status SubgraphExecutor::ExecuteAsync(const std::vector &inputs, return SUCCESS; } +Status SubgraphExecutor::ExecuteAsync(const std::vector &inputs, + const std::vector &input_desc) { + return ExecuteAsync(inputs, input_desc, {}); +} + Status SubgraphExecutor::ExecuteAsyncForKnownShape(const std::vector &inputs) { GELOGD("[%s] subgraph is not dynamic.", graph_item_->GetName().c_str()); if (graph_item_->GetAllNodes().size() != 1) { @@ -413,5 +422,36 @@ Status SubgraphExecutor::SetOutputsToParentNode(TaskContext &task_context) { return SUCCESS; } +Status SubgraphExecutor::EnableOutputZeroCopy(const vector &outputs) { + GELOGD("To enable zero copy, output number = %zu", outputs.size()); + const auto &output_edges = graph_item_->GetOutputEdges(); + // Op -> MetOutput, set the output tensor of Op that output to the NetOutput node + if (outputs.size() != output_edges.size()) { + GELOGE(PARAM_INVALID, "Output number mismatches, expect = %zu, but given = %zu", + output_edges.size(), + outputs.size()); + return PARAM_INVALID; + } + + for (size_t i = 0; i < outputs.size(); ++i) { + auto &output_tensor = outputs[i]; + auto &output_node = output_edges[i].first; + int output_idx = output_edges[i].second; + GELOGD("[%s] Set output tensor[%zu] to [%s]'s output[%d], tensor = %s", + graph_item_->GetName().c_str(), + i, + output_node->NodeName().c_str(), + output_idx, + output_tensor.DebugString().c_str()); + + GE_CHK_STATUS_RET(subgraph_context_->SetOutput(*output_node, output_idx, output_tensor), + "[%s] Failed to set input tensor[%zu]", + graph_item_->GetName().c_str(), + i); + } + + GELOGD("Done enabling zero copy for outputs successfully."); + return SUCCESS; +} } // namespace hybrid } // namespace ge diff --git a/ge/hybrid/executor/subgraph_executor.h b/ge/hybrid/executor/subgraph_executor.h index d1949947..2a227535 100644 --- a/ge/hybrid/executor/subgraph_executor.h +++ b/ge/hybrid/executor/subgraph_executor.h @@ -43,7 +43,19 @@ class SubgraphExecutor { * @param input_desc input tensor descriptions * @return SUCCESS on success, error code otherwise */ - Status ExecuteAsync(const std::vector &inputs, const std::vector &input_desc); + Status ExecuteAsync(const std::vector &inputs, + const std::vector &input_desc); + + /** + * Execute subgraph async, output tensor address(not data) and output tensor descriptions are + * valid after this method returned + * @param inputs input tensors + * @param input_desc input tensor descriptions + * @return SUCCESS on success, error code otherwise + */ + Status ExecuteAsync(const std::vector &inputs, + const std::vector &input_desc, + const std::vector &outputs); /** * Execute subgraph async, output tensor address(not data) and output tensor descriptions are @@ -75,6 +87,7 @@ class SubgraphExecutor { Status GetOutputs(std::vector &outputs, std::vector &output_desc); private: + Status EnableOutputZeroCopy(const std::vector &outputs); static Status PrepareForExecution(GraphExecutionContext *ctx, NodeState &node_state); static Status InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state); Status Init(const std::vector &inputs, diff --git a/ge/hybrid/model/hybrid_model.cc b/ge/hybrid/model/hybrid_model.cc index 91b6a549..0e09501a 100644 --- a/ge/hybrid/model/hybrid_model.cc +++ b/ge/hybrid/model/hybrid_model.cc @@ -40,9 +40,13 @@ HybridModel::~HybridModel() { GELOGD("[%s] HybridModel destroyed.", model_name_.c_str()); } -Status HybridModel::Init() { +Status HybridModel::Init(bool is_single_op) { GELOGD("Start to init hybrid model."); - GE_CHK_STATUS_RET(HybridModelBuilder(*this).Build(), "Failed to build hybrid model."); + if (is_single_op) { + GE_CHK_STATUS_RET(HybridModelBuilder(*this).BuildForSingleOp(), "Failed to build hybrid model."); + } else { + GE_CHK_STATUS_RET(HybridModelBuilder(*this).Build(), "Failed to build hybrid model."); + } GELOGD("HybridModel initialized successfully."); return SUCCESS; } diff --git a/ge/hybrid/model/hybrid_model.h b/ge/hybrid/model/hybrid_model.h index e521b776..488036c5 100644 --- a/ge/hybrid/model/hybrid_model.h +++ b/ge/hybrid/model/hybrid_model.h @@ -37,7 +37,7 @@ class HybridModel { ~HybridModel(); - Status Init(); + Status Init(bool is_single_op = false); const NodeItem *GetNodeItem(const NodePtr &node) const; diff --git a/ge/hybrid/model/hybrid_model_builder.cc b/ge/hybrid/model/hybrid_model_builder.cc index 46c9c39b..064737b4 100755 --- a/ge/hybrid/model/hybrid_model_builder.cc +++ b/ge/hybrid/model/hybrid_model_builder.cc @@ -138,6 +138,18 @@ Status HybridModelBuilder::Build() { return SUCCESS; } +Status HybridModelBuilder::BuildForSingleOp() { + GE_CHK_STATUS_RET(ValidateParams(), "Failed to validate GeRootModel"); + hybrid_model_.model_name_ = ge_root_model_->GetRootGraph()->GetName(); + GELOGI("[%s] Start to build hybrid model.", GetGraphName()); + GE_CHK_STATUS_RET(IndexTaskDefs(), "[%s] Failed to index task defs", GetGraphName()); + GE_CHK_STATUS_RET(LoadGraph(), "[%s] Failed to load graph", GetGraphName()); + GE_CHK_STATUS_RET(InitWeights(), "[%s] Failed to init weights", GetGraphName()); + GE_CHK_STATUS_RET(LoadTasks(), "[%s] Failed to load tasks", GetGraphName()); + GELOGI("[%s] Done building hybrid model for single op successfully.", GetGraphName()); + return SUCCESS; +} + Status HybridModelBuilder::ValidateParams() { GE_CHECK_NOTNULL(ge_root_model_); GE_CHECK_NOTNULL(ge_root_model_->GetRootGraph()); diff --git a/ge/hybrid/model/hybrid_model_builder.h b/ge/hybrid/model/hybrid_model_builder.h index a11faae2..24b75f61 100644 --- a/ge/hybrid/model/hybrid_model_builder.h +++ b/ge/hybrid/model/hybrid_model_builder.h @@ -35,6 +35,7 @@ class HybridModelBuilder { explicit HybridModelBuilder(HybridModel &hybrid_model); ~HybridModelBuilder() = default; Status Build(); + Status BuildForSingleOp(); private: static Status UpdateAnchorStatus(const NodePtr &node); diff --git a/ge/hybrid/node_executor/task_context.cc b/ge/hybrid/node_executor/task_context.cc index d15ea978..6b847078 100644 --- a/ge/hybrid/node_executor/task_context.cc +++ b/ge/hybrid/node_executor/task_context.cc @@ -388,7 +388,7 @@ Status TaskContext::PropagateOutputs() { subgraph_context_->all_inputs_[input_offset] = *tensor; if (execution_context_->trace_enabled) { subgraph_context_->all_inputs_[input_offset].SetName( - node_item_->NodeName() + "_in_" + std::to_string(dst_input_idx)); + dst_node_item ->NodeName() + "_in_" + std::to_string(dst_input_idx)); } } } diff --git a/ge/single_op/single_op.cc b/ge/single_op/single_op.cc index 1f3fc5c5..661af3fc 100755 --- a/ge/single_op/single_op.cc +++ b/ge/single_op/single_op.cc @@ -254,10 +254,27 @@ Status DynamicSingleOp::ExecuteAsync(const vector &input_desc, const vector &input_buffers, vector &output_desc, vector &output_buffers) { - GE_CHECK_NOTNULL(op_task_); GE_CHK_STATUS_RET_NOLOG(ValidateParams(input_desc, input_buffers, output_desc, output_buffers)); - std::lock_guard lk(*stream_mutex_); + if (hybrid_model_executor_ != nullptr) { + GELOGD("Execute multi-task dynamic single op by hybrid model executor"); + hybrid::HybridModelExecutor::ExecuteArgs args; + for (auto &input : input_buffers) { + args.inputs.emplace_back(hybrid::TensorValue(input.data, input.length)); + } + for (auto &output : output_buffers) { + args.outputs.emplace_back(hybrid::TensorValue(output.data, output.length)); + } + for (auto &tensor_desc : input_desc) { + auto desc = MakeShared(tensor_desc); + GE_CHECK_NOTNULL(desc); + args.input_desc.emplace_back(desc); + } + return hybrid_model_executor_->Execute(args); + } + + std::lock_guard lk(*stream_mutex_); + GE_CHECK_NOTNULL(op_task_); GE_CHK_STATUS_RET_NOLOG(op_task_->LaunchKernel(input_desc, input_buffers, output_desc, output_buffers, stream_)); GE_CHK_STATUS_RET_NOLOG(ProfilingTaskInfo(op_task_.get(), kShapeTypeDynamic)); return SUCCESS; diff --git a/ge/single_op/single_op.h b/ge/single_op/single_op.h index d677f94a..b350b684 100755 --- a/ge/single_op/single_op.h +++ b/ge/single_op/single_op.h @@ -28,6 +28,7 @@ #include "runtime/stream.h" #include "task/op_task.h" #include "cce/aicpu_engine_struct.h" +#include "hybrid/executor/hybrid_model_executor.h" namespace ge { class StreamResource; @@ -46,7 +47,7 @@ class SingleOp { Status GetArgs(const std::vector &inputs, const std::vector &outputs); friend class SingleOpModel; - StreamResource *stream_resource_; + StreamResource *stream_resource_ = nullptr; std::mutex *stream_mutex_; rtStream_t stream_ = nullptr; std::vector input_addr_list_; @@ -77,6 +78,8 @@ class DynamicSingleOp { std::vector &outputs) const; std::unique_ptr op_task_; + std::unique_ptr hybrid_model_; + std::unique_ptr hybrid_model_executor_; uintptr_t resource_id_ = 0; std::mutex *stream_mutex_; rtStream_t stream_ = nullptr; diff --git a/ge/single_op/single_op_model.cc b/ge/single_op/single_op_model.cc index 25bf6855..4657c6ae 100755 --- a/ge/single_op/single_op_model.cc +++ b/ge/single_op/single_op_model.cc @@ -31,6 +31,8 @@ #include "task/aicpu_task_builder.h" #include "task/aicpu_kernel_task_builder.h" #include "task/tbe_task_builder.h" +#include "hybrid/executor/hybrid_model_executor.h" +#include "hybrid/node_executor/node_executor.h" static std::atomic aicpu_kernel_id(0); @@ -477,6 +479,25 @@ Status SingleOpModel::BuildDynamicOp(StreamResource &resource, DynamicSingleOp & single_op.num_inputs_ = data_ops_.size(); single_op.num_outputs_ = netoutput_op_->GetAllInputsSize(); GE_CHK_STATUS_RET_NOLOG(InitModelMem(resource)); + + auto ge_model = model_helper_.GetGeModel(); + GE_CHECK_NOTNULL(ge_model); + if (ge_model->GetModelTaskDefPtr()->task_num() > 1) { + GELOGD("Build single op with multiple tasks"); + GE_CHK_STATUS_RET_NOLOG(hybrid::NodeExecutorManager::GetInstance().EnsureInitialized()); + single_op.hybrid_model_.reset(new (std::nothrow)hybrid::HybridModel(model_helper_.GetGeRootModel())); + GE_CHECK_NOTNULL(single_op.hybrid_model_); + GE_CHK_STATUS_RET(single_op.hybrid_model_->Init(true), "Failed to init hybrid model"); + int32_t device_id = 0; + GE_CHK_RT_RET(rtGetDevice(&device_id)); + single_op.hybrid_model_executor_.reset(new (std::nothrow)hybrid::HybridModelExecutor(single_op.hybrid_model_.get(), + device_id, + resource.GetStream())); + GE_CHECK_NOTNULL(single_op.hybrid_model_executor_); + GE_CHK_STATUS_RET(single_op.hybrid_model_executor_->Init(), "Failed to init hybrid model"); + return SUCCESS; + } + return BuildTaskListForDynamicOp(single_op); } } // namespace ge diff --git a/ge/single_op/stream_resource.cc b/ge/single_op/stream_resource.cc index db6b7c47..a3acf6b7 100755 --- a/ge/single_op/stream_resource.cc +++ b/ge/single_op/stream_resource.cc @@ -61,6 +61,10 @@ DynamicSingleOp *StreamResource::GetDynamicOperator(const void *key) { return it->second.get(); } +rtStream_t StreamResource::GetStream() const { + return stream_; +} + void StreamResource::SetStream(rtStream_t stream) { stream_ = stream; } diff --git a/ge/single_op/stream_resource.h b/ge/single_op/stream_resource.h index d5bc941a..d2c1ca36 100755 --- a/ge/single_op/stream_resource.h +++ b/ge/single_op/stream_resource.h @@ -37,6 +37,7 @@ class StreamResource { StreamResource(StreamResource &&) = delete; StreamResource &operator=(const StreamResource &) = delete; StreamResource &operator=(StreamResource &&) = delete; + rtStream_t GetStream() const; void SetStream(rtStream_t stream); SingleOp *GetOperator(const void *key); From f0ba6d638e271c792131d7af0623586409bf2b76 Mon Sep 17 00:00:00 2001 From: chuxing Date: Wed, 30 Dec 2020 19:31:28 +0800 Subject: [PATCH 2/3] fix compile --- ge/single_op/single_op_model.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ge/single_op/single_op_model.cc b/ge/single_op/single_op_model.cc index 4657c6ae..51a62481 100755 --- a/ge/single_op/single_op_model.cc +++ b/ge/single_op/single_op_model.cc @@ -482,7 +482,7 @@ Status SingleOpModel::BuildDynamicOp(StreamResource &resource, DynamicSingleOp & auto ge_model = model_helper_.GetGeModel(); GE_CHECK_NOTNULL(ge_model); - if (ge_model->GetModelTaskDefPtr()->task_num() > 1) { + if (ge_model->GetModelTaskDefPtr()->task_size() > 1) { GELOGD("Build single op with multiple tasks"); GE_CHK_STATUS_RET_NOLOG(hybrid::NodeExecutorManager::GetInstance().EnsureInitialized()); single_op.hybrid_model_.reset(new (std::nothrow)hybrid::HybridModel(model_helper_.GetGeRootModel())); From c6e714abdf3e3e6d7c7ae175513d99e3d93ec3d7 Mon Sep 17 00:00:00 2001 From: chuxing Date: Wed, 30 Dec 2020 19:47:06 +0800 Subject: [PATCH 3/3] atc do not need single op runtime --- ge/CMakeLists.txt | 9 --------- 1 file changed, 9 deletions(-) diff --git a/ge/CMakeLists.txt b/ge/CMakeLists.txt index 306b3eda..09db63b1 100755 --- a/ge/CMakeLists.txt +++ b/ge/CMakeLists.txt @@ -583,15 +583,6 @@ set(INFER_SRC_LIST "graph/load/new_model_manager/task_info/model_exit_task_info.cc" "graph/load/new_model_manager/task_info/super_kernel/super_kernel_factory.cc" "graph/load/new_model_manager/task_info/super_kernel/super_kernel.cc" - "single_op/task/op_task.cc" - "single_op/task/build_task_utils.cc" - "single_op/task/tbe_task_builder.cc" - "single_op/task/aicpu_task_builder.cc" - "single_op/task/aicpu_kernel_task_builder.cc" - "single_op/single_op.cc" - "single_op/single_op_model.cc" - "single_op/stream_resource.cc" - "single_op/single_op_manager.cc" "hybrid/hybrid_davinci_model_stub.cc" "ir_build/ge_ir_build.cc" "ir_build/atc_ir_common.cc"