diff --git a/mindinsight/explainer/manager/explain_loader.py b/mindinsight/explainer/manager/explain_loader.py index 53c3b2be..5ee9c332 100644 --- a/mindinsight/explainer/manager/explain_loader.py +++ b/mindinsight/explainer/manager/explain_loader.py @@ -29,7 +29,7 @@ from mindinsight.datavisual.data_access.file_handler import FileHandler from mindinsight.explainer.common.enums import ExplainFieldsEnum from mindinsight.explainer.common.log import logger from mindinsight.explainer.manager.explain_parser import ExplainParser -from mindinsight.utils.exceptions import ParamValueError +from mindinsight.utils.exceptions import ParamValueError, UnknownError _NAN_CONSTANT = 'NaN' _NUM_DIGITS = 6 @@ -287,7 +287,10 @@ class ExplainLoader: is_end = False while not is_end and self.status != _LoaderStatus.STOP.value: - file_changed, is_end, event_dict = self._parser.list_events(filenames) + try: + file_changed, is_end, event_dict = self._parser.list_events(filenames) + except UnknownError: + break if file_changed: logger.info('Summary file in %s update, reload the data in the summary.', diff --git a/mindinsight/explainer/manager/explain_manager.py b/mindinsight/explainer/manager/explain_manager.py index 39072316..4d8ea791 100644 --- a/mindinsight/explainer/manager/explain_manager.py +++ b/mindinsight/explainer/manager/explain_manager.py @@ -14,11 +14,10 @@ # ============================================================================ """ExplainManager.""" -from collections import OrderedDict - import os import threading import time +from collections import OrderedDict from datetime import datetime from typing import Optional @@ -29,7 +28,7 @@ from mindinsight.datavisual.data_access.file_handler import FileHandler from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher from mindinsight.explainer.common.log import logger from mindinsight.explainer.manager.explain_loader import ExplainLoader -from mindinsight.utils.exceptions import MindInsightException, ParamValueError, UnknownError +from mindinsight.utils.exceptions import ParamValueError, UnknownError _MAX_LOADERS_NUM = 3 @@ -226,23 +225,17 @@ class ExplainManager: """Execute the data loading.""" # We will load the newest loader first. for loader_id in list(self._loader_pool.keys())[::-1]: - try: - with self._loader_pool_mutex: - loader = self._loader_pool.get(loader_id, None) - if loader is None: - logger.debug('Loader %r has been deleted, will not load data.', loader_id) - continue + with self._loader_pool_mutex: + loader = self._loader_pool.get(loader_id, None) + if loader is None: + logger.debug('Loader %r has been deleted, will not load data.', loader_id) + continue - if self.status == _ExplainManagerStatus.STOPPING.value: - logger.info('Loader %s status is %s, will return.', loader_id, loader.status) - return - - loader.load() + if self.status == _ExplainManagerStatus.STOPPING.value: + logger.info('Loader %s status is %s, will return.', loader_id, loader.status) + return - except MindInsightException as ex: - logger.warning('Data loader %r load data failed. Delete data_loader. Detail: %s.', loader_id, ex) - with self._loader_pool_mutex: - self._delete_loader(loader_id) + loader.load() def _delete_loader(self, loader_id): """Delete loader given loader_id.""" diff --git a/mindinsight/explainer/manager/explain_parser.py b/mindinsight/explainer/manager/explain_parser.py index 00576205..2e7e8e7c 100644 --- a/mindinsight/explainer/manager/explain_parser.py +++ b/mindinsight/explainer/manager/explain_parser.py @@ -19,14 +19,12 @@ This module is used to parse the MindExplain log file. """ from collections import namedtuple -from google.protobuf.message import DecodeError - from mindinsight.datavisual.common import exceptions -from mindinsight.explainer.common.enums import ExplainFieldsEnum -from mindinsight.explainer.common.log import logger from mindinsight.datavisual.data_access.file_handler import FileHandler from mindinsight.datavisual.data_transform.ms_data_loader import _SummaryParser from mindinsight.datavisual.proto_files import mindinsight_summary_pb2 as summary_pb2 +from mindinsight.explainer.common.enums import ExplainFieldsEnum +from mindinsight.explainer.common.log import logger from mindinsight.utils.exceptions import UnknownError HEADER_SIZE = 8 @@ -109,15 +107,20 @@ class ExplainParser(_SummaryParser): except (exceptions.CRCFailedError, exceptions.CRCLengthFailedError) as ex: self._summary_file_handler.reset_offset(start_offset) is_end = True - logger.warning("Check crc failed and ignore this file, file_path=%s, offset=%s. Detail: %r.", + logger.warning("Check crc failed and reset offset, file_path=%s, offset=%s. Detail: %r.", self._summary_file_handler.file_path, self._summary_file_handler.offset, str(ex)) return file_changed, is_end, event_data - except (OSError, DecodeError, exceptions.MindInsightException) as ex: - is_end = True - logger.warning("Parse log file fail, and ignore this file, detail: %r," - "file path: %s.", str(ex), self._summary_file_handler.file_path) - return file_changed, is_end, event_data except Exception as ex: + # Note: If an unknown error occurs, we will set the offset to the end of this file, + # which is equivalent to stopping parsing this file. We do not delete the current job + # and retain the data that has been successfully parsed. + self._summary_file_handler.reset_offset(new_size) + + # Notice: If the current job is the latest one in the loader pool and the job is deleted, + # the job goes into an infinite cycle of load-fail-delete-reload-load-fail-delete. + # We need to prevent this infinite loop. + logger.error("Parse summary file failed, will set offset to the file end. file_path: %s, " + "offset: %d, detail: %s.", file_path, self._summary_file_handler.offset, str(ex)) logger.exception(ex) raise UnknownError(str(ex)) finally: