1. New a thread to load detail info. Loading detail info takes too much time, so the summary list and lineage can not be loaded timely. 2. Add a status for DetailCacheManager to indicate it is INIT, LOADING or DONE. 3. Update UT/ST.tags/0.7.0-beta
| @@ -32,6 +32,13 @@ class DataManagerStatus(BaseEnum): | |||
| INVALID = 'INVALID' | |||
| class DetailCacheManagerStatus(BaseEnum): | |||
| """Data manager status.""" | |||
| INIT = 'INIT' | |||
| LOADING = 'LOADING' | |||
| DONE = 'DONE' | |||
| class PluginNameEnum(BaseEnum): | |||
| """Plugin Name Enum.""" | |||
| IMAGE = 'image' | |||
| @@ -35,7 +35,7 @@ from mindinsight.conf import settings | |||
| from mindinsight.datavisual.common import exceptions | |||
| from mindinsight.datavisual.common.enums import CacheStatus | |||
| from mindinsight.datavisual.common.log import logger | |||
| from mindinsight.datavisual.common.enums import DataManagerStatus | |||
| from mindinsight.datavisual.common.enums import DataManagerStatus, DetailCacheManagerStatus | |||
| from mindinsight.datavisual.common.enums import PluginNameEnum | |||
| from mindinsight.datavisual.common.exceptions import TrainJobNotExistError | |||
| from mindinsight.datavisual.data_transform.loader_generators.loader_generator import MAX_DATA_LOADER_SIZE | |||
| @@ -44,6 +44,7 @@ from mindinsight.utils.computing_resource_mgr import ComputingResourceManager | |||
| from mindinsight.utils.exceptions import MindInsightException | |||
| from mindinsight.utils.exceptions import ParamValueError | |||
| from mindinsight.utils.exceptions import UnknownError | |||
| from mindinsight.datavisual.utils.tools import exception_wrapper | |||
| class _BasicTrainJob: | |||
| @@ -415,6 +416,13 @@ class _DetailCacheManager(_BaseCacheManager): | |||
| self._loader_pool_mutex = threading.Lock() | |||
| self._max_threads_count = 30 | |||
| self._loader_generators = loader_generators | |||
| self._status = DetailCacheManagerStatus.INIT.value | |||
| self._loading_mutex = threading.Lock() | |||
| @property | |||
| def status(self): | |||
| """Get loading status, if it is loading, return True.""" | |||
| return self._status | |||
| def has_content(self): | |||
| """Whether this cache manager has train jobs.""" | |||
| @@ -435,6 +443,20 @@ class _DetailCacheManager(_BaseCacheManager): | |||
| """Get loader pool size.""" | |||
| return len(self._loader_pool) | |||
| def _load_in_cache(self): | |||
| """Generate and execute loaders.""" | |||
| def load(): | |||
| self._generate_loaders() | |||
| self._execute_load_data() | |||
| try: | |||
| exception_wrapper(load()) | |||
| except UnknownError as ex: | |||
| logger.warning("Load event data failed. Detail: %s.", str(ex)) | |||
| finally: | |||
| self._status = DetailCacheManagerStatus.DONE.value | |||
| logger.info("Load event data end, status: %r, and loader pool size is %r.", | |||
| self._status, self.loader_pool_size()) | |||
| def update_cache(self, disk_train_jobs: Iterable[_BasicTrainJob]): | |||
| """ | |||
| Update cache. | |||
| @@ -445,8 +467,13 @@ class _DetailCacheManager(_BaseCacheManager): | |||
| disk_train_jobs (Iterable[_BasicTrainJob]): Basic info about train jobs on disk. | |||
| """ | |||
| self._generate_loaders() | |||
| self._execute_load_data() | |||
| with self._loading_mutex: | |||
| if self._status == DetailCacheManagerStatus.LOADING.value: | |||
| logger.debug("Event data is loading, and loader pool size is %r.", self.loader_pool_size()) | |||
| return | |||
| self._status = DetailCacheManagerStatus.LOADING.value | |||
| thread = threading.Thread(target=self._load_in_cache, name="load_detail_in_cache") | |||
| thread.start() | |||
| def cache_train_job(self, train_id): | |||
| """Cache given train job.""" | |||
| @@ -711,8 +738,7 @@ class _DetailCacheManager(_BaseCacheManager): | |||
| loader = self._get_loader(train_id) | |||
| if loader is None: | |||
| logger.warning("No valid summary log in train job %s, " | |||
| "or it is not in the cache.", train_id) | |||
| logger.info("No valid summary log in train job %s, or it is not in the cache.", train_id) | |||
| return None | |||
| train_job = loader.to_dict() | |||
| @@ -897,19 +923,11 @@ class DataManager: | |||
| """Wrapper for load data in thread.""" | |||
| try: | |||
| with self._load_data_lock: | |||
| self._load_data_in_thread() | |||
| except MindInsightException as exc: | |||
| exception_wrapper(self._load_data()) | |||
| except UnknownError as exc: | |||
| # Not raising the exception here to ensure that data reloading does not crash. | |||
| logger.warning(exc.message) | |||
| def _load_data_in_thread(self): | |||
| """Log (but not swallow) exceptions in thread to help debugging.""" | |||
| try: | |||
| self._load_data() | |||
| except Exception as exc: | |||
| logger.exception(exc) | |||
| raise UnknownError('Load data thread error.') | |||
| def _load_data(self): | |||
| """This function will load data once and ignore it if the status is loading.""" | |||
| logger.info("Start to load data, reload interval: %r.", self._reload_interval) | |||
| @@ -939,13 +957,13 @@ class DataManager: | |||
| self._brief_cache.update_cache(basic_train_jobs) | |||
| self._detail_cache.update_cache(basic_train_jobs) | |||
| if not self._brief_cache.has_content() and not self._detail_cache.has_content(): | |||
| if not self._brief_cache.has_content() and not self._detail_cache.has_content() \ | |||
| and self._detail_cache.status == DetailCacheManagerStatus.DONE.value: | |||
| self.status = DataManagerStatus.INVALID.value | |||
| else: | |||
| self.status = DataManagerStatus.DONE.value | |||
| logger.info("Load event data end, status: %r, and loader pool size is %r.", | |||
| self.status, self._detail_cache.loader_pool_size()) | |||
| logger.info("Load brief data end, and loader pool size is %r.", self._detail_cache.loader_pool_size()) | |||
| @staticmethod | |||
| def check_reload_interval(reload_interval): | |||
| @@ -1046,14 +1064,6 @@ class DataManager: | |||
| return TrainJob(brief_train_job, detail_train_job) | |||
| def list_train_jobs(self): | |||
| """ | |||
| List train jobs. | |||
| To be implemented. | |||
| """ | |||
| raise NotImplementedError() | |||
| @property | |||
| def status(self): | |||
| """ | |||
| @@ -1088,5 +1098,9 @@ class DataManager: | |||
| """Get brief train job.""" | |||
| return self._brief_cache.get_train_job(train_id) | |||
| def get_detail_cache_status(self): | |||
| """Get detail status, just for ut/st.""" | |||
| return self._detail_cache.status | |||
| DATA_MANAGER = DataManager(settings.SUMMARY_BASE_DIR) | |||
| @@ -21,7 +21,9 @@ from numbers import Number | |||
| from urllib.parse import unquote | |||
| from mindinsight.datavisual.common.exceptions import MaxCountExceededError | |||
| from mindinsight.datavisual.common.log import logger | |||
| from mindinsight.utils import exceptions | |||
| from mindinsight.utils.exceptions import UnknownError | |||
| _IMG_EXT_TO_MIMETYPE = { | |||
| 'bmp': 'image/bmp', | |||
| @@ -216,6 +218,16 @@ def if_nan_inf_to_none(name, value): | |||
| return value | |||
| def exception_wrapper(func): | |||
| def wrapper(*args, **kwargs): | |||
| try: | |||
| func(*args, **kwargs) | |||
| except Exception as exc: | |||
| logger.exception(exc) | |||
| raise UnknownError(str(exc)) | |||
| return wrapper | |||
| class Counter: | |||
| """Count accumulator with limit checking.""" | |||
| @@ -15,28 +15,15 @@ | |||
| """ | |||
| Description: This file is used for some common util. | |||
| """ | |||
| from unittest.mock import Mock | |||
| import pytest | |||
| from flask import Response | |||
| from mindinsight.backend import datavisual | |||
| from mindinsight.datavisual.utils import tools | |||
| from mindinsight.backend.application import APP | |||
| @pytest.fixture | |||
| def client(): | |||
| """This fixture is flask client.""" | |||
| mock_data_manager = Mock() | |||
| mock_data_manager.start_load_data = Mock() | |||
| datavisual.DATA_MANAGER = mock_data_manager | |||
| packages = ["mindinsight.backend.data_visual"] | |||
| mock_obj = Mock(return_value=packages) | |||
| tools.find_app_package = mock_obj | |||
| from mindinsight.backend.application import APP | |||
| APP.response_class = Response | |||
| app_client = APP.test_client() | |||
| @@ -22,12 +22,10 @@ from unittest.mock import patch | |||
| from werkzeug.exceptions import MethodNotAllowed, NotFound | |||
| from mindinsight.datavisual.processors import scalars_processor | |||
| from mindinsight.datavisual.processors.scalars_processor import ScalarsProcessor | |||
| from ....utils.tools import get_url | |||
| from ...backend.datavisual.conftest import TRAIN_ROUTES | |||
| from ..mock import MockLogger | |||
| class TestErrorHandler: | |||
| @@ -36,7 +34,6 @@ class TestErrorHandler: | |||
| @patch.object(ScalarsProcessor, 'get_metadata_list') | |||
| def test_handle_http_exception_error_not_found(self, mock_scalar_processor, client): | |||
| """Test handle http exception error not found.""" | |||
| scalars_processor.logger = MockLogger | |||
| text = 'Test Message' | |||
| # NotFound | |||
| @@ -59,7 +56,6 @@ class TestErrorHandler: | |||
| @patch.object(ScalarsProcessor, 'get_metadata_list') | |||
| def test_handle_http_exception_error_method_not_allowed(self, mock_scalar_processor, client): | |||
| """Test handling http exception error method not allowed.""" | |||
| scalars_processor.logger = MockLogger | |||
| text = 'Test Message' | |||
| # MethodNotAllowed | |||
| @@ -82,7 +78,6 @@ class TestErrorHandler: | |||
| @patch.object(ScalarsProcessor, 'get_metadata_list') | |||
| def test_handle_http_exception_error_method_other_errors(self, mock_scalar_processor, client): | |||
| """Test handling http exception error method other errors.""" | |||
| scalars_processor.logger = MockLogger | |||
| text = 'Test Message' | |||
| # Other errors | |||
| @@ -1,14 +0,0 @@ | |||
| # Copyright 2019 Huawei Technologies Co., Ltd | |||
| # | |||
| # Licensed under the Apache License, Version 2.0 (the "License"); | |||
| # you may not use this file except in compliance with the License. | |||
| # You may obtain a copy of the License at | |||
| # | |||
| # http://www.apache.org/licenses/LICENSE-2.0 | |||
| # | |||
| # Unless required by applicable law or agreed to in writing, software | |||
| # distributed under the License is distributed on an "AS IS" BASIS, | |||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| # See the License for the specific language governing permissions and | |||
| # limitations under the License. | |||
| # ============================================================================ | |||
| @@ -1,45 +0,0 @@ | |||
| # Copyright 2019 Huawei Technologies Co., Ltd | |||
| # | |||
| # Licensed under the Apache License, Version 2.0 (the "License"); | |||
| # you may not use this file except in compliance with the License. | |||
| # You may obtain a copy of the License at | |||
| # | |||
| # http://www.apache.org/licenses/LICENSE-2.0 | |||
| # | |||
| # Unless required by applicable law or agreed to in writing, software | |||
| # distributed under the License is distributed on an "AS IS" BASIS, | |||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| # See the License for the specific language governing permissions and | |||
| # limitations under the License. | |||
| # ============================================================================ | |||
| """ | |||
| Description: This file is used for some common util. | |||
| """ | |||
| from unittest.mock import Mock | |||
| import pytest | |||
| from flask import Response | |||
| from mindinsight.backend import datavisual | |||
| from mindinsight.datavisual import utils | |||
| @pytest.fixture | |||
| def client(): | |||
| """This fixture is flask client.""" | |||
| mock_data_manager = Mock() | |||
| mock_data_manager.start_load_data = Mock() | |||
| datavisual.DATA_MANAGER = mock_data_manager | |||
| packages = ["mindinsight.backend.raw_dataset", | |||
| "mindinsight.backend.train_dataset", | |||
| "mindinsight.backend.data_visual"] | |||
| mock_obj = Mock(return_value=packages) | |||
| utils.find_app_package = mock_obj | |||
| from mindinsight.backend.application import APP | |||
| APP.response_class = Response | |||
| app_client = APP.test_client() | |||
| yield app_client | |||
| @@ -81,8 +81,9 @@ class TestDataManager: | |||
| def test_start_load_data_success(self): | |||
| """Test start_load_data method success.""" | |||
| summary_base_dir = tempfile.mkdtemp() | |||
| dir_num = 3 | |||
| train_ids = [] | |||
| for i in range(3): | |||
| for i in range(dir_num): | |||
| log_path = os.path.join(summary_base_dir, f'dir{i}') | |||
| self._make_path_and_file_list(log_path) | |||
| train_ids.append(f'./dir{i}') | |||
| @@ -215,7 +216,7 @@ class TestDataManager: | |||
| expected_loader_ids = expected_loader_ids[-MAX_DATA_LOADER_SIZE:] | |||
| # Make sure to finish loading, make it init. | |||
| mock_data_manager._status = DataManagerStatus.INIT | |||
| mock_data_manager._detail_cache._status = DataManagerStatus.INIT.value | |||
| mock_generate_loaders.return_value = loader_dict | |||
| mock_data_manager.start_load_data(reload_interval=0) | |||
| check_loading_done(mock_data_manager) | |||
| @@ -26,7 +26,7 @@ from urllib.parse import urlencode | |||
| import numpy as np | |||
| from PIL import Image | |||
| from mindinsight.datavisual.common.enums import DataManagerStatus | |||
| from mindinsight.datavisual.common.enums import DetailCacheManagerStatus | |||
| def get_url(url, params): | |||
| @@ -59,13 +59,13 @@ def check_loading_done(data_manager, time_limit=15, first_sleep_time=0): | |||
| if first_sleep_time > 0: | |||
| time.sleep(first_sleep_time) | |||
| start_time = time.time() | |||
| while data_manager.status not in (DataManagerStatus.DONE.value, DataManagerStatus.INVALID.value): | |||
| while data_manager.get_detail_cache_status() != DetailCacheManagerStatus.DONE.value: | |||
| time_used = time.time() - start_time | |||
| if time_used > time_limit: | |||
| break | |||
| time.sleep(0.1) | |||
| continue | |||
| return bool(data_manager.status == DataManagerStatus.DONE.value) | |||
| return bool(data_manager.get_detail_cache_status == DetailCacheManagerStatus.DONE.value) | |||
| def get_image_tensor_from_bytes(image_string): | |||