From adb6ff6c78eb3f132e8aa9f4b4f7642e3ea4fa97 Mon Sep 17 00:00:00 2001 From: gukecai Date: Mon, 3 Aug 2020 17:49:56 +0800 Subject: [PATCH] independent stream parallel --- .../backend/session/anf_runtime_algorithm.cc | 26 ++ .../backend/session/anf_runtime_algorithm.h | 1 + .../ccsrc/backend/session/session_basic.cc | 34 ++- .../device/ascend/ascend_stream_assign.cc | 258 +++++++++++++----- .../device/ascend/ascend_stream_assign.h | 11 +- .../runtime/device/ascend/dump/data_dumper.cc | 4 +- .../ccsrc/runtime/device/kernel_adjust.cc | 129 ++++++--- .../ccsrc/runtime/device/kernel_adjust.h | 18 +- .../ccsrc/runtime/device/kernel_runtime.cc | 8 + mindspore/ccsrc/utils/utils.h | 1 + 10 files changed, 369 insertions(+), 121 deletions(-) diff --git a/mindspore/ccsrc/backend/session/anf_runtime_algorithm.cc b/mindspore/ccsrc/backend/session/anf_runtime_algorithm.cc index 34471537db..a0b157eb06 100644 --- a/mindspore/ccsrc/backend/session/anf_runtime_algorithm.cc +++ b/mindspore/ccsrc/backend/session/anf_runtime_algorithm.cc @@ -1164,5 +1164,31 @@ bool AnfRuntimeAlgorithm::IsCondControlKernel(const CNodePtr &node) { auto input = node->input(kAnfPrimitiveIndex); return IsPrimitive(input, prim::kPrimLabelGoto) || IsPrimitive(input, prim::kPrimLabelSwitch); } + +bool AnfRuntimeAlgorithm::IsIndependentNode(const CNodePtr &node) { + MS_EXCEPTION_IF_NULL(node); + if (AnfAlgo::GetKernelType(node) != AICPU_KERNEL) { + return false; + } + + if (AnfAlgo::GetCNodeName(node) == kGetNextOpName) { + MS_LOG(INFO) << "GetNext should not be independent node"; + return false; + } + + uint32_t input_nums = AnfAlgo::GetInputTensorNum(node); + if (input_nums == 0) { + return true; + } + + auto inputs = node->inputs(); + for (size_t i = 1; i < inputs.size(); i++) { + if (!inputs[i]->isa()) { + return false; + } + } + return true; +} + } // namespace session } // namespace mindspore diff --git a/mindspore/ccsrc/backend/session/anf_runtime_algorithm.h b/mindspore/ccsrc/backend/session/anf_runtime_algorithm.h index c08819e2dc..b39f6c6b70 100644 --- a/mindspore/ccsrc/backend/session/anf_runtime_algorithm.h +++ b/mindspore/ccsrc/backend/session/anf_runtime_algorithm.h @@ -211,6 +211,7 @@ class AnfRuntimeAlgorithm { // get fix output precision from prev node, input_idx is the input index of current node related to prev node. static TypeId GetPrevNodeOutputPrecision(const AnfNodePtr &node, size_t input_idx); static bool IsCondControlKernel(const CNodePtr &node); + static bool IsIndependentNode(const CNodePtr &node); }; } // namespace session using AnfAlgo = session::AnfRuntimeAlgorithm; diff --git a/mindspore/ccsrc/backend/session/session_basic.cc b/mindspore/ccsrc/backend/session/session_basic.cc index b8e2e40df3..fe55a264fa 100644 --- a/mindspore/ccsrc/backend/session/session_basic.cc +++ b/mindspore/ccsrc/backend/session/session_basic.cc @@ -178,20 +178,32 @@ size_t LoadCtrlInputTensor(const std::shared_ptr &graph, std::vecto if (inputs_params == nullptr) { return 0; } - if (inputs_params->size() < 2) { + if (inputs_params->size() < 3) { MS_LOG(EXCEPTION) << "Illegal inputs_params size"; } - auto tensor = (*inputs_params)[0]; - MS_EXCEPTION_IF_NULL(tensor); - auto *val = static_cast(tensor->data_c()); - MS_EXCEPTION_IF_NULL(val); - *val = 0; - tensor->set_dirty(true); + // update current loop tensor to 0 per iterator + auto cur_loop_tensor = (*inputs_params)[0]; + MS_EXCEPTION_IF_NULL(cur_loop_tensor); + auto *cur_val = static_cast(cur_loop_tensor->data_c()); + MS_EXCEPTION_IF_NULL(cur_val); + *cur_val = 0; + cur_loop_tensor->set_dirty(true); // set loop_count to zero MS_EXCEPTION_IF_NULL(inputs); - inputs->push_back(tensor); + inputs->push_back(cur_loop_tensor); - auto epoch_tensor = (*inputs_params)[1]; + // update next loop tensor to 0 per iterator + auto next_loop_tensor = (*inputs_params)[1]; + MS_EXCEPTION_IF_NULL(next_loop_tensor); + auto *next_val = static_cast(next_loop_tensor->data_c()); + MS_EXCEPTION_IF_NULL(next_val); + *next_val = 0; + next_loop_tensor->set_dirty(true); + // set loop_count to zero + MS_EXCEPTION_IF_NULL(inputs); + inputs->push_back(next_loop_tensor); + + auto epoch_tensor = (*inputs_params)[2]; MS_EXCEPTION_IF_NULL(epoch_tensor); auto *epoch_val = static_cast(epoch_tensor->data_c()); MS_EXCEPTION_IF_NULL(epoch_val); @@ -881,7 +893,7 @@ bool TensorNeedSync(const AnfNodePtr ¶meter, const tensor::TensorPtr &tensor void SessionBasic::LoadInputData(const std::shared_ptr &kernel_graph, const std::vector &inputs_const) const { std::vector inputs(inputs_const); - size_t input_ctrl_size = 2; + size_t input_ctrl_size = 3; MS_EXCEPTION_IF_NULL(kernel_graph); if (kernel_graph->input_ctrl_tensors()) { input_ctrl_size = LoadCtrlInputTensor(kernel_graph, &inputs); @@ -891,7 +903,7 @@ void SessionBasic::LoadInputData(const std::shared_ptr &kernel_grap auto params = AnfAlgo::GetAllOutput(input_node); std::copy(params.begin(), params.end(), std::back_inserter(input_nodes)); } - if ((inputs.size() + input_ctrl_size) - 2 != input_nodes.size()) { + if ((inputs.size() + input_ctrl_size) - 3 != input_nodes.size()) { MS_LOG(EXCEPTION) << "Tensor input:" << inputs.size() << " is not equal graph inputs:" << input_nodes.size() << ", input_ctrl_size:" << input_ctrl_size; } diff --git a/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.cc b/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.cc index 57fe3090af..e809f94969 100644 --- a/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.cc +++ b/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.cc @@ -42,6 +42,9 @@ void AscendStreamAssign::AssignStream(const NotNull &graph_ptr) InsertStreamActive(graph_ptr); InsertEventForHcomParallel(graph_ptr); InsertEventForIndependentParallel(graph_ptr); + GetIndependentMaxTarget(graph_ptr); + InsertCtrlForIndependentParallel(graph_ptr); + GetNeedActiveStreams(graph_ptr); graph_ptr->PrintGraphExecuteOrder(); CheckResourceAssign(graph_ptr); @@ -66,7 +69,7 @@ void AscendStreamAssign::ReorderIndependentOrders(const NotNull for (size_t i = 0; i < cnode_ptr_list.size(); ++i) { auto cur_cnode_ptr = cnode_ptr_list[i]; MS_EXCEPTION_IF_NULL(cur_cnode_ptr); - if (IsIndependentNode(cur_cnode_ptr)) { + if (AnfAlgo::IsIndependentNode(cur_cnode_ptr)) { independents.emplace_back(cur_cnode_ptr); } else { others.emplace_back(cur_cnode_ptr); @@ -133,7 +136,7 @@ void AscendStreamAssign::AssignAllNodesStream(const NotNull &gra continue; } - if (IsIndependentNode(cur_cnode_ptr)) { + if (AnfAlgo::IsIndependentNode(cur_cnode_ptr)) { exit_independent = true; continue; } @@ -165,7 +168,7 @@ void AscendStreamAssign::AssignAllNodesStream(const NotNull &gra if (AnfAlgo::GetStreamId(cur_cnode_ptr) != kInvalidStreamId) { continue; } - if (IsIndependentNode(cur_cnode_ptr)) { + if (AnfAlgo::IsIndependentNode(cur_cnode_ptr)) { AssignIndependentStreamId(cur_cnode_ptr); } } @@ -242,33 +245,6 @@ void AscendStreamAssign::AssignIndependentStreamId(const CNodePtr &cur_cnode_ptr } } -bool AscendStreamAssign::IsIndependentNode(const CNodePtr &node_ptr) { - MS_EXCEPTION_IF_NULL(node_ptr); - if (AnfAlgo::GetKernelType(node_ptr) != AICPU_KERNEL) { - return false; - } - - if (AnfAlgo::GetCNodeName(node_ptr) == kGetNextOpName) { - MS_LOG(INFO) << "GetNext should not be independent node"; - return false; - } - - uint32_t input_nums = AnfAlgo::GetInputTensorNum(node_ptr); - if (input_nums == 0) { - MS_LOG(INFO) << "Node " << node_ptr->fullname_with_scope() << " is independent, as inputs nums is zero"; - return true; - } - - auto inputs = node_ptr->inputs(); - for (size_t i = 1; i < inputs.size(); i++) { - if (!inputs[i]->isa()) { - return false; - } - } - MS_LOG(INFO) << "Node " << node_ptr->fullname_with_scope() << " is independent, as inputs is all value node"; - return true; -} - // section 3: void AscendStreamAssign::UpdateAtomicAddrCleanStreamId(const NotNull &graph_ptr) { MS_LOG(INFO) << "Start"; @@ -293,13 +269,11 @@ void AscendStreamAssign::InsertStreamActive(const NotNull &graph CNodePtr pre_cnode_ptr = nullptr; uint32_t pre_stream_id = UINT32_MAX; - bool independent_flag = !(independent_stream_map_.empty()); - bool hcom_flag = !(hcom_stream_map_.empty()); auto cnode_ptr_list = graph_ptr->execution_order(); for (size_t i = 0; i < cnode_ptr_list.size(); ++i) { cur_cnode_ptr = cnode_ptr_list[i]; MS_EXCEPTION_IF_NULL(cur_cnode_ptr); - if (IsIndependentNode(cur_cnode_ptr)) { + if (AnfAlgo::IsIndependentNode(cur_cnode_ptr)) { update_cnode_list.emplace_back(cur_cnode_ptr); continue; } @@ -322,7 +296,7 @@ void AscendStreamAssign::InsertStreamActive(const NotNull &graph update_cnode_list.emplace_back(active_ptr); } - if ((independent_flag || hcom_flag) && (AnfAlgo::GetCNodeName(cur_cnode_ptr) == kStreamSwitchOpName)) { + if (AnfAlgo::GetCNodeName(cur_cnode_ptr) == kStreamSwitchOpName) { MS_LOG(INFO) << "Insert StreamActive op after FP StreamSwitch for stream parallel"; UpdateStreamSwitch(graph_ptr, cur_cnode_ptr, &update_cnode_list); } else { @@ -346,8 +320,10 @@ void AscendStreamAssign::GetProcessedStream(const NotNull &graph uint32_t cur_stream_id = AnfAlgo::GetStreamId(cur_cnode_ptr); if (AnfAlgo::GetCNodeName(cur_cnode_ptr) == kStreamSwitchOpName) { - auto true_stream_id = AnfAlgo::GetNodeAttr(cur_cnode_ptr, kAttrTrueBranchStream); - processed_streams_.emplace(true_stream_id); + if (AnfAlgo::HasNodeAttr(kAttrTrueBranchStream, cur_cnode_ptr)) { + auto true_stream_id = AnfAlgo::GetNodeAttr(cur_cnode_ptr, kAttrTrueBranchStream); + processed_streams_.emplace(true_stream_id); + } if (!AnfAlgo::HasNodeAttr(kStreamNeedActivedFirst, cur_cnode_ptr)) { continue; @@ -365,46 +341,78 @@ void AscendStreamAssign::GetProcessedStream(const NotNull &graph void AscendStreamAssign::UpdateStreamSwitch(const NotNull &graph_ptr, const CNodePtr &switch_ptr, vector *orders) { - orders->emplace_back(switch_ptr); if (!AnfAlgo::HasNodeAttr(kStreamNeedActivedFirst, switch_ptr)) { + orders->emplace_back(switch_ptr); return; } - auto need_active = AnfAlgo::GetNodeAttr(switch_ptr, kStreamNeedActivedFirst); if (!need_active) { + orders->emplace_back(switch_ptr); return; } - MS_EXCEPTION_IF_NULL(switch_ptr); - auto true_stream_id = AnfAlgo::GetNodeAttr(switch_ptr, kAttrTrueBranchStream); - MS_LOG(INFO) << "Streamswtich stream id:" << AnfAlgo::GetStreamId(switch_ptr) - << "; active stream id:" << true_stream_id; - - CNodePtr active_ptr = KernelAdjust::GetInstance().CreateStreamActiveOp(graph_ptr); - AnfAlgo::SetStreamId(true_stream_id, active_ptr.get()); - vector active_ids; - // active indepdent stream - for (const auto &item : independent_stream_map_) { - active_ids.emplace_back(item.first); + if (!AnfAlgo::HasNodeAttr(kAttrStreamSwitchKind, switch_ptr)) { + orders->emplace_back(switch_ptr); + return; } - // active hcom stream - for (const auto &item : hcom_stream_map_) { - active_ids.emplace_back(item.first); + auto kind = AnfAlgo::GetNodeAttr(switch_ptr, kAttrStreamSwitchKind); + if (kind == kEosStreamSwitch || kind == kGetNextStreamSwitch) { + orders->emplace_back(switch_ptr); + return; } - AnfAlgo::SetNodeAttr(kAttrActiveStreamList, MakeValue>(active_ids), active_ptr); - // update processed stream - independent_stream_activated_ = true; - for (const auto &item : independent_stream_map_) { - processed_streams_.emplace(item.first); - } + if (kind == kIndependentStreamSwitch) { + bool independent_empty = independent_stream_map_.empty(); + // if indepdent empty: delete independent streamswitch + if (!independent_empty) { + for (const auto &item : independent_stream_map_) { + // first independetn stream id is minimum and order by std map; + auto first_independent_stream = item.first; + AnfAlgo::SetNodeAttr(kAttrTrueBranchStream, MakeValue(first_independent_stream), switch_ptr); + orders->emplace_back(switch_ptr); + break; + } + } else { + MS_LOG(ERROR) << "independent stream switch exit, but independent stream is empty"; + } - hcom_stream_activated_ = true; - for (const auto &item : hcom_stream_map_) { - processed_streams_.emplace(item.first); + // update processed stream + independent_stream_activated_ = true; + for (const auto &item : independent_stream_map_) { + processed_streams_.emplace(item.first); + } + return; } - orders->emplace_back(active_ptr); + if (kind == kFpBpStreamSwitch) { + bool hcom_empty = hcom_stream_map_.empty(); + if (hcom_empty) { + orders->emplace_back(switch_ptr); + return; + } + if (!AnfAlgo::HasNodeAttr(kAttrTrueBranchStream, switch_ptr)) { + orders->emplace_back(switch_ptr); + MS_LOG(WARNING) << "FpBp StreamSwitch has no true branch attr"; + return; + } + auto true_stream_id = AnfAlgo::GetNodeAttr(switch_ptr, kAttrTrueBranchStream); + MS_LOG(INFO) << "Streamswtich stream id:" << AnfAlgo::GetStreamId(switch_ptr) + << "; active stream id:" << true_stream_id; + CNodePtr active_ptr = KernelAdjust::GetInstance().CreateStreamActiveOp(graph_ptr); + AnfAlgo::SetStreamId(true_stream_id, active_ptr.get()); + vector active_ids; + // active hcom stream + for (const auto &item : hcom_stream_map_) { + active_ids.emplace_back(item.first); + } + AnfAlgo::SetNodeAttr(kAttrActiveStreamList, MakeValue>(active_ids), active_ptr); + hcom_stream_activated_ = true; + for (const auto &item : hcom_stream_map_) { + processed_streams_.emplace(item.first); + } + orders->emplace_back(switch_ptr); + orders->emplace_back(active_ptr); + } } bool AscendStreamAssign::IsProcessedStream(uint32_t stream_id) { @@ -632,7 +640,7 @@ void AscendStreamAssign::InsertEventForIndependentParallel(const NotNullDebugString() << "]"; CNodePtr send_cnode_ptr = CreateSendApplyKernel(graph_ptr, cur_event_id, AnfAlgo::GetStreamId(*it)); it = cnodes.insert(it + 1, send_cnode_ptr); @@ -660,6 +668,129 @@ void AscendStreamAssign::InsertEventForIndependentParallel(const NotNull &graph_ptr) { + MS_LOG(INFO) << "Start"; + auto cnode_ptr_list = graph_ptr->execution_order(); + for (size_t i = 0; i < cnode_ptr_list.size(); i++) { + auto cur_node = cnode_ptr_list[i]; + auto key = cur_node.get(); + if (!AnfAlgo::IsIndependentNode(cur_node)) { + continue; + } + + bool flag = false; + for (size_t j = cnode_ptr_list.size() - 1; j > i; j--) { + auto target_node = cnode_ptr_list[j]; + auto inputs = target_node->inputs(); + for (size_t m = 1; m < inputs.size(); m++) { + auto input = inputs[m]; + if (opt::IsNopNode(input)) { + CNodePtr cnode = input->cast(); + auto new_inputs = cnode->inputs(); + for (size_t k = 1; k < new_inputs.size(); k++) { + auto new_real_input = AnfAlgo::VisitKernel(new_inputs[k], 0); + if (key == new_real_input.first.get()) { + MS_LOG(INFO) << "Nop node find max target op:" << AnfAlgo::GetCNodeName(cur_node); + independent_targets_.emplace(target_node.get()); + flag = true; + break; + } + } + } else { + auto real_input = AnfAlgo::VisitKernel(input, 0); + if (key == real_input.first.get()) { + MS_LOG(INFO) << "Find max target op:" << AnfAlgo::GetCNodeName(cur_node); + independent_targets_.emplace(target_node.get()); + flag = true; + } + } + if (flag) { + break; + } + } + } + } + + MS_LOG(INFO) << "End"; +} + +uint32_t AscendStreamAssign::GetIndexByKey(const NotNull &graph_ptr, const CNodeKey &key) { + auto &exe_orders = graph_ptr->execution_order(); + for (uint32_t i = 0; i < exe_orders.size(); i++) { + CNodeKey node_key = exe_orders[i].get(); + if (node_key == key) { + return i; + } + } + + return UINT32_MAX; +} + +uint32_t AscendStreamAssign::GetMaxIndexTarget(const NotNull &graph_ptr) { + if (independent_targets_.empty()) { + return UINT32_MAX; + } + + std::set indexs; + for (const auto &key : independent_targets_) { + auto index = GetIndexByKey(graph_ptr, key); + if (index == UINT32_MAX) { + MS_LOG(EXCEPTION) << "graph has no correspond key"; + } + indexs.emplace(index); + } + + return *(std::max_element(indexs.begin(), indexs.end())); +} + +uint32_t AscendStreamAssign::GetIndependentStreamSwitchStreamId(const NotNull &graph_ptr) { + auto &exe_orders = graph_ptr->execution_order(); + for (const auto &item : exe_orders) { + if (AnfAlgo::GetCNodeName(item) == kStreamSwitchOpName) { + if (!AnfAlgo::HasNodeAttr(kAttrStreamSwitchKind, item)) { + continue; + } + auto kind = AnfAlgo::GetNodeAttr(item, kAttrStreamSwitchKind); + if (kind == kIndependentStreamSwitch) { + return AnfAlgo::GetStreamId(item); + } + } + } + return kInvalidStreamId; +} + +void AscendStreamAssign::InsertCtrlForIndependentParallel(const NotNull &graph_ptr) { + if (independent_targets_.empty()) { + return; + } + + uint32_t independent_switch_stream = GetIndependentStreamSwitchStreamId(graph_ptr); + if (independent_switch_stream == kInvalidStreamId) { + return; + } + + auto max_index = GetMaxIndexTarget(graph_ptr); + auto &exe_orders = graph_ptr->execution_order(); + if (max_index >= exe_orders.size()) { + MS_LOG(EXCEPTION) << "max target index:" << max_index << " is greater than graph orders size:" << exe_orders.size(); + } + + auto max_node_stream = AnfAlgo::GetStreamId(exe_orders[max_index]); + + CNodePtr active_ptr = KernelAdjust::GetInstance().CreateStreamActiveOp(graph_ptr); + // 1.set stream id + AnfAlgo::SetStreamId(max_node_stream, active_ptr.get()); + // 2.set active stream ids + std::vector active_index_list{independent_switch_stream}; + AnfAlgo::SetNodeAttr(kAttrActiveStreamList, MakeValue>(active_index_list), active_ptr); + + std::vector update_cnode_list; + std::copy(exe_orders.begin(), exe_orders.begin() + max_index + 1, std::back_inserter(update_cnode_list)); + update_cnode_list.emplace_back(active_ptr); + std::copy(exe_orders.begin() + max_index + 1, exe_orders.end(), std::back_inserter(update_cnode_list)); + graph_ptr->set_execution_order(update_cnode_list); +} + // section7 void AscendStreamAssign::GetNeedActiveStreams(const NotNull &graph_ptr) { CNodePtr cur_cnode_ptr = nullptr; @@ -917,6 +1048,7 @@ void AscendStreamAssign::Reset() { stream_groups_.clear(); stream_relations_.clear(); event_map_.clear(); + independent_targets_.clear(); } // section 10 diff --git a/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.h b/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.h index 1ee3cd6104..5747748dbd 100644 --- a/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.h +++ b/mindspore/ccsrc/runtime/device/ascend/ascend_stream_assign.h @@ -39,6 +39,7 @@ using std::shared_ptr; using std::unordered_map; using std::unordered_set; using std::vector; +using CNodeKey = void *; const uint32_t kInvalidStreamId = UINT32_MAX; const uint32_t kInvalidEventId = UINT32_MAX; class AscendResourceMng { @@ -108,8 +109,6 @@ class AscendStreamAssign { void AssignStream(const NotNull &graph_ptr); void GetHcomStreams(std::vector *streams); void GetWaitStreams(vector *wait_active_stream_list); - CNodePtr CreateSendApplyKernel(const NotNull &graph_ptr, uint32_t event_id, uint32_t stream_id); - CNodePtr CreateRecvApplyKernel(const NotNull &graph_ptr, uint32_t event_id, uint32_t stream_id); const std::vector> &get_stream_group() const { return stream_groups_; } const std::map &get_event_map() const { return event_map_; } @@ -117,6 +116,8 @@ class AscendStreamAssign { AscendStreamAssign() = default; ~AscendStreamAssign() = default; void Reset(); + CNodePtr CreateSendApplyKernel(const NotNull &graph_ptr, uint32_t event_id, uint32_t stream_id); + CNodePtr CreateRecvApplyKernel(const NotNull &graph_ptr, uint32_t event_id, uint32_t stream_id); void CheckResourceAssign(const NotNull &graph_ptr); void CheckStreamAssign(const NotNull &graph_ptr); void CheckEventAssign(const NotNull &graph_ptr); @@ -130,6 +131,7 @@ class AscendStreamAssign { void UpdateStreamSwitch(const NotNull &graph_ptr, const CNodePtr &switch_ptr, vector *orders); void InsertEventForIndependentParallel(const NotNull &graph_ptr); + void InsertCtrlForIndependentParallel(const NotNull &graph_ptr); void InsertEventForHcomParallel(const NotNull &graph_ptr); void InsertEventCommonDependHcom(const NotNull &graph_ptr); void InsertEventHcomDependCommon(const NotNull &graph_ptr); @@ -141,6 +143,10 @@ class AscendStreamAssign { void GetProcessedStream(const NotNull &graph_ptr); void GetNeedActiveStreams(const NotNull &graph_ptr); void ReorderIndependentOrders(const NotNull &graph_ptr); + uint32_t GetMaxIndexTarget(const NotNull &graph_ptr); + uint32_t GetIndexByKey(const NotNull &graph_ptr, const CNodeKey &key); + uint32_t GetIndependentStreamSwitchStreamId(const NotNull &graph_ptr); + void GetIndependentMaxTarget(const NotNull &graph_ptr); bool IsTaskSink(); bool IsHcom(const CNodePtr &cur_cnode_ptr); @@ -171,6 +177,7 @@ class AscendStreamAssign { std::map common_stream_map_{}; std::set processed_streams_{}; std::vector need_first_active_streams_{}; + std::set independent_targets_; // attr for memory copy reuse std::map> stream_relations_{}; diff --git a/mindspore/ccsrc/runtime/device/ascend/dump/data_dumper.cc b/mindspore/ccsrc/runtime/device/ascend/dump/data_dumper.cc index 0a6ac52268..24ade6bf9c 100644 --- a/mindspore/ccsrc/runtime/device/ascend/dump/data_dumper.cc +++ b/mindspore/ccsrc/runtime/device/ascend/dump/data_dumper.cc @@ -34,8 +34,8 @@ static constexpr uint32_t kTupleTaskId = 0; static constexpr uint32_t kTupleStreamId = 1; static constexpr uint32_t kTupleArgs = 2; static constexpr uint32_t kCurrentStepTensorIndex = 0; -static constexpr uint32_t kCurrentEpochTensorIndex = 1; -static constexpr uint32_t kStepsPerEpochTensorIndex = 2; +static constexpr uint32_t kCurrentEpochTensorIndex = 2; +static constexpr uint32_t kStepsPerEpochTensorIndex = 3; static constexpr uint64_t kOpDebugShape = 2048; static constexpr uint64_t kOpDebugHostMemSize = 2048; static constexpr uint64_t kOpDebugDevMemSize = sizeof(void *); diff --git a/mindspore/ccsrc/runtime/device/kernel_adjust.cc b/mindspore/ccsrc/runtime/device/kernel_adjust.cc index 513fa68252..2652674fc2 100644 --- a/mindspore/ccsrc/runtime/device/kernel_adjust.cc +++ b/mindspore/ccsrc/runtime/device/kernel_adjust.cc @@ -103,6 +103,19 @@ CNodePtr KernelAdjust::CreateRecvApplyKernel(const std::shared_ptr &kernel_graph_ptr) { + MS_EXCEPTION_IF_NULL(kernel_graph_ptr); + const auto &exe_orders = kernel_graph_ptr->execution_order(); + for (const auto &node : exe_orders) { + if (AnfAlgo::IsIndependentNode(node)) { + MS_LOG(INFO) << "graph exit independent node"; + return true; + } + } + + return false; +} + void KernelAdjust::InsertSwitchLoop(const std::shared_ptr &kernel_graph_ptr) { device::ascend::AscendResourceMng &resource_manager = device::ascend::AscendResourceMng::GetInstance(); resource_manager.ResetResource(); @@ -117,10 +130,10 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr std::vector *mute_inputs = kernel_graph_ptr->MutableInputs(); MS_EXCEPTION_IF_NULL(mute_inputs); - mute_inputs->push_back(switch_loop_input[kLoopCountParamName]); + mute_inputs->push_back(switch_loop_input[kCurLoopCountParamName]); + mute_inputs->push_back(switch_loop_input[kNextLoopCountParamName]); mute_inputs->push_back(switch_loop_input[kEpochParamName]); mute_inputs->push_back(switch_loop_input[kIterLoopParamName]); - mute_inputs->push_back(switch_loop_input[kZeroParamName]); mute_inputs->push_back(switch_loop_input[kOneParamName]); for (const auto &input : kernel_graph_ptr->inputs()) { MS_EXCEPTION_IF_NULL(input); @@ -145,7 +158,7 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr // getnext loop process // getnext loop stream switch op - CNodePtr getnext_switch_app = CreateStreamSwitchOp(kernel_graph_ptr, switch_loop_input); + CNodePtr getnext_switch_app = CreateStreamSwitchOp(kernel_graph_ptr, switch_loop_input, kGetNextStreamSwitch); MS_EXCEPTION_IF_NULL(getnext_switch_app); uint32_t getnext_switch_stream_id = resource_manager.ApplyNewStream(); AnfAlgo::SetStreamId(getnext_switch_stream_id, getnext_switch_app.get()); @@ -165,7 +178,9 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr } // update getnext loop stream switch true_branch_stream attr + AnfAlgo::SetNodeAttr(kStreamNeedActivedFirst, MakeValue(true), getnext_switch_app); AnfAlgo::SetNodeAttr(kAttrTrueBranchStream, MakeValue(getnext_stream_id), getnext_switch_app); + AnfAlgo::SetNodeAttr(kAttrStreamSwitchKind, MakeValue(kGetNextStreamSwitch), getnext_switch_app); // getnext loop fpbp start send uint32_t fpbp_start_event_id = resource_manager.ApplyNewEvent(); @@ -182,7 +197,7 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr // End Of Sequence loop process // eos loop stream switch - CNodePtr eos_switch_app = CreateStreamSwitchOp(kernel_graph_ptr, switch_loop_input); + CNodePtr eos_switch_app = CreateStreamSwitchOp(kernel_graph_ptr, switch_loop_input, kEosStreamSwitch); MS_EXCEPTION_IF_NULL(eos_switch_app); uint32_t eos_switch_stream_id = resource_manager.ApplyNewStream(); AnfAlgo::SetStreamId(eos_switch_stream_id, eos_switch_app.get()); @@ -197,6 +212,7 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr // update eos loop stream switch true_branch_stream attr AnfAlgo::SetNodeAttr(kAttrTrueBranchStream, MakeValue(eos_stream_id), eos_switch_app); + AnfAlgo::SetNodeAttr(kAttrStreamSwitchKind, MakeValue(kEosStreamSwitch), eos_switch_app); // EndOfSequence op CNodePtr end_of_sequence_op = CreateEndOfSequenceOP(kernel_graph_ptr, getnext_cnode); @@ -214,13 +230,27 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr fpbp_active_streams.push_back(eos_switch_stream_id); } + bool exit_independent = ExitIndependent(kernel_graph_ptr); + if (exit_independent) { + // Independet parallel + CNodePtr independent_switch_app = + CreateStreamSwitchOp(kernel_graph_ptr, switch_loop_input, kIndependentStreamSwitch); + MS_EXCEPTION_IF_NULL(independent_switch_app); + uint32_t independent_switch_stream_id = resource_manager.ApplyNewStream(); + AnfAlgo::SetStreamId(independent_switch_stream_id, independent_switch_app.get()); + AnfAlgo::SetNodeAttr(kStreamNeedActivedFirst, MakeValue(true), independent_switch_app); + AnfAlgo::SetNodeAttr(kAttrStreamSwitchKind, MakeValue(kIndependentStreamSwitch), independent_switch_app); + exec_order.push_back(independent_switch_app); + } + // fpbp loop process // fpbp loop stream switch - CNodePtr fpbp_switch_app = CreateStreamSwitchOp(kernel_graph_ptr, switch_loop_input); + CNodePtr fpbp_switch_app = CreateStreamSwitchOp(kernel_graph_ptr, switch_loop_input, kFpBpStreamSwitch); MS_EXCEPTION_IF_NULL(fpbp_switch_app); uint32_t fpbp_switch_stream_id = resource_manager.ApplyNewStream(); AnfAlgo::SetStreamId(fpbp_switch_stream_id, fpbp_switch_app.get()); AnfAlgo::SetNodeAttr(kStreamNeedActivedFirst, MakeValue(true), fpbp_switch_app); + exec_order.push_back(fpbp_switch_app); // fpbp loop fpbp start recv @@ -231,9 +261,9 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr // update fpbp loop stream switch true_branch_stream attr AnfAlgo::SetNodeAttr(kAttrTrueBranchStream, MakeValue(fpbp_stream_id), fpbp_switch_app); - - // fpbp loop AssignAdd - CNodePtr assign_add_one = CreateStreamAssignAddnOP(kernel_graph_ptr, switch_loop_input); + AnfAlgo::SetNodeAttr(kAttrStreamSwitchKind, MakeValue(kFpBpStreamSwitch), fpbp_switch_app); + // next loop AssignAdd + CNodePtr assign_add_one = CreateStreamAssignAddnOP(kernel_graph_ptr, switch_loop_input, false); MS_EXCEPTION_IF_NULL(assign_add_one); AnfAlgo::SetStreamId(fpbp_stream_id, assign_add_one.get()); exec_order.push_back(assign_add_one); @@ -271,6 +301,11 @@ void KernelAdjust::InsertSwitchLoop(const std::shared_ptr // fpbp loop other ops (void)std::copy(other_list.begin(), other_list.end(), std::back_inserter(exec_order)); + // current assign add op + CNodePtr cur_assign_add = CreateStreamAssignAddnOP(kernel_graph_ptr, switch_loop_input, true); + MS_EXCEPTION_IF_NULL(cur_assign_add); + exec_order.push_back(cur_assign_add); + // stream active to activate fpbp loop and eos loop CNodePtr fpbp_active_app = CreateStreamActiveOp(kernel_graph_ptr); MS_EXCEPTION_IF_NULL(fpbp_active_app); @@ -293,13 +328,19 @@ void KernelAdjust::CreateSwitchOpParameters(const std::shared_ptr(kernel_graph_ptr); - MS_EXCEPTION_IF_NULL(loop_count); - loop_count->set_name(kLoopCountParamName); - loop_count->set_abstract(paremeter_abstract_ptr); - ParameterPtr loop_count_new = kernel_graph_ptr->NewParameter(loop_count); + ParameterPtr cur_loop_count = std::make_shared(kernel_graph_ptr); + MS_EXCEPTION_IF_NULL(cur_loop_count); + cur_loop_count->set_name(kCurLoopCountParamName); + cur_loop_count->set_abstract(paremeter_abstract_ptr); + ParameterPtr loop_count_cur = kernel_graph_ptr->NewParameter(cur_loop_count); + (*switch_loop_input)[kCurLoopCountParamName] = loop_count_cur; - (*switch_loop_input)[kLoopCountParamName] = loop_count_new; + ParameterPtr next_loop_count = std::make_shared(kernel_graph_ptr); + MS_EXCEPTION_IF_NULL(next_loop_count); + next_loop_count->set_name(kNextLoopCountParamName); + next_loop_count->set_abstract(paremeter_abstract_ptr); + ParameterPtr loop_count_next = kernel_graph_ptr->NewParameter(next_loop_count); + (*switch_loop_input)[kNextLoopCountParamName] = loop_count_next; ParameterPtr iter_loop = std::make_shared(kernel_graph_ptr); iter_loop->set_name(kIterLoopParamName); @@ -307,12 +348,6 @@ void KernelAdjust::CreateSwitchOpParameters(const std::shared_ptrNewParameter(iter_loop); (*switch_loop_input)[kIterLoopParamName] = iter_loop_new; - ParameterPtr zero = std::make_shared(kernel_graph_ptr); - zero->set_name(kZeroParamName); - zero->set_abstract(paremeter_abstract_ptr); - ParameterPtr zero_new = kernel_graph_ptr->NewParameter(zero); - (*switch_loop_input)[kZeroParamName] = zero_new; - ParameterPtr one = std::make_shared(kernel_graph_ptr); one->set_name(kOneParamName); one->set_abstract(paremeter_abstract_ptr); @@ -340,14 +375,22 @@ kernel::KernelBuildInfo::KernelBuildInfoBuilder KernelAdjust::CreateMngKernelBui } CNodePtr KernelAdjust::CreateStreamSwitchOp(const std::shared_ptr &kernel_graph_ptr, - const std::map &switch_loop_input) { + const std::map &switch_loop_input, + StreamSwitchKind kind) { kernel::KernelBuildInfo::KernelBuildInfoBuilder selected_kernel_builder = CreateMngKernelBuilder( {kOpFormat_DEFAULT, kOpFormat_DEFAULT}, {TypeId::kNumberTypeInt32, TypeId::kNumberTypeInt32}); auto typeNone_abstract = std::make_shared(); auto stream_switch = std::make_shared(kStreamSwitchOpName); std::vector inputs; inputs.push_back(NewValueNode(stream_switch)); - inputs.push_back(switch_loop_input.at(kLoopCountParamName)); + if (kind == kFpBpStreamSwitch || kind == kEosStreamSwitch) { + inputs.push_back(switch_loop_input.at(kCurLoopCountParamName)); + } else if (kind == kGetNextStreamSwitch || kind == kIndependentStreamSwitch) { + inputs.push_back(switch_loop_input.at(kNextLoopCountParamName)); + } else { + MS_LOG(ERROR) << "unknown stream switch kind"; + } + inputs.push_back(switch_loop_input.at(kIterLoopParamName)); MS_EXCEPTION_IF_NULL(kernel_graph_ptr); CNodePtr stream_switch_app = kernel_graph_ptr->NewCNode(inputs); @@ -430,9 +473,9 @@ CNodePtr KernelAdjust::CreateEndOfSequenceOP(const std::shared_ptr &kernel_graph_ptr, - const std::map &switch_loop_input) { +CNodePtr KernelAdjust::CreateStreamAssignAddnOP(const std::shared_ptr &kernel_graph_ptr, + const std::map &switch_loop_input, + bool cur_loop) { MS_EXCEPTION_IF_NULL(kernel_graph_ptr); kernel::KernelBuildInfo::KernelBuildInfoBuilder selected_kernel_builder = CreateMngKernelBuilder( {kOpFormat_DEFAULT, kOpFormat_DEFAULT}, {TypeId::kNumberTypeInt32, TypeId::kNumberTypeInt32}); @@ -442,7 +485,12 @@ CNodePtr KernelAdjust::CreateStreamAssignAddnOP( auto assign_add = std::make_shared(kAssignAddOpName); std::vector inputs; inputs.push_back(NewValueNode(assign_add)); - inputs.push_back(switch_loop_input.at(kLoopCountParamName)); + if (cur_loop) { + inputs.push_back(switch_loop_input.at(kCurLoopCountParamName)); + } else { + inputs.push_back(switch_loop_input.at(kNextLoopCountParamName)); + } + inputs.push_back(switch_loop_input.at(kOneParamName)); CNodePtr assign_add_one = kernel_graph_ptr->NewCNode(inputs); MS_EXCEPTION_IF_NULL(assign_add_one); @@ -454,8 +502,8 @@ CNodePtr KernelAdjust::CreateStreamAssignAddnOP( AnfAlgo::SetNodeAttr("input_names", input_names_v, assign_add_one); AnfAlgo::SetNodeAttr("output_names", output_names_v, assign_add_one); selected_kernel_builder.SetKernelType(KernelType::TBE_KERNEL); - MS_EXCEPTION_IF_NULL(switch_loop_input.at(kLoopCountParamName)); - assign_add_one->set_abstract(switch_loop_input.at(kLoopCountParamName)->abstract()); + MS_EXCEPTION_IF_NULL(switch_loop_input.at(kCurLoopCountParamName)); + assign_add_one->set_abstract(switch_loop_input.at(kCurLoopCountParamName)->abstract()); return assign_add_one; } @@ -510,14 +558,23 @@ bool KernelAdjust::StepLoadCtrlInputs(const std::shared_ptr *inputs) { MS_LOG(INFO) << "---------------- LoadSwitchInputs---"; MS_EXCEPTION_IF_NULL(inputs); + // current loop count std::vector shp = {1}; - tensor::TensorPtr loop_count_tensor = std::make_shared(kInt32->type_id(), shp); - MS_EXCEPTION_IF_NULL(loop_count_tensor); + tensor::TensorPtr cur_loop_count = std::make_shared(kInt32->type_id(), shp); + MS_EXCEPTION_IF_NULL(cur_loop_count); int32_t *val = nullptr; - val = static_cast(loop_count_tensor->data_c()); + val = static_cast(cur_loop_count->data_c()); MS_EXCEPTION_IF_NULL(val); *val = 0; - inputs->push_back(loop_count_tensor); + inputs->push_back(cur_loop_count); + + // next loop count + tensor::TensorPtr next_loop_count = std::make_shared(kInt32->type_id(), shp); + MS_EXCEPTION_IF_NULL(next_loop_count); + val = static_cast(next_loop_count->data_c()); + MS_EXCEPTION_IF_NULL(val); + *val = 0; + inputs->push_back(next_loop_count); // Epoch in device tensor::TensorPtr epoch_tensor = std::make_shared(kInt32->type_id(), shp); @@ -527,6 +584,7 @@ void KernelAdjust::LoadSwitchInputs(std::vector *inputs) { *val = 0; inputs->push_back(epoch_tensor); + // total loop count per iter tensor::TensorPtr iter_loop_tensor = std::make_shared(kInt32->type_id(), shp); MS_EXCEPTION_IF_NULL(iter_loop_tensor); val = static_cast(iter_loop_tensor->data_c()); @@ -535,13 +593,6 @@ void KernelAdjust::LoadSwitchInputs(std::vector *inputs) { MS_LOG(INFO) << "iter_loop_tensor = " << *val; inputs->push_back(iter_loop_tensor); - tensor::TensorPtr zero_tensor = std::make_shared(kInt32->type_id(), shp); - MS_EXCEPTION_IF_NULL(zero_tensor); - val = static_cast(zero_tensor->data_c()); - MS_EXCEPTION_IF_NULL(val); - *val = 0; - inputs->push_back(zero_tensor); - tensor::TensorPtr one_tensor = std::make_shared(kInt32->type_id(), shp); MS_EXCEPTION_IF_NULL(one_tensor); val = static_cast(one_tensor->data_c()); diff --git a/mindspore/ccsrc/runtime/device/kernel_adjust.h b/mindspore/ccsrc/runtime/device/kernel_adjust.h index b65dcd0121..bb0cbc6146 100644 --- a/mindspore/ccsrc/runtime/device/kernel_adjust.h +++ b/mindspore/ccsrc/runtime/device/kernel_adjust.h @@ -33,13 +33,19 @@ using mindspore::device::ascend::ProfilingTraceInfo; using mindspore::device::ascend::ProfilingUtils; namespace mindspore { -constexpr auto kLoopCountParamName = "loop_count"; +constexpr auto kCurLoopCountParamName = "cur_loop_count"; +constexpr auto kNextLoopCountParamName = "next_loop_count"; constexpr auto kIterLoopParamName = "iter_loop"; -constexpr auto kZeroParamName = "zero"; constexpr auto kOneParamName = "one"; constexpr auto kEpochParamName = "loop_epoch"; constexpr auto kStreamNeedActivedFirst = "stream_need_active_first"; constexpr uint32_t kSecondStreamSwitchLabel = 2; +enum StreamSwitchKind { + kFpBpStreamSwitch = 0, + kGetNextStreamSwitch = 1, + kEosStreamSwitch = 2, + kIndependentStreamSwitch = 3 +}; namespace device { class KernelAdjust { @@ -65,18 +71,22 @@ class KernelAdjust { void CreateSwitchOpParameters(const std::shared_ptr &kernel_graph_ptr, std::map *switch_loop_input); CNodePtr CreateStreamSwitchOp(const std::shared_ptr &kernel_graph_ptr, - const std::map &switch_loop_input); + const std::map &switch_loop_input, + StreamSwitchKind kind); + CNodePtr CreatTupleGetItemNode(const std::shared_ptr &kernel_graph_ptr, const CNodePtr &node, size_t output_idx); CNodePtr CreateEndOfSequenceOP(const std::shared_ptr &kernel_graph_ptr, const CNodePtr &getnext_cnode); CNodePtr CreateStreamAssignAddnOP(const std::shared_ptr &kernel_graph_ptr, - const std::map &switch_loop_input); + const std::map &switch_loop_input, + bool cur_loop); kernel::KernelBuildInfo::KernelBuildInfoBuilder CreateMngKernelBuilder(const std::vector &formats, const std::vector &type_ids); void LoadSwitchInputs(std::vector *inputs); void InsertProfilingKernel(const ProfilingTraceInfo &profiling_trace_info, NotNull kernel_graph_ptr); + bool ExitIndependent(const std::shared_ptr &graph_ptr); }; } // namespace device } // namespace mindspore diff --git a/mindspore/ccsrc/runtime/device/kernel_runtime.cc b/mindspore/ccsrc/runtime/device/kernel_runtime.cc index 6173aa4faf..15fa007cfc 100644 --- a/mindspore/ccsrc/runtime/device/kernel_runtime.cc +++ b/mindspore/ccsrc/runtime/device/kernel_runtime.cc @@ -580,6 +580,14 @@ void KernelRuntime::AssignNodeOutputMem(MemType type, const AnfNodePtr &node, in MS_LOG(INFO) << "GetNext disable mem_reuse"; type = kDynamicMem; } + + if (node->isa()) { + bool independent = AnfAlgo::IsIndependentNode(node->cast()); + if (independent && type == kReuseDynamicMem) { + MS_LOG(INFO) << "Independent disable mem_reuse"; + type = kDynamicMem; + } + } auto kernel_mod = AnfAlgo::GetKernelMod(node); MS_EXCEPTION_IF_NULL(kernel_mod); auto output_sizes = kernel_mod->GetOutputSizeList(); diff --git a/mindspore/ccsrc/utils/utils.h b/mindspore/ccsrc/utils/utils.h index c1f551258b..0b72f8f99e 100644 --- a/mindspore/ccsrc/utils/utils.h +++ b/mindspore/ccsrc/utils/utils.h @@ -209,6 +209,7 @@ constexpr auto kAttrDataType = "data_type"; constexpr auto kAttrActiveTarget = "active_target"; constexpr auto kAttrActiveStreamList = "active_stream_list"; constexpr auto kAttrTrueBranchStream = "true_branch_stream"; +constexpr auto kAttrStreamSwitchKind = "stream_switch_kind"; constexpr auto kAttrEventId = "event_id"; constexpr auto kAttrDynInput = "dynamic"; constexpr auto kAttrDynInputSizes = "dyn_input_sizes";