From 2a48b2ecb84604f846cc82a327eaaea1b8775ef6 Mon Sep 17 00:00:00 2001 From: kswang Date: Mon, 12 Apr 2021 11:13:00 +0800 Subject: [PATCH] reconstruct session code --- .../ccsrc/backend/session/ascend_session.cc | 266 ++++++++++++------ .../ccsrc/backend/session/ascend_session.h | 8 +- .../ccsrc/backend/session/cpu_session.cc | 35 +-- mindspore/ccsrc/backend/session/cpu_session.h | 6 +- mindspore/ccsrc/backend/session/executor.cc | 3 +- .../ccsrc/backend/session/gpu_session.cc | 67 +++-- mindspore/ccsrc/backend/session/gpu_session.h | 8 +- .../ccsrc/backend/session/session_basic.cc | 139 ++------- .../ccsrc/backend/session/session_basic.h | 12 +- 9 files changed, 275 insertions(+), 269 deletions(-) diff --git a/mindspore/ccsrc/backend/session/ascend_session.cc b/mindspore/ccsrc/backend/session/ascend_session.cc index 6639f42020..4b3d9a87d2 100644 --- a/mindspore/ccsrc/backend/session/ascend_session.cc +++ b/mindspore/ccsrc/backend/session/ascend_session.cc @@ -59,6 +59,7 @@ #include "debug/data_dump/e2e_dump.h" #include "debug/anf_ir_dump.h" #include "debug/dump_proto.h" +#include "abstract/utils.h" #ifdef ENABLE_DEBUGGER #include "debug/debugger/proto_exporter.h" #else @@ -209,69 +210,6 @@ void GenOpOutputStubTensor(const KernelGraphPtr &single_op_graph, const CNodePtr (*op_output_info)[kernel_with_index] = output_tensor_info; } } -} // namespace - -void AscendSession::Init(uint32_t device_id) { InitExecutor(kAscendDevice, device_id); } - -void AscendSession::UnifyMindIR(const KernelGraphPtr &graph) { - auto context_ptr = MsContext::GetInstance(); - MS_EXCEPTION_IF_NULL(context_ptr); - bool save_graphs = context_ptr->get_param(MS_CTX_SAVE_GRAPHS_FLAG); - if (save_graphs) { - std::string file_name = "hwopt_d_before_unify_mindir_graph_" + std::to_string(graph->graph_id()) + ".ir"; - DumpIR(file_name, graph); - DumpIRProto(graph, "before_unify_mindir_hwopt_" + std::to_string(graph->graph_id())); - } - auto optimizer = std::make_shared(); - auto unify_mindir_pm = std::make_shared("unify_mindir_pm"); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - auto ms_context = MsContext::GetInstance(); - MS_EXCEPTION_IF_NULL(ms_context); - if (ms_context->get_param(MS_CTX_EXECUTION_MODE) == kGraphMode) { - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - } else { - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - } - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - unify_mindir_pm->AddPass(std::make_shared()); - - optimizer->AddPassManager(unify_mindir_pm); - (void)optimizer->Optimize(graph); - graph->SetExecOrderByDefault(); - if (save_graphs) { - std::string file_name = "hwopt_d_after_unify_mindir_graph_" + std::to_string(graph->graph_id()) + ".ir"; - DumpIR(file_name, graph); - } -} - -GraphId AscendSession::CompileGraphImpl(const AnfNodePtrList &lst, const AnfNodePtrList &outputs) { - MS_LOG(INFO) << "Start"; - // construct graph, if successfully, graph_sum_ + 1 - auto graph = ConstructKernelGraph(lst, outputs); - auto graph_id = graph->graph_id(); - InitAllBucket(graph); - MS_LOG(INFO) << "Compile graph " << graph_id << " success"; - return graph_id; -} bool IsBackward(const CNodePtr &cnode) { auto prim = GetValueNode(cnode->input(0)); @@ -357,6 +295,183 @@ void ReorderSendRecv(std::vector *execution_order) { } } +size_t LoadCtrlInputTensor(const std::shared_ptr &graph, std::vector *inputs) { + MS_EXCEPTION_IF_NULL(graph); + MS_LOG(INFO) << "Load kInputCtrlTensors"; + auto inputs_params = graph->input_ctrl_tensors(); + if (inputs_params == nullptr) { + return 0; + } + if (inputs_params->size() < 3) { + MS_LOG(EXCEPTION) << "Illegal inputs_params size"; + } + // update current loop tensor to 0 per iterator + auto cur_loop_tensor = (*inputs_params)[0]; + MS_EXCEPTION_IF_NULL(cur_loop_tensor); + auto *cur_val = static_cast(cur_loop_tensor->data_c()); + MS_EXCEPTION_IF_NULL(cur_val); + *cur_val = 0; + cur_loop_tensor->set_sync_status(kNeedSyncHostToDevice); + // set loop_count to zero + MS_EXCEPTION_IF_NULL(inputs); + inputs->push_back(cur_loop_tensor); + + // update next loop tensor to 0 per iterator + auto next_loop_tensor = (*inputs_params)[1]; + MS_EXCEPTION_IF_NULL(next_loop_tensor); + auto *next_val = static_cast(next_loop_tensor->data_c()); + MS_EXCEPTION_IF_NULL(next_val); + *next_val = 0; + next_loop_tensor->set_sync_status(kNeedSyncHostToDevice); + // set loop_count to zero + MS_EXCEPTION_IF_NULL(inputs); + inputs->push_back(next_loop_tensor); + + auto epoch_tensor = (*inputs_params)[2]; + MS_EXCEPTION_IF_NULL(epoch_tensor); + auto *epoch_val = static_cast(epoch_tensor->data_c()); + MS_EXCEPTION_IF_NULL(epoch_val); + *epoch_val = graph->current_epoch(); + epoch_tensor->set_sync_status(kNeedSyncHostToDevice); + inputs->push_back(epoch_tensor); + MS_LOG(INFO) << "Load epoch_val:" << *epoch_val; + graph->set_current_epoch(graph->current_epoch() + 1); + return inputs_params->size(); +} + +bool TensorNeedSync(const AnfNodePtr ¶meter, const tensor::TensorPtr &tensor) { + auto ms_context = MsContext::GetInstance(); + MS_EXCEPTION_IF_NULL(ms_context); + auto device_address = AnfAlgo::GetMutableOutputAddr(parameter, 0); + if (ms_context->get_param(MS_CTX_ENABLE_PYNATIVE_INFER)) { + return tensor->device_address().get() == nullptr || tensor->device_address() != device_address; + } + if (tensor->NeedSyncHostToDevice()) { + return true; + } + auto tensor_address = tensor->device_address(); + if (tensor_address != device_address) { + tensor->data_sync(false); + return true; + } + return false; +} +} // namespace + +void AscendSession::Init(uint32_t device_id) { InitExecutor(kAscendDevice, device_id); } + +void AscendSession::UnifyMindIR(const KernelGraphPtr &graph) { + auto context_ptr = MsContext::GetInstance(); + MS_EXCEPTION_IF_NULL(context_ptr); + bool save_graphs = context_ptr->get_param(MS_CTX_SAVE_GRAPHS_FLAG); + if (save_graphs) { + std::string file_name = "hwopt_d_before_unify_mindir_graph_" + std::to_string(graph->graph_id()) + ".ir"; + DumpIR(file_name, graph); + DumpIRProto(graph, "before_unify_mindir_hwopt_" + std::to_string(graph->graph_id())); + } + auto optimizer = std::make_shared(); + auto unify_mindir_pm = std::make_shared("unify_mindir_pm"); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + auto ms_context = MsContext::GetInstance(); + MS_EXCEPTION_IF_NULL(ms_context); + if (ms_context->get_param(MS_CTX_EXECUTION_MODE) == kGraphMode) { + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + } else { + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + } + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + unify_mindir_pm->AddPass(std::make_shared()); + + optimizer->AddPassManager(unify_mindir_pm); + (void)optimizer->Optimize(graph); + graph->SetExecOrderByDefault(); + if (save_graphs) { + std::string file_name = "hwopt_d_after_unify_mindir_graph_" + std::to_string(graph->graph_id()) + ".ir"; + DumpIR(file_name, graph); + } +} + +void AscendSession::LoadInputData(const std::shared_ptr &kernel_graph, + const std::vector &inputs_const) const { + std::vector inputs(inputs_const); + size_t input_ctrl_size = 3; + MS_EXCEPTION_IF_NULL(kernel_graph); + if (kernel_graph->input_ctrl_tensors()) { + input_ctrl_size = LoadCtrlInputTensor(kernel_graph, &inputs); + } + auto &input_nodes = kernel_graph->input_nodes(); + auto extra_param_size = kernel_graph->GetExtraParamAndTensor().size(); + if ((inputs.size() + input_ctrl_size) - 3 != input_nodes.size() - extra_param_size) { + MS_LOG(EXCEPTION) << "Tensor input:" << inputs.size() << " is not equal graph inputs:" << input_nodes.size() + << ", input_ctrl_size:" << input_ctrl_size << ", extra_param_size:" << extra_param_size; + } + auto ms_context = MsContext::GetInstance(); + MS_EXCEPTION_IF_NULL(ms_context); + for (size_t i = 0; i < inputs.size(); ++i) { + auto tensor = inputs[i]; + MS_EXCEPTION_IF_NULL(tensor); + auto input_node = input_nodes[i]; + MS_EXCEPTION_IF_NULL(input_node); + auto size = LongToSize(tensor->data().nbytes()); + if (input_node->isa() && input_node->cast()->is_used_by_dynamic_kernel()) { + auto tensor_shape = tensor->shape(); + std::vector shape_tmp; + (void)std::transform(tensor_shape.begin(), tensor_shape.end(), std::back_inserter(shape_tmp), IntToSize); + AnfAlgo::SetOutputInferTypeAndShape({AnfAlgo::GetOutputInferDataType(input_node, 0)}, {shape_tmp}, + input_node.get()); + size = abstract::ShapeSize(shape_tmp) * abstract::TypeIdSize(tensor->data_type()); + } + if (input_node->isa() && AnfAlgo::OutputAddrExist(input_node, 0) && TensorNeedSync(input_node, tensor)) { +#if (ENABLE_CPU && (ENABLE_D || ENABLE_GPU)) + const std::string ¶m_name = input_node->fullname_with_scope(); + if (ps::ps_cache_instance.IsHashTable(param_name)) { + continue; + } +#endif + auto device_address = AnfAlgo::GetMutableOutputAddr(input_node, 0); + MS_EXCEPTION_IF_NULL(device_address); + if (size != 0 && !device_address->SyncHostToDevice(trans::GetRuntimePaddingShape(input_node, 0), size, + tensor->data_type(), tensor->data_c())) { + MS_LOG(EXCEPTION) << "SyncHostToDevice failed."; + } + if (ms_context->get_param(MS_CTX_EXECUTION_MODE) == kPynativeMode || + AnfAlgo::IsParameterWeight(input_node->cast())) { + tensor->set_device_address(device_address); + } + } + tensor->set_sync_status(kNoNeedSync); + } +} + +GraphId AscendSession::CompileGraphImpl(const AnfNodePtrList &lst, const AnfNodePtrList &outputs) { + MS_LOG(INFO) << "Start"; + // construct graph, if successfully, graph_sum_ + 1 + auto graph = ConstructKernelGraph(lst, outputs); + auto graph_id = graph->graph_id(); + InitAllBucket(graph); + MS_LOG(INFO) << "Compile graph " << graph_id << " success"; + return graph_id; +} + GraphId AscendSession::CompileGraphImpl(NotNull func_graph) { MS_LOG(INFO) << "Start"; std::vector all_graphs; @@ -559,16 +674,8 @@ void AscendSession::CompileChildGraph(const KernelGraphPtr &child_graph) { bool AscendSession::IsSupportSummary() { return !device::KernelAdjust::NeedInsertSwitch(); } -void AscendSession::RunGraphImpl(const GraphId &graph_id, const std::vector &inputs, - VectorRef *const outputs) { - MS_LOG(INFO) << "Start"; - auto kernel_graph = GetGraph(graph_id); - MS_EXCEPTION_IF_NULL(kernel_graph); - // if none of child graph and no anf output exists - if (!kernel_graph->executable()) { - MS_LOG(INFO) << "No child graph has anf output"; - return; - } +void AscendSession::PreExecuteGraph(const std::shared_ptr &kernel_graph, + const std::vector &inputs, VectorRef *const outputs) { // load data to extra params std::set memo; SyncDataToExtraParams(NOT_NULL(kernel_graph), NOT_NULL(&memo)); @@ -580,14 +687,14 @@ void AscendSession::RunGraphImpl(const GraphId &graph_id, const std::vector &kernel_graph, + const std::vector &inputs, VectorRef *const outputs) { // summary Summary(kernel_graph.get()); // load tensor from device for debugger @@ -598,9 +705,10 @@ void AscendSession::RunGraphImpl(const GraphId &graph_id, const std::vectorPostExecute(); } - MS_LOG(INFO) << "Finish!"; } +void AscendSession::ExecuteGraph(const std::shared_ptr &kernel_graph) { Execute(kernel_graph, true); } + void AscendSession::RunOpHardwareOptimize(const std::shared_ptr &kernel_graph) const { MS_LOG(INFO) << "Start"; // data layout optimization diff --git a/mindspore/ccsrc/backend/session/ascend_session.h b/mindspore/ccsrc/backend/session/ascend_session.h index 27042c0157..0d23e58fe3 100644 --- a/mindspore/ccsrc/backend/session/ascend_session.h +++ b/mindspore/ccsrc/backend/session/ascend_session.h @@ -50,7 +50,13 @@ class AscendSession : public SessionBasic { GraphId CompileGraphImpl(const AnfNodePtrList &lst, const AnfNodePtrList &outputs) override; GraphId CompileGraphImpl(NotNull func_graph) override; bool IsSupportSummary() override; - void RunGraphImpl(const GraphId &graph_id, const std::vector &inputs, VectorRef *outputs) override; + void LoadInputData(const std::shared_ptr &kernel_graph, + const std::vector &inputs_const) const override; + void PreExecuteGraph(const std::shared_ptr &kernel_graph, const std::vector &inputs, + VectorRef *const outputs) override; + void PostExecuteGraph(const std::shared_ptr &kernel_graph, const std::vector &inputs, + VectorRef *const outputs) override; + void ExecuteGraph(const std::shared_ptr &kernel_graph) override; void BuildGraphImpl(GraphId) override; void BuildOpImpl(const OpRunInfo &op_run_info, const GraphInfo &graph_info, const std::vector &input_tensors, diff --git a/mindspore/ccsrc/backend/session/cpu_session.cc b/mindspore/ccsrc/backend/session/cpu_session.cc index 8f6042025d..e076b77f14 100644 --- a/mindspore/ccsrc/backend/session/cpu_session.cc +++ b/mindspore/ccsrc/backend/session/cpu_session.cc @@ -102,20 +102,19 @@ GraphId CPUSession::CompileGraphImpl(const AnfNodePtrList &lst, const AnfNodePtr Optimize(graph); MS_LOG(INFO) << "Build kernel"; BuildKernel(graph.get()); - // Remove reorder after PS feature finish adapting push/pull in auto_monad. auto execution_order = graph->execution_order(); Reorder(&execution_order); graph->set_execution_order(execution_order); - // runtime init if (!runtime_.Init()) { MS_LOG(EXCEPTION) << "Kernel runtime init error."; } - MS_LOG(INFO) << "Assign kernel address"; runtime_.AssignKernelAddress(graph.get()); - + // set summary node + SetSummaryNodes(graph.get()); + runtime_.IncreaseSummaryRefCount(graph->summary_nodes()); DumpGraph(graph); return graph_id; } @@ -154,38 +153,26 @@ void CPUSession::LoadInputData(const std::shared_ptr &kernel_graph, } } -void CPUSession::RunGraphImpl(const GraphId &graph_id, const std::vector &inputs, - VectorRef *outputs) { - auto kernel_graph = GetGraph(graph_id); - MS_EXCEPTION_IF_NULL(kernel_graph); +void CPUSession::PreExecuteGraph(const std::shared_ptr &kernel_graph, + const std::vector &inputs, VectorRef *const outputs) { MS_LOG(INFO) << "Bind input output address"; runtime_.BindInputOutput(kernel_graph.get(), inputs, outputs); #if (ENABLE_CPU && (ENABLE_D || ENABLE_GPU)) InitPSParamAndOptim(kernel_graph, inputs); #endif +} - MS_LOG(INFO) << "Run graph start"; - - bool enable_summary = summary_callback_ != nullptr; - NamedSummaryOutputs summary_outputs; - if (enable_summary) { - SetSummaryNodes(kernel_graph.get()); - summary_outputs = kernel_graph->summary_nodes(); - runtime_.IncreaseSummaryRefCount(summary_outputs); - } +void CPUSession::PostExecuteGraph(const std::shared_ptr &kernel_graph, + const std::vector &inputs, VectorRef *const outputs) { + Summary(kernel_graph.get()); +} +void CPUSession::ExecuteGraph(const std::shared_ptr &kernel_graph) { bool ret = runtime_.Run(kernel_graph.get(), false); if (!ret) { MS_LOG(EXCEPTION) << "Run graph failed"; } - - if (enable_summary) { - Summary(kernel_graph.get()); - runtime_.DecreaseSummaryRefCount(summary_outputs); - } - - MS_LOG(INFO) << "Run graph end"; } void CPUSession::BuildOpImpl(const OpRunInfo &op_run_info, const GraphInfo &graph_info, diff --git a/mindspore/ccsrc/backend/session/cpu_session.h b/mindspore/ccsrc/backend/session/cpu_session.h index e52ae8693f..53e99e0a5e 100644 --- a/mindspore/ccsrc/backend/session/cpu_session.h +++ b/mindspore/ccsrc/backend/session/cpu_session.h @@ -36,7 +36,11 @@ class CPUSession : public SessionBasic { void CreateOutputTensors(const GraphId &graph_id, const std::vector &input_tensors, VectorRef *, std::map *tensor_to_node) override; GraphId CompileGraphImpl(const AnfNodePtrList &lst, const AnfNodePtrList &outputs) override; - void RunGraphImpl(const GraphId &graph_id, const std::vector &inputs, VectorRef *outputs) override; + void PreExecuteGraph(const std::shared_ptr &kernel_graph, const std::vector &inputs, + VectorRef *const outputs) override; + void PostExecuteGraph(const std::shared_ptr &kernel_graph, const std::vector &inputs, + VectorRef *const outputs) override; + void ExecuteGraph(const std::shared_ptr &kernel_graph) override; ParameterPtr CreateNewParameterFromParameter(const AnfNodePtr &anf, KernelGraph *graph) override; void Optimize(const std::shared_ptr &kernel_graph); void BuildOpImpl(const OpRunInfo &op_run_info, const GraphInfo &graph_info, diff --git a/mindspore/ccsrc/backend/session/executor.cc b/mindspore/ccsrc/backend/session/executor.cc index 843e2cce1b..1c915ebd84 100644 --- a/mindspore/ccsrc/backend/session/executor.cc +++ b/mindspore/ccsrc/backend/session/executor.cc @@ -236,6 +236,7 @@ void Executor::WorkerLoop() { done_tasks_.emplace_back(task); } if (task->type_ != kRunGraph || task->sync_run_) { + std::lock_guard lock(task_mutex_); sync_run_task_finished_ = true; sync_cond_var_.notify_all(); } @@ -310,9 +311,9 @@ void Executor::ClearDoneTasks() { } void Executor::RunTask(const std::shared_ptr &task, bool sync, bool long_run) { - sync_run_task_finished_ = false; { std::lock_guard lock(task_mutex_); + sync_run_task_finished_ = false; ready_tasks_.push(task); } task_cond_var_.notify_all(); diff --git a/mindspore/ccsrc/backend/session/gpu_session.cc b/mindspore/ccsrc/backend/session/gpu_session.cc index 5f172d79d4..5a9242e02a 100644 --- a/mindspore/ccsrc/backend/session/gpu_session.cc +++ b/mindspore/ccsrc/backend/session/gpu_session.cc @@ -299,14 +299,6 @@ void GPUSession::LoadInputData(const std::shared_ptr &kernel_graph, } } -void GPUSession::Execute(const std::shared_ptr &kernel_graph) const { - auto runtime_instance = device::KernelRuntimeManager::Instance().GetSingleKernelRuntime(kGPUDevice, device_id_); - MS_EXCEPTION_IF_NULL(runtime_instance); - if (!runtime_instance->Run(kernel_graph.get(), false)) { - MS_LOG(EXCEPTION) << "GPU execute graph failed!"; - } -} - GraphId GPUSession::CompileGraphImpl(const AnfNodePtrList &lst, const AnfNodePtrList &outputs) { // Construct graph, if successfully, graph_sum_ + 1 auto graph = ConstructKernelGraph(lst, outputs); @@ -419,10 +411,8 @@ GraphId GPUSession::CompileGraphImpl(KernelGraphPtr graph) { return graph->graph_id(); } -void GPUSession::RunGraphImpl(const GraphId &graph_id, const std::vector &inputs, - VectorRef *outputs) { - auto &kernel_graph = graphs_[graph_id]; - MS_LOG(INFO) << "RunGraph graph_id: " << graph_id; +void GPUSession::PreExecuteGraph(const std::shared_ptr &kernel_graph, + const std::vector &inputs, VectorRef *outputs) { if (debugger_) { debugger_->PreExecute(kernel_graph, graph_sum_); } @@ -430,26 +420,48 @@ void GPUSession::RunGraphImpl(const GraphId &graph_id, const std::vector &kernel_graph, + const std::vector &inputs, VectorRef *outputs) { + // Summary + auto context_ptr = MsContext::GetInstance(); + MS_EXCEPTION_IF_NULL(context_ptr); + if (context_ptr->get_param(MS_CTX_ENABLE_GPU_SUMMARY)) { + Summary(kernel_graph.get()); + } + bool dump_enabled = DumpDataEnabledIteration(); + // debug used for dump + if (debugger_ && dump_enabled) { + Dump(kernel_graph); + } else { + DumpJsonParser::GetInstance().UpdateDumpIter(); + } + if (debugger_) { + debugger_->PostExecute(); + } +} + +void GPUSession::ExecuteGraph(const std::shared_ptr &kernel_graph) { int kernel_num = kernel_graph->execution_order().size(); int64_t loopsize = (kernel_num > 1) ? ConfigManager::GetInstance().gpu_loopsink_size() : 1; for (int64_t i = 0; i < loopsize; i++) { #if ENABLE_CPU && ENABLE_GPU std::string channel_name; - if (ps::PsDataPrefetch::GetInstance().cache_enable() && IsGetNextGraph(graph_id, &channel_name)) { + if (ps::PsDataPrefetch::GetInstance().cache_enable() && IsGetNextGraph(kernel_graph, &channel_name)) { ps::ps_cache_instance.IncreaseGraphStep(channel_name); } #endif Execute(kernel_graph); } - // Summary - auto context_ptr = MsContext::GetInstance(); - MS_EXCEPTION_IF_NULL(context_ptr); - if (context_ptr->get_param(MS_CTX_ENABLE_GPU_SUMMARY)) { - Summary(kernel_graph.get()); +} + +void GPUSession::Execute(const std::shared_ptr &kernel_graph) const { + auto runtime_instance = device::KernelRuntimeManager::Instance().GetSingleKernelRuntime(kGPUDevice, device_id_); + MS_EXCEPTION_IF_NULL(runtime_instance); + if (!runtime_instance->Run(kernel_graph.get(), false)) { + MS_LOG(EXCEPTION) << "GPU execute graph failed!"; } - PostIterationDbg(kernel_graph); } void GPUSession::BuildOpImpl(const OpRunInfo &op_run_info, const GraphInfo &graph_info, @@ -519,19 +531,6 @@ bool GPUSession::DumpDataEnabledIteration() const { return runtime_instance->DumpDataEnabledIteration(); } -void GPUSession::PostIterationDbg(const std::shared_ptr &kernel_graph) const { - bool dump_enabled = DumpDataEnabledIteration(); - // debug used for dump - if (debugger_ && dump_enabled) { - Dump(kernel_graph); - } else { - DumpJsonParser::GetInstance().UpdateDumpIter(); - } - if (debugger_) { - debugger_->PostExecute(); - } -} - void GPUSession::SyncStream() { auto runtime_instance = device::KernelRuntimeManager::Instance().GetSingleKernelRuntime(kGPUDevice, device_id_); MS_EXCEPTION_IF_NULL(runtime_instance); diff --git a/mindspore/ccsrc/backend/session/gpu_session.h b/mindspore/ccsrc/backend/session/gpu_session.h index 76bfe454b1..3cd54ec6e4 100644 --- a/mindspore/ccsrc/backend/session/gpu_session.h +++ b/mindspore/ccsrc/backend/session/gpu_session.h @@ -39,7 +39,11 @@ class GPUSession : public SessionBasic { void UnifyMindIR(const KernelGraphPtr &graph) override { return; } GraphId CompileGraphImpl(const AnfNodePtrList &lst, const AnfNodePtrList &outputs) override; GraphId CompileGraphImpl(NotNull func_graph) override; - void RunGraphImpl(const GraphId &graph_id, const std::vector &inputs, VectorRef *outputs) override; + void PreExecuteGraph(const std::shared_ptr &kernel_graph, const std::vector &inputs, + VectorRef *const outputs) override; + void PostExecuteGraph(const std::shared_ptr &kernel_graph, const std::vector &inputs, + VectorRef *const outputs) override; + void ExecuteGraph(const std::shared_ptr &kernel_graph) override; void BuildOpImpl(const OpRunInfo &op_run_info, const GraphInfo &graph_info, const std::vector &input_tensors, const std::vector &tensors_mask) override; @@ -79,8 +83,6 @@ class GPUSession : public SessionBasic { bool DumpDataEnabledIteration() const; - void PostIterationDbg(const std::shared_ptr &kernel_graph) const; - GraphId CompileGraphImpl(KernelGraphPtr kernel_graph); }; using GPUSessionPtr = std::shared_ptr; diff --git a/mindspore/ccsrc/backend/session/session_basic.cc b/mindspore/ccsrc/backend/session/session_basic.cc index 48c79c6353..a7e8dda65a 100644 --- a/mindspore/ccsrc/backend/session/session_basic.cc +++ b/mindspore/ccsrc/backend/session/session_basic.cc @@ -252,52 +252,6 @@ ValueNodePtr CreateNewValueNode(const AnfNodePtr &anf, KernelGraph *graph) { return new_value_node; } -size_t LoadCtrlInputTensor(const std::shared_ptr &graph, std::vector *inputs) { - MS_EXCEPTION_IF_NULL(graph); - MS_LOG(INFO) << "Load kInputCtrlTensors"; - auto inputs_params = graph->input_ctrl_tensors(); - if (inputs_params == nullptr) { - return 0; - } - if (inputs_params->size() < 3) { - MS_LOG(EXCEPTION) << "Illegal inputs_params size"; - } - // update current loop tensor to 0 per iterator - auto cur_loop_tensor = (*inputs_params)[0]; - MS_EXCEPTION_IF_NULL(cur_loop_tensor); - auto *cur_val = static_cast(cur_loop_tensor->data_c()); - MS_EXCEPTION_IF_NULL(cur_val); - *cur_val = 0; - cur_loop_tensor->set_sync_status(kNeedSyncHostToDevice); - // set loop_count to zero - MS_EXCEPTION_IF_NULL(inputs); - inputs->push_back(cur_loop_tensor); - - // update next loop tensor to 0 per iterator - auto next_loop_tensor = (*inputs_params)[1]; - MS_EXCEPTION_IF_NULL(next_loop_tensor); - auto *next_val = static_cast(next_loop_tensor->data_c()); - MS_EXCEPTION_IF_NULL(next_val); - *next_val = 0; - next_loop_tensor->set_sync_status(kNeedSyncHostToDevice); - // set loop_count to zero - MS_EXCEPTION_IF_NULL(inputs); - inputs->push_back(next_loop_tensor); - - auto epoch_tensor = (*inputs_params)[2]; - MS_EXCEPTION_IF_NULL(epoch_tensor); - auto *epoch_val = static_cast(epoch_tensor->data_c()); - MS_EXCEPTION_IF_NULL(epoch_val); - *epoch_val = graph->current_epoch(); - epoch_tensor->set_sync_status(kNeedSyncHostToDevice); - inputs->push_back(epoch_tensor); - MS_LOG(INFO) << "Load epoch_val:" << *epoch_val; - - graph->set_current_epoch(graph->current_epoch() + 1); - - return inputs_params->size(); -} - ValueNodePtr ConstructRunOpValueNode(const std::shared_ptr &graph, const tensor::TensorPtr &input_tensor) { MS_EXCEPTION_IF_NULL(graph); MS_EXCEPTION_IF_NULL(input_tensor); @@ -1544,80 +1498,6 @@ void SessionBasic::AddParameterToGraphInputs(const std::vector ¶ } } -namespace { -bool TensorNeedSync(const AnfNodePtr ¶meter, const tensor::TensorPtr &tensor) { - auto ms_context = MsContext::GetInstance(); - MS_EXCEPTION_IF_NULL(ms_context); - auto device_address = AnfAlgo::GetMutableOutputAddr(parameter, 0); - if (ms_context->get_param(MS_CTX_ENABLE_PYNATIVE_INFER)) { - return tensor->device_address().get() == nullptr || tensor->device_address() != device_address; - } - if (tensor->NeedSyncHostToDevice()) { - return true; - } - auto tensor_address = tensor->device_address(); - if (tensor_address != device_address) { - tensor->data_sync(false); - return true; - } - return false; -} -} // namespace - -// run graph steps -void SessionBasic::LoadInputData(const std::shared_ptr &kernel_graph, - const std::vector &inputs_const) const { - std::vector inputs(inputs_const); - size_t input_ctrl_size = 3; - MS_EXCEPTION_IF_NULL(kernel_graph); - if (kernel_graph->input_ctrl_tensors()) { - input_ctrl_size = LoadCtrlInputTensor(kernel_graph, &inputs); - } - auto &input_nodes = kernel_graph->input_nodes(); - auto extra_param_size = kernel_graph->GetExtraParamAndTensor().size(); - if ((inputs.size() + input_ctrl_size) - 3 != input_nodes.size() - extra_param_size) { - MS_LOG(EXCEPTION) << "Tensor input:" << inputs.size() << " is not equal graph inputs:" << input_nodes.size() - << ", input_ctrl_size:" << input_ctrl_size << ", extra_param_size:" << extra_param_size; - } - auto ms_context = MsContext::GetInstance(); - MS_EXCEPTION_IF_NULL(ms_context); - for (size_t i = 0; i < inputs.size(); ++i) { - auto tensor = inputs[i]; - MS_EXCEPTION_IF_NULL(tensor); - auto input_node = input_nodes[i]; - MS_EXCEPTION_IF_NULL(input_node); - auto size = LongToSize(tensor->data().nbytes()); - if (input_node->isa() && input_node->cast()->is_used_by_dynamic_kernel()) { - auto tensor_shape = tensor->shape(); - std::vector shape_tmp; - (void)std::transform(tensor_shape.begin(), tensor_shape.end(), std::back_inserter(shape_tmp), IntToSize); - AnfAlgo::SetOutputInferTypeAndShape({AnfAlgo::GetOutputInferDataType(input_node, 0)}, {shape_tmp}, - input_node.get()); - size = abstract::ShapeSize(shape_tmp) * abstract::TypeIdSize(tensor->data_type()); - } - if (input_node->isa() && AnfAlgo::OutputAddrExist(input_node, 0) && TensorNeedSync(input_node, tensor)) { -#if (ENABLE_CPU && (ENABLE_D || ENABLE_GPU)) - const std::string ¶m_name = input_node->fullname_with_scope(); - if (ps::ps_cache_instance.IsHashTable(param_name)) { - continue; - } -#endif - auto device_address = AnfAlgo::GetMutableOutputAddr(input_node, 0); - MS_EXCEPTION_IF_NULL(device_address); - if (size != 0 && !device_address->SyncHostToDevice(trans::GetRuntimePaddingShape(input_node, 0), size, - tensor->data_type(), tensor->data_c())) { - MS_LOG(EXCEPTION) << "SyncHostToDevice failed."; - } - - if (ms_context->get_param(MS_CTX_EXECUTION_MODE) == kPynativeMode || - AnfAlgo::IsParameterWeight(input_node->cast())) { - tensor->set_device_address(device_address); - } - } - tensor->set_sync_status(kNoNeedSync); - } -} - void SessionBasic::UpdateOutputs(const std::shared_ptr &kernel_graph, VectorRef *const outputs, const std::vector &input_tensors) const { MS_EXCEPTION_IF_NULL(kernel_graph); @@ -2130,6 +2010,22 @@ void SessionBasic::RunGraphAsync(const GraphId &graph_id, const std::vectorRunGraphAsync(shared_from_this(), graph_id, inputs, outputs); } +void SessionBasic::RunGraphImpl(const GraphId &graph_id, const std::vector &inputs, + VectorRef *const outputs) { + MS_LOG(INFO) << "Run graph start, graph id: " << graph_id; + auto kernel_graph = GetGraph(graph_id); + MS_EXCEPTION_IF_NULL(kernel_graph); + // if none of child graph and no anf output exists + if (!kernel_graph->executable()) { + MS_LOG(INFO) << "No child graph has anf output"; + return; + } + PreExecuteGraph(kernel_graph, inputs, outputs); + ExecuteGraph(kernel_graph); + PostExecuteGraph(kernel_graph, inputs, outputs); + MS_LOG(INFO) << "Run graph end, graph id: " << graph_id; +} + void SessionBasic::RunOpsInGraphImpl(const GraphId &graph_id, const std::vector &inputs, VectorRef *outputs) { MS_LOG(INFO) << "Start!"; @@ -2214,8 +2110,7 @@ void SessionBasic::UpdateGraphDynamicShapeAttr(const NotNull &ro root_graph->UpdateGraphDynamicAttr(); } -bool SessionBasic::IsGetNextGraph(const GraphId &graph_id, std::string *channel_name) { - auto kernel_graph = graphs_[graph_id]; +bool SessionBasic::IsGetNextGraph(const std::shared_ptr &kernel_graph, std::string *channel_name) { MS_EXCEPTION_IF_NULL(kernel_graph); for (const auto &kernel_node : kernel_graph->execution_order()) { auto kernel_name = AnfAlgo::GetCNodeName(kernel_node); diff --git a/mindspore/ccsrc/backend/session/session_basic.h b/mindspore/ccsrc/backend/session/session_basic.h index 945f249d01..a44fa56c2b 100644 --- a/mindspore/ccsrc/backend/session/session_basic.h +++ b/mindspore/ccsrc/backend/session/session_basic.h @@ -114,7 +114,7 @@ class SessionBasic : public std::enable_shared_from_this { virtual GraphId GetFinalRunGraph() const { return kInvalidGraphId; } void AssignParamKey(const KernelGraphPtr &kernel_graph); void InitPSParamAndOptim(const KernelGraphPtr &kernel_graph, const std::vector &inputs_const); - bool IsGetNextGraph(const GraphId &graph_id, std::string *channel_name); + bool IsGetNextGraph(const std::shared_ptr &kernel_graph, std::string *channel_name); virtual bool CheckModelInputs(uint32_t graph_id, const std::vector &inputs, std::string *error_msg) const { return true; @@ -173,8 +173,12 @@ class SessionBasic : public std::enable_shared_from_this { virtual GraphId CompileGraphImpl(const AnfNodePtrList &lst, const AnfNodePtrList &outputs) { return 0; } virtual GraphId CompileGraphImpl(NotNull func_graph) { return kInvalidGraphId; } virtual void BuildGraphImpl(GraphId) {} - virtual void RunGraphImpl(const GraphId &graph_id, const std::vector &inputs, VectorRef *outputs) { - } + virtual void PreExecuteGraph(const std::shared_ptr &kernel_graph, + const std::vector &inputs, VectorRef *const outputs) {} + virtual void PostExecuteGraph(const std::shared_ptr &kernel_graph, + const std::vector &inputs, VectorRef *const outputs) {} + virtual void ExecuteGraph(const std::shared_ptr &kernel_graph) {} + void RunGraphImpl(const GraphId &graph_id, const std::vector &inputs, VectorRef *outputs); virtual void BuildOpImpl(const OpRunInfo &op_run_info, const GraphInfo &graph_info, const std::vector &input_tensors, const std::vector &tensors_mask) {} @@ -195,7 +199,7 @@ class SessionBasic : public std::enable_shared_from_this { } virtual void LoadInputData(const std::shared_ptr &kernel_graph, - const std::vector &inputs_const) const; + const std::vector &inputs_const) const {} void UpdateOutputs(const std::shared_ptr &kernel_graph, VectorRef *const outputs, const std::vector &input_tensors) const; void UpdateOutputAbstract(const std::shared_ptr &kernel_graph, OpRunInfo *op_run_info) const;