|
|
|
@@ -46,7 +46,7 @@ void AscendStreamAssign::AssignStream(const NotNull<KernelGraphPtr> &graph_ptr) |
|
|
|
GetNeedActiveStreams(graph_ptr); |
|
|
|
graph_ptr->PrintGraphExecuteOrder(); |
|
|
|
CheckResourceAssign(graph_ptr); |
|
|
|
MS_LOG(INFO) << "after finish stream assign"; |
|
|
|
MS_LOG(INFO) << "After finish stream assign"; |
|
|
|
|
|
|
|
// Get info for D Model |
|
|
|
AscendResourceMng &resource_manager = AscendResourceMng::GetInstance(); |
|
|
|
@@ -64,7 +64,7 @@ void AscendStreamAssign::ReorderIndependentOrders(const NotNull<KernelGraphPtr> |
|
|
|
std::vector<CNodePtr> others; |
|
|
|
|
|
|
|
auto cnode_ptr_list = graph_ptr->execution_order(); |
|
|
|
MS_LOG(INFO) << "before reorder, graph orders size:" << cnode_ptr_list.size(); |
|
|
|
MS_LOG(INFO) << "Before reorder, graph orders size:" << cnode_ptr_list.size(); |
|
|
|
for (size_t i = 0; i < cnode_ptr_list.size(); ++i) { |
|
|
|
auto cur_cnode_ptr = cnode_ptr_list[i]; |
|
|
|
MS_EXCEPTION_IF_NULL(cur_cnode_ptr); |
|
|
|
@@ -76,7 +76,7 @@ void AscendStreamAssign::ReorderIndependentOrders(const NotNull<KernelGraphPtr> |
|
|
|
} |
|
|
|
|
|
|
|
if (others.empty() || independents.empty()) { |
|
|
|
MS_LOG(INFO) << "independent or others is empty, no need reorder"; |
|
|
|
MS_LOG(INFO) << "Independent or others is empty, no need reorder"; |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
@@ -107,9 +107,9 @@ void AscendStreamAssign::ReorderIndependentOrders(const NotNull<KernelGraphPtr> |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
MS_LOG(INFO) << "after reorder, graph orders size:" << exe_orders.size(); |
|
|
|
MS_LOG(INFO) << "After reorder, graph orders size:" << exe_orders.size(); |
|
|
|
if (processed.size() != independents.size()) { |
|
|
|
MS_LOG(WARNING) << "processed independent nodes size is not equal to exiting independent nodes size"; |
|
|
|
MS_LOG(WARNING) << "Processed independent nodes size is not equal to exiting independent nodes size"; |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
@@ -142,7 +142,7 @@ void AscendStreamAssign::AssignAllNodesStream(const NotNull<KernelGraphPtr> &gra |
|
|
|
|
|
|
|
AssignCommonStreamId(cur_cnode_ptr); |
|
|
|
} |
|
|
|
MS_LOG(INFO) << "common start from 0, common stream nums:" << resource_manager.get_cur_stream_num(); |
|
|
|
MS_LOG(INFO) << "Common start from 0, common stream nums:" << resource_manager.get_cur_stream_num(); |
|
|
|
|
|
|
|
if (exit_hcom) { |
|
|
|
uint32_t first_hcom_stream_id = resource_manager.ApplyNewStream(); |
|
|
|
@@ -157,7 +157,7 @@ void AscendStreamAssign::AssignAllNodesStream(const NotNull<KernelGraphPtr> &gra |
|
|
|
AssignHcomStreamId(cur_cnode_ptr); |
|
|
|
} |
|
|
|
} |
|
|
|
MS_LOG(INFO) << "hcom start from :" << first_hcom_stream_id << ", hcom stream nums:" << hcom_stream_map_.size(); |
|
|
|
MS_LOG(INFO) << "Hcom start from :" << first_hcom_stream_id << ", hcom stream nums:" << hcom_stream_map_.size(); |
|
|
|
} |
|
|
|
|
|
|
|
if (exit_independent) { |
|
|
|
@@ -171,10 +171,10 @@ void AscendStreamAssign::AssignAllNodesStream(const NotNull<KernelGraphPtr> &gra |
|
|
|
AssignIndependentStreamId(cur_cnode_ptr); |
|
|
|
} |
|
|
|
} |
|
|
|
MS_LOG(INFO) << "independ start from:" << first_independ << ", stream nums:" << independent_stream_map_.size(); |
|
|
|
MS_LOG(INFO) << "Independ start from:" << first_independ << ", stream nums:" << independent_stream_map_.size(); |
|
|
|
} |
|
|
|
|
|
|
|
MS_LOG(INFO) << "after stream assign, total stream nums:" << resource_manager.get_cur_stream_num(); |
|
|
|
MS_LOG(INFO) << "After stream assign, total stream nums:" << resource_manager.get_cur_stream_num(); |
|
|
|
} |
|
|
|
|
|
|
|
void AscendStreamAssign::AssignCommonStreamId(const CNodePtr &cur_cnode_ptr) { |
|
|
|
@@ -257,7 +257,7 @@ bool AscendStreamAssign::IsIndependentNode(const CNodePtr &node_ptr) { |
|
|
|
|
|
|
|
uint32_t input_nums = AnfAlgo::GetInputTensorNum(node_ptr); |
|
|
|
if (input_nums == 0) { |
|
|
|
MS_LOG(INFO) << "node " << node_ptr->fullname_with_scope() << " is independent, as inputs nums is zero"; |
|
|
|
MS_LOG(INFO) << "Node " << node_ptr->fullname_with_scope() << " is independent, as inputs nums is zero"; |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
@@ -267,13 +267,13 @@ bool AscendStreamAssign::IsIndependentNode(const CNodePtr &node_ptr) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
MS_LOG(INFO) << "node " << node_ptr->fullname_with_scope() << " is independent, as inputs is all value node"; |
|
|
|
MS_LOG(INFO) << "Node " << node_ptr->fullname_with_scope() << " is independent, as inputs is all value node"; |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
// section 3: |
|
|
|
void AscendStreamAssign::UpdateAtomicAddrCleanStreamId(const NotNull<KernelGraphPtr> &graph_ptr) { |
|
|
|
MS_LOG(INFO) << "start"; |
|
|
|
MS_LOG(INFO) << "Start"; |
|
|
|
auto cnode_ptr_list = graph_ptr->execution_order(); |
|
|
|
for (size_t i = 0; i < cnode_ptr_list.size(); ++i) { |
|
|
|
CNodePtr cur_cnode_ptr = cnode_ptr_list[i]; |
|
|
|
@@ -283,12 +283,12 @@ void AscendStreamAssign::UpdateAtomicAddrCleanStreamId(const NotNull<KernelGraph |
|
|
|
AnfAlgo::SetStreamId(AnfAlgo::GetStreamId(cur_cnode_ptr), cnode_ptr_list[i - 1].get()); |
|
|
|
} |
|
|
|
} |
|
|
|
MS_LOG(INFO) << "end"; |
|
|
|
MS_LOG(INFO) << "End"; |
|
|
|
} |
|
|
|
|
|
|
|
// section 4 |
|
|
|
void AscendStreamAssign::InsertStreamActive(const NotNull<KernelGraphPtr> &graph_ptr) { |
|
|
|
MS_LOG(INFO) << "start"; |
|
|
|
MS_LOG(INFO) << "Start"; |
|
|
|
GetProcessedStream(graph_ptr); |
|
|
|
std::vector<CNodePtr> update_cnode_list; |
|
|
|
CNodePtr cur_cnode_ptr = nullptr; |
|
|
|
@@ -314,7 +314,7 @@ void AscendStreamAssign::InsertStreamActive(const NotNull<KernelGraphPtr> &graph |
|
|
|
bool processed = IsProcessedStream(cur_stream_id); |
|
|
|
// 1)inner stream assign, need insert active op |
|
|
|
if (!processed) { |
|
|
|
MS_LOG(INFO) << "common stream active info:" << pre_stream_id << "->active" << cur_stream_id; |
|
|
|
MS_LOG(INFO) << "Common stream active info:" << pre_stream_id << "->active" << cur_stream_id; |
|
|
|
CNodePtr active_ptr = KernelAdjust::GetInstance().CreateStreamActiveOp(graph_ptr); |
|
|
|
// 1.set stream id |
|
|
|
AnfAlgo::SetStreamId(pre_stream_id, active_ptr.get()); |
|
|
|
@@ -336,7 +336,7 @@ void AscendStreamAssign::InsertStreamActive(const NotNull<KernelGraphPtr> &graph |
|
|
|
pre_cnode_ptr = cur_cnode_ptr; |
|
|
|
} |
|
|
|
graph_ptr->set_execution_order(update_cnode_list); |
|
|
|
MS_LOG(INFO) << "end"; |
|
|
|
MS_LOG(INFO) << "End"; |
|
|
|
} |
|
|
|
|
|
|
|
void AscendStreamAssign::GetProcessedStream(const NotNull<KernelGraphPtr> &graph_ptr) { |
|
|
|
@@ -364,7 +364,7 @@ void AscendStreamAssign::GetProcessedStream(const NotNull<KernelGraphPtr> &graph |
|
|
|
} |
|
|
|
} |
|
|
|
for (const auto &item : processed_streams_) { |
|
|
|
MS_LOG(INFO) << "before active:" << item << " is been processed"; |
|
|
|
MS_LOG(INFO) << "Before active:" << item << " is been processed"; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@@ -385,7 +385,7 @@ void AscendStreamAssign::UpdateStreamSwitch(const NotNull<KernelGraphPtr> &graph |
|
|
|
|
|
|
|
MS_EXCEPTION_IF_NULL(switch_ptr); |
|
|
|
auto true_stream_id = GetValue<uint32_t>(primitive->GetAttr(kAttrTrueBranchStream)); |
|
|
|
MS_LOG(INFO) << "streamswtich stream id:" << AnfAlgo::GetStreamId(switch_ptr) |
|
|
|
MS_LOG(INFO) << "Streamswtich stream id:" << AnfAlgo::GetStreamId(switch_ptr) |
|
|
|
<< "; active stream id:" << true_stream_id; |
|
|
|
|
|
|
|
CNodePtr active_ptr = KernelAdjust::GetInstance().CreateStreamActiveOp(graph_ptr); |
|
|
|
@@ -425,11 +425,11 @@ bool AscendStreamAssign::IsProcessedStream(uint32_t stream_id) { |
|
|
|
|
|
|
|
// section5 |
|
|
|
void AscendStreamAssign::InsertEventForHcomParallel(const NotNull<KernelGraphPtr> &graph_ptr) { |
|
|
|
MS_LOG(INFO) << "start"; |
|
|
|
MS_LOG(INFO) << "Start"; |
|
|
|
InsertEventCommonDependHcom(graph_ptr); |
|
|
|
InsertEventHcomDependCommon(graph_ptr); |
|
|
|
InsertEventHcomDependHcom(graph_ptr); |
|
|
|
MS_LOG(INFO) << "end"; |
|
|
|
MS_LOG(INFO) << "End"; |
|
|
|
} |
|
|
|
|
|
|
|
void AscendStreamAssign::InsertEventCommonDependHcom(const NotNull<KernelGraphPtr> &graph_ptr) { |
|
|
|
@@ -447,7 +447,7 @@ void AscendStreamAssign::InsertEventCommonDependHcom(const NotNull<KernelGraphPt |
|
|
|
|
|
|
|
auto target = FindTargetOp(it, cnodes.end(), *(it - 1)); |
|
|
|
if (target == cnodes.end()) { |
|
|
|
MS_LOG(WARNING) << "hcom node:" << (*(it - 1))->fullname_with_scope() |
|
|
|
MS_LOG(WARNING) << "Hcom node:" << (*(it - 1))->fullname_with_scope() |
|
|
|
<< ", can't find target for insert recv op, no insert send/recv"; |
|
|
|
it = cnodes.erase(it); |
|
|
|
continue; |
|
|
|
@@ -469,7 +469,7 @@ void AscendStreamAssign::InsertEventCommonDependHcom(const NotNull<KernelGraphPt |
|
|
|
// one event allocated additional, should delete |
|
|
|
resource_manager.DeleteEvent(); |
|
|
|
graph_ptr->set_execution_order(cnodes); |
|
|
|
MS_LOG(INFO) << "after common depend hcom, total event nums:" << resource_manager.get_cur_event_num(); |
|
|
|
MS_LOG(INFO) << "After common depend hcom, total event nums:" << resource_manager.get_cur_event_num(); |
|
|
|
} |
|
|
|
|
|
|
|
void AscendStreamAssign::InsertEventHcomDependCommon(const NotNull<KernelGraphPtr> &graph_ptr) { |
|
|
|
@@ -512,7 +512,7 @@ void AscendStreamAssign::InsertEventHcomDependCommon(const NotNull<KernelGraphPt |
|
|
|
} |
|
|
|
|
|
|
|
graph_ptr->set_execution_order(cnodes); |
|
|
|
MS_LOG(INFO) << "after hcom depend common, total event nums:" << resource_manager.get_cur_event_num(); |
|
|
|
MS_LOG(INFO) << "After hcom depend common, total event nums:" << resource_manager.get_cur_event_num(); |
|
|
|
} |
|
|
|
|
|
|
|
void AscendStreamAssign::InsertEventHcomDependHcom(const NotNull<KernelGraphPtr> &graph_ptr) { |
|
|
|
@@ -547,11 +547,11 @@ void AscendStreamAssign::InsertEventHcomDependHcom(const NotNull<KernelGraphPtr> |
|
|
|
} |
|
|
|
|
|
|
|
if (hcom_index.size() < 2) { |
|
|
|
MS_LOG(INFO) << "different stream hcom size is less than 2, no need insert event between them"; |
|
|
|
MS_LOG(INFO) << "Different stream hcom size is less than 2, no need insert event between them"; |
|
|
|
return; |
|
|
|
} |
|
|
|
InsertEventBetweenHcom(graph_ptr, hcom_index, first_hcom_stream, last_hcom_stream); |
|
|
|
MS_LOG(INFO) << "after hcom depend hcom, total event nums:" << resource_manager.get_cur_event_num(); |
|
|
|
MS_LOG(INFO) << "After hcom depend hcom, total event nums:" << resource_manager.get_cur_event_num(); |
|
|
|
} |
|
|
|
|
|
|
|
void AscendStreamAssign::InsertEventBetweenHcom(const NotNull<KernelGraphPtr> &graph_ptr, |
|
|
|
@@ -630,7 +630,7 @@ bool AscendStreamAssign::IsSatisfiedHcom(const std::map<uint32_t, vector<size_t> |
|
|
|
|
|
|
|
// section6 |
|
|
|
void AscendStreamAssign::InsertEventForIndependentParallel(const NotNull<KernelGraphPtr> &graph_ptr) { |
|
|
|
MS_LOG(INFO) << "start"; |
|
|
|
MS_LOG(INFO) << "Start"; |
|
|
|
AscendResourceMng &resource_manager = AscendResourceMng::GetInstance(); |
|
|
|
auto cnode_ptr_list = graph_ptr->execution_order(); |
|
|
|
vector<CNodePtr> cnodes = cnode_ptr_list; |
|
|
|
@@ -639,13 +639,13 @@ void AscendStreamAssign::InsertEventForIndependentParallel(const NotNull<KernelG |
|
|
|
while (it != cnodes.end()) { |
|
|
|
MS_EXCEPTION_IF_NULL(*it); |
|
|
|
if (IsIndependentNode(*it)) { |
|
|
|
MS_LOG(INFO) << "deal independent op[" << (*it)->DebugString() << "]"; |
|
|
|
MS_LOG(INFO) << "Deal independent op[" << (*it)->DebugString() << "]"; |
|
|
|
CNodePtr send_cnode_ptr = CreateSendApplyKernel(graph_ptr, cur_event_id, AnfAlgo::GetStreamId(*it)); |
|
|
|
it = cnodes.insert(it + 1, send_cnode_ptr); |
|
|
|
|
|
|
|
auto target = FindTargetOp(it, cnodes.end(), *(it - 1)); |
|
|
|
if (target == cnodes.end()) { |
|
|
|
MS_LOG(DEBUG) << "independ node[" << (*(it - 1))->fullname_with_scope() |
|
|
|
MS_LOG(DEBUG) << "Independ node[" << (*(it - 1))->fullname_with_scope() |
|
|
|
<< "] can't find target for insert recv op, no insert send/recv"; |
|
|
|
it = cnodes.erase(it); |
|
|
|
continue; |
|
|
|
@@ -662,8 +662,8 @@ void AscendStreamAssign::InsertEventForIndependentParallel(const NotNull<KernelG |
|
|
|
// one event allocated additional, should delete |
|
|
|
resource_manager.DeleteEvent(); |
|
|
|
graph_ptr->set_execution_order(cnodes); |
|
|
|
MS_LOG(INFO) << "after independent parallel, total event nums:" << resource_manager.get_cur_event_num(); |
|
|
|
MS_LOG(INFO) << "end"; |
|
|
|
MS_LOG(INFO) << "After independent parallel, total event nums:" << resource_manager.get_cur_event_num(); |
|
|
|
MS_LOG(INFO) << "End"; |
|
|
|
} |
|
|
|
|
|
|
|
// section7 |
|
|
|
@@ -687,7 +687,7 @@ void AscendStreamAssign::GetNeedActiveStreams(const NotNull<KernelGraphPtr> &gra |
|
|
|
auto need_active = GetValue<bool>(value_ptr); |
|
|
|
if (need_active) { |
|
|
|
auto stream_id = AnfAlgo::GetStreamId(cur_cnode_ptr); |
|
|
|
MS_LOG(INFO) << "stream id:" << stream_id << " is need actived at first"; |
|
|
|
MS_LOG(INFO) << "Stream id:" << stream_id << " is need actived at first"; |
|
|
|
need_first_active_streams_.push_back(stream_id); |
|
|
|
} |
|
|
|
} |
|
|
|
@@ -724,7 +724,7 @@ void AscendStreamAssign::CheckStreamAssign(const NotNull<KernelGraphPtr> &graph_ |
|
|
|
MS_EXCEPTION_IF_NULL(cur_cnode_ptr); |
|
|
|
uint32_t stream_id = AnfAlgo::GetStreamId(cur_cnode_ptr); |
|
|
|
if (stream_id == kInvalidStreamId) { |
|
|
|
MS_LOG(EXCEPTION) << "node:" << AnfAlgo::GetCNodeName(cur_cnode_ptr) << "had not been assigned stream"; |
|
|
|
MS_LOG(EXCEPTION) << "Node:" << AnfAlgo::GetCNodeName(cur_cnode_ptr) << "had not been assigned stream"; |
|
|
|
} |
|
|
|
|
|
|
|
(void)streams.emplace(stream_id); |
|
|
|
@@ -739,11 +739,11 @@ void AscendStreamAssign::CheckStreamAssign(const NotNull<KernelGraphPtr> &graph_ |
|
|
|
// check stream assign |
|
|
|
if (!streams.empty()) { |
|
|
|
if (min_stream != 0) { |
|
|
|
MS_LOG(EXCEPTION) << "stream should start from 0, now is from " << min_stream; |
|
|
|
MS_LOG(EXCEPTION) << "Stream should start from 0, now is from " << min_stream; |
|
|
|
} |
|
|
|
uint32_t assigned_stream_num = resource_manager.get_cur_stream_num(); |
|
|
|
if ((max_stream != assigned_stream_num - 1) || (streams.size() != assigned_stream_num)) { |
|
|
|
MS_LOG(EXCEPTION) << "stream should be consecutive, max stream id:" << max_stream |
|
|
|
MS_LOG(EXCEPTION) << "Stream should be consecutive, max stream id:" << max_stream |
|
|
|
<< "; alloc stream nums:" << assigned_stream_num << "; streams size:" << streams.size(); |
|
|
|
} |
|
|
|
} |
|
|
|
@@ -779,20 +779,20 @@ void AscendStreamAssign::CheckEventAssign(const NotNull<KernelGraphPtr> &graph_p |
|
|
|
// check event assign |
|
|
|
if (!event_map.empty()) { |
|
|
|
if (min_event_id != 0) { |
|
|
|
MS_LOG(EXCEPTION) << "event should start from 0, now is from " << min_event_id; |
|
|
|
MS_LOG(EXCEPTION) << "Event should start from 0, now is from " << min_event_id; |
|
|
|
} |
|
|
|
uint32_t assigned_event_num = resource_manager.get_cur_event_num(); |
|
|
|
if ((max_event_id != assigned_event_num - 1) || (event_map.size() != assigned_event_num)) { |
|
|
|
MS_LOG(EXCEPTION) << "event should be consecutive"; |
|
|
|
MS_LOG(EXCEPTION) << "Event should be consecutive"; |
|
|
|
} |
|
|
|
for (const auto &item : event_map) { |
|
|
|
if (item.second.size() != 2) { |
|
|
|
MS_LOG(EXCEPTION) << "send/recv should be in pair and share one event id"; |
|
|
|
MS_LOG(EXCEPTION) << "Send/recv should be in pair and share one event id"; |
|
|
|
} |
|
|
|
auto first_name = AnfAlgo::GetCNodeName(item.second[0]); |
|
|
|
auto second_name = AnfAlgo::GetCNodeName(item.second[1]); |
|
|
|
if (!(first_name == kSendOpName && second_name == kRecvOpName)) { |
|
|
|
MS_LOG(EXCEPTION) << "send should be before recv"; |
|
|
|
MS_LOG(EXCEPTION) << "Send should be before recv"; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@@ -858,7 +858,7 @@ vector<CNodePtr>::iterator AscendStreamAssign::FindTargetOp(vector<CNodePtr>::it |
|
|
|
} else { |
|
|
|
auto real_input = AnfAlgo::VisitKernel(input, 0); |
|
|
|
if (node == real_input.first) { |
|
|
|
MS_LOG(INFO) << "find target op[" << (*begin)->DebugString() << "]"; |
|
|
|
MS_LOG(INFO) << "Find target op[" << (*begin)->DebugString() << "]"; |
|
|
|
return begin; |
|
|
|
} |
|
|
|
} |
|
|
|
@@ -872,10 +872,10 @@ bool AscendStreamAssign::IsTaskSink() { |
|
|
|
auto ms_context = MsContext::GetInstance(); |
|
|
|
MS_EXCEPTION_IF_NULL(ms_context); |
|
|
|
if (!ms_context->enable_task_sink()) { |
|
|
|
MS_LOG(INFO) << "task sink mode is not enable"; |
|
|
|
MS_LOG(INFO) << "Task sink mode is not enable"; |
|
|
|
return false; |
|
|
|
} else { |
|
|
|
MS_LOG(INFO) << "task sink mode is enable"; |
|
|
|
MS_LOG(INFO) << "Task sink mode is enable"; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
@@ -885,7 +885,7 @@ void AscendStreamAssign::GetWaitStreams(vector<uint32_t> *wait_active_stream_lis |
|
|
|
AscendResourceMng &resource_manager = AscendResourceMng::GetInstance(); |
|
|
|
uint32_t total_stream_num = resource_manager.get_cur_stream_num(); |
|
|
|
if (total_stream_num == 0) { |
|
|
|
MS_LOG(INFO) << "total_common_stream_num is zero"; |
|
|
|
MS_LOG(INFO) << "The total_common_stream_num is zero"; |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
@@ -893,7 +893,7 @@ void AscendStreamAssign::GetWaitStreams(vector<uint32_t> *wait_active_stream_lis |
|
|
|
for (uint32_t i = 0; i < total_stream_num; i++) { |
|
|
|
auto it = std::find(need_first_active_streams_.begin(), need_first_active_streams_.end(), i); |
|
|
|
if (it == need_first_active_streams_.end()) { |
|
|
|
MS_LOG(INFO) << "wait common stream id = " << i; |
|
|
|
MS_LOG(INFO) << "Wait common stream id = " << i; |
|
|
|
wait_active_stream_list->push_back(i); |
|
|
|
} |
|
|
|
} |
|
|
|
|