| @@ -27,6 +27,8 @@ from mindinsight.datavisual.common.exceptions import MaxCountExceededError | |||
| from mindinsight.utils.exceptions import FileSystemPermissionError | |||
| LINEAGE_SUMMARY_SUFFIX = '_lineage' | |||
| EXPLAIN_SUMMARY_SUFFIX = '_explain' | |||
| class SummaryWatcher: | |||
| """SummaryWatcher class.""" | |||
| @@ -41,13 +43,15 @@ class SummaryWatcher: | |||
| # to avoid long-time blocking | |||
| MAX_SCAN_COUNT = 20000 | |||
| def list_summary_directories(self, summary_base_dir, overall=True): | |||
| def list_summary_directories(self, summary_base_dir, overall=True, list_explain=False): | |||
| """ | |||
| List summary directories within base directory. | |||
| Args: | |||
| summary_base_dir (str): Path of summary base directory. | |||
| overall (bool): Limit the total num of scanning if overall is False. | |||
| list_explain (bool): Indicates whether to list only the mindexplain folder. | |||
| Default is False, means not to list mindexplain folder. | |||
| Returns: | |||
| list, list of summary directory info, each of which including the following attributes. | |||
| @@ -91,10 +95,10 @@ class SummaryWatcher: | |||
| if entry.is_symlink(): | |||
| pass | |||
| elif entry.is_file(): | |||
| self._update_summary_dict(summary_dict, summary_base_dir, relative_path, entry) | |||
| self._update_summary_dict(summary_dict, summary_base_dir, relative_path, entry, list_explain) | |||
| elif entry.is_dir(): | |||
| entry_path = os.path.realpath(os.path.join(summary_base_dir, entry.name)) | |||
| self._scan_subdir_entries(summary_dict, summary_base_dir, entry_path, entry.name, counter) | |||
| self._scan_subdir_entries(summary_dict, summary_base_dir, entry_path, entry.name, counter, list_explain) | |||
| directories = [] | |||
| for key, value in summary_dict.items(): | |||
| @@ -109,7 +113,7 @@ class SummaryWatcher: | |||
| return directories | |||
| def _scan_subdir_entries(self, summary_dict, summary_base_dir, entry_path, entry_name, counter): | |||
| def _scan_subdir_entries(self, summary_dict, summary_base_dir, entry_path, entry_name, counter, list_explain): | |||
| """ | |||
| Scan subdir entries. | |||
| @@ -119,6 +123,7 @@ class SummaryWatcher: | |||
| entry_path(str): Path entry. | |||
| entry_name (str): Name of entry. | |||
| counter (Counter): An instance of CountLimiter. | |||
| list_explain (bool): Indicates whether to list only the mindexplain folder. | |||
| """ | |||
| try: | |||
| @@ -139,7 +144,7 @@ class SummaryWatcher: | |||
| subdir_relative_path = os.path.join('.', entry_name) | |||
| if subdir_entry.is_symlink(): | |||
| pass | |||
| self._update_summary_dict(summary_dict, summary_base_dir, subdir_relative_path, subdir_entry) | |||
| self._update_summary_dict(summary_dict, summary_base_dir, subdir_relative_path, subdir_entry, list_explain) | |||
| def _is_valid_summary_directory(self, summary_base_dir, relative_path): | |||
| """ | |||
| @@ -172,7 +177,7 @@ class SummaryWatcher: | |||
| return True | |||
| def _update_summary_dict(self, summary_dict, summary_base_dir, relative_path, entry): | |||
| def _update_summary_dict(self, summary_dict, summary_base_dir, relative_path, entry, list_explain): | |||
| """ | |||
| Update summary_dict with ctime and mtime. | |||
| @@ -182,6 +187,7 @@ class SummaryWatcher: | |||
| relative_path (str): Relative path of summary directory, referring to summary base directory, | |||
| starting with "./" . | |||
| entry (DirEntry): Directory entry instance needed to check with regular expression. | |||
| list_explain (bool): Indicates whether to list only the mindexplain folder. | |||
| """ | |||
| try: | |||
| stat = entry.stat() | |||
| @@ -204,6 +210,12 @@ class SummaryWatcher: | |||
| ctime = datetime.datetime.fromtimestamp(timestamp).astimezone() | |||
| except OverflowError: | |||
| return | |||
| if list_explain and not entry.name.endswith(EXPLAIN_SUMMARY_SUFFIX): | |||
| return | |||
| if not list_explain and entry.name.endswith(EXPLAIN_SUMMARY_SUFFIX): | |||
| return | |||
| if relative_path not in summary_dict: | |||
| summary_dict[relative_path] = _new_entry(ctime, mtime) | |||
| if summary_dict[relative_path]['create_time'] < ctime: | |||
| @@ -215,6 +227,8 @@ class SummaryWatcher: | |||
| summary_dict[relative_path]['graph_files'] += 1 | |||
| elif entry.name.endswith(LINEAGE_SUMMARY_SUFFIX): | |||
| summary_dict[relative_path]['lineage_files'] += 1 | |||
| elif entry.name.endswith(EXPLAIN_SUMMARY_SUFFIX): | |||
| summary_dict[relative_path]['explain_files'] += 1 | |||
| else: | |||
| summary_dict[relative_path]['summary_files'] += 1 | |||
| elif entry.is_dir(): | |||
| @@ -397,6 +411,41 @@ class SummaryWatcher: | |||
| return summaries | |||
| def list_explain_directories(self, summary_base_dir, offset=0, limit=None): | |||
| """ | |||
| List explain directories within base directory. | |||
| Args: | |||
| summary_base_dir (str): Path of summary base directory. | |||
| offset (int): An offset for page. Ex, offset is 0, mean current page is 1. Default value is 0. | |||
| limit (int): The max data items for per page. Default value is 10. | |||
| Returns: | |||
| tuple[total, directories], total indicates the overall number of explain directories and directories | |||
| indicate list of summary directory info including the following attributes. | |||
| - relative_path (str): Relative path of summary directory, referring to settings.SUMMARY_BASE_DIR, | |||
| starting with "./". | |||
| - create_time (datetime): Creation time of summary file. | |||
| - update_time (datetime): Modification time of summary file. | |||
| Raises: | |||
| ParamValueError, if offset < 0 or limit is out of valid value range. | |||
| ParamTypeError, if offset or limit is not valid integer. | |||
| Examples: | |||
| >>> from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher | |||
| >>> summary_watcher = SummaryWatcher() | |||
| >>> total, directories = summary_watcher.list_explain_directories('/summary/base/dir', offset=0, limit=10) | |||
| """ | |||
| offset = Validation.check_offset(offset=offset) | |||
| limit = Validation.check_limit(limit, min_value=1, max_value=999, default_value=None) | |||
| directories = self.list_summary_directories(summary_base_dir, overall=False, list_explain=True) | |||
| if limit is None: | |||
| return len(directories), directories | |||
| return len(directories), directories[offset * limit:(offset + 1) * limit] | |||
| def _new_entry(ctime, mtime, profiler=None): | |||
| """Create a new entry.""" | |||
| @@ -405,6 +454,7 @@ def _new_entry(ctime, mtime, profiler=None): | |||
| 'update_time': mtime, | |||
| 'summary_files': 0, | |||
| 'lineage_files': 0, | |||
| 'explain_files': 0, | |||
| 'graph_files': 0, | |||
| 'profiler': profiler | |||
| } | |||