| @@ -15,6 +15,7 @@ | |||
| """Trigger data manager load.""" | |||
| import time | |||
| from mindinsight.conf import settings | |||
| from mindinsight.datavisual.common.log import logger | |||
| from mindinsight.datavisual.data_transform.data_manager import DATA_MANAGER | |||
| from mindinsight.lineagemgr.cache_item_updater import LineageCacheItemUpdater | |||
| @@ -34,4 +35,4 @@ def init_module(app): | |||
| # Let gunicorn load other modules first. | |||
| time.sleep(1) | |||
| DATA_MANAGER.start_load_data(auto_reload=True) | |||
| DATA_MANAGER.start_load_data(reload_interval=settings.RELOAD_INTERVAL) | |||
| @@ -17,9 +17,29 @@ | |||
| import argparse | |||
| import os | |||
| from mindinsight.conf import settings | |||
| from mindinsight.utils.hook import BaseHook | |||
| class ReloadIntervalAction(argparse.Action): | |||
| """Reload interval action class definition.""" | |||
| def __call__(self, parser, namespace, values, option_string=None): | |||
| """ | |||
| Inherited __call__ method from argparse.Action. | |||
| Args: | |||
| parser (ArgumentParser): Passed-in argument parser. | |||
| namespace (Namespace): Namespace object to hold arguments. | |||
| values (object): Argument values with type depending on argument definition. | |||
| option_string (str): Option string for specific argument name. | |||
| """ | |||
| reload_interval = values | |||
| if reload_interval < 0: | |||
| parser.error(f'{option_string} should be greater than or equal to 0') | |||
| setattr(namespace, self.dest, reload_interval) | |||
| class SummaryBaseDirAction(argparse.Action): | |||
| """Summary base dir action class definition.""" | |||
| @@ -47,6 +67,15 @@ class Hook(BaseHook): | |||
| Args: | |||
| parser (ArgumentParser): Specify parser to which arguments are added. | |||
| """ | |||
| parser.add_argument( | |||
| '--reload-interval', | |||
| type=int, | |||
| action=ReloadIntervalAction, | |||
| help=""" | |||
| data reload time(Seconds). It should be greater than 0 or equal to 0. | |||
| If it equals 0, load data only once. Default value is %s seconds. | |||
| """ % settings.RELOAD_INTERVAL) | |||
| parser.add_argument( | |||
| '--summary-base-dir', | |||
| type=str, | |||
| @@ -35,4 +35,5 @@ ENABLE_DEBUGGER = False | |||
| #################################### | |||
| # Datavisual default settings. | |||
| #################################### | |||
| RELOAD_INTERVAL = 3 # Seconds | |||
| SUMMARY_BASE_DIR = os.getcwd() | |||
| @@ -850,23 +850,41 @@ class DataManager: | |||
| """Get summary base dir.""" | |||
| return self._summary_base_dir | |||
| def start_load_data(self, auto_reload=False): | |||
| def start_load_data(self, reload_interval=0): | |||
| """ | |||
| Start threads for loading data. | |||
| Args: | |||
| reload_interval (int): Time to reload data again. | |||
| Returns: | |||
| Thread, the background Thread instance. | |||
| """ | |||
| logger.info("Start to load data") | |||
| DataManager.check_reload_interval(reload_interval) | |||
| thread = threading.Thread(target=self._load_data_in_thread_wrapper, | |||
| name='start_load_data_thread', | |||
| args=(auto_reload,), | |||
| args=(reload_interval,), | |||
| daemon=True) | |||
| thread.daemon = True | |||
| thread.start() | |||
| return thread | |||
| def _load_data_in_thread_wrapper(self, auto_reload): | |||
| @staticmethod | |||
| def check_reload_interval(reload_interval): | |||
| """ | |||
| Check reload interval is valid. | |||
| Args: | |||
| reload_interval (int): Reload interval >= 0. | |||
| """ | |||
| if not isinstance(reload_interval, int): | |||
| raise ParamValueError("The value of reload interval should be integer.") | |||
| if reload_interval < 0: | |||
| raise ParamValueError("The value of reload interval should be >= 0.") | |||
| def _load_data_in_thread_wrapper(self, reload_interval): | |||
| """Wrapper for load data in thread.""" | |||
| if self._load_data_lock.locked(): | |||
| return | |||
| @@ -874,8 +892,9 @@ class DataManager: | |||
| with self._load_data_lock: | |||
| while True: | |||
| exception_wrapper(self._load_data)() | |||
| if not auto_reload: | |||
| if not reload_interval: | |||
| break | |||
| time.sleep(reload_interval) | |||
| except UnknownError as exc: | |||
| # Not raising the exception here to ensure that data reloading does not crash. | |||
| logger.warning(exc.message) | |||
| @@ -94,6 +94,19 @@ class TestDataManager: | |||
| assert MockLogger.log_msg['info'] == "Load brief data end, and loader pool size is '3'." | |||
| shutil.rmtree(summary_base_dir) | |||
| @pytest.mark.parametrize('params', [{ | |||
| 'reload_interval': '30' | |||
| }, { | |||
| 'reload_interval': -1 | |||
| }]) | |||
| def test_start_load_data_with_invalid_params(self, params): | |||
| """Test start_load_data with invalid reload_interval or invalid max_threads_count.""" | |||
| summary_base_dir = tempfile.mkdtemp() | |||
| d_manager = DataManager(summary_base_dir) | |||
| with pytest.raises(ParamValueError): | |||
| d_manager.start_load_data(**params) | |||
| shutil.rmtree(summary_base_dir) | |||
| def test_list_tensors_success(self): | |||
| """Test list_tensors method success.""" | |||
| summary_base_dir = tempfile.mkdtemp() | |||