CR Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/9995250 1. add resumable dataset upload 2. add large data upload (up to 48.8TB)master
| @@ -574,14 +574,8 @@ class MsDataset: | |||||
| None | None | ||||
| """ | """ | ||||
| from modelscope.hub.api import HubApi | |||||
| _hub_api = HubApi() | |||||
| cookies = _hub_api.check_cookies_upload_data(use_cookies=True) | |||||
| _upload_manager = DatasetUploadManager( | _upload_manager = DatasetUploadManager( | ||||
| dataset_name=dataset_name, | |||||
| namespace=namespace, | |||||
| version=version, | |||||
| cookies=cookies) | |||||
| dataset_name=dataset_name, namespace=namespace, version=version) | |||||
| _upload_manager.upload(object_name, local_file_path) | _upload_manager.upload(object_name, local_file_path) | ||||
| @staticmethod | @staticmethod | ||||
| @@ -18,6 +18,12 @@ class OssUtilities: | |||||
| self.oss_dir = oss_config['Dir'] | self.oss_dir = oss_config['Dir'] | ||||
| self.oss_backup_dir = oss_config['BackupDir'] | self.oss_backup_dir = oss_config['BackupDir'] | ||||
| self.upload_resumable_tmp_store = '/tmp/modelscope/tmp_dataset' | |||||
| self.upload_multipart_threshold = 50 * 1024 * 1024 | |||||
| self.upload_part_size = 1 * 1024 * 1024 | |||||
| self.upload_num_threads = 4 | |||||
| self.upload_max_retries = 3 | |||||
| @staticmethod | @staticmethod | ||||
| def _percentage(consumed_bytes, total_bytes): | def _percentage(consumed_bytes, total_bytes): | ||||
| if total_bytes: | if total_bytes: | ||||
| @@ -42,21 +48,27 @@ class OssUtilities: | |||||
| progress_callback=self._percentage) | progress_callback=self._percentage) | ||||
| return local_path | return local_path | ||||
| def upload(self, oss_file_name: str, local_file_path: str) -> str: | |||||
| max_retries = 3 | |||||
| def upload(self, oss_object_name: str, local_file_path: str) -> str: | |||||
| retry_count = 0 | retry_count = 0 | ||||
| object_key = os.path.join(self.oss_dir, oss_file_name) | |||||
| object_key = os.path.join(self.oss_dir, oss_object_name) | |||||
| resumable_store = oss2.ResumableStore( | |||||
| root=self.upload_resumable_tmp_store) | |||||
| while True: | while True: | ||||
| try: | try: | ||||
| retry_count += 1 | retry_count += 1 | ||||
| self.bucket.put_object_from_file( | |||||
| oss2.resumable_upload( | |||||
| self.bucket, | |||||
| object_key, | object_key, | ||||
| local_file_path, | local_file_path, | ||||
| progress_callback=self._percentage) | |||||
| store=resumable_store, | |||||
| multipart_threshold=self.upload_multipart_threshold, | |||||
| part_size=self.upload_part_size, | |||||
| progress_callback=self._percentage, | |||||
| num_threads=self.upload_num_threads) | |||||
| break | break | ||||
| except Exception: | except Exception: | ||||
| if retry_count >= max_retries: | |||||
| if retry_count >= self.upload_max_retries: | |||||
| raise | raise | ||||
| return object_key | return object_key | ||||
| @@ -1,23 +1,21 @@ | |||||
| from http.cookiejar import CookieJar | |||||
| from .oss_utils import OssUtilities | from .oss_utils import OssUtilities | ||||
| class DatasetUploadManager(object): | class DatasetUploadManager(object): | ||||
| def __init__(self, dataset_name: str, namespace: str, version: str, | |||||
| cookies: CookieJar): | |||||
| def __init__(self, dataset_name: str, namespace: str, version: str): | |||||
| from modelscope.hub.api import HubApi | from modelscope.hub.api import HubApi | ||||
| api = HubApi() | |||||
| oss_config = api.get_dataset_access_config_session( | |||||
| cookies=cookies, | |||||
| _hub_api = HubApi() | |||||
| _cookies = _hub_api.check_cookies_upload_data(use_cookies=True) | |||||
| _oss_config = _hub_api.get_dataset_access_config_session( | |||||
| cookies=_cookies, | |||||
| dataset_name=dataset_name, | dataset_name=dataset_name, | ||||
| namespace=namespace, | namespace=namespace, | ||||
| revision=version) | revision=version) | ||||
| self.oss_utilities = OssUtilities(oss_config) | |||||
| self.oss_utilities = OssUtilities(_oss_config) | |||||
| def upload(self, oss_file_name: str, local_file_path: str) -> str: | |||||
| oss_object_key = self.oss_utilities.upload( | |||||
| oss_file_name=oss_file_name, local_file_path=local_file_path) | |||||
| return oss_object_key | |||||
| def upload(self, object_name: str, local_file_path: str) -> str: | |||||
| object_key = self.oss_utilities.upload( | |||||
| oss_object_name=object_name, local_file_path=local_file_path) | |||||
| return object_key | |||||