From: @xiaoda_zh Reviewed-by: Signed-off-by:tags/v1.1.0
| @@ -43,6 +43,7 @@ int64_t RUN_PHASE = DEFAULT_RUN_PHASE; | |||
| bool TRIANGLE_STAR_STRATEGY_OVERWRITE = DEFAULT_TRIANGLE_STAR_STRATEGY_OVERWRITE; | |||
| bool DP_ALGO_ENABLE_APPROX = DEFAULT_DP_ALGO_ENABLE_APPROX; | |||
| double DP_ALGO_APPROX_EPSILON = DEFAULT_DP_ALGO_APPROX_EPSILON; | |||
| bool DP_ALGO_SINGLE_LOOP = DEFAULT_DP_ALGO_SINGLE_LOOP; | |||
| void CostGraph::SetDeviceMemoryAndCostParameter() { | |||
| MS_EXCEPTION_IF_NULL(CostModelContext::GetInstance()); | |||
| @@ -187,6 +188,14 @@ void CostGraph::SetDeviceMemoryAndCostParameter() { | |||
| } | |||
| DP_ALGO_APPROX_EPSILON = epsilon; | |||
| MS_LOG(INFO) << "epsilon: " << epsilon << "."; | |||
| auto single_loop = CostModelContext::GetInstance()->dp_algo_single_loop(); | |||
| DP_ALGO_SINGLE_LOOP = single_loop; | |||
| if (single_loop) { | |||
| MS_LOG(INFO) << "dp_algo_single_loop: true."; | |||
| } else { | |||
| MS_LOG(INFO) << "dp_algo_single_loop: false."; | |||
| } | |||
| } | |||
| void CostGraph::RemoveOperator(const OperatorInfoPtr &op) { | |||
| @@ -49,6 +49,7 @@ extern bool DP_ALGO_ENABLE_APPROX; | |||
| extern double DP_ALGO_APPROX_EPSILON; | |||
| extern int64_t RUN_PHASE; | |||
| extern bool TRIANGLE_STAR_STRATEGY_OVERWRITE; | |||
| extern bool DP_ALGO_SINGLE_LOOP; | |||
| class CostGraph { | |||
| // 'CostGraph' consists of Operators and edges between them. An edge is created between two Operators if they have | |||
| @@ -56,6 +56,7 @@ void CostModelContext::ResetCostModel() { | |||
| costmodel_allreduce_fusion_allreduce_bandwidth_ = DEFAULT_COST_MODEL_ALLREDUCE_FUSION_ALLREDUCE_BANDWIDTH; | |||
| costmodel_allreduce_fusion_computation_time_parameter_ = | |||
| DEFAULT_COST_MODEL_ALLREDUCE_FUSION_COMPUTATION_TIME_PARAMETER; | |||
| dp_algo_single_loop_ = DEFAULT_DP_ALGO_SINGLE_LOOP; | |||
| } | |||
| void CostModelContext::ResetAlgoParameters() { | |||
| @@ -146,6 +147,8 @@ void CostModelContext::set_triangle_star_strategy_overwrite(bool overwrite) { | |||
| void CostModelContext::set_run_phase(int64_t phase) { run_phase_ = phase; } | |||
| void CostModelContext::set_dp_algo_single_loop(bool single_loop) { dp_algo_single_loop_ = single_loop; } | |||
| struct CostRegister { | |||
| CostRegister() { | |||
| MsContext::device_seter([](const std::string &device_target) { | |||
| @@ -47,6 +47,7 @@ namespace parallel { | |||
| #define DEFAULT_TRIANGLE_STAR_STRATEGY_OVERWRITE true; | |||
| #define DEFAULT_DP_ALGO_ENABLE_APPROX false | |||
| #define DEFAULT_DP_ALGO_APPROX_EPSILON 0.1 | |||
| #define DEFAULT_DP_ALGO_SINGLE_LOOP true | |||
| class CostModelContext { | |||
| public: | |||
| @@ -149,6 +150,9 @@ class CostModelContext { | |||
| void set_dp_algo_enable_approxi(bool); | |||
| bool dp_algo_enable_approxi() const { return dp_algo_enable_approxi_; } | |||
| void set_dp_algo_single_loop(bool); | |||
| bool dp_algo_single_loop() const { return dp_algo_single_loop_; } | |||
| private: | |||
| CostModelContext(); | |||
| static std::shared_ptr<CostModelContext> cm_context_inst_; | |||
| @@ -190,6 +194,9 @@ class CostModelContext { | |||
| // When APPROXIMATION is enabled in the DP algorithm, the 'epsilon' value used in the APPROXIMATION. | |||
| double dp_algo_approxi_epsilon_; | |||
| // Whether to generate a single suite of OperatorInfo for a loop. | |||
| bool dp_algo_single_loop_; | |||
| int64_t run_phase_; // 0: 'training', 1: 'inference' | |||
| int64_t costmodel_allreduce_fusion_algorithm_; | |||
| @@ -14,12 +14,14 @@ | |||
| * limitations under the License. | |||
| */ | |||
| #include <regex> | |||
| #include "frontend/parallel/graph_util/graph_info.h" | |||
| #include "debug/anf_ir_dump.h" | |||
| #include "debug/anf_ir_utils.h" | |||
| #include "debug/draw.h" | |||
| #include "utils/ms_context.h" | |||
| #include "ir/graph_utils.h" | |||
| #include "pipeline/jit/pipeline.h" | |||
| namespace mindspore { | |||
| namespace parallel { | |||
| @@ -50,5 +52,26 @@ void DumpGraph(const FuncGraphPtr &root, const std::string &name) { | |||
| ExportIR(name + ".dat", "0", root); | |||
| } | |||
| } | |||
| // Return true if the cnode is in a for-loop and loop_index indicates the i-th loop; | |||
| // otherwise return false | |||
| bool GetLoopIndexFromCNode(const CNodePtr &cnode, size_t *loop_index) { | |||
| std::regex pattern(CELLLIST_KEYWORD_PATTERN); | |||
| std::smatch result; | |||
| const auto &cnode_fullname = cnode->fullname_with_scope(); | |||
| if (std::regex_search(cnode_fullname, result, pattern)) { | |||
| if (result.length() < 2) { | |||
| MS_LOG(EXCEPTION) << "Wrong format of fullname_with_scope: " << cnode_fullname; | |||
| } | |||
| *loop_index = std::stoi(result[1]); | |||
| return true; | |||
| } | |||
| return false; | |||
| } | |||
| void SetOpsNumToExecutor(size_t num_ops) { | |||
| auto executor = pipeline::ExecutorPy::GetInstance(); | |||
| executor->SetNumOpsInfo(num_ops); | |||
| } | |||
| } // namespace parallel | |||
| } // namespace mindspore | |||
| @@ -26,6 +26,8 @@ namespace mindspore { | |||
| namespace parallel { | |||
| std::vector<PrimitivePtr> FindPrimtive(const FuncGraphPtr &graph, const std::string &name); | |||
| void DumpGraph(const FuncGraphPtr &root, const std::string &name); | |||
| bool GetLoopIndexFromCNode(const CNodePtr &cnode, size_t *loop_index); | |||
| void SetOpsNumToExecutor(size_t); | |||
| } // namespace parallel | |||
| } // namespace mindspore | |||
| @@ -149,6 +149,7 @@ constexpr char FIELD_SIZE[] = "field_size"; | |||
| constexpr char OPTIMIZER_SUB_STRING[] = "optimizer"; | |||
| constexpr char DEVICE[] = "Device"; | |||
| constexpr char PARALLEL_OPTIMIZER_ALLGATHER[] = "parallel_optimizer_allgather"; | |||
| constexpr char CELLLIST_KEYWORD_PATTERN[] = "-CellList/(\\d+)-"; | |||
| // Operator | |||
| constexpr char VIRTUAL_DIV[] = "_VirtualDiv"; | |||
| @@ -38,6 +38,7 @@ | |||
| #include "frontend/parallel/auto_parallel/rec_core/rec_partition.h" | |||
| #include "frontend/parallel/context.h" | |||
| #include "frontend/parallel/graph_util/node_info.h" | |||
| #include "frontend/parallel/graph_util/graph_info.h" | |||
| #include "frontend/parallel/ops_info/reshape_info.h" | |||
| #include "frontend/parallel/ops_info/tmp_identity_info.h" | |||
| #include "frontend/parallel/step_parallel.h" | |||
| @@ -346,6 +347,39 @@ bool IsAutoParallelCareNode(const CNodePtr &cnode) { | |||
| return IsParallelCareNode(cnode) && IsSplittableOperator(prim->name()); | |||
| } | |||
| // Recording the operators appearing in a for-loop. | |||
| // Currently, we assume that the operators in different for-loops are identical, and their traversal | |||
| // orderings are also identical. | |||
| // Therefore, we create OperatorInfo objects for the operators in a loop (say, loop-3), and reuse them in | |||
| // the rest of loops (loop-2, loop-1 and loop-0) | |||
| std::set<std::string> ops_in_a_loop_; | |||
| // Whether two operators are in different loops; if it is true, then return true. | |||
| // If at least one of the two operators is not in the loop, then return false. | |||
| // If two operators are in the same loop, the return false. | |||
| bool IsOperatorsInTwoSeparateLoops(const CNodePtr &a_cnode, const CNodePtr &b_cnode) { | |||
| auto a_op_info = a_cnode->user_data<OperatorInfo>(); | |||
| MS_EXCEPTION_IF_NULL(a_op_info); | |||
| auto b_op_info = b_cnode->user_data<OperatorInfo>(); | |||
| MS_EXCEPTION_IF_NULL(b_op_info); | |||
| if ((ops_in_a_loop_.find(a_op_info->name()) == ops_in_a_loop_.end()) || | |||
| (ops_in_a_loop_.find(b_op_info->name()) == ops_in_a_loop_.end())) { | |||
| return false; | |||
| } | |||
| size_t a_loop_index = 0, b_loop_index = 0; | |||
| const auto &a_fullname = a_cnode->fullname_with_scope(); | |||
| if (!GetLoopIndexFromCNode(a_cnode, &a_loop_index)) { | |||
| MS_LOG(EXCEPTION) << "The operator with fullname_with_scope: " << a_fullname << " was not included in the set."; | |||
| } | |||
| const auto &b_fullname = b_cnode->fullname_with_scope(); | |||
| if (!GetLoopIndexFromCNode(b_cnode, &b_loop_index)) { | |||
| MS_LOG(EXCEPTION) << "The operator with fullname_with_scope: " << b_fullname << " was not included in the set."; | |||
| } | |||
| if (a_loop_index == b_loop_index) { | |||
| return false; | |||
| } | |||
| return true; | |||
| } | |||
| OperatorInfoPtr CreateTheOperatorInfo(const PrimitivePtr &prim, const CNodePtr &cnode, StrategyMap *stra_map) { | |||
| MS_EXCEPTION_IF_NULL(prim); | |||
| MS_EXCEPTION_IF_NULL(cnode); | |||
| @@ -460,6 +494,10 @@ Status ConstructCostGraphNodesByUniqueId(const std::vector<AnfNodePtr> &all_node | |||
| entire_costgraph->SetDeviceMemoryAndCostParameter(); | |||
| // The map from CNode's UniqueId to its operatorInfo | |||
| std::map<std::string, OperatorInfoPtr> from_cnode_to_info; | |||
| // The operator_infos in a loop | |||
| std::vector<OperatorInfoPtr> operators_in_forloop; | |||
| // Key: i-th loop; Value: index of 'operators_in_forloop' | |||
| std::map<size_t, size_t> loop_to_ops; | |||
| // extract strategy from checkpoint for multi-train | |||
| StrategyMap stra_map; | |||
| if (StrategyCheckpoint::GetInstance().LoadCheckPointOn()) { | |||
| @@ -491,6 +529,27 @@ Status ConstructCostGraphNodesByUniqueId(const std::vector<AnfNodePtr> &all_node | |||
| auto search_cnode = from_cnode_to_info.find(cnode->UniqueId()); | |||
| if (search_cnode == from_cnode_to_info.end()) { | |||
| size_t loop_index = 0; | |||
| bool is_in_loop = GetLoopIndexFromCNode(cnode, &loop_index); | |||
| if (DP_ALGO_SINGLE_LOOP && is_in_loop && (loop_to_ops[loop_index] < operators_in_forloop.size())) { | |||
| const auto ¤t_op_ptr = operators_in_forloop[loop_to_ops[loop_index]]; | |||
| bool is_find_wrong = (current_op_ptr->name().find(VIRTUAL_DATA_SET_INFO) == std::string::npos) && | |||
| (current_op_ptr->name().find(BATCH_PARALLEL) == std::string::npos) && | |||
| (current_op_ptr->name().find(prim->name()) == std::string::npos); | |||
| if (is_find_wrong) { | |||
| MS_LOG(EXCEPTION) << "The OperatorInfo: " << current_op_ptr->name() | |||
| << " does not match the Prim: " << prim->name() | |||
| << ". The fullname_with_scope: " << cnode->fullname_with_scope(); | |||
| } | |||
| loop_to_ops[loop_index]++; | |||
| cnode->set_user_data<OperatorInfo>(current_op_ptr); | |||
| MS_LOG(INFO) << "The CNode with UniqueId: " << cnode->UniqueId() | |||
| << " and UniqueIdThroughCopy: " << cnode->UniqueIdThroughCopy() | |||
| << ", CNode fullname_with_scope: " << cnode->fullname_with_scope() | |||
| << " is set OperatorInfo: " << current_op_ptr->name() << ", Primitive: " << prim->name(); | |||
| (void)from_cnode_to_info.emplace(std::make_pair(cnode->UniqueId(), current_op_ptr)); | |||
| continue; | |||
| } | |||
| auto operator_info = CreateTheOperatorInfo(prim, cnode, &stra_map); | |||
| if (operator_info == nullptr) { | |||
| return FAILED; | |||
| @@ -503,8 +562,14 @@ Status ConstructCostGraphNodesByUniqueId(const std::vector<AnfNodePtr> &all_node | |||
| cnode->set_user_data<OperatorInfo>(operator_info); | |||
| MS_LOG(INFO) << "The CNode with UniqueId: " << cnode->UniqueId() | |||
| << " and UniqueIdThroughCopy: " << cnode->UniqueIdThroughCopy() | |||
| << ", CNode fullname_with_scope: " << cnode->fullname_with_scope() | |||
| << " is set OperatorInfo: " << operator_info->name() << ", Primitive: " << prim->name(); | |||
| (void)from_cnode_to_info.emplace(std::make_pair(cnode->UniqueIdThroughCopy(), operator_info)); | |||
| (void)from_cnode_to_info.emplace(std::make_pair(cnode->UniqueId(), operator_info)); | |||
| if (DP_ALGO_SINGLE_LOOP && is_in_loop) { | |||
| operators_in_forloop.push_back(operator_info); | |||
| ops_in_a_loop_.insert(operator_info->name()); | |||
| loop_to_ops[loop_index]++; | |||
| } | |||
| // Needed by rec_parser | |||
| entire_costgraph->add_inputs_tensor_name(inputs_tensor_name); | |||
| } else { | |||
| @@ -526,6 +591,10 @@ Status ConstructCostGraphNodesByUniqueIdTC(const std::vector<AnfNodePtr> &all_no | |||
| entire_costgraph->SetDeviceMemoryAndCostParameter(); | |||
| // The map from CNode's UniqueIdThroughCopy to its operatorInfo | |||
| std::map<std::string, OperatorInfoPtr> from_cnode_to_info; | |||
| // The operator_infos in a loop | |||
| std::vector<OperatorInfoPtr> operators_in_forloop; | |||
| // Key: i-th loop; Value: index of 'operators_in_forloop' | |||
| std::map<size_t, size_t> loop_to_ops; | |||
| // extract strategy from checkpoint for multi-train | |||
| StrategyMap stra_map; | |||
| if (StrategyCheckpoint::GetInstance().LoadCheckPointOn()) { | |||
| @@ -556,6 +625,27 @@ Status ConstructCostGraphNodesByUniqueIdTC(const std::vector<AnfNodePtr> &all_no | |||
| // Find the operatorInfo if it exists | |||
| auto search_cnode = from_cnode_to_info.find(cnode->UniqueIdThroughCopy()); | |||
| if (search_cnode == from_cnode_to_info.end()) { | |||
| size_t loop_index = 0; | |||
| bool is_in_loop = GetLoopIndexFromCNode(cnode, &loop_index); | |||
| if (DP_ALGO_SINGLE_LOOP && is_in_loop && (loop_to_ops[loop_index] < operators_in_forloop.size())) { | |||
| const auto ¤t_op_ptr = operators_in_forloop[loop_to_ops[loop_index]]; | |||
| bool is_find_wrong = (current_op_ptr->name().find(VIRTUAL_DATA_SET_INFO) == std::string::npos) && | |||
| (current_op_ptr->name().find(BATCH_PARALLEL) == std::string::npos) && | |||
| (current_op_ptr->name().find(prim->name()) == std::string::npos); | |||
| if (is_find_wrong) { | |||
| MS_LOG(EXCEPTION) << "The OperatorInfo: " << current_op_ptr->name() | |||
| << " does not match the Prim: " << prim->name() | |||
| << ". The fullname_with_scope: " << cnode->fullname_with_scope(); | |||
| } | |||
| loop_to_ops[loop_index]++; | |||
| cnode->set_user_data<OperatorInfo>(current_op_ptr); | |||
| MS_LOG(INFO) << "The CNode with UniqueId: " << cnode->UniqueId() | |||
| << " and UniqueIdThroughCopy: " << cnode->UniqueIdThroughCopy() | |||
| << ", CNode fullname_with_scope: " << cnode->fullname_with_scope() | |||
| << " is set OperatorInfo: " << current_op_ptr->name() << ", Primitive: " << prim->name(); | |||
| (void)from_cnode_to_info.emplace(std::make_pair(cnode->UniqueIdThroughCopy(), current_op_ptr)); | |||
| continue; | |||
| } | |||
| // In this case, the corresponding OperatorInfo is not created, create the new one. | |||
| auto operator_info = CreateTheOperatorInfo(prim, cnode, &stra_map); | |||
| if (operator_info == nullptr) { | |||
| @@ -569,8 +659,14 @@ Status ConstructCostGraphNodesByUniqueIdTC(const std::vector<AnfNodePtr> &all_no | |||
| cnode->set_user_data<OperatorInfo>(operator_info); | |||
| MS_LOG(INFO) << "The CNode with UniqueId: " << cnode->UniqueId() | |||
| << " and UniqueIdThroughCopy: " << cnode->UniqueIdThroughCopy() | |||
| << ", CNode fullname_with_scope: " << cnode->fullname_with_scope() | |||
| << " is set OperatorInfo: " << operator_info->name() << ", Primitive: " << prim->name(); | |||
| (void)from_cnode_to_info.emplace(std::make_pair(cnode->UniqueIdThroughCopy(), operator_info)); | |||
| if (DP_ALGO_SINGLE_LOOP && is_in_loop) { | |||
| operators_in_forloop.push_back(operator_info); | |||
| ops_in_a_loop_.insert(operator_info->name()); | |||
| loop_to_ops[loop_index]++; | |||
| } | |||
| // Needed by rec_parser | |||
| entire_costgraph->add_inputs_tensor_name(inputs_tensor_name); | |||
| } else { | |||
| @@ -642,7 +738,12 @@ void ConstructCostGraphEdges(const std::vector<AnfNodePtr> &all_nodes) { | |||
| } | |||
| EdgePtr edge_ptr; | |||
| MS_LOG(INFO) << "Creating edge: " << edge_name; | |||
| if (IsOperatorsInTwoSeparateLoops(prev_cnode, cnode)) { | |||
| MS_LOG(INFO) << "prev_cnode_fullname: " << prev_cnode->fullname_with_scope() | |||
| << ", cnode_fullname: " << cnode->fullname_with_scope(); | |||
| MS_LOG(INFO) << "The two operators in two separate for-loops, thus skip the edge."; | |||
| break; | |||
| } | |||
| bool follow_strategy = (prim->name() == RESHAPE) || (prev_prim->name() == RESHAPE) || | |||
| (ELEMENTWISE_OP_STRA_FOLLOW && IsElementWiseOperator(prev_prim->name())); | |||
| if (follow_strategy) { | |||
| @@ -1044,8 +1145,11 @@ Status ParallelStrategySearch(const std::vector<AnfNodePtr> &all_nodes, const Fu | |||
| // Step 3: Augment the costgraph. | |||
| AugmentCostGraph(all_nodes); | |||
| MS_LOG(INFO) << "After the augmenting procedure, there are " << entire_costgraph->GetOperators().size() | |||
| << " operators, and " << entire_costgraph->GetNumEdges() << " edges."; | |||
| auto num_ops = entire_costgraph->GetOperators().size(); | |||
| SetOpsNumToExecutor(num_ops); | |||
| auto num_edges = entire_costgraph->GetNumEdges(); | |||
| MS_LOG(INFO) << "After the augmenting procedure, there are " << num_ops << " operators, and " << num_edges | |||
| << " edges."; | |||
| // Step 3.1: Calculate the memory usage | |||
| if (entire_costgraph->CalculateMemoryCost() != SUCCESS) { | |||
| @@ -1071,6 +1175,7 @@ Status ParallelStrategySearch(const std::vector<AnfNodePtr> &all_nodes, const Fu | |||
| MS_LOG(INFO) << op->name() << " : The strategy is:"; | |||
| PrintStrategy(s_strategy); | |||
| } | |||
| ops_in_a_loop_.clear(); | |||
| return SUCCESS; | |||
| } | |||
| @@ -82,6 +82,8 @@ PYBIND11_MODULE(_c_expression, m) { | |||
| "Get Parameter Tensor Layout Dictionary.") | |||
| .def("get_strategy", &ExecutorPy::GetCNodeStrategy, py::arg("phase") = py::str("train"), | |||
| "Get CNode Strategy Dictionary.") | |||
| .def("get_num_parallel_ops", &ExecutorPy::GetNumOpsInfo, py::arg("phase") = py::str("train"), | |||
| "Get the number of parallel operators.") | |||
| .def("get_allreduce_fusion", &ExecutorPy::GetAllreduceFusion, py::arg("phase") = py::str("train"), | |||
| "Get Allreduce Fusion Dictionary.") | |||
| .def("fetch_info_for_quant_export", &ExecutorPy::FetchInfoForQuantExport, py::arg("phase") = py::str("train"), | |||
| @@ -254,6 +256,10 @@ PYBIND11_MODULE(_c_expression, m) { | |||
| "Set the epsilon which is used in the approximation of DP algorithm.") | |||
| .def("get_dp_algo_approxi_epsilon", &CostModelContext::dp_algo_approxi_epsilon, | |||
| "Get the epsilon which is used in the approximation of DP algorithm.") | |||
| .def("set_dp_algo_single_loop", &CostModelContext::set_dp_algo_single_loop, | |||
| "Set the flag of generating a single suite of OperatorInfos in for-loop.") | |||
| .def("get_dp_algo_single_loop", &CostModelContext::dp_algo_single_loop, | |||
| "Get the flag of whether or not generating a single suite of OperatorInfos in for-loop.") | |||
| .def("reset_cost_model", &CostModelContext::ResetCostModel, "Reset the CostModelContext.") | |||
| .def("reset_algo_parameters", &CostModelContext::ResetAlgoParameters, "Reset the AlgoParameters."); | |||
| @@ -252,6 +252,16 @@ void ExecutorPy::SetCNodeStrategy(const std::string &name, const parallel::Strat | |||
| stra_dict_[phase_][py::str(name)] = strategy; | |||
| } | |||
| size_t ExecutorPy::GetNumOpsInfo(const std::string &phase) { | |||
| MS_LOG(DEBUG) << "GetNumOpsInfo!"; | |||
| return phase_to_num_op_info_[phase]; | |||
| } | |||
| void ExecutorPy::SetNumOpsInfo(size_t num_ops) { | |||
| MS_LOG(DEBUG) << "SetNumOpsInfo!"; | |||
| phase_to_num_op_info_[phase_] = num_ops; | |||
| } | |||
| py::dict ExecutorPy::GetAllreduceFusion(const std::string &phase) { | |||
| MS_LOG(INFO) << "GetAllreduceFusion!"; | |||
| auto graph = GetFuncGraph(phase); | |||
| @@ -93,6 +93,8 @@ class ExecutorPy : public std::enable_shared_from_this<ExecutorPy> { | |||
| py::dict GetParameterLayout(const std::string &phase); | |||
| py::dict GetCNodeStrategy(const std::string &phase); | |||
| void SetCNodeStrategy(const std::string &name, const parallel::Strategys &strategy); | |||
| size_t GetNumOpsInfo(const std::string &phase); | |||
| void SetNumOpsInfo(size_t); | |||
| py::dict GetAllreduceFusion(const std::string &phase); | |||
| void DelNetRes(const std::string &id); | |||
| void ReleaseResource(const py::object &phase); | |||
| @@ -117,6 +119,7 @@ class ExecutorPy : public std::enable_shared_from_this<ExecutorPy> { | |||
| static bool debugger_terminate_; | |||
| std::map<std::string, py::dict> stra_dict_; | |||
| std::string phase_ = ""; | |||
| std::map<std::string, size_t> phase_to_num_op_info_; | |||
| }; | |||
| using ExecutorPyPtr = std::shared_ptr<ExecutorPy>; | |||
| @@ -455,6 +455,10 @@ class _Executor: | |||
| real_phase = self.phase_prefix + obj.phase + '.' + str(obj.create_time) | |||
| return self._executor.get_strategy(real_phase) | |||
| def _get_num_parallel_ops(self, obj): | |||
| real_phase = self.phase_prefix + obj.phase + '.' + str(obj.create_time) | |||
| return self._executor.get_num_parallel_ops(real_phase) | |||
| def _get_allreduce_fusion(self, obj): | |||
| real_phase = self.phase_prefix + obj.phase + '.' + str(obj.create_time) | |||
| return self._executor.get_allreduce_fusion(real_phase) | |||
| @@ -266,6 +266,31 @@ class _CostModelContext: | |||
| raise ValueError("Context handle is none in context!!!") | |||
| return self._context_handle.get_run_phase() | |||
| def set_dp_algo_single_loop(self, single_loop): | |||
| """ | |||
| Set the flag of generating a single suite of OperatorInfos in for-loop. | |||
| Args: | |||
| single_loop (bool): The parameter for the single loop flag. | |||
| Raises: | |||
| ValueError: If context handle is none. | |||
| """ | |||
| if self._context_handle is None: | |||
| raise ValueError("Context handle is none in context!!!") | |||
| self._context_handle.set_dp_algo_single_loop(single_loop) | |||
| def get_dp_algo_single_loop(self): | |||
| """ | |||
| Get the flag of whether or not generating a single suite of OperatorInfos in for-loop. | |||
| Raises: | |||
| ValueError: If context handle is none. | |||
| """ | |||
| if self._context_handle is None: | |||
| raise ValueError("Context handle is none in context!!!") | |||
| return self._context_handle.get_dp_algo_single_loop() | |||
| def set_costmodel_allreduce_fusion_algorithm(self, algorithm): | |||
| """ | |||
| Set costmodel allreduce fusion algorithm. | |||
| @@ -602,4 +627,19 @@ def _get_multi_subgraphs(): | |||
| """ | |||
| Get the flag of ANF graph containing multiple subgraphs. | |||
| """ | |||
| cost_model_context().get_multi_subgraphs() | |||
| return cost_model_context().get_multi_subgraphs() | |||
| def _set_algo_single_loop(single_loop=True): | |||
| """ | |||
| Set the flag of generating a single suite of OperatorInfos in for-loop. | |||
| Args: | |||
| single_loop (bool): The parameter for the single loop flag. | |||
| """ | |||
| cost_model_context().set_dp_algo_single_loop(single_loop) | |||
| def _get_algo_single_loop(): | |||
| """ | |||
| Get the flag of whether or not generating a single suite of OperatorInfos in for-loop. | |||
| """ | |||
| return cost_model_context().get_dp_algo_single_loop() | |||
| @@ -78,8 +78,8 @@ def test_auto_parallel_arithmetic(): | |||
| b = Tensor(np.ones([64, 128]), dtype=ms.float32) | |||
| compile_net(net, x, y, b, phase='train') | |||
| strategies = _executor._get_shard_strategy(net) | |||
| expected_strategies = {'Default/network-Net/FloorDiv-op1': [[2, 4], [2, 4]], | |||
| 'Default/network-Net/MatMul-op0': [[2, 1], [1, 4]]} | |||
| expected_strategies = {'Default/network-Net/FloorDiv-op0': [[2, 4], [2, 4]], | |||
| 'Default/network-Net/MatMul-op1': [[2, 1], [1, 4]]} | |||
| assert strategies == expected_strategies | |||
| @@ -105,8 +105,8 @@ def test_auto_parallel_arithmetic_broadcast_both(): | |||
| b = Tensor(np.ones([1, 64]), dtype=ms.float32) | |||
| compile_net(net, x, y, b, phase='train') | |||
| strategies = _executor._get_shard_strategy(net) | |||
| expected_strategies = {'Default/network-Net/FloorDiv-op1': [[8, 1], [1, 1]], | |||
| 'Default/network-Net/MatMul-op0': [[8, 1], [1, 1]]} | |||
| expected_strategies = {'Default/network-Net/FloorDiv-op0': [[8, 1], [1, 1]], | |||
| 'Default/network-Net/MatMul-op1': [[8, 1], [1, 1]]} | |||
| assert strategies == expected_strategies | |||
| @@ -132,8 +132,8 @@ def test_auto_parallel_arithmetic_broadcast_right(): | |||
| b = Tensor(np.ones([32]), dtype=ms.float32) | |||
| compile_net(net, x, y, b, phase='train') | |||
| strategies = _executor._get_shard_strategy(net) | |||
| expected_strategies = {'Default/network-Net/FloorDiv-op1': [[4, 2], [2]], | |||
| 'Default/network-Net/MatMul-op0': [[4, 1], [1, 2]]} | |||
| expected_strategies = {'Default/network-Net/FloorDiv-op0': [[4, 2], [2]], | |||
| 'Default/network-Net/MatMul-op1': [[4, 1], [1, 2]]} | |||
| assert strategies == expected_strategies | |||
| @@ -159,6 +159,6 @@ def test_auto_parallel_arithmetic_broadcast_left(): | |||
| b = Tensor(np.ones([128, 64, 32]), dtype=ms.float32) | |||
| compile_net(net, x, y, b, phase="train") | |||
| strategies = _executor._get_shard_strategy(net) | |||
| expected_strategies = {'Default/network-Net/FloorDiv-op1': [[4, 2], [1, 4, 2]], | |||
| 'Default/network-Net/MatMul-op0': [[4, 1], [1, 2]]} | |||
| expected_strategies = {'Default/network-Net/FloorDiv-op0': [[4, 2], [1, 4, 2]], | |||
| 'Default/network-Net/MatMul-op1': [[4, 1], [1, 2]]} | |||
| assert strategies == expected_strategies | |||
| @@ -84,9 +84,9 @@ def test_double_star_graph(): | |||
| net.set_train() | |||
| _executor.compile(net, x, y, z, w, phase='train') | |||
| strategies = _executor._get_shard_strategy(net) | |||
| expected_strategies = {'Default/network-Net/Cast-op0': [[8, 1]], | |||
| 'Default/network-Net/Cast-op1': [[1, 8]], | |||
| 'Default/network-Net/MatMul-op3': [[8, 1], [1, 1]], | |||
| 'Default/network-Net/MatMul-op2': [[1, 1], [1, 8]], | |||
| 'Default/network-Net/MatMul-op4': [[1, 8], [8, 1]]} | |||
| expected_strategies = {'Default/network-Net/Cast-op1': [[8, 1]], | |||
| 'Default/network-Net/Cast-op3': [[1, 8]], | |||
| 'Default/network-Net/MatMul-op2': [[8, 1], [1, 1]], | |||
| 'Default/network-Net/MatMul-op4': [[1, 1], [1, 8]], | |||
| 'Default/network-Net/MatMul-op0': [[1, 8], [8, 1]]} | |||
| assert strategies == expected_strategies | |||
| @@ -0,0 +1,129 @@ | |||
| # Copyright 2020 Huawei Technologies Co., Ltd | |||
| # | |||
| # Licensed under the Apache License, Version 2.0 (the "License"); | |||
| # you may not use this file except in compliance with the License. | |||
| # You may obtain a copy of the License at | |||
| # | |||
| # http://www.apache.org/licenses/LICENSE-2.0 | |||
| # | |||
| # Unless required by applicable law or agreed to in writing, software | |||
| # distributed under the License is distributed on an "AS IS" BASIS, | |||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| # See the License for the specific language governing permissions and | |||
| # limitations under the License. | |||
| # ============================================================================ | |||
| import numpy as np | |||
| import mindspore as ms | |||
| from mindspore import context, Tensor, Parameter | |||
| from mindspore.nn import Cell | |||
| import mindspore.nn as nn | |||
| from mindspore.ops import operations as P, functional as F | |||
| from mindspore.common.initializer import initializer | |||
| import mindspore.common.dtype as mstype | |||
| from mindspore.common.api import _executor | |||
| from tests.dataset_mock import MindData | |||
| class Dataset(MindData): | |||
| def __init__(self, predict, label, length=3): | |||
| super(Dataset, self).__init__(size=length) | |||
| self.predict = predict | |||
| self.label = label | |||
| self.index = 0 | |||
| self.length = length | |||
| def __iter__(self): | |||
| return self | |||
| def __next__(self): | |||
| if self.index >= self.length: | |||
| raise StopIteration | |||
| self.index += 1 | |||
| return self.predict, self.label | |||
| def reset(self): | |||
| self.index = 0 | |||
| class LayerNorm(nn.Cell): | |||
| def __init__(self, normalized_shape, eps=1e-5): | |||
| super(LayerNorm, self).__init__() | |||
| self.gamma = Parameter(initializer('ones', normalized_shape), name="gamma") | |||
| self.beta = Parameter(initializer('zeros', normalized_shape), name="beta") | |||
| self.mean = P.ReduceMean(keep_dims=True) | |||
| self.eps = eps | |||
| self.sub = P.Sub() | |||
| self.add = P.TensorAdd() | |||
| self.mul = P.Mul() | |||
| self.div = P.RealDiv() | |||
| def construct(self, x): | |||
| mean = self.mean(x, -1) | |||
| variance = self.mean(F.square(self.sub(x, mean))) | |||
| output = self.div(self.sub(x, mean), F.sqrt(self.add(variance, self.eps))) | |||
| rescaled_output = self.add(self.mul(output, self.gamma), self.beta) | |||
| return rescaled_output | |||
| class SubNet(Cell): | |||
| def __init__(self, index): | |||
| super().__init__() | |||
| self.matmul = P.MatMul() | |||
| self.relu = P.ReLU() | |||
| self.weight = Parameter(Tensor(np.ones([128, 128]), dtype=ms.float32), "matmul_w"+str(index)) | |||
| self.layernorm1 = LayerNorm((128,)).to_float(mstype.float32) | |||
| def construct(self, x): | |||
| x = self.layernorm1(x) | |||
| out = self.matmul(x, self.weight) | |||
| out = self.relu(out) | |||
| return out | |||
| class Net(Cell): | |||
| def __init__(self, mul_weight, num_layers, strategy1=None, strategy2=None): | |||
| super().__init__() | |||
| self.mul = P.Mul().shard(strategy1) | |||
| self.neg = P.Neg().shard(strategy2) | |||
| self.mul_weight = Parameter(mul_weight, "w1") | |||
| self.num_layers = num_layers | |||
| self.layers = nn.CellList() | |||
| for i in range(num_layers): | |||
| self.layers.append(SubNet(i)) | |||
| def construct(self, x): | |||
| for i in range(self.num_layers): | |||
| x = self.layers[i](x) | |||
| out = self.mul(x, self.mul_weight) | |||
| out = self.neg(out) | |||
| return out | |||
| class Full(Cell): | |||
| def __init__(self, mul_weight, num_layers, strategy1=None, strategy2=None): | |||
| super().__init__() | |||
| self.network = Net(mul_weight, num_layers, strategy1, strategy2) | |||
| self.relu = P.ReLU() | |||
| def construct(self, x): | |||
| out = self.network(x) | |||
| out = self.relu(out) | |||
| return out | |||
| _x = Tensor(np.ones([512, 128]), dtype=ms.float32) | |||
| _b = Tensor(np.ones([32]), dtype=ms.int32) | |||
| _w1 = Tensor(np.ones([512, 128]), dtype=ms.float32) | |||
| def test_auto_parallel(): | |||
| context.set_context(save_graphs=True) | |||
| context.set_auto_parallel_context(parallel_mode="auto_parallel", device_num=16, global_rank=0) | |||
| net = Full(_w1, 3) | |||
| net.set_auto_parallel() | |||
| net.set_train() | |||
| _executor.compile(net, _x, phase='train') | |||
| num_ops = _executor._get_num_parallel_ops(net) | |||
| expected_num = 16 | |||
| assert num_ops == expected_num | |||
| @@ -0,0 +1,136 @@ | |||
| # Copyright 2020 Huawei Technologies Co., Ltd | |||
| # | |||
| # Licensed under the Apache License, Version 2.0 (the "License"); | |||
| # you may not use this file except in compliance with the License. | |||
| # You may obtain a copy of the License at | |||
| # | |||
| # http://www.apache.org/licenses/LICENSE-2.0 | |||
| # | |||
| # Unless required by applicable law or agreed to in writing, software | |||
| # distributed under the License is distributed on an "AS IS" BASIS, | |||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| # See the License for the specific language governing permissions and | |||
| # limitations under the License. | |||
| # ============================================================================ | |||
| import numpy as np | |||
| import mindspore as ms | |||
| import mindspore.nn as nn | |||
| from mindspore import Tensor, Parameter, ParameterTuple | |||
| from mindspore import context | |||
| from mindspore.common.api import _executor | |||
| from mindspore.nn.optim import Adam, FTRL | |||
| from mindspore.ops import composite as C | |||
| from mindspore.ops import functional as F | |||
| from mindspore.ops import operations as P | |||
| from mindspore.parallel._cost_model_context import _set_multi_subgraphs | |||
| from mindspore.parallel._utils import _reset_op_id as reset_op_id | |||
| class SubNet(nn.Cell): | |||
| def __init__(self, index): | |||
| super().__init__() | |||
| self.matmul = P.BatchMatMul() | |||
| self.relu = P.ReLU() | |||
| self.weight = Parameter(Tensor(np.ones([8, 8, 8, 8]), dtype=ms.float32), "matmul_w"+str(index)) | |||
| def construct(self, x): | |||
| out = self.matmul(x, self.weight) | |||
| out = self.relu(out) | |||
| return out | |||
| class Net(nn.Cell): | |||
| def __init__(self): | |||
| super(Net, self).__init__() | |||
| self.mul = P.Mul() | |||
| self.relu = P.ReLU() | |||
| self.wd = Parameter(Tensor(np.ones([8, 8, 8, 8]).astype(np.float32)), name="wide") | |||
| self.wt = Parameter(Tensor(np.ones([8, 8, 8, 8]).astype(np.float32)), name="l") | |||
| self.layers = nn.CellList() | |||
| for i in range(3): | |||
| self.layers.append(SubNet(i)) | |||
| def construct(self, x): | |||
| for i in range(3): | |||
| x = self.layers[i](x) | |||
| out = self.mul(x, self.wd) | |||
| out = self.mul(out, self.wt) | |||
| out = self.relu(out) | |||
| return out | |||
| class NetWithLoss(nn.Cell): | |||
| def __init__(self, network): | |||
| super(NetWithLoss, self).__init__() | |||
| self.sum = P.ReduceSum() | |||
| self.mean = P.ReduceMean() | |||
| self.net = network | |||
| def construct(self, x): | |||
| predict = self.net(x) | |||
| loss1 = self.sum(predict, -1) | |||
| loss2 = self.mean(predict, -1) | |||
| return loss1, loss2 | |||
| class IthOutputCell(nn.Cell): | |||
| def __init__(self, network, output_index): | |||
| super(IthOutputCell, self).__init__() | |||
| self.network = network | |||
| self.output_index = output_index | |||
| def construct(self, x): | |||
| predict = self.network(x)[self.output_index] | |||
| return predict | |||
| class TrainStepWarp(nn.Cell): | |||
| def __init__(self, network, sens=1000.0): | |||
| super(TrainStepWarp, self).__init__() | |||
| self.network = network | |||
| self.network.set_train() | |||
| self.trainable_params = network.trainable_params() | |||
| weights_w = [] | |||
| weights_d = [] | |||
| for params in self.trainable_params: | |||
| weights_w.append(params) | |||
| weights_d.append(params) | |||
| self.weights_w = ParameterTuple(weights_w) | |||
| self.weights_d = ParameterTuple(weights_d) | |||
| self.optimizer_w = FTRL(learning_rate=1e-2, params=self.weights_w, l1=1e-8, | |||
| l2=1e-8, initial_accum=1.0) | |||
| self.optimizer_d = Adam(self.weights_d, learning_rate=3.5e-4, eps=1e-8, | |||
| loss_scale=sens) | |||
| self.hyper_map = C.HyperMap() | |||
| self.grad_w = C.GradOperation(get_by_list=True, sens_param=True) | |||
| self.grad_d = C.GradOperation(get_by_list=True, sens_param=True) | |||
| self.sens = sens | |||
| self.loss_net_w = IthOutputCell(network, output_index=0) | |||
| self.loss_net_d = IthOutputCell(network, output_index=1) | |||
| def construct(self, x): | |||
| weights_w = self.weights_w | |||
| weights_d = self.weights_d | |||
| loss_w, loss_d = self.network(x) | |||
| sens_w = P.Fill()(P.DType()(loss_w), P.Shape()(loss_w), self.sens) | |||
| sens_d = P.Fill()(P.DType()(loss_d), P.Shape()(loss_d), self.sens) | |||
| grads_w = self.grad_w(self.loss_net_w, weights_w)(x, sens_w) | |||
| grads_d = self.grad_d(self.loss_net_d, weights_d)(x, sens_d) | |||
| return F.depend(loss_w, self.optimizer_w(grads_w)), F.depend(loss_d, self.optimizer_d(grads_d)) | |||
| def test_double_subgraphs(): | |||
| context.set_context(save_graphs=True) | |||
| context.set_auto_parallel_context(parallel_mode="auto_parallel", device_num=8, global_rank=0) | |||
| net = TrainStepWarp(NetWithLoss(Net())) | |||
| _set_multi_subgraphs() | |||
| net.set_auto_parallel() | |||
| x = Tensor(np.ones([8, 8, 8, 8]), dtype=ms.float32) | |||
| reset_op_id() | |||
| net.set_train() | |||
| _executor.compile(net, x, phase='train') | |||
| num_ops = _executor._get_num_parallel_ops(net) | |||
| expected_num = 7 | |||
| assert expected_num == num_ops | |||
| @@ -0,0 +1,101 @@ | |||
| # Copyright 2020 Huawei Technologies Co., Ltd | |||
| # | |||
| # Licensed under the Apache License, Version 2.0 (the "License"); | |||
| # you may not use this file except in compliance with the License. | |||
| # You may obtain a copy of the License at | |||
| # | |||
| # http://www.apache.org/licenses/LICENSE-2.0 | |||
| # | |||
| # Unless required by applicable law or agreed to in writing, software | |||
| # distributed under the License is distributed on an "AS IS" BASIS, | |||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| # See the License for the specific language governing permissions and | |||
| # limitations under the License. | |||
| # ============================================================================ | |||
| import numpy as np | |||
| import mindspore as ms | |||
| from mindspore import context, Tensor, Parameter | |||
| from mindspore.nn import Cell, Momentum | |||
| from mindspore.nn.loss import SoftmaxCrossEntropyWithLogits | |||
| import mindspore.nn as nn | |||
| from mindspore.ops import operations as P | |||
| from mindspore.train import Model | |||
| from tests.dataset_mock import MindData | |||
| class Dataset(MindData): | |||
| def __init__(self, predict, label, length=3): | |||
| super(Dataset, self).__init__(size=length) | |||
| self.predict = predict | |||
| self.label = label | |||
| self.index = 0 | |||
| self.length = length | |||
| def __iter__(self): | |||
| return self | |||
| def __next__(self): | |||
| if self.index >= self.length: | |||
| raise StopIteration | |||
| self.index += 1 | |||
| return self.predict, self.label | |||
| def reset(self): | |||
| self.index = 0 | |||
| class SubNet(Cell): | |||
| def __init__(self, index): | |||
| super().__init__() | |||
| self.matmul = P.MatMul() | |||
| self.relu = P.ReLU() | |||
| self.weight = Parameter(Tensor(np.ones([128, 128]), dtype=ms.float32), "matmul_w"+str(index)) | |||
| def construct(self, x): | |||
| out = self.matmul(x, self.weight) | |||
| out = self.relu(out) | |||
| return out | |||
| class Net(Cell): | |||
| def __init__(self, mul_weight, num_layers, strategy1=None, strategy2=None): | |||
| super().__init__() | |||
| self.mul = P.Mul().shard(strategy1) | |||
| self.neg = P.Neg().shard(strategy2) | |||
| self.mul_weight = Parameter(mul_weight, "w1") | |||
| self.num_layers = num_layers | |||
| self.layers = nn.CellList() | |||
| for i in range(num_layers): | |||
| self.layers.append(SubNet(i)) | |||
| def construct(self, x): | |||
| for i in range(self.num_layers): | |||
| x = self.layers[i](x) | |||
| out = self.mul(x, self.mul_weight) | |||
| out = self.neg(out) | |||
| return out | |||
| _x = Tensor(np.ones([32, 128]), dtype=ms.float32) | |||
| _b = Tensor(np.ones([32]), dtype=ms.int32) | |||
| _w1 = Tensor(np.ones([512, 128]), dtype=ms.float32) | |||
| def compile_net(net): | |||
| context.set_context(save_graphs=True) | |||
| learning_rate = 0.1 | |||
| momentum = 0.9 | |||
| epoch_size = 2 | |||
| dataset = Dataset(_x, _b) | |||
| loss = SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') | |||
| opt = Momentum(net.trainable_params(), learning_rate, momentum) | |||
| model = Model(net, loss, optimizer=opt) | |||
| model.train(epoch_size, dataset, dataset_sink_mode=False) | |||
| context.reset_auto_parallel_context() | |||
| def test_auto_parallel(): | |||
| context.set_auto_parallel_context(parallel_mode="auto_parallel", device_num=16, global_rank=0) | |||
| net = Net(_w1, 3) | |||
| compile_net(net) | |||
| @@ -79,8 +79,8 @@ def test_two_matmul_transpose(): | |||
| net.set_train() | |||
| _executor.compile(net, x, y, b, phase='train') | |||
| strategies = _executor._get_shard_strategy(net) | |||
| expected_strategies = {'Default/network-Net/Transpose-op3': [[1, 16]], | |||
| 'Default/network-Net/Transpose-op2': [[16, 1]], | |||
| 'Default/network-Net/MatMul-op0': [[16, 1], [1, 1]], | |||
| 'Default/network-Net/MatMul-op1': [[16, 1], [1, 1]]} | |||
| expected_strategies = {'Default/network-Net/Transpose-op0': [[1, 16]], | |||
| 'Default/network-Net/Transpose-op1': [[16, 1]], | |||
| 'Default/network-Net/MatMul-op2': [[16, 1], [1, 1]], | |||
| 'Default/network-Net/MatMul-op3': [[16, 1], [1, 1]]} | |||
| assert strategies == expected_strategies | |||
| @@ -22,6 +22,7 @@ from mindspore.common.api import _executor | |||
| from mindspore.ops import composite as C | |||
| from mindspore.ops import operations as P | |||
| from mindspore.parallel import _cost_model_context as cost_model_context | |||
| from mindspore.parallel._cost_model_context import _set_algo_single_loop, _get_algo_single_loop | |||
| from mindspore.parallel import set_algo_parameters, get_algo_parameters, reset_algo_parameters | |||
| from mindspore.parallel._utils import _reset_op_id as reset_op_id | |||
| from tests.ut.python.ops.test_math_ops import VirtualLoss | |||
| @@ -120,6 +121,14 @@ def test_two_matmul(): | |||
| algo_epsilon = get_algo_parameters("algo_approxi_epsilon") | |||
| assert algo_epsilon == 0.001 | |||
| expecte_single_loop = True | |||
| signle_loop = _get_algo_single_loop() | |||
| assert expecte_single_loop == signle_loop | |||
| expecte_single_loop = False | |||
| _set_algo_single_loop(expecte_single_loop) | |||
| signle_loop = _get_algo_single_loop() | |||
| assert expecte_single_loop == signle_loop | |||
| reset_algo_parameters() | |||
| para_slice_align_enable = get_algo_parameters("tensor_slice_align_enable") | |||
| assert not para_slice_align_enable | |||