From f660a119f02cbf767521eea322f96faf2bb883c8 Mon Sep 17 00:00:00 2001 From: "xingjun.wxj" Date: Mon, 5 Sep 2022 16:19:45 +0800 Subject: [PATCH] [to #42322933]Add resumable and large data upload. CR Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/9995250 1. add resumable dataset upload 2. add large data upload (up to 48.8TB) --- modelscope/msdatasets/ms_dataset.py | 8 +------ modelscope/msdatasets/utils/oss_utils.py | 24 +++++++++++++++------ modelscope/msdatasets/utils/upload_utils.py | 22 +++++++++---------- 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/modelscope/msdatasets/ms_dataset.py b/modelscope/msdatasets/ms_dataset.py index 338c6333..28a95643 100644 --- a/modelscope/msdatasets/ms_dataset.py +++ b/modelscope/msdatasets/ms_dataset.py @@ -574,14 +574,8 @@ class MsDataset: None """ - from modelscope.hub.api import HubApi - _hub_api = HubApi() - cookies = _hub_api.check_cookies_upload_data(use_cookies=True) _upload_manager = DatasetUploadManager( - dataset_name=dataset_name, - namespace=namespace, - version=version, - cookies=cookies) + dataset_name=dataset_name, namespace=namespace, version=version) _upload_manager.upload(object_name, local_file_path) @staticmethod diff --git a/modelscope/msdatasets/utils/oss_utils.py b/modelscope/msdatasets/utils/oss_utils.py index 63a1cf77..9a7040a1 100644 --- a/modelscope/msdatasets/utils/oss_utils.py +++ b/modelscope/msdatasets/utils/oss_utils.py @@ -18,6 +18,12 @@ class OssUtilities: self.oss_dir = oss_config['Dir'] self.oss_backup_dir = oss_config['BackupDir'] + self.upload_resumable_tmp_store = '/tmp/modelscope/tmp_dataset' + self.upload_multipart_threshold = 50 * 1024 * 1024 + self.upload_part_size = 1 * 1024 * 1024 + self.upload_num_threads = 4 + self.upload_max_retries = 3 + @staticmethod def _percentage(consumed_bytes, total_bytes): if total_bytes: @@ -42,21 +48,27 @@ class OssUtilities: progress_callback=self._percentage) return local_path - def upload(self, oss_file_name: str, local_file_path: str) -> str: - max_retries = 3 + def upload(self, oss_object_name: str, local_file_path: str) -> str: retry_count = 0 - object_key = os.path.join(self.oss_dir, oss_file_name) + object_key = os.path.join(self.oss_dir, oss_object_name) + resumable_store = oss2.ResumableStore( + root=self.upload_resumable_tmp_store) while True: try: retry_count += 1 - self.bucket.put_object_from_file( + oss2.resumable_upload( + self.bucket, object_key, local_file_path, - progress_callback=self._percentage) + store=resumable_store, + multipart_threshold=self.upload_multipart_threshold, + part_size=self.upload_part_size, + progress_callback=self._percentage, + num_threads=self.upload_num_threads) break except Exception: - if retry_count >= max_retries: + if retry_count >= self.upload_max_retries: raise return object_key diff --git a/modelscope/msdatasets/utils/upload_utils.py b/modelscope/msdatasets/utils/upload_utils.py index eff3aca0..fbe5c531 100644 --- a/modelscope/msdatasets/utils/upload_utils.py +++ b/modelscope/msdatasets/utils/upload_utils.py @@ -1,23 +1,21 @@ -from http.cookiejar import CookieJar - from .oss_utils import OssUtilities class DatasetUploadManager(object): - def __init__(self, dataset_name: str, namespace: str, version: str, - cookies: CookieJar): + def __init__(self, dataset_name: str, namespace: str, version: str): from modelscope.hub.api import HubApi - api = HubApi() - oss_config = api.get_dataset_access_config_session( - cookies=cookies, + _hub_api = HubApi() + _cookies = _hub_api.check_cookies_upload_data(use_cookies=True) + _oss_config = _hub_api.get_dataset_access_config_session( + cookies=_cookies, dataset_name=dataset_name, namespace=namespace, revision=version) - self.oss_utilities = OssUtilities(oss_config) + self.oss_utilities = OssUtilities(_oss_config) - def upload(self, oss_file_name: str, local_file_path: str) -> str: - oss_object_key = self.oss_utilities.upload( - oss_file_name=oss_file_name, local_file_path=local_file_path) - return oss_object_key + def upload(self, object_name: str, local_file_path: str) -> str: + object_key = self.oss_utilities.upload( + oss_object_name=object_name, local_file_path=local_file_path) + return object_key