|
|
|
@@ -21,10 +21,9 @@ from collections import namedtuple |
|
|
|
from decimal import Decimal |
|
|
|
|
|
|
|
from mindinsight.profiler.common.exceptions.exceptions import ProfilerPathErrorException, \ |
|
|
|
JobIdMismatchException |
|
|
|
JobIdMismatchException, ProfilerIOException |
|
|
|
from mindinsight.profiler.common.log import logger as log |
|
|
|
from mindinsight.profiler.common.util import get_summary_for_step_trace |
|
|
|
from mindinsight.utils.exceptions import MindInsightException |
|
|
|
|
|
|
|
StepTraceStruct = namedtuple( |
|
|
|
'TrainingTraceStruct', ['tag_id', 'task_id', 'stream_id', 'sys_count'] |
|
|
|
@@ -72,25 +71,39 @@ class StepTraceParser: |
|
|
|
def parse_and_save(self): |
|
|
|
"""Parse step trace files and save the result.""" |
|
|
|
try: |
|
|
|
source_file = self._get_step_trace_file() |
|
|
|
self._parse(source_file) |
|
|
|
source_files = self._get_step_trace_files() |
|
|
|
self._parse(source_files) |
|
|
|
self._save() |
|
|
|
except MindInsightException as err: |
|
|
|
log.error("Failed to parse and save step trace files.") |
|
|
|
except IOError as err: |
|
|
|
log.exception(err) |
|
|
|
raise ProfilerIOException() |
|
|
|
else: |
|
|
|
log.info("Finish to save intermediate result for step trace file.") |
|
|
|
|
|
|
|
def _get_step_trace_file(self): |
|
|
|
"""Get step trace file.""" |
|
|
|
profiling_path = self._input_dir |
|
|
|
def _get_step_trace_files(self): |
|
|
|
"""Get step trace files.""" |
|
|
|
# step trace files may under $profiler_dir or $profiler_dir/data |
|
|
|
profiler_dir = self._input_dir |
|
|
|
step_trace_files = self._search_file(profiler_dir) |
|
|
|
if not step_trace_files: |
|
|
|
# try to find step trace files under $profiler_dir/data |
|
|
|
profiler_dir = os.path.join(profiler_dir, 'data') |
|
|
|
step_trace_files = self._search_file(profiler_dir) |
|
|
|
if not step_trace_files: |
|
|
|
raise ProfilerPathErrorException('Training trace file does not exist.') |
|
|
|
|
|
|
|
return step_trace_files |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def _search_file(input_dir): |
|
|
|
"""Search step trace file under specific input directory.""" |
|
|
|
# validate input_dir |
|
|
|
if not os.path.isdir(profiling_path): |
|
|
|
if not os.path.isdir(input_dir): |
|
|
|
raise ProfilerPathErrorException( |
|
|
|
'{} does not exist or is not a dir'.format(profiling_path) |
|
|
|
'{} does not exist or is not a dir'.format(input_dir) |
|
|
|
) |
|
|
|
# get step trace files |
|
|
|
files = os.listdir(profiling_path) |
|
|
|
files = os.listdir(input_dir) |
|
|
|
step_trace_files = list( |
|
|
|
filter( |
|
|
|
lambda file: file.startswith('training_trace') and not file.endswith('.done'), |
|
|
|
@@ -98,36 +111,46 @@ class StepTraceParser: |
|
|
|
) |
|
|
|
) |
|
|
|
# validate result |
|
|
|
if not step_trace_files: |
|
|
|
raise ProfilerPathErrorException('training trace file does not exist') |
|
|
|
if len(step_trace_files) > 1: |
|
|
|
log.warning("Not enable to parse multiple step trace files yet.") |
|
|
|
step_trace_file = os.path.join(profiling_path, step_trace_files[0]) |
|
|
|
return step_trace_file |
|
|
|
# the format of file name is like |
|
|
|
# `training_trace.46.dev.profiler_default_tag.$id.slice_$number` |
|
|
|
# use the $number as the sorted key |
|
|
|
try: |
|
|
|
step_trace_files.sort(key=lambda path: int(path.rsplit('_', 1)[-1])) |
|
|
|
except ValueError as err: |
|
|
|
log.warning("Unable to parse file names: %s. %s", step_trace_files, err) |
|
|
|
step_trace_files = [] |
|
|
|
|
|
|
|
file_paths = [os.path.join(input_dir, file) for file in step_trace_files] |
|
|
|
log.info("Find %d step trace files.", len(file_paths)) |
|
|
|
return file_paths |
|
|
|
|
|
|
|
def _parse(self, source_file): |
|
|
|
"""Parse source step trace file.""" |
|
|
|
log.info("Start to parse step trace file.") |
|
|
|
with open(source_file, 'rb') as handler: |
|
|
|
content = handler.read() |
|
|
|
for step_trace in self._get_next_step_trace(content): |
|
|
|
if self._skip_first_step: |
|
|
|
self._skip_first_step = False |
|
|
|
else: |
|
|
|
def _parse(self, source_files): |
|
|
|
"""Parse source step trace files.""" |
|
|
|
log.info("Start to parse step trace file.") |
|
|
|
event_info = {} |
|
|
|
for source_file in source_files: |
|
|
|
with open(source_file, 'rb') as handler: |
|
|
|
content = handler.read() |
|
|
|
for step_trace in self._get_next_step_trace(content, event_info): |
|
|
|
if self._skip_first_step: |
|
|
|
self._skip_first_step = False |
|
|
|
continue |
|
|
|
self._record_trace_event(step_trace) |
|
|
|
self._record_average_info() |
|
|
|
log.info("Finish to parse step trace file.") |
|
|
|
|
|
|
|
def _get_next_step_trace(self, content): |
|
|
|
def _get_next_step_trace(self, content, event_info): |
|
|
|
""" |
|
|
|
Get next step trace info. |
|
|
|
|
|
|
|
Args: |
|
|
|
content (bytes): The input step trace info |
|
|
|
content (bytes): The input step trace info. |
|
|
|
event_info (dict): The event info. |
|
|
|
|
|
|
|
Returns: |
|
|
|
Generator, return the step trace one by one. |
|
|
|
""" |
|
|
|
event_info = {} |
|
|
|
for pos in range(0, len(content), 20): |
|
|
|
next_event = self._get_trace_struct(content[pos:pos + self._event_size]) |
|
|
|
self._construct_event_info(next_event, event_info) |
|
|
|
@@ -251,7 +274,7 @@ class StepTraceParser: |
|
|
|
log.info("Finish add average info for step trace.") |
|
|
|
|
|
|
|
def _save(self): |
|
|
|
log.info("Start to save step trace file.") |
|
|
|
log.info("Start to save step trace file.") |
|
|
|
if not self._header: |
|
|
|
return |
|
|
|
with open(self._output_path, 'w') as file_handle: |
|
|
|
|