From f6b0fd301e9a825269ec71f43abfd59cb3b3a85a Mon Sep 17 00:00:00 2001 From: gzhcv Date: Wed, 9 Sep 2020 21:56:28 +0800 Subject: [PATCH] gpu_timeline(python code) update mindspore/profiler/parser/integrator.py. update mindspore/profiler/profiling.py. --- mindspore/profiler/parser/container.py | 2 +- mindspore/profiler/parser/integrator.py | 301 ++++++++++++++++++------ mindspore/profiler/profiling.py | 20 +- 3 files changed, 242 insertions(+), 81 deletions(-) diff --git a/mindspore/profiler/parser/container.py b/mindspore/profiler/parser/container.py index 62f054ea7b..1646b7ed5f 100644 --- a/mindspore/profiler/parser/container.py +++ b/mindspore/profiler/parser/container.py @@ -80,7 +80,7 @@ class TimelineContainer: """ def __init__(self, split_list): self._op_name = split_list[0] - self._stream_id = int(split_list[1]) + self._stream_id = str(split_list[1]) self._start_time = float(split_list[2]) self._duration = float(split_list[3]) self._pid = None diff --git a/mindspore/profiler/parser/integrator.py b/mindspore/profiler/parser/integrator.py index fa8e208586..02f0b71901 100644 --- a/mindspore/profiler/parser/integrator.py +++ b/mindspore/profiler/parser/integrator.py @@ -25,7 +25,7 @@ from mindspore.profiler.common.util import query_latest_trace_time_file, to_int, from mindspore.profiler.common.validator.validate_path import validate_and_normalize_path from mindspore.profiler.parser.container import TimelineContainer -SIZE_LIMIT = 20 * 1024 * 1024 # 20MB +SIZE_LIMIT_DEFAULT = 20 * 1024 * 1024 # 20MB class Integrator: @@ -483,7 +483,7 @@ class Integrator: self._display_col_names_detail.append(self._col_names_detail[5]) -class TimelineAnalyser: +class BaseTimelineGenerator: """ Analyse timeline data from file. """ @@ -504,14 +504,23 @@ class TimelineAnalyser: self._profiling_dir = profiling_dir self._device_id = device_id - def write_timeline(self): + def _load_timeline_data(self): + """Load timeline data from file.""" + + def _parse_timeline_data(self): + """Parse timeline data.""" + + def init_timeline(self): + """Init timeline metadata, adding all collected info.""" + + def write_timeline(self, size_limit=SIZE_LIMIT_DEFAULT): """Load data according to the parsed profiling files.""" # Write timeline to file. logger.info('Writing timeline file...') - self.write_timeline_to_json_by_limitation() + self.write_timeline_to_json_by_limitation(size_limit) logger.info('Finished file writing!') - def write_timeline_to_json_by_limitation(self): + def write_timeline_to_json_by_limitation(self, size_limit): """Write timeline to json by limitation.""" display_filename = self._display_filename.format(self._device_id) display_file_path = os.path.join( @@ -527,7 +536,7 @@ class TimelineAnalyser: for index, item in enumerate(self._timeline_meta): json.dump(item, json_file) file_size = os.path.getsize(display_file_path) - if file_size > SIZE_LIMIT: + if file_size > size_limit: break if index == length - 1: break @@ -553,6 +562,215 @@ class TimelineAnalyser: logger.error('Error occurred when write timeline summary file: %s', err) raise ProfilerIOException + @staticmethod + def _update_num_of_streams(timeline, stream_count_dict): + """Update number of streams.""" + stream_id = timeline[1] + if stream_id not in stream_count_dict.keys(): + stream_count_dict[stream_id] = 1 + else: + stream_count_dict[stream_id] += 1 + + def get_min_cycle_counter(self): + """ + Get minimum cycle counter. + + Returns: + float, the minimum value of the cycle counter. + """ + file_path = os.path.join( + self._profiling_dir, + self._min_cycle_counter_file_path.format(self._device_id) + ) + + file_path = validate_and_normalize_path(file_path) + + if os.path.exists(file_path): + try: + with open(file_path, 'r') as f_obj: + min_cycle_counter = f_obj.read() + min_cycle_counter = float(min_cycle_counter) \ + if not min_cycle_counter == 'inf' else 0 + except (IOError, OSError) as err: + logger.error('Error occurred when read minimum cycle counter: %s', err) + raise ProfilerIOException + else: + min_cycle_counter = 0 + logger.info("No min cycle counter recorded.") + + return min_cycle_counter + + def _add_framework_info(self, framework_obj_list): + """ + Add framework info into timeline metadata. + + Args: + framework_obj_list (list): The framework metadata. + """ + logger.debug('Start adding framework info into timeline...') + # Get the framework info that will be written into timeline. + framework_info_dict = {} + for framework_obj in framework_obj_list: + op_name = framework_obj[0] + op_type = framework_obj[1] + op_full_name = framework_obj[4] + op_info = framework_obj[5] + framework_info_dict[op_full_name] = { + 'name': op_name, + 'args': { + 'type': op_type, + 'fullname': op_full_name + } + } + framework_info_dict[op_full_name]['args'].update(op_info) + + # Insert framework info into timeline. + for timeline_item in self._timeline_meta: + op_full_name = timeline_item.get('name') + framework_item = framework_info_dict.get(op_full_name) + if framework_item: + timeline_item['name'] = framework_item.get('name') + timeline_item['args'] = framework_item.get('args') + logger.debug('Finished adding framework info into timeline...') + +class GpuTimelineGenerator(BaseTimelineGenerator): + """Generate gpu Timeline data from file.""" + _display_filename = 'gpu_timeline_display_{}.json' + _timeline_summary_filename = 'gpu_timeline_summary_{}.json' + _output_op_execute_time_file_path = "op_execute_timestamp_{}.txt" + _output_activity_execute_time_file_path = "activity_execute_timestamp_{}.txt" + _output_gpu_activity_info_file_path = "gpu_activity_data_{}.csv" + _activity_keys_list = [] + + def _get_and_validate_path(self, file_name): + """Generate op or activity file path from file name, and validate this path.""" + file_path = os.path.join( + self._profiling_dir, + file_name.format(self._device_id) + ) + file_path = validate_and_normalize_path(file_path) + if not os.path.exists(file_path): + logger.error("Failed to find parsed timeline file.") + raise ProfilerFileNotFoundException('parsed timeline file') + + return file_path + + def _parse_timeline_data(self, timeline, min_cycle_counter): + """Parse timeline data.""" + # factor to convert the time unit of start_time(ts) from 1ns to 1us for timeline display + factor = 1000 + op_meta = TimelineContainer(timeline) + timeline_dict = {} + timeline_dict['name'] = op_meta.op_name + timeline_dict['ph'] = 'X' + timeline_dict['tid'] = op_meta.stream_id + timeline_dict['ts'] = (op_meta.start_time - min_cycle_counter) / factor + dur = op_meta.duration + timeline_dict['dur'] = dur + if op_meta.pid is None: + timeline_dict['pid'] = int(self._device_id) + else: # AllReduce and AI CPU pid + timeline_dict['pid'] = op_meta.pid + if len(timeline) > 4: + # len(timeline) > 4 refers to activity data, else op data. + # Add args for activity data + args_dict = {} + for ix, value in enumerate(timeline[4:]): + args_dict[self._activity_keys_list[ix]] = value + timeline_dict['args'] = args_dict + else: + # Update total time of operator execution. + self._timeline_summary['total_time'] += dur + self._timeline_summary['op_exe_times'] += 1 + + self._timeline_meta.append(timeline_dict) + + def _load_timeline_data(self): + """Load timeline data from file.""" + op_file_path = self._get_and_validate_path( + self._output_op_execute_time_file_path) + activity_file_path = self._get_and_validate_path( + self._output_activity_execute_time_file_path) + activity_args_file_path = self._get_and_validate_path( + self._output_gpu_activity_info_file_path) + + timeline_list = self._load_op_data(op_file_path) + \ + self._load_activity_data(activity_file_path, activity_args_file_path) + timeline_list.sort(key=lambda x: float(x[2])) + + return timeline_list + + def _load_op_data(self, op_file_path): + """Load operator data from file""" + op_timeline_list = [] + try: + with open(op_file_path, 'r') as f_obj: + for line in f_obj: + self._timeline_summary['num_of_ops'] += 1 + op_list = line.strip('\n').strip().split(';') + time_arr = op_list[-1] + time_arr = time_arr.split(" ") + for time in time_arr: + time = time.split(",") + line_list = op_list[:2] + time + op_timeline_list.append(line_list) + except (IOError, OSError) as err: + logger.error('Error occurred when load operator timeline data intermediate file: %s', err) + raise ProfilerIOException + + return op_timeline_list + + def _load_activity_data(self, activity_file_path, activity_args_file_path): + """Load activity data from file""" + activity_timeline_list = [] + try: + args_dict = {} + with open(activity_args_file_path, 'r') as args_file: + csv_reader = csv.reader(args_file) + keys_list = next(csv_reader) + # keys_list [name, type, op_full_name, stream_id, block_dim, grid_dim, ...] + self._activity_keys_list = keys_list[1:3] + keys_list[4:6] + for info in csv_reader: + args_dict[info[0]] = info[1:3] + info[4:6] + with open(activity_file_path, 'r') as f_obj: + for line in f_obj: + line_list = line.strip('\n').split(';') + # concat activity args info. + line_list += args_dict[line_list[0]] + activity_timeline_list.append(line_list) + except (IOError, OSError) as err: + logger.error('Error occurred when load activity timeline data intermediate file: %s', err) + raise ProfilerIOException + + return activity_timeline_list + + + def init_timeline(self): + """ + Init timeline metadata, adding all collected info. + + Args: + all_reduce_info (list[list]): The metadata of AllReduce operator. + framework_info (dict): The framework metadata. + aicpu_info (dict): The metadata of AICPU operator. + min_cycle_counter (float): The minimum cycle counter of the timeline. + """ + timeline_list = self._load_timeline_data() + + # Init a dict for counting the num of streams. + stream_count_dict = {} + for timeline in timeline_list: + self._parse_timeline_data(timeline, 0) + # Updating the collection of streams. + if len(timeline) == 4: + self._update_num_of_streams(timeline, stream_count_dict) + + # Update timeline summary info + self._timeline_summary['num_of_streams'] += len(stream_count_dict.keys()) + +class AscendTimelineGenerator(BaseTimelineGenerator): + """Generate ascend Timeline data from file.""" + def _load_timeline_data(self): """Load timeline data from file.""" file_path = os.path.join( @@ -597,44 +815,6 @@ class TimelineAnalyser: timeline_dict['pid'] = op_meta.pid self._timeline_meta.append(timeline_dict) - @staticmethod - def _update_num_of_streams(timeline, stream_count_dict): - """Update number of streams.""" - stream_id = timeline[1] - if stream_id not in stream_count_dict.keys(): - stream_count_dict[stream_id] = 1 - else: - stream_count_dict[stream_id] += 1 - - def get_min_cycle_counter(self): - """ - Get minimum cycle counter. - - Returns: - float, the minimum value of the cycle counter. - """ - file_path = os.path.join( - self._profiling_dir, - self._min_cycle_counter_file_path.format(self._device_id) - ) - - file_path = validate_and_normalize_path(file_path) - - if os.path.exists(file_path): - try: - with open(file_path, 'r') as f_obj: - min_cycle_counter = f_obj.read() - min_cycle_counter = float(min_cycle_counter) \ - if not min_cycle_counter == 'inf' else 0 - except (IOError, OSError) as err: - logger.error('Error occurred when read minimum cycle counter: %s', err) - raise ProfilerIOException - else: - min_cycle_counter = 0 - logger.info("No min cycle counter recorded.") - - return min_cycle_counter - def init_timeline(self, all_reduce_info, framework_info, aicpu_info, min_cycle_counter): """ Init timeline metadata, adding all collected info. @@ -685,36 +865,3 @@ class TimelineAnalyser: # Update timeline summary info self._timeline_summary['num_of_streams'] += len(stream_count_dict.keys()) - - def _add_framework_info(self, framework_obj_list): - """ - Add framework info into timeline metadata. - - Args: - framework_obj_list (list): The framework metadata. - """ - logger.debug('Start adding framework info into timeline...') - # Get the framework info that will be written into timeline. - framework_info_dict = {} - for framework_obj in framework_obj_list: - op_name = framework_obj[0] - op_type = framework_obj[1] - op_full_name = framework_obj[4] - op_info = framework_obj[5] - framework_info_dict[op_full_name] = { - 'name': op_name, - 'args': { - 'type': op_type, - 'fullname': op_full_name - } - } - framework_info_dict[op_full_name]['args'].update(op_info) - - # Insert framework info into timeline. - for timeline_item in self._timeline_meta: - op_full_name = timeline_item.get('name') - framework_item = framework_info_dict.get(op_full_name) - if framework_item: - timeline_item['name'] = framework_item.get('name') - timeline_item['args'] = framework_item.get('args') - logger.debug('Finished adding framework info into timeline...') diff --git a/mindspore/profiler/profiling.py b/mindspore/profiler/profiling.py index 21d56b0888..cdfb943034 100644 --- a/mindspore/profiler/profiling.py +++ b/mindspore/profiler/profiling.py @@ -28,7 +28,7 @@ from mindspore.profiler.parser.aicpu_data_parser import DataPreProcessParser from mindspore.profiler.parser.framework_parser import FrameworkParser from mindspore.profiler.parser.hwts_log_parser import HWTSLogParser from mindspore.profiler.parser.integrator import Integrator -from mindspore.profiler.parser.integrator import TimelineAnalyser +from mindspore.profiler.parser.integrator import GpuTimelineGenerator, AscendTimelineGenerator from mindspore.profiler.parser.minddata_parser import MinddataParser from mindspore.profiler.parser.minddata_pipeline_parser import \ MinddataPipelineParser @@ -140,6 +140,7 @@ class Profiler: """ if self._device_target and self._device_target == "GPU": self._gpu_profiler.stop() + self._generate_timeline() elif self._device_target and self._device_target == "Ascend": release() @@ -254,7 +255,7 @@ class Profiler: optime_parser (OPComputeTimeParserParser): The parser instance for AI Core operator execution time calculation. """ - timeline_analyser = TimelineAnalyser(self._output_path, self._dev_id) + timeline_analyser = AscendTimelineGenerator(self._output_path, self._dev_id) # Get framework info integrator = Integrator(self._output_path, self._dev_id) aicore_detail_data = integrator.get_aicore_detail_data() @@ -277,9 +278,22 @@ class Profiler: aicpu_info = aicpu_parser.query_aicpu_data() min_cycle_counter = min(aicpu_parser.min_cycle_counter, optime_parser.min_cycle_counter) timeline_analyser.init_timeline(all_reduce_info, framework_info, aicpu_info, min_cycle_counter) - timeline_analyser.write_timeline() + size_limit = 20 * 1024 * 1024 # 20MB + timeline_analyser.write_timeline(size_limit) timeline_analyser.write_timeline_summary() + def _generate_timeline(self): + """Used for gpu, generate timeline info, write to json format file.""" + try: + size_limit = 100 * 1024 * 1024 # 100MB + timeline_generator = GpuTimelineGenerator(self._output_path, self._dev_id) + timeline_generator.init_timeline() + timeline_generator.write_timeline(size_limit) + timeline_generator.write_timeline_summary() + except (ProfilerIOException, ProfilerFileNotFoundException, RuntimeError) as err: + logger.warning('Fail to write timeline data: %s', err) + raise RuntimeError('Fail to write timeline data.') + def _get_profiling_job_id(self): """Get profiling job id, which was generated by ada service.