From 1b4d5ccb9c8b7a7d93c91aa85e43b017826df2c0 Mon Sep 17 00:00:00 2001 From: "xingjun.wxj" Date: Fri, 14 Oct 2022 18:32:38 +0800 Subject: [PATCH] [to #42322933]MsDataset upload and load supports directory. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 上传和下载支持多文件操作 --- modelscope/hub/api.py | 34 ++++--- modelscope/hub/utils/utils.py | 8 +- modelscope/msdatasets/ms_dataset.py | 52 +++++++---- modelscope/msdatasets/utils/dataset_utils.py | 90 ++++++++++++++++++- modelscope/msdatasets/utils/download_utils.py | 18 ++-- modelscope/msdatasets/utils/oss_utils.py | 9 +- modelscope/msdatasets/utils/upload_utils.py | 40 ++++++++- modelscope/utils/constant.py | 7 ++ tests/msdatasets/test_dataset_upload.py | 43 ++++++++- 9 files changed, 250 insertions(+), 51 deletions(-) diff --git a/modelscope/hub/api.py b/modelscope/hub/api.py index 214045dd..dc4d0ab2 100644 --- a/modelscope/hub/api.py +++ b/modelscope/hub/api.py @@ -26,18 +26,15 @@ from modelscope.utils.logger import get_logger from .errors import (InvalidParameter, NotExistError, RequestError, datahub_raise_on_error, handle_http_post_error, handle_http_response, is_ok, raise_on_error) -from .utils.utils import (get_dataset_hub_endpoint, get_endpoint, - model_id_to_group_owner_name) +from .utils.utils import get_endpoint, model_id_to_group_owner_name logger = get_logger() class HubApi: - def __init__(self, endpoint=None, dataset_endpoint=None): + def __init__(self, endpoint=None): self.endpoint = endpoint if endpoint is not None else get_endpoint() - self.dataset_endpoint = dataset_endpoint if dataset_endpoint is not None else get_dataset_hub_endpoint( - ) def login( self, @@ -288,7 +285,7 @@ class HubApi: return files def list_datasets(self): - path = f'{self.dataset_endpoint}/api/v1/datasets' + path = f'{self.endpoint}/api/v1/datasets' headers = None params = {} r = requests.get(path, params=params, headers=headers) @@ -315,13 +312,13 @@ class HubApi: cache_dir): shutil.rmtree(cache_dir) os.makedirs(cache_dir, exist_ok=True) - datahub_url = f'{self.dataset_endpoint}/api/v1/datasets/{namespace}/{dataset_name}' + datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}' r = requests.get(datahub_url) resp = r.json() datahub_raise_on_error(datahub_url, resp) dataset_id = resp['Data']['Id'] dataset_type = resp['Data']['Type'] - datahub_url = f'{self.dataset_endpoint}/api/v1/datasets/{dataset_id}/repo/tree?Revision={revision}' + datahub_url = f'{self.endpoint}/api/v1/datasets/{dataset_id}/repo/tree?Revision={revision}' r = requests.get(datahub_url) resp = r.json() datahub_raise_on_error(datahub_url, resp) @@ -339,7 +336,7 @@ class HubApi: file_path = file_info['Path'] extension = os.path.splitext(file_path)[-1] if extension in dataset_meta_format: - datahub_url = f'{self.dataset_endpoint}/api/v1/datasets/{namespace}/{dataset_name}/repo?' \ + datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/repo?' \ f'Revision={revision}&FilePath={file_path}' r = requests.get(datahub_url) r.raise_for_status() @@ -363,7 +360,7 @@ class HubApi: namespace: str, revision: Optional[str] = DEFAULT_DATASET_REVISION): if file_name.endswith('.csv'): - file_name = f'{self.dataset_endpoint}/api/v1/datasets/{namespace}/{dataset_name}/repo?' \ + file_name = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/repo?' \ f'Revision={revision}&FilePath={file_name}' return file_name @@ -372,7 +369,7 @@ class HubApi: dataset_name: str, namespace: str, revision: Optional[str] = DEFAULT_DATASET_REVISION): - datahub_url = f'{self.dataset_endpoint}/api/v1/datasets/{namespace}/{dataset_name}/' \ + datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/' \ f'ststoken?Revision={revision}' return self.datahub_remote_call(datahub_url) @@ -383,7 +380,7 @@ class HubApi: namespace: str, revision: Optional[str] = DEFAULT_DATASET_REVISION): - datahub_url = f'{self.dataset_endpoint}/api/v1/datasets/{namespace}/{dataset_name}/' \ + datahub_url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/' \ f'ststoken?Revision={revision}' cookies = requests.utils.dict_from_cookiejar(cookies) @@ -392,6 +389,19 @@ class HubApi: raise_on_error(resp) return resp['Data'] + def list_oss_dataset_objects(self, dataset_name, namespace, max_limit, + is_recursive, is_filter_dir, revision, + cookies): + url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/oss/tree/?' \ + f'MaxLimit={max_limit}&Revision={revision}&Recursive={is_recursive}&FilterDir={is_filter_dir}' + cookies = requests.utils.dict_from_cookiejar(cookies) + + resp = requests.get(url=url, cookies=cookies) + resp = resp.json() + raise_on_error(resp) + resp = resp['Data'] + return resp + def on_dataset_download(self, dataset_name: str, namespace: str) -> None: url = f'{self.endpoint}/api/v1/datasets/{namespace}/{dataset_name}/download/increase' r = requests.post(url) diff --git a/modelscope/hub/utils/utils.py b/modelscope/hub/utils/utils.py index d84b78ea..7d3c2499 100644 --- a/modelscope/hub/utils/utils.py +++ b/modelscope/hub/utils/utils.py @@ -4,8 +4,7 @@ import hashlib import os from typing import Optional -from modelscope.hub.constants import (DEFAULT_MODELSCOPE_DATA_ENDPOINT, - DEFAULT_MODELSCOPE_DOMAIN, +from modelscope.hub.constants import (DEFAULT_MODELSCOPE_DOMAIN, DEFAULT_MODELSCOPE_GROUP, MODEL_ID_SEPARATOR, MODELSCOPE_URL_SCHEME) @@ -44,11 +43,6 @@ def get_endpoint(): return MODELSCOPE_URL_SCHEME + modelscope_domain -def get_dataset_hub_endpoint(): - return os.environ.get('HUB_DATASET_ENDPOINT', - DEFAULT_MODELSCOPE_DATA_ENDPOINT) - - def compute_hash(file_path): BUFFER_SIZE = 1024 * 64 # 64k buffer size sha256_hash = hashlib.sha256() diff --git a/modelscope/msdatasets/ms_dataset.py b/modelscope/msdatasets/ms_dataset.py index 361b8ae0..cf055d6d 100644 --- a/modelscope/msdatasets/ms_dataset.py +++ b/modelscope/msdatasets/ms_dataset.py @@ -1,6 +1,5 @@ # Copyright (c) Alibaba, Inc. and its affiliates. -import math import os from typing import (Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Union) @@ -17,19 +16,18 @@ from datasets.utils.file_utils import (is_relative_path, relative_to_absolute_path) from modelscope.hub.repository import DatasetRepository +from modelscope.msdatasets.task_datasets.builder import build_task_dataset +from modelscope.msdatasets.utils.dataset_builder import ExternalDataset +from modelscope.msdatasets.utils.dataset_utils import ( + get_dataset_files, get_target_dataset_structure, load_dataset_builder) +from modelscope.msdatasets.utils.download_utils import DatasetDownloadManager +from modelscope.msdatasets.utils.upload_utils import DatasetUploadManager from modelscope.utils.config import ConfigDict from modelscope.utils.config_ds import MS_DATASETS_CACHE from modelscope.utils.constant import (DEFAULT_DATASET_NAMESPACE, DEFAULT_DATASET_REVISION, DatasetFormations, DownloadMode, Hubs) from modelscope.utils.logger import get_logger -from .task_datasets.builder import build_task_dataset -from .utils.dataset_builder import ExternalDataset -from .utils.dataset_utils import (get_dataset_files, - get_target_dataset_structure, - load_dataset_builder) -from .utils.download_utils import DatasetDownloadManager -from .utils.upload_utils import DatasetUploadManager logger = get_logger() @@ -234,7 +232,6 @@ class MsDataset: # dataset organized to be compatible with hf format if dataset_formation == DatasetFormations.hf_compatible: dataset_name = dataset_scripts['.py'][0] - download_dataset = dataset_name else: raise FileNotFoundError( f"Couldn't find a dataset script at {relative_to_absolute_path(dataset_name)} " @@ -270,7 +267,8 @@ class MsDataset: raise TypeError('path must be a str or a list, but got' f' {type(dataset_name)}') - if download_dataset: + is_ci_test = os.getenv('CI_TEST') == 'True' + if download_dataset and not is_ci_test: try: api.on_dataset_download( dataset_name=download_dataset, namespace=namespace) @@ -570,15 +568,26 @@ class MsDataset: local_file_path: str, dataset_name: str, namespace: Optional[str] = DEFAULT_DATASET_NAMESPACE, - version: Optional[str] = DEFAULT_DATASET_REVISION) -> None: - """Upload dataset file to the ModelScope Hub. Please login to the ModelScope Hub first. + version: Optional[str] = DEFAULT_DATASET_REVISION, + num_processes: Optional[int] = None, + chunksize: Optional[int] = 1, + filter_hidden_files: Optional[bool] = True) -> None: + """Upload dataset file or directory to the ModelScope Hub. Please login to the ModelScope Hub first. Args: - object_name (str): The object name on ModelScope, in the form of your-dataset-name.zip - local_file_path (str): Local file to upload + object_name (str): The object name on ModelScope, in the form of your-dataset-name.zip or your-dataset-name + local_file_path (str): Local file or directory to upload dataset_name (str): Name of the dataset namespace(str, optional): Namespace of the dataset version: Optional[str]: Version of the dataset + num_processes: Optional[int]: The number of processes used for multi-process uploading. + This is only applicable when local_file_path is a directory, and we are uploading mutliple-files + insided the directory. When None provided, the number returned by os.cpu_count() is used as default. + chunksize: Optional[int]: The chunksize of objects to upload. + For very long iterables using a large value for chunksize can make the job complete much faster than + using the default value of 1. Available if local_file_path is a directory. + filter_hidden_files: Optional[bool]: Whether to filter hidden files. + Available if local_file_path is a directory. Returns: None @@ -586,7 +595,20 @@ class MsDataset: """ _upload_manager = DatasetUploadManager( dataset_name=dataset_name, namespace=namespace, version=version) - _upload_manager.upload(object_name, local_file_path) + + if os.path.isfile(local_file_path): + _upload_manager.upload( + object_name=object_name, local_file_path=local_file_path) + elif os.path.isdir(local_file_path): + _upload_manager.upload_dir( + object_dir_name=object_name, + local_dir_path=local_file_path, + num_processes=num_processes, + chunksize=chunksize, + filter_hidden_files=filter_hidden_files) + else: + raise ValueError( + f'{local_file_path} is not a valid file path or directory') @staticmethod def clone_meta(dataset_work_dir: str, diff --git a/modelscope/msdatasets/utils/dataset_utils.py b/modelscope/msdatasets/utils/dataset_utils.py index ef42f75f..db9d1fee 100644 --- a/modelscope/msdatasets/utils/dataset_utils.py +++ b/modelscope/msdatasets/utils/dataset_utils.py @@ -6,7 +6,8 @@ from typing import Any, Mapping, Optional, Sequence, Union from datasets.builder import DatasetBuilder -from modelscope.utils.constant import DEFAULT_DATASET_REVISION +from modelscope.hub.api import HubApi +from modelscope.utils.constant import DEFAULT_DATASET_REVISION, DownloadParams from modelscope.utils.logger import get_logger from .dataset_builder import MsCsvDatasetBuilder, TaskSpecificDatasetBuilder @@ -77,6 +78,81 @@ def get_target_dataset_structure(dataset_structure: dict, return target_subset_name, target_dataset_structure +def list_dataset_objects(hub_api: HubApi, max_limit: int, is_recursive: bool, + dataset_name: str, namespace: str, + version: str) -> list: + """ + List all of objects for specific dataset. + + Args: + hub_api (class HubApi): HubApi instance. + max_limit (int): Max number of objects. + is_recursive (bool): Whether to list objects recursively. + dataset_name (str): Dataset name. + namespace (str): Namespace. + version (str): Dataset version. + Returns: + res (list): List of objects, i.e., ['train/images/001.png', 'train/images/002.png', 'val/images/001.png', ...] + """ + res = [] + cookies = hub_api.check_cookies_upload_data(use_cookies=True) + objects = hub_api.list_oss_dataset_objects( + dataset_name=dataset_name, + namespace=namespace, + max_limit=max_limit, + is_recursive=is_recursive, + is_filter_dir=True, + revision=version, + cookies=cookies) + + for item in objects: + object_key = item.get('Key') + res.append(object_key) + + return res + + +def contains_dir(file_map) -> bool: + """ + To check whether input contains at least one directory. + + Args: + file_map (dict): Structure of data files. e.g., {'train': 'train.zip', 'validation': 'val.zip'} + Returns: + True if input contains at least one directory, False otherwise. + """ + res = False + for k, v in file_map.items(): + if isinstance(v, str) and not v.endswith('.zip'): + res = True + break + return res + + +def get_split_objects_map(file_map, objects): + """ + Get the map between dataset split and oss objects. + + Args: + file_map (dict): Structure of data files. e.g., {'train': 'train', 'validation': 'val'}, both of train and val + are dirs. + objects (list): List of oss objects. e.g., ['train/001/1_123.png', 'train/001/1_124.png', 'val/003/3_38.png'] + Returns: + A map of split-objects. e.g., {'train': ['train/001/1_123.png', 'train/001/1_124.png'], + 'validation':['val/003/3_38.png']} + """ + res = {} + for k, v in file_map.items(): + res[k] = [] + + for obj_key in objects: + for k, v in file_map.items(): + if obj_key.startswith(v): + res[k].append(obj_key) + + return res + + def get_dataset_files(subset_split_into: dict, dataset_name: str, namespace: str, @@ -95,14 +171,24 @@ def get_dataset_files(subset_split_into: dict, meta_map = defaultdict(dict) file_map = defaultdict(dict) args_map = defaultdict(dict) - from modelscope.hub.api import HubApi modelscope_api = HubApi() + objects = list_dataset_objects( + hub_api=modelscope_api, + max_limit=DownloadParams.MAX_LIST_OBJECTS_NUM.value, + is_recursive=True, + dataset_name=dataset_name, + namespace=namespace, + version=revision) + for split, info in subset_split_into.items(): meta_map[split] = modelscope_api.get_dataset_file_url( info.get('meta', ''), dataset_name, namespace, revision) if info.get('file'): file_map[split] = info['file'] args_map[split] = info.get('args') + + if contains_dir(file_map): + file_map = get_split_objects_map(file_map, objects) return meta_map, file_map, args_map diff --git a/modelscope/msdatasets/utils/download_utils.py b/modelscope/msdatasets/utils/download_utils.py index 2e21bf50..b1c7a5ab 100644 --- a/modelscope/msdatasets/utils/download_utils.py +++ b/modelscope/msdatasets/utils/download_utils.py @@ -10,16 +10,14 @@ from .oss_utils import OssUtilities class DatasetDownloadManager(DownloadManager): - def __init__( - self, - dataset_name: str, - namespace: str, - version: str, - data_dir: Optional[str] = None, - download_config: Optional[DownloadConfig] = None, - base_path: Optional[str] = None, - record_checksums=True, - ): + def __init__(self, + dataset_name: str, + namespace: str, + version: str, + data_dir: Optional[str] = None, + download_config: Optional[DownloadConfig] = None, + base_path: Optional[str] = None, + record_checksums=True): super().__init__(dataset_name, data_dir, download_config, base_path, record_checksums) self._namespace = namespace diff --git a/modelscope/msdatasets/utils/oss_utils.py b/modelscope/msdatasets/utils/oss_utils.py index 4a403876..d7d61e89 100644 --- a/modelscope/msdatasets/utils/oss_utils.py +++ b/modelscope/msdatasets/utils/oss_utils.py @@ -50,11 +50,16 @@ class OssUtilities: progress_callback=self._percentage) return local_path - def upload(self, oss_object_name: str, local_file_path: str) -> str: + def upload(self, oss_object_name: str, local_file_path: str, + indicate_individual_progress: bool) -> str: retry_count = 0 object_key = os.path.join(self.oss_dir, oss_object_name) resumable_store = oss2.ResumableStore( root=self.upload_resumable_tmp_store) + if indicate_individual_progress: + progress_callback = self._percentage + else: + progress_callback = None while True: try: @@ -66,7 +71,7 @@ class OssUtilities: store=resumable_store, multipart_threshold=self.upload_multipart_threshold, part_size=self.upload_part_size, - progress_callback=self._percentage, + progress_callback=progress_callback, num_threads=self.upload_num_threads) break except Exception: diff --git a/modelscope/msdatasets/utils/upload_utils.py b/modelscope/msdatasets/utils/upload_utils.py index 4813b89f..2b4422b2 100644 --- a/modelscope/msdatasets/utils/upload_utils.py +++ b/modelscope/msdatasets/utils/upload_utils.py @@ -1,5 +1,10 @@ # Copyright (c) Alibaba, Inc. and its affiliates. +import os +from multiprocessing.dummy import Pool as ThreadPool + +from tqdm import tqdm + from .oss_utils import OssUtilities @@ -19,5 +24,38 @@ class DatasetUploadManager(object): def upload(self, object_name: str, local_file_path: str) -> str: object_key = self.oss_utilities.upload( - oss_object_name=object_name, local_file_path=local_file_path) + oss_object_name=object_name, + local_file_path=local_file_path, + indicate_individual_progress=True) return object_key + + def upload_dir(self, object_dir_name: str, local_dir_path: str, + num_processes: int, chunksize: int, + filter_hidden_files: bool) -> int: + + def run_upload(args): + self.oss_utilities.upload( + oss_object_name=args[0], + local_file_path=args[1], + indicate_individual_progress=False) + + files_list = [] + for root, dirs, files in os.walk(local_dir_path): + for file_name in files: + if filter_hidden_files and file_name.startswith('.'): + continue + # Concatenate directory name and relative path into a oss object key. e.g., train/001/1_1230.png + object_name = os.path.join( + object_dir_name, + root.replace(local_dir_path, '', 1).strip('/'), file_name) + + local_file_path = os.path.join(root, file_name) + files_list.append((object_name, local_file_path)) + + with ThreadPool(processes=num_processes) as pool: + result = list( + tqdm( + pool.imap(run_upload, files_list, chunksize=chunksize), + total=len(files_list))) + + return len(result) diff --git a/modelscope/utils/constant.py b/modelscope/utils/constant.py index 5f0532ce..9e10e802 100644 --- a/modelscope/utils/constant.py +++ b/modelscope/utils/constant.py @@ -227,6 +227,13 @@ class DownloadMode(enum.Enum): FORCE_REDOWNLOAD = 'force_redownload' +class DownloadParams(enum.Enum): + """ + Parameters for downloading dataset. + """ + MAX_LIST_OBJECTS_NUM = 50000 + + class DatasetFormations(enum.Enum): """ How a dataset is organized and interpreted """ diff --git a/tests/msdatasets/test_dataset_upload.py b/tests/msdatasets/test_dataset_upload.py index 1179414d..3d35d480 100644 --- a/tests/msdatasets/test_dataset_upload.py +++ b/tests/msdatasets/test_dataset_upload.py @@ -6,9 +6,13 @@ import unittest import zipfile from modelscope.msdatasets import MsDataset -from modelscope.utils.constant import ModelFile +from modelscope.msdatasets.utils.dataset_utils import list_dataset_objects +from modelscope.utils import logger as logging +from modelscope.utils.constant import DEFAULT_DATASET_REVISION, ModelFile from modelscope.utils.test_utils import test_level +logger = logging.get_logger(__name__) + KEY_EXTRACTED = 'extracted' @@ -39,7 +43,8 @@ class DatasetUploadTest(unittest.TestCase): def tearDown(self): os.chdir(self.old_dir) shutil.rmtree(self.temp_dir, ignore_errors=True) - print('The test dir successfully removed!') + logger.info( + f'Temporary directory {self.temp_dir} successfully removed!') @staticmethod def get_raw_downloaded_file_path(extracted_path): @@ -68,6 +73,40 @@ class DatasetUploadTest(unittest.TestCase): dataset_name=self.dataset_name, namespace=self.namespace) + @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') + def test_ds_upload_dir(self): + ms_ds_train = MsDataset.load(self.prepared_dataset_name, split='train') + config_train = ms_ds_train._hf_ds.config_kwargs + extracted_path_train = config_train.get('split_config').get('train') + + MsDataset.upload( + object_name='train', + local_file_path=os.path.join(extracted_path_train, + 'Pets/images/train'), + dataset_name=self.dataset_name, + namespace=self.namespace) + MsDataset.upload( + object_name='val', + local_file_path=os.path.join(extracted_path_train, + 'Pets/images/val'), + dataset_name=self.dataset_name, + namespace=self.namespace) + + objects = list_dataset_objects( + hub_api=self.api, + max_limit=-1, + is_recursive=True, + dataset_name=self.dataset_name, + namespace=self.namespace, + version=DEFAULT_DATASET_REVISION) + + logger.info(f'{len(objects)} objects have been uploaded: {objects}') + + @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') + def test_ds_download_dir(self): + test_ds = MsDataset.load(self.dataset_name, self.namespace) + assert test_ds.config_kwargs['split_config'].values() + @unittest.skipUnless(test_level() >= 1, 'skip test in current test level') def test_ds_clone_meta(self): MsDataset.clone_meta(