From 2cabcbcf7e36e7551d92ba6e9cac9f3a9e02ebd0 Mon Sep 17 00:00:00 2001 From: VectorSL Date: Thu, 5 Nov 2020 20:22:20 +0800 Subject: [PATCH] gpu loopsink --- .../ccsrc/backend/session/gpu_session.cc | 7 ++++- mindspore/ccsrc/pipeline/jit/pipeline.cc | 29 ++++++++++++++++--- mindspore/ccsrc/pipeline/jit/resource.h | 4 +++ mindspore/ccsrc/utils/config_manager.h | 5 ++++ mindspore/train/dataset_helper.py | 11 +++---- 5 files changed, 46 insertions(+), 10 deletions(-) diff --git a/mindspore/ccsrc/backend/session/gpu_session.cc b/mindspore/ccsrc/backend/session/gpu_session.cc index 7ee81a0046..c54e103e7a 100644 --- a/mindspore/ccsrc/backend/session/gpu_session.cc +++ b/mindspore/ccsrc/backend/session/gpu_session.cc @@ -348,7 +348,12 @@ void GPUSession::RunGraphImpl(const GraphId &graph_id, const std::vectorexecution_order().size(); + int64_t loopsize = (kernel_num > 1) ? ConfigManager::GetInstance().gpu_loopsink_size() : 1; + for (int64_t i = 0; i < loopsize; i++) { + Execute(kernel_graph); + } PostLoadTensor(kernel_graph); // Summary auto context_ptr = MsContext::GetInstance(); diff --git a/mindspore/ccsrc/pipeline/jit/pipeline.cc b/mindspore/ccsrc/pipeline/jit/pipeline.cc index 118ac4016c..c33d3686ae 100644 --- a/mindspore/ccsrc/pipeline/jit/pipeline.cc +++ b/mindspore/ccsrc/pipeline/jit/pipeline.cc @@ -526,7 +526,6 @@ static std::string PrintArgs(const py::tuple &args) { bool ExecutorPy::Compile(const py::object &obj, const py::tuple &args, const py::object &phase, bool use_vm) { bool ret_value = false; - try { MS_LOG(DEBUG) << PrintArgs(args); ret_value = CompileInner(obj, args, phase, use_vm); @@ -567,7 +566,6 @@ bool ExecutorPy::Compile(const py::object &obj, const py::tuple &args, const py: std::string exName(abi::__cxa_current_exception_type()->name()); MS_LOG(EXCEPTION) << "Error occurred when compile graph. Exception name: " << exName; } - return ret_value; } @@ -648,6 +646,17 @@ void Pipeline::Run() { #endif MS_LOG(DEBUG) << "Action " << action.first << " end."; }; + if (action.first == "task_emit") { + auto func_graph = resource_->func_graph(); + if (func_graph != nullptr && func_graph->manager() != nullptr) { + auto manager = func_graph->manager(); + size_t graph_nums = manager->func_graphs().size(); + if (graph_nums == 1) { + resource_->set_gpu_loopsink_flag(true); + MS_LOG(INFO) << "Change gpu_loopsink_flag_ to true."; + } + } + } if (!result) { MS_LOG(EXCEPTION) << "Pipeline running to end, failed in step:" << action.first; } @@ -813,10 +822,22 @@ py::object ExecutorPy::Run(const py::tuple &args, const py::object &phase) { if (run == nullptr) { MS_LOG(EXCEPTION) << "Can't find run graph func for " << phase_s; } + // Set loopsink size for each phase. + bool is_loopsink = info_[phase_s]->resource->gpu_loopsink_flag(); + int64_t sinksize = ConfigManager::GetInstance().iter_num(); + ConfigManager::GetInstance().set_gpu_loopsink_size(is_loopsink ? sinksize : 1); + // If target is not gpu or is loopsink, keep vmloop 1. + bool g = (MsContext::GetInstance()->get_param(MS_CTX_DEVICE_TARGET) == kGPUDevice); + int64_t vm_loop = (!g || is_loopsink) ? 1 : sinksize; + MS_LOG(INFO) << "VM loop size " << vm_loop << ", loopsink size " << (is_loopsink ? sinksize : 1); + py::object ret; MS_LOG(DEBUG) << "Eval run" << backend; - BaseRef value = (*run)(execute_info->arg_list); + for (int64_t i = 0; i < vm_loop; i++) { + BaseRef value = (*run)(execute_info->arg_list); + ret = BaseRefToPyData(value); + } MS_LOG(DEBUG) << "Run end"; - return BaseRefToPyData(value); + return ret; } FuncGraphPtr ExecutorPy::BuildGraph(const py::dict &init_params, const std::string &phase, diff --git a/mindspore/ccsrc/pipeline/jit/resource.h b/mindspore/ccsrc/pipeline/jit/resource.h index 4a20758802..4e1cda1c0c 100644 --- a/mindspore/ccsrc/pipeline/jit/resource.h +++ b/mindspore/ccsrc/pipeline/jit/resource.h @@ -74,6 +74,9 @@ class Resource : public ResourceBase { const abstract::AbstractBasePtrList &args_spec() const { return args_spec_; } void set_args_spec(const abstract::AbstractBasePtrList &args_spec) { args_spec_ = args_spec; } + void set_gpu_loopsink_flag(const bool &flag) { gpu_loopsink_flag_ = flag; } + bool gpu_loopsink_flag() { return gpu_loopsink_flag_; } + // Reclaim resource and clear the cache. // ExecutorPy::Compile() can be called multiple times, so cache // should be cleared. @@ -85,6 +88,7 @@ class Resource : public ResourceBase { abstract::AbstractBasePtrList args_spec_; py::object input_; bool is_cleaned_; + bool gpu_loopsink_flag_{false}; }; using ResourcePtr = std::shared_ptr; diff --git a/mindspore/ccsrc/utils/config_manager.h b/mindspore/ccsrc/utils/config_manager.h index 1960797f6f..94d7c65f1a 100644 --- a/mindspore/ccsrc/utils/config_manager.h +++ b/mindspore/ccsrc/utils/config_manager.h @@ -106,6 +106,10 @@ class ConfigManager { std::map ge_initialize_options_; + int64_t gpu_loopsink_size() const { return gpu_loopsink_size_; } + + void set_gpu_loopsink_size(const int64_t size) { gpu_loopsink_size_ = size; } + private: ConfigManager() = default; ~ConfigManager() = default; @@ -115,6 +119,7 @@ class ConfigManager { DatasetGraphParam dataset_param_{"", 0, 0, {}, {}, {}}; int64_t iter_num_{1}; std::string dataset_phase_{""}; + int64_t gpu_loopsink_size_{1}; }; } // namespace mindspore diff --git a/mindspore/train/dataset_helper.py b/mindspore/train/dataset_helper.py index 153cf05cea..386836e5ac 100644 --- a/mindspore/train/dataset_helper.py +++ b/mindspore/train/dataset_helper.py @@ -45,7 +45,7 @@ def connect_network_with_dataset(network, dataset_helper): data channel corresponding to the 'queue_name' and passed to the input network during forward computation. Note: - In the case of running the network on Ascend in graph mode, this function will wrap the input network with + In the case of running the network on Ascend/GPU in graph mode, this function will wrap the input network with 'GetNext', in other cases, the input network will be returned with no change. The 'GetNext' is required to get data only in sink mode, so this function is not applicable to no-sink mode. @@ -88,8 +88,8 @@ def connect_network_with_dataset(network, dataset_helper): if isinstance(dataset_iter, _DatasetIterNormal): raise RuntimeError("Dataset should be connected with network only in sink mode.") - if not hasattr(dataset, '__ME_INITED__') and context.get_context("device_target") == "Ascend" and \ - not context.get_context("enable_ge"): + if not hasattr(dataset, '__ME_INITED__') and (context.get_context("device_target") == "Ascend" \ + or context.get_context("device_target") == "GPU") and not context.get_context("enable_ge"): dataset.__ME_INITED__ = True dataset_types, dataset_shapes = dataset_helper.types_shapes() queue_name = dataset.__TRANSFER_DATASET__.queue_name @@ -139,7 +139,7 @@ class DatasetHelper: if ms_role in ("MS_PSERVER", "MS_SCHED"): iterclass = _DatasetIterPSLite else: - iterclass = _DatasetIterMS + iterclass = _DatasetIterMSLoopSink elif context.get_context("device_target") == "CPU": raise RuntimeError("Currently dataset sink mode is not supported when the device target is CPU.") self.iter = iterclass(dataset, sink_size, epoch_num) @@ -218,7 +218,8 @@ class _DatasetIter: if hasattr(self.dataset, '__loop_size__'): sink_size = self.dataset.__loop_size__ else: - if context.get_context("enable_ge") or context.get_context("device_target") == "Ascend": + if context.get_context("enable_ge") or context.get_context("device_target") == "Ascend" \ + or context.get_context("device_target") == "GPU": if self.sink_size > 0: sink_size = self.sink_size else: