| @@ -348,7 +348,12 @@ void GPUSession::RunGraphImpl(const GraphId &graph_id, const std::vector<tensor: | |||
| InitPSParamAndOptim(kernel_graph, inputs); | |||
| #endif | |||
| MS_EXCEPTION_IF_NULL(kernel_graph); | |||
| Execute(kernel_graph); | |||
| // It's InitDataset graph if kernel_num == 1, skip the loop. | |||
| int kernel_num = kernel_graph->execution_order().size(); | |||
| int64_t loopsize = (kernel_num > 1) ? ConfigManager::GetInstance().gpu_loopsink_size() : 1; | |||
| for (int64_t i = 0; i < loopsize; i++) { | |||
| Execute(kernel_graph); | |||
| } | |||
| PostLoadTensor(kernel_graph); | |||
| // Summary | |||
| auto context_ptr = MsContext::GetInstance(); | |||
| @@ -526,7 +526,6 @@ static std::string PrintArgs(const py::tuple &args) { | |||
| bool ExecutorPy::Compile(const py::object &obj, const py::tuple &args, const py::object &phase, bool use_vm) { | |||
| bool ret_value = false; | |||
| try { | |||
| MS_LOG(DEBUG) << PrintArgs(args); | |||
| ret_value = CompileInner(obj, args, phase, use_vm); | |||
| @@ -567,7 +566,6 @@ bool ExecutorPy::Compile(const py::object &obj, const py::tuple &args, const py: | |||
| std::string exName(abi::__cxa_current_exception_type()->name()); | |||
| MS_LOG(EXCEPTION) << "Error occurred when compile graph. Exception name: " << exName; | |||
| } | |||
| return ret_value; | |||
| } | |||
| @@ -648,6 +646,17 @@ void Pipeline::Run() { | |||
| #endif | |||
| MS_LOG(DEBUG) << "Action " << action.first << " end."; | |||
| }; | |||
| if (action.first == "task_emit") { | |||
| auto func_graph = resource_->func_graph(); | |||
| if (func_graph != nullptr && func_graph->manager() != nullptr) { | |||
| auto manager = func_graph->manager(); | |||
| size_t graph_nums = manager->func_graphs().size(); | |||
| if (graph_nums == 1) { | |||
| resource_->set_gpu_loopsink_flag(true); | |||
| MS_LOG(INFO) << "Change gpu_loopsink_flag_ to true."; | |||
| } | |||
| } | |||
| } | |||
| if (!result) { | |||
| MS_LOG(EXCEPTION) << "Pipeline running to end, failed in step:" << action.first; | |||
| } | |||
| @@ -813,10 +822,22 @@ py::object ExecutorPy::Run(const py::tuple &args, const py::object &phase) { | |||
| if (run == nullptr) { | |||
| MS_LOG(EXCEPTION) << "Can't find run graph func for " << phase_s; | |||
| } | |||
| // Set loopsink size for each phase. | |||
| bool is_loopsink = info_[phase_s]->resource->gpu_loopsink_flag(); | |||
| int64_t sinksize = ConfigManager::GetInstance().iter_num(); | |||
| ConfigManager::GetInstance().set_gpu_loopsink_size(is_loopsink ? sinksize : 1); | |||
| // If target is not gpu or is loopsink, keep vmloop 1. | |||
| bool g = (MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET) == kGPUDevice); | |||
| int64_t vm_loop = (!g || is_loopsink) ? 1 : sinksize; | |||
| MS_LOG(INFO) << "VM loop size " << vm_loop << ", loopsink size " << (is_loopsink ? sinksize : 1); | |||
| py::object ret; | |||
| MS_LOG(DEBUG) << "Eval run" << backend; | |||
| BaseRef value = (*run)(execute_info->arg_list); | |||
| for (int64_t i = 0; i < vm_loop; i++) { | |||
| BaseRef value = (*run)(execute_info->arg_list); | |||
| ret = BaseRefToPyData(value); | |||
| } | |||
| MS_LOG(DEBUG) << "Run end"; | |||
| return BaseRefToPyData(value); | |||
| return ret; | |||
| } | |||
| FuncGraphPtr ExecutorPy::BuildGraph(const py::dict &init_params, const std::string &phase, | |||
| @@ -74,6 +74,9 @@ class Resource : public ResourceBase { | |||
| const abstract::AbstractBasePtrList &args_spec() const { return args_spec_; } | |||
| void set_args_spec(const abstract::AbstractBasePtrList &args_spec) { args_spec_ = args_spec; } | |||
| void set_gpu_loopsink_flag(const bool &flag) { gpu_loopsink_flag_ = flag; } | |||
| bool gpu_loopsink_flag() { return gpu_loopsink_flag_; } | |||
| // Reclaim resource and clear the cache. | |||
| // ExecutorPy::Compile() can be called multiple times, so cache | |||
| // should be cleared. | |||
| @@ -85,6 +88,7 @@ class Resource : public ResourceBase { | |||
| abstract::AbstractBasePtrList args_spec_; | |||
| py::object input_; | |||
| bool is_cleaned_; | |||
| bool gpu_loopsink_flag_{false}; | |||
| }; | |||
| using ResourcePtr = std::shared_ptr<pipeline::Resource>; | |||
| @@ -106,6 +106,10 @@ class ConfigManager { | |||
| std::map<std::string, std::string> ge_initialize_options_; | |||
| int64_t gpu_loopsink_size() const { return gpu_loopsink_size_; } | |||
| void set_gpu_loopsink_size(const int64_t size) { gpu_loopsink_size_ = size; } | |||
| private: | |||
| ConfigManager() = default; | |||
| ~ConfigManager() = default; | |||
| @@ -115,6 +119,7 @@ class ConfigManager { | |||
| DatasetGraphParam dataset_param_{"", 0, 0, {}, {}, {}}; | |||
| int64_t iter_num_{1}; | |||
| std::string dataset_phase_{""}; | |||
| int64_t gpu_loopsink_size_{1}; | |||
| }; | |||
| } // namespace mindspore | |||
| @@ -45,7 +45,7 @@ def connect_network_with_dataset(network, dataset_helper): | |||
| data channel corresponding to the 'queue_name' and passed to the input network during forward computation. | |||
| Note: | |||
| In the case of running the network on Ascend in graph mode, this function will wrap the input network with | |||
| In the case of running the network on Ascend/GPU in graph mode, this function will wrap the input network with | |||
| 'GetNext', in other cases, the input network will be returned with no change. | |||
| The 'GetNext' is required to get data only in sink mode, so this function is not applicable to no-sink mode. | |||
| @@ -88,8 +88,8 @@ def connect_network_with_dataset(network, dataset_helper): | |||
| if isinstance(dataset_iter, _DatasetIterNormal): | |||
| raise RuntimeError("Dataset should be connected with network only in sink mode.") | |||
| if not hasattr(dataset, '__ME_INITED__') and context.get_context("device_target") == "Ascend" and \ | |||
| not context.get_context("enable_ge"): | |||
| if not hasattr(dataset, '__ME_INITED__') and (context.get_context("device_target") == "Ascend" \ | |||
| or context.get_context("device_target") == "GPU") and not context.get_context("enable_ge"): | |||
| dataset.__ME_INITED__ = True | |||
| dataset_types, dataset_shapes = dataset_helper.types_shapes() | |||
| queue_name = dataset.__TRANSFER_DATASET__.queue_name | |||
| @@ -139,7 +139,7 @@ class DatasetHelper: | |||
| if ms_role in ("MS_PSERVER", "MS_SCHED"): | |||
| iterclass = _DatasetIterPSLite | |||
| else: | |||
| iterclass = _DatasetIterMS | |||
| iterclass = _DatasetIterMSLoopSink | |||
| elif context.get_context("device_target") == "CPU": | |||
| raise RuntimeError("Currently dataset sink mode is not supported when the device target is CPU.") | |||
| self.iter = iterclass(dataset, sink_size, epoch_num) | |||
| @@ -218,7 +218,8 @@ class _DatasetIter: | |||
| if hasattr(self.dataset, '__loop_size__'): | |||
| sink_size = self.dataset.__loop_size__ | |||
| else: | |||
| if context.get_context("enable_ge") or context.get_context("device_target") == "Ascend": | |||
| if context.get_context("enable_ge") or context.get_context("device_target") == "Ascend" \ | |||
| or context.get_context("device_target") == "GPU": | |||
| if self.sink_size > 0: | |||
| sink_size = self.sink_size | |||
| else: | |||