Browse Source

add gpu step_trace

tags/v1.1.0
gzhcv 5 years ago
parent
commit
6f6b56bfe1
9 changed files with 576 additions and 74 deletions
  1. +49
    -1
      mindspore/ccsrc/profiler/device/gpu/data_saver.cc
  2. +6
    -0
      mindspore/ccsrc/profiler/device/gpu/data_saver.h
  3. +4
    -0
      mindspore/ccsrc/profiler/device/gpu/gpu_profiling.cc
  4. +3
    -0
      mindspore/ccsrc/profiler/device/gpu/gpu_profiling.h
  5. +187
    -0
      mindspore/ccsrc/profiler/device/gpu/gpu_profiling_utils.cc
  6. +79
    -0
      mindspore/ccsrc/profiler/device/gpu/gpu_profiling_utils.h
  7. +12
    -1
      mindspore/ccsrc/runtime/device/gpu/gpu_kernel_runtime.cc
  8. +205
    -58
      mindspore/profiler/parser/step_trace_parser.py
  9. +31
    -14
      mindspore/profiler/profiling.py

+ 49
- 1
mindspore/ccsrc/profiler/device/gpu/data_saver.cc View File

@@ -168,6 +168,7 @@ void DataSaver::WriteFile(std::string out_path_dir) {
WriteOpType(out_path_dir);
WriteActivity(out_path_dir);
WriteOpTimestamp(out_path_dir);
WriteStepTrace(out_path_dir);
}

void DataSaver::WriteOpType(const std::string &saver_base_dir) {
@@ -262,9 +263,56 @@ void DataSaver::WriteOpTimestamp(const std::string &saver_base_dir) {
ChangeFileMode(file_path);
}

void DataSaver::WriteStepTrace(const std::string &saver_base_dir) {
std::string file_path = saver_base_dir + "/step_trace_profiling_" + device_id_ + ".txt";
std::ofstream ofs(file_path);
// check if the file is writable
if (!ofs.is_open()) {
MS_LOG(WARNING) << "Open file '" << file_path << "' failed!";
return;
}

// write step trace time info into file
uint32_t factor = 10;
std::vector<std::string> op_name_arr;
op_name_arr.push_back(step_trace_op_name.trace_iter_start);
op_name_arr.push_back(step_trace_op_name.trace_fp_start);
op_name_arr.push_back(step_trace_op_name.trace_bp_end);
op_name_arr.push_back(step_trace_op_name.trace_iter_end);
if (!step_trace_op_name.trace_custom_node.empty()) {
auto start = step_trace_op_name.trace_custom_node.begin();
auto end = step_trace_op_name.trace_custom_node.end();
std::copy(start, end, std::back_inserter(op_name_arr));
}
for (auto op_name : op_name_arr) {
auto iter_op_timestamp = op_timestamps_map_.find(op_name);
if (iter_op_timestamp != op_timestamps_map_.end()) {
try {
ofs << op_name << " ";
for (auto start_end : iter_op_timestamp->second) {
// convert the time unit from 1ns to 10ns (keep the same with ascend)
uint64_t duration = start_end.duration * kTimeUnit;
uint64_t end_timestamp = (duration + start_end.start_timestamp) / factor;
uint64_t start_timestamp = start_end.start_timestamp / factor;
ofs << start_timestamp << "," << end_timestamp << " ";
}
ofs << std::endl;
} catch (const std::exception &e) {
MS_LOG(ERROR) << "Write " << file_path << "failed:" << e.what();
}
}
}

ofs.close();
ChangeFileMode(file_path);
MS_LOG(INFO) << "Write step trace infos into file: " << file_path;
}

void DataSaver::SetStepTraceOpName(ProfilingTraceInfo trace_op_name) { step_trace_op_name = trace_op_name; }

void DataSaver::ChangeFileMode(const std::string &file_path) {
if (chmod(common::SafeCStr(file_path), S_IRUSR | S_IWUSR) == -1) {
MS_LOG(INFO) << "Modify file:" << file_path << " to rw fail.";
MS_LOG(WARNING) << "Modify file:" << file_path << " to rw fail.";
return;
}
}


+ 6
- 0
mindspore/ccsrc/profiler/device/gpu/data_saver.h View File

@@ -17,6 +17,7 @@
#ifndef MINDSPORE_DATA_SAVER_H
#define MINDSPORE_DATA_SAVER_H
#include <iostream>
#include <algorithm>
#include <unordered_map>
#include <vector>
#include <string>
@@ -124,6 +125,8 @@ class DataSaver {

void ParseOpInfo(const OpInfoMap &op_info_maps);

void SetStepTraceOpName(ProfilingTraceInfo trace_op_name);

void ParseEvent(const std::vector<Event> &events);

void WriteFile(std::string out_path);
@@ -145,6 +148,8 @@ class DataSaver {

void WriteOpTimestamp(const std::string &saver_base_dir);

void WriteStepTrace(const std::string &saver_base_dir);

void ChangeFileMode(const std::string &file_path);

std::string device_id_;
@@ -152,6 +157,7 @@ class DataSaver {
OpTypeInfos op_type_infos_;
OpDetailInfos op_detail_infos_;
OpTimestampInfo op_timestamps_map_;
ProfilingTraceInfo step_trace_op_name;
};
} // namespace gpu
} // namespace profiler


+ 4
- 0
mindspore/ccsrc/profiler/device/gpu/gpu_profiling.cc View File

@@ -470,6 +470,7 @@ void GPUProfiler::SaveProfileData() {
MS_LOG(WARNING) << "Profile data path is empty, skip save profile data.";
} else {
DataSaver dataSaver;
dataSaver.SetStepTraceOpName(step_trace_op_name);
dataSaver.ParseOpInfo(op_info_map_);
dataSaver.ParseEvent(events_);
dataSaver.WriteFile(profile_data_path_);
@@ -649,6 +650,9 @@ void GPUProfiler::HandleActivityRecord(CUpti_Activity *record) {

AddEvent(std::move(profilingData));
}

void GPUProfiler::SetStepTraceOpName(ProfilingTraceInfo trace_op_name) { step_trace_op_name = trace_op_name; }

void GPUProfiler::RegisterProfilingOp(std::shared_ptr<ProfilingOp> node) {
if (profiling_op_.find(node->Name()) != profiling_op_.end()) {
return;


+ 3
- 0
mindspore/ccsrc/profiler/device/gpu/gpu_profiling.h View File

@@ -27,6 +27,7 @@
#include <unordered_map>
#include <utility>
#include <vector>
#include "profiler/device/gpu/gpu_profiling_utils.h"

namespace mindspore {
namespace profiler {
@@ -144,6 +145,7 @@ class GPUProfiler {
void OpDataProducerEnd();
void ProcessEvents();
void RegisterProfilingOp(std::shared_ptr<ProfilingOp> node);
void SetStepTraceOpName(ProfilingTraceInfo trace_op_name);
std::string ProfileDataPath() const { return profile_data_path_; }

private:
@@ -189,6 +191,7 @@ class GPUProfiler {
uint64_t op_cupti_time_start_;
std::string profile_data_path_;
std::map<std::string, std::shared_ptr<ProfilingOp>> profiling_op_;
ProfilingTraceInfo step_trace_op_name;
};
} // namespace gpu
} // namespace profiler


+ 187
- 0
mindspore/ccsrc/profiler/device/gpu/gpu_profiling_utils.cc View File

@@ -0,0 +1,187 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "profiler/device/gpu/gpu_profiling_utils.h"
#include "backend/kernel_compiler/kernel.h"
#include "backend/session/anf_runtime_algorithm.h"
#include "utils/ms_utils.h"
#include "utils/ms_context.h"
#include "utils/utils.h"

namespace mindspore {
namespace profiler {
namespace gpu {
constexpr char kFpStartNode[] = "PROFILING_FP_START";
constexpr char kBpEndNode[] = "PROFILING_BP_END";
constexpr char kIterEndNode[] = "PROFILING_ITER_END";
constexpr int fpExistGraphId = 2;

uint32_t ProfilingUtils::last_graph_id = 0;
bool ProfilingUtils::have_communication_op = false;
ProfilingTraceInfo ProfilingUtils::profiling_trace = {"", "", "", "", false};

ProfilingTraceInfo ProfilingUtils::GetProfilingTraceFromEnv(NotNull<const session::KernelGraph *> graph_ptr) {
MS_LOG(INFO) << "get current subgraph op name start.";
auto &cnode_exec_order = graph_ptr->execution_order();
if (cnode_exec_order.empty()) {
return profiling_trace;
}
uint32_t current_graph_id = graph_ptr->graph_id();
// current graph id less than last graph id indicates all subgraph have called.
if (current_graph_id < last_graph_id) {
profiling_trace.IsFirstStepEnd = true;
OutputStepTraceOpNameStatus();
return profiling_trace;
}

SetTraceIterStart(cnode_exec_order);
SetTraceIterEnd(cnode_exec_order);
SetTraceFpStart(cnode_exec_order, current_graph_id);
SetTraceBpEnd(cnode_exec_order);
GetTraceHccl(cnode_exec_order);

last_graph_id = current_graph_id;
return profiling_trace;
}

void ProfilingUtils::OutputStepTraceOpNameStatus() {
if (!profiling_trace.IsValid()) {
MS_LOG(ERROR) << "Did not get all the step_trace op name.";
}
MS_LOG(INFO) << "[profiling]trace_iter_start: " << profiling_trace.trace_iter_start
<< "trace_fp_start: " << profiling_trace.trace_fp_start
<< "trace_bp_end: " << profiling_trace.trace_bp_end
<< "trace_iter_end: " << profiling_trace.trace_iter_end;
MS_LOG(INFO) << "get step_trace op name end.";
}

void ProfilingUtils::GetTraceHccl(const std::vector<CNodePtr> &cnode_exec_order) {
for (const auto &node : cnode_exec_order) {
if (AnfAlgo::IsCommunicationOp(node)) {
MS_EXCEPTION_IF_NULL(node);
if (std::find(profiling_trace.trace_custom_node.begin(), profiling_trace.trace_custom_node.end(),
node->fullname_with_scope()) == profiling_trace.trace_custom_node.end()) {
profiling_trace.trace_custom_node.push_back(node->fullname_with_scope());
}
MS_LOG(INFO) << "[profiling]Get hccl node:" << node->fullname_with_scope();
}
}
}

void ProfilingUtils::SetTraceIterStart(const std::vector<CNodePtr> &cnode_exec_order) {
if (!profiling_trace.trace_iter_start.empty()) {
return;
}

auto first_node = cnode_exec_order.front();
MS_EXCEPTION_IF_NULL(first_node);
if (AnfAlgo::GetCNodeName(first_node) == kGetNextOpName) {
profiling_trace.trace_iter_start = first_node->fullname_with_scope();
}
}

void ProfilingUtils::SetTraceFpStart(const std::vector<CNodePtr> &cnode_exec_order, uint32_t graph_id) {
if (!profiling_trace.trace_fp_start.empty()) {
return;
}

const char *trace_fp_start = std::getenv(kFpStartNode);
if (trace_fp_start != nullptr) {
profiling_trace.trace_fp_start = std::string(trace_fp_start);
MS_LOG(INFO) << "Set the Fp Start Op Name from Environment Variable:" << profiling_trace.trace_fp_start;
return;
}

if (graph_id == fpExistGraphId) {
auto first_node = cnode_exec_order.front();
MS_EXCEPTION_IF_NULL(first_node);
profiling_trace.trace_fp_start = first_node->fullname_with_scope();
}
}

void ProfilingUtils::SetTraceBpEnd(const std::vector<CNodePtr> &cnode_exec_order) {
const char *trace_bp_end = std::getenv(kBpEndNode);
if (trace_bp_end != nullptr) {
profiling_trace.trace_bp_end = std::string(trace_bp_end);
MS_LOG(INFO) << "Set the Bp End Op Name from Environment Variable:" << profiling_trace.trace_bp_end;
return;
}

std::string bp_end_str;
// Contain hccl kernel (try to find the last communication op)
auto iter = cnode_exec_order.rbegin();
while (iter != cnode_exec_order.rend()) {
if (AnfAlgo::IsCommunicationOp(*iter)) {
break;
}
++iter;
}
// If find the communication op
if (iter != cnode_exec_order.rend()) {
// store communication op input nodes' name
std::set<std::string> ar_input_node_names;
for (size_t i = 0; i < AnfAlgo::GetInputTensorNum(*iter); ++i) {
auto input_node_with_index = AnfAlgo::GetPrevNodeOutput(*iter, i);
auto input_node = input_node_with_index.first;
ar_input_node_names.insert(input_node->fullname_with_scope());
}
// start from previous node
++iter;
// find input names in previous node
while (iter != cnode_exec_order.rend()) {
if (ar_input_node_names.find((*iter)->fullname_with_scope()) != ar_input_node_names.end()) {
bp_end_str = (*iter)->fullname_with_scope();
break;
}
++iter;
}
}

if (bp_end_str.empty() && !have_communication_op) {
bp_end_str = GetGraphSecondLastKernelName(cnode_exec_order);
}

if (!bp_end_str.empty()) {
profiling_trace.trace_bp_end = bp_end_str;
}
}

void ProfilingUtils::SetTraceIterEnd(const std::vector<CNodePtr> &cnode_exec_order) {
const char *trace_iter_end = std::getenv(kIterEndNode);
if (trace_iter_end != nullptr) {
profiling_trace.trace_iter_end = std::string(trace_iter_end);
MS_LOG(INFO) << "Set the Iter End Op Name from Environment Variable:" << profiling_trace.trace_iter_end;
return;
}

auto iter_end = cnode_exec_order.rbegin();
profiling_trace.trace_iter_end = (*iter_end)->fullname_with_scope();
}

std::string ProfilingUtils::GetGraphSecondLastKernelName(const std::vector<CNodePtr> &cnode_exec_order) {
std::string second_last_kernel_name;
auto iter = cnode_exec_order.rbegin();
++iter;
if (iter != cnode_exec_order.rend()) {
second_last_kernel_name = (*iter)->fullname_with_scope();
}

return second_last_kernel_name;
}

} // namespace gpu
} // namespace profiler
} // namespace mindspore

+ 79
- 0
mindspore/ccsrc/profiler/device/gpu/gpu_profiling_utils.h View File

@@ -0,0 +1,79 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_PROFILER_DEVICE_GPU_GPU_PROFILING_UTILS_H_
#define MINDSPORE_CCSRC_PROFILER_DEVICE_GPU_GPU_PROFILING_UTILS_H_

#include <map>
#include <memory>
#include <string>
#include <vector>
#include <set>
#include <unordered_map>

#include "backend/session/kernel_graph.h"

namespace mindspore {
namespace profiler {
namespace gpu {
struct ProfilingTraceInfo {
// support get all the op name from environment variable
// iteration start op is GetNext
std::string trace_iter_start;
// fp start op is the first op of Graph that ID is 2
std::string trace_fp_start;
// bp end op is the input node op of the last communication op (if exist)
std::string trace_bp_end;
// iteration end op is the last executed op
std::string trace_iter_end;

bool IsFirstStepEnd;

// profiling specific op, such as AllReduce;
std::vector<std::string> trace_custom_node;

bool IsValid() const {
return !(trace_iter_start.empty() || trace_fp_start.empty() || trace_bp_end.empty() || trace_iter_end.empty());
}
};

class ProfilingUtils {
public:
ProfilingUtils() = default;
~ProfilingUtils() = default;

// Get profiling trace point from envs.
// export PROFILING_FP_START='full name of the first cnode to execute'
// export PROFILING_BP_END='full name of the last backpropagation cnode to execute'
// export PROFILING_ITER_END='full name of last cnode in graph to execute'
static ProfilingTraceInfo GetProfilingTraceFromEnv(NotNull<const session::KernelGraph *> graph_ptr);
static void OutputStepTraceOpNameStatus();

static uint32_t last_graph_id;
static bool have_communication_op;
static ProfilingTraceInfo profiling_trace;

private:
static void SetTraceIterStart(const std::vector<CNodePtr> &cnode_exec_order);
static void SetTraceFpStart(const std::vector<CNodePtr> &cnode_exec_order, uint32_t graph_id);
static void SetTraceBpEnd(const std::vector<CNodePtr> &cnode_exec_order);
static void SetTraceIterEnd(const std::vector<CNodePtr> &cnode_exec_order);
static std::string GetGraphSecondLastKernelName(const std::vector<CNodePtr> &cnode_exec_order);
static void GetTraceHccl(const std::vector<CNodePtr> &cnode_exec_order);
};
} // namespace gpu
} // namespace profiler
} // namespace mindspore
#endif // MINDSPORE_CCSRC_PROFILER_DEVICE_GPU_GPU_PROFILING_UTILS_H_

+ 12
- 1
mindspore/ccsrc/runtime/device/gpu/gpu_kernel_runtime.cc View File

@@ -1,5 +1,5 @@
/**
* Copyright 2019 Huawei Technologies Co., Ltd
* Copyright 2019-2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
#include "common/trans.h"
#include "ir/dtype.h"
#include "profiler/device/gpu/gpu_profiling.h"
#include "profiler/device/gpu/gpu_profiling_utils.h"
#include "utils/shape_utils.h"
#include "debug/data_dump/dump_json_parser.h"
#ifdef ENABLE_DEBUGGER
@@ -604,6 +605,16 @@ bool GPUKernelRuntime::LaunchKernelDynamic(const session::KernelGraph *graph, De
auto profiler_inst = profiler::gpu::GPUProfiler::GetInstance();
MS_EXCEPTION_IF_NULL(profiler_inst);

static bool FlagGetStepTraceOpName = false;
if (!FlagGetStepTraceOpName) {
profiler::gpu::ProfilingTraceInfo profiling_trace =
profiler::gpu::ProfilingUtils::GetProfilingTraceFromEnv(NOT_NULL(graph));
if (profiling_trace.IsFirstStepEnd) {
FlagGetStepTraceOpName = true;
profiler_inst->SetStepTraceOpName(profiling_trace);
}
}

for (const auto &kernel : kernels) {
auto kernel_mod = AnfAlgo::GetKernelMod(kernel);
MS_EXCEPTION_IF_NULL(kernel_mod);


+ 205
- 58
mindspore/profiler/parser/step_trace_parser.py View File

@@ -33,7 +33,7 @@ StepTraceStruct = namedtuple(
)


class StepTraceParser:
class BaseStepTraceParser:
"""
The parser for step trace data.

@@ -43,11 +43,6 @@ class StepTraceParser:
job_id (int): The job id used to define the start of new step. Default: 0.
skip_first_step (bool): Whether skip the first step or not.
"""
_event_size = 20
_fp_tag = 1
_bp_tag = 2
_end_tag = 255

def __init__(self, input_dir, output_file_path, job_id=0, skip_first_step=False):
self._input_dir = input_dir
self._output_path = output_file_path
@@ -98,18 +93,6 @@ class StepTraceParser:
Returns:
dict, parsed point info.
"""
points = {
'fp_start': point_info.get(self._fp_tag, ''),
'bp_end': point_info.get(self._bp_tag, '')
}
try:
with open(output_path, 'w') as json_file:
json.dump(points, json_file)
os.chmod(output_path, stat.S_IREAD)
except (IOError, OSError) as err:
log.warning('Failed to save point info. %s', err)
raise ProfilerIOException
return points

def update_tag_op_type_map(self, point_info):
"""
@@ -153,17 +136,7 @@ class StepTraceParser:

def _get_step_trace_files(self):
"""Get step trace files."""
# step trace files may under $profiler_dir or $profiler_dir/data
profiler_dir = self._input_dir
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
# try to find step trace files under $profiler_dir/data
profiler_dir = os.path.join(profiler_dir, 'data')
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
raise ProfilerPathErrorException('Training trace file does not exist.')

return step_trace_files
return self._input_dir

@staticmethod
def _search_file(input_dir):
@@ -198,19 +171,6 @@ class StepTraceParser:

def _parse(self, source_files):
"""Parse source step trace files."""
log.info("Start to parse step trace file.")
event_info = {}
for source_file in source_files:
source_file = validate_and_normalize_path(source_file)
with open(source_file, 'rb') as handler:
content = handler.read()
for step_trace in self._get_next_step_trace(content, event_info):
if self._skip_first_step:
self._skip_first_step = False
continue
self._record_trace_event(step_trace)
self._record_average_info()
log.info("Finish to parse step trace file.")

def _get_next_step_trace(self, content, event_info):
"""
@@ -337,22 +297,8 @@ class StepTraceParser:
Returns:
dict, reduce info.
"""
reduce_info = {}
if end_point[0] - start_point[0] != 1 or end_point[0] % 2:
log.warning("Unmatched reduce event <%s, %s>.", start_point, end_point)
return reduce_info
op_type = self._tag_map.get(start_point[0])
# append field name with op type.
if not op_type:
log.warning("Can't recognize the inner type for point tag: %d.", start_point[0])
field_name += '_parallel'
else:
field_name += '_' + op_type
reduce_info[field_name] = end_point[1] - start_point[1]
reduce_info[field_name + '_start_point'] = start_point[1]
reduce_info[field_name + '_end_point'] = end_point[1]

return reduce_info
ret_dict = {}
return ret_dict

def _record_average_info(self):
"""Calculate average info."""
@@ -383,3 +329,204 @@ class StepTraceParser:
for row_data in self._result:
csv_writer.writerow(row_data)
os.chmod(self._output_path, stat.S_IREAD)


class GpuStepTraceParser(BaseStepTraceParser):
"""The parser for gpu step trace data."""
def record_point_info(self, source_file, output_path):
"""
Record point info into json.

Args:
source_file (str): The file path of step trace original data.
output_path (str): The output path for saving point info.

Returns:
dict, parsed point info.
"""
fp_start, bp_end = 1, 2
try:
with open(source_file, 'r') as f:
lines = f.readlines()
fp_start_name = lines[fp_start].split()[0]
bp_end_name = lines[bp_end].split()[0]
except (IOError, OSError) as err:
log.warning(f'Failed to read {source_file}', err)
raise ProfilerIOException

points = {
'fp_start': fp_start_name,
'bp_end': bp_end_name
}
try:
with open(output_path, 'w') as json_file:
json.dump(points, json_file)
os.chmod(output_path, stat.S_IREAD)
except (IOError, OSError) as err:
log.warning('Failed to save point info. %s', err)
raise ProfilerIOException
return points

def _get_step_trace_files(self):
"""Get step trace files."""
return self._input_dir

def _parse(self, source_file):
"""Parse source step trace files."""
log.info("Start to parse step trace file.")
iter_start, fp_start, bp_end, iter_end = 0, 1, 2, 3
reduce_start = 4
start_time, end_time = 0, 1

source_file = validate_and_normalize_path(source_file)
try:
with open(source_file, 'r') as f:
lines = f.readlines()
step_trace_info_all = [line.strip().split() for line in lines]
num_of_step = len(step_trace_info_all[0])
# in callback mode that set the profiling step range, each op count is not equal
step_trace_info_all = [line[-num_of_step:] for line in step_trace_info_all]
except (IOError, OSError) as err:
log.warning(f'Failed to read {source_file}', err)
raise ProfilerIOException

for step_num in range(1, num_of_step):
step_trace = {
'start': int(step_trace_info_all[iter_start][step_num].split(',')[start_time]),
'fp': int(step_trace_info_all[fp_start][step_num].split(',')[start_time]),
'bp': int(step_trace_info_all[bp_end][step_num].split(',')[end_time]),
'end': int(step_trace_info_all[iter_end][step_num].split(',')[end_time]),
'reduce': {}
}
num_of_step_point = len(step_trace_info_all)
if num_of_step_point > reduce_start:
reduce_info = {}
reduce_time_info = []
for reduce_idx in range(reduce_start, num_of_step_point):
cur_reduce_time = step_trace_info_all[reduce_idx][step_num]
reduce_time_info += cur_reduce_time.split(',')
reduce_info['ops'] = reduce_time_info
step_trace['reduce'] = reduce_info
self._record_trace_event(step_trace)
self._record_average_info()
log.info("Finish to parse step trace file.")

def _get_single_reduce_event_info(self, field_name, start_point, end_point):
"""
Get single reduce info.

Args:
field_name (str): The field name.
start_point (str): Start point time.
end_point (str): End point time.

Returns:
dict, reduce info.
"""
reduce_info = {}

op_type = 'AllReduce'
# append field name with op type.
field_name += '_' + op_type
reduce_info[field_name] = int(end_point) - int(start_point)
reduce_info[field_name + '_start_point'] = start_point
reduce_info[field_name + '_end_point'] = end_point

return reduce_info


class AscendStepTraceParser(BaseStepTraceParser):
"""The parser for ascend step trace data."""
_event_size = 20
_fp_tag = 1
_bp_tag = 2
_end_tag = 255

def record_point_info(self, point_info, output_path):
"""
Record point info into json.

Args:
point_info (dict): The point info about tag id and relative op name.
output_path (str): The output path for saving point info.

Returns:
dict, parsed point info.
"""
points = {
'fp_start': point_info.get(self._fp_tag, ''),
'bp_end': point_info.get(self._bp_tag, '')
}
try:
with open(output_path, 'w') as json_file:
json.dump(points, json_file)
os.chmod(output_path, stat.S_IREAD)
except (IOError, OSError) as err:
log.warning('Failed to save point info. %s', err)
raise ProfilerIOException
return points

def _get_step_trace_files(self):
"""Get step trace files."""
# step trace files may under $profiler_dir or $profiler_dir/data
profiler_dir = self._input_dir
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
# try to find step trace files under $profiler_dir/data
profiler_dir = os.path.join(profiler_dir, 'data')
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
raise ProfilerPathErrorException('Training trace file does not exist.')

return step_trace_files

def _parse(self, source_files):
"""Parse source step trace files."""
log.info("Start to parse step trace file.")
event_info = {}

for source_file in source_files:
source_file = validate_and_normalize_path(source_file)
try:
with open(source_file, 'rb') as handler:
content = handler.read()
for step_trace in self._get_next_step_trace(content, event_info):
if self._skip_first_step:
self._skip_first_step = False
continue
self._record_trace_event(step_trace)
except (IOError, OSError) as err:
log.warning(f'Failed to read {source_file}', err)
raise ProfilerIOException

self._record_average_info()
log.info("Finish to parse step trace file.")

def _get_single_reduce_event_info(self, field_name, start_point, end_point):
"""
Get single reduce info.

Args:
field_name (str): The field name.
start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count).
end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count).

Returns:
dict, reduce info.
"""
reduce_info = {}
if end_point[0] - start_point[0] != 1 or end_point[0] % 2:
log.warning("Unmatched reduce event <%s, %s>.", start_point, end_point)
return reduce_info
op_type = self._tag_map.get(start_point[0])
# append field name with op type.
if not op_type:
log.warning("Can't recognize the inner type for point tag: %d.", start_point[0])
field_name += '_parallel'
else:
field_name += '_' + op_type
reduce_info[field_name] = end_point[1] - start_point[1]
reduce_info[field_name + '_start_point'] = start_point[1]
reduce_info[field_name + '_end_point'] = end_point[1]

return reduce_info

+ 31
- 14
mindspore/profiler/profiling.py View File

@@ -33,7 +33,7 @@ from mindspore.profiler.parser.minddata_parser import MinddataParser
from mindspore.profiler.parser.minddata_pipeline_parser import \
MinddataPipelineParser
from mindspore.profiler.parser.optime_parser import OPComputeTimeParser
from mindspore.profiler.parser.step_trace_parser import StepTraceParser
from mindspore.profiler.parser.step_trace_parser import GpuStepTraceParser, AscendStepTraceParser
from mindspore.nn.cell import Cell

PROFILING_LOG_BASE_PATH = "/var/log/npu/profiling"
@@ -154,6 +154,12 @@ class Profiler:
except ProfilerException as err:
logger.warning(err.message)

# analyse step trace info
try:
self._analyse_step_trace()
except ProfilerException as err:
logger.warning(err.message)

os.environ['PROFILING_MODE'] = str("false")

elif self._device_target and self._device_target == "Ascend":
@@ -227,7 +233,7 @@ class Profiler:
os.environ['PROFILING_MODE'] = str("false")
context.set_context(enable_profiling=False)

def _analyse_step_trace(self, source_path, framework_parser):
def _analyse_step_trace(self, source_path=None, framework_parser=None):
"""
Analyse step trace data and save the result.

@@ -247,18 +253,29 @@ class Profiler:
)
step_trace_intermediate_file_path = validate_and_normalize_path(step_trace_intermediate_file_path)
point_info_file_path = validate_and_normalize_path(point_info_file_path)
# whether keep the first step
skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME)
point_info = framework_parser.point_info
# parser the step trace files and save the result to disk
source_path = validate_and_normalize_path(source_path)
parser = StepTraceParser(input_dir=source_path,
output_file_path=step_trace_intermediate_file_path,
job_id=self._job_id_env,
skip_first_step=skip_first_step_flag)
parser.update_tag_op_type_map(point_info)
parser.parse_and_save()
point_info = parser.record_point_info(point_info, point_info_file_path)

if self._device_target and self._device_target == 'GPU':
input_file_path = os.path.join(
self._output_path,
f'step_trace_profiling_{self._dev_id}.txt'
)
parser = GpuStepTraceParser(input_dir=input_file_path,
output_file_path=step_trace_intermediate_file_path)
parser.parse_and_save()
point_info = parser.record_point_info(input_file_path, point_info_file_path)
else:
# whether keep the first step
skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME)
point_info = framework_parser.point_info
# parser the step trace files and save the result to disk
source_path = validate_and_normalize_path(source_path)
parser = AscendStepTraceParser(input_dir=source_path,
output_file_path=step_trace_intermediate_file_path,
job_id=self._job_id_env,
skip_first_step=skip_first_step_flag)
parser.update_tag_op_type_map(point_info)
parser.parse_and_save()
point_info = parser.record_point_info(point_info, point_info_file_path)
# print parser result
parser.show()
logger.info("Finish saving the intermediate result: %s", step_trace_intermediate_file_path)


Loading…
Cancel
Save