diff --git a/modelscope/models/multi_modal/ofa_for_all_tasks.py b/modelscope/models/multi_modal/ofa_for_all_tasks.py index 80471e3c..4528a9da 100644 --- a/modelscope/models/multi_modal/ofa_for_all_tasks.py +++ b/modelscope/models/multi_modal/ofa_for_all_tasks.py @@ -287,5 +287,7 @@ class OfaForAllTasks(TorchModel): def load_ans2label(self): if self.cfg.model.get('answer2label', None): - filename = osp.join(self.model_dir, self.cfg.model.answer2label) - self.ans2label_dict = json.load(open(filename)) + ans2label_file = osp.join(self.model_dir, + self.cfg.model.answer2label) + with open(ans2label_file, 'r') as reader: + self.ans2label_dict = json.load(reader) diff --git a/modelscope/preprocessors/ofa/base.py b/modelscope/preprocessors/ofa/base.py index bb47c411..8bbe02d1 100644 --- a/modelscope/preprocessors/ofa/base.py +++ b/modelscope/preprocessors/ofa/base.py @@ -61,7 +61,8 @@ class OfaBasePreprocessor: self.index2ans = {} if self.cfg.model.get('answer2label', False): ans2label_file = osp.join(model_dir, self.cfg.model.answer2label) - ans2label_dict = json.load(open(ans2label_file, 'r')) + with open(ans2label_file, 'r') as reader: + ans2label_dict = json.load(reader) self.constraint_trie = Trie(tokenizer.eos_token_id) for i, answer in enumerate(ans2label_dict.keys()): answer_item = tokenizer( diff --git a/modelscope/trainers/multi_modal/ofa/ofa_file_dataset.py b/modelscope/trainers/multi_modal/ofa/ofa_file_dataset.py index 17c9398a..138f1303 100644 --- a/modelscope/trainers/multi_modal/ofa/ofa_file_dataset.py +++ b/modelscope/trainers/multi_modal/ofa/ofa_file_dataset.py @@ -79,7 +79,7 @@ class OFAFileDataset: self.total_row_count += 1 offset += len(line.encode('utf-8')) pickle.dump(self.lineid_to_offset, - open('{}.index'.format(self.file_path), 'rb')) + open('{}.index'.format(self.file_path), 'wb')) self._compute_start_pos_and_row_count() print( 'local datafile {} slice_id {} finished initializing row_count and line_idx-to-offset mapping' diff --git a/modelscope/trainers/multi_modal/ofa/ofa_trainer.py b/modelscope/trainers/multi_modal/ofa/ofa_trainer.py index af2fca0a..fae79a74 100644 --- a/modelscope/trainers/multi_modal/ofa/ofa_trainer.py +++ b/modelscope/trainers/multi_modal/ofa/ofa_trainer.py @@ -1,120 +1,84 @@ import os -from os import path as osp from typing import Dict, Optional -import torch -import torch.distributed as dist -import transformers -from torch.utils.data import DataLoader -from torch.utils.data.distributed import DistributedSampler +from datasets import load_dataset from modelscope.metainfo import Trainers from modelscope.models.base import Model +from modelscope.msdatasets.ms_dataset import MsDataset from modelscope.preprocessors.multi_modal import OfaPreprocessor from modelscope.preprocessors.ofa.utils.collate import collate_fn -from modelscope.trainers.base import BaseTrainer +from modelscope.trainers import EpochBasedTrainer from modelscope.trainers.builder import TRAINERS +from modelscope.trainers.optimizer.builder import build_optimizer +from modelscope.utils.config import Config from modelscope.utils.constant import ModeKeys, ModelFile -from modelscope.utils.logger import get_logger -from modelscope.utils.torch_utils import init_dist from .ofa_trainer_utils import (AdjustLabelSmoothedCrossEntropyCriterion, OFADataset, get_schedule) -logger = get_logger() - @TRAINERS.register_module(module_name=Trainers.ofa_tasks) -class OFATrainer(BaseTrainer): +class OFATrainer(EpochBasedTrainer): def __init__(self, model: str, *args, **kwargs): + # import pdb + # pdb.set_trace() model = Model.from_pretrained(model) - super().__init__(osp.join(model.model_dir, ModelFile.CONFIGURATION)) - self.model_dir = model.model_dir - self.model = model.model - self.device_id = 0 - self.total_epoch = self.cfg.train.epoch - self.train_batch_size = self.cfg.train.batch_size - self.val_batch_size = self.cfg.evaluation.batch_size - self.save_dir = self.cfg.train.save_dir - init_dist(launcher='pytorch') - self.train_dataset = OFADataset( - file_path=self.cfg.dataset.train_set, - selected_id_keys=self.cfg.dataset.selected_id_keys, - preprocessor=OfaPreprocessor( - model_dir=self.model_dir, split=ModeKeys.TRAIN), - ) - self.val_dataset = OFADataset( - file_path=self.cfg.dataset.valid_set, - selected_id_keys=self.cfg.dataset.selected_id_keys, - preprocessor=OfaPreprocessor( - model_dir=self.model_dir, split=ModeKeys.EVAL), + model_dir = model.model_dir + cfg_file = os.path.join(model_dir, ModelFile.CONFIGURATION) + cfg = Config.from_file(cfg_file) + dataset = load_dataset( + cfg.dataset.script, + data_files=cfg.dataset.hf_dataset, + sep=cfg.dataset.sep, ) - epoch_steps = len( - self.train_dataset) // self.cfg.train.gradient_accumulation_steps - self.cfg.train.num_train_steps = epoch_steps * self.cfg.train.epoch + ms_dadaset = MsDataset.from_hf_dataset(dataset) + # train_dataset = OFADataset( + # file_path=cfg.dataset.train_set, + # selected_id_keys=cfg.dataset.selected_id_keys, + # preprocessor=OfaPreprocessor( + # model_dir=model_dir, mode=ModeKeys.TRAIN), + # ) + # val_dataset = OFADataset( + # file_path=cfg.dataset.valid_set, + # selected_id_keys=cfg.dataset.selected_id_keys, + # preprocessor=OfaPreprocessor( + # model_dir=model_dir, mode=ModeKeys.EVAL), + # ) + epoch_steps = len(ms_dadaset['train']) // ( + cfg.train.gradient_accumulation_steps + * cfg.train.dataloader.batch_size_per_gpu) + cfg.train.lr_scheduler.num_train_steps = epoch_steps * cfg.train.max_epochs + cfg.train.criterion.tokenizer = model.tokenizer self.criterion = AdjustLabelSmoothedCrossEntropyCriterion( - self.cfg.train.criterion) - - def train(self, *args, **kwargs): - assert dist.is_initialized() - - self.model.train() - self.model.to(self.device_id) - ddp_model = torch.nn.parallel.DistributedDataParallel( - self.model, device_ids=[ - self.device_id, - ]) - - optimizer = transformers.AdamW( - self.model.parameters(), - lr=self.cfg.train.lr, - weight_decay=self.cfg.train.weight_decay, - correct_bias=False, - ) - scheduler_class, scheduler_args = get_schedule(self.cfg.train) + cfg.train.criterion) + optimizer = build_optimizer(model, cfg=cfg.train.optimizer) + scheduler_class, scheduler_args = get_schedule(cfg.train.lr_scheduler) if scheduler_class is not None: lr_scheduler = scheduler_class(**{'optimizer': optimizer}, **scheduler_args) else: lr_scheduler = None - for epoch in range(self.total_epoch): - train_sampler = DistributedSampler( - dataset=self.train_dataset, shuffle=True) - train_sampler.set_epoch(epoch) - - train_params = { - 'pin_memory': True, - 'collate_fn': collate_fn, - 'batch_size': self.train_batch_size, - 'shuffle': False, - 'drop_last': True, - 'sampler': train_sampler, - 'num_workers': 2, - } - - train_loader = DataLoader(self.train_dataset, **train_params) + super().__init__( + cfg_file=cfg_file, + model=model, + data_collator=collate_fn, + train_dataset=dataset['train'], + eval_dataset=dataset['valid'], + optimizers=(optimizer, lr_scheduler), + work_dir=cfg.train.work_dir, + *args, + **kwargs, + ) - for idx, batch in enumerate(train_loader, start=1): - model_outputs = ddp_model(**batch) - loss, sample_size, logging_output = self.criterion( - model_outputs, batch) - loss.backward() - optimizer.zero_grad() - if lr_scheduler is not None: - lr_scheduler.step() - optimizer.step() - optimizer.zero_grad() - if idx % 10 == 0: - logger.info( - 'epoch: {}, train batch {}/{}, loss={:.5f}'.format( - epoch, idx, len(train_loader), loss.item())) - if dist.get_rank() == 0: - os.makedirs(self.ckpt_dir, exist_ok=True) - torch.save(ddp_model.module.state_dict(), - f'{self.ckpt_dir}/epoch{epoch}.bin') + def train(self, *args, **kwargs): + pass def evaluate(self, checkpoint_path: Optional[str] = None, *args, **kwargs) -> Dict[str, float]: pass + + def prediction_step(self, model, inputs): + pass diff --git a/modelscope/trainers/multi_modal/ofa/ofa_trainer_old.py b/modelscope/trainers/multi_modal/ofa/ofa_trainer_old.py new file mode 100644 index 00000000..5e41b49b --- /dev/null +++ b/modelscope/trainers/multi_modal/ofa/ofa_trainer_old.py @@ -0,0 +1,120 @@ +import os +from os import path as osp +from typing import Dict, Optional + +import torch +import torch.distributed as dist +import transformers +from torch.utils.data import DataLoader +from torch.utils.data.distributed import DistributedSampler + +from modelscope.metainfo import Trainers +from modelscope.models.base import Model +from modelscope.preprocessors.multi_modal import OfaPreprocessor +from modelscope.preprocessors.ofa.utils.collate import collate_fn +from modelscope.trainers.base import BaseTrainer +from modelscope.trainers.builder import TRAINERS +from modelscope.utils.constant import ModeKeys, ModelFile +from modelscope.utils.logger import get_logger +from modelscope.utils.torch_utils import init_dist +from .ofa_trainer_utils import (AdjustLabelSmoothedCrossEntropyCriterion, + OFADataset, get_schedule) + +logger = get_logger() + + +@TRAINERS.register_module(module_name=Trainers.ofa_tasks) +class OFAOldTrainer(BaseTrainer): + + def __init__(self, model: str, *args, **kwargs): + model = Model.from_pretrained(model) + super().__init__(osp.join(model.model_dir, ModelFile.CONFIGURATION)) + self.model_dir = model.model_dir + self.model = model.model + self.device_id = 0 + self.total_epoch = self.cfg.train.epoch + self.train_batch_size = self.cfg.train.batch_size + self.val_batch_size = self.cfg.evaluation.batch_size + self.save_dir = self.cfg.train.save_dir + init_dist(launcher='pytorch') + self.train_dataset = OFADataset( + file_path=self.cfg.dataset.train_set, + selected_id_keys=self.cfg.dataset.selected_id_keys, + preprocessor=OfaPreprocessor( + model_dir=self.model_dir, split=ModeKeys.TRAIN), + ) + self.val_dataset = OFADataset( + file_path=self.cfg.dataset.valid_set, + selected_id_keys=self.cfg.dataset.selected_id_keys, + preprocessor=OfaPreprocessor( + model_dir=self.model_dir, split=ModeKeys.EVAL), + ) + epoch_steps = len( + self.train_dataset) // self.cfg.train.gradient_accumulation_steps + self.cfg.train.num_train_steps = epoch_steps * self.cfg.train.epoch + self.criterion = AdjustLabelSmoothedCrossEntropyCriterion( + self.cfg.train.criterion) + + def train(self, *args, **kwargs): + assert dist.is_initialized() + + self.model.train() + self.model.to(self.device_id) + ddp_model = torch.nn.parallel.DistributedDataParallel( + self.model, device_ids=[ + self.device_id, + ]) + + optimizer = transformers.AdamW( + self.model.parameters(), + lr=self.cfg.train.lr, + weight_decay=self.cfg.train.weight_decay, + correct_bias=False, + ) + scheduler_class, scheduler_args = get_schedule(self.cfg.train) + if scheduler_class is not None: + lr_scheduler = scheduler_class(**{'optimizer': optimizer}, + **scheduler_args) + else: + lr_scheduler = None + for epoch in range(self.total_epoch): + train_sampler = DistributedSampler( + dataset=self.train_dataset, shuffle=True) + train_sampler.set_epoch(epoch) + + train_params = { + 'pin_memory': True, + 'collate_fn': collate_fn, + 'batch_size': self.train_batch_size, + 'shuffle': False, + 'drop_last': True, + 'sampler': train_sampler, + 'num_workers': 2, + } + + train_loader = DataLoader(self.train_dataset, **train_params) + + for idx, batch in enumerate(train_loader, start=1): + model_outputs = ddp_model(**batch) + loss, sample_size, logging_output = self.criterion( + model_outputs, batch) + loss.backward() + optimizer.zero_grad() + if lr_scheduler is not None: + lr_scheduler.step() + optimizer.step() + optimizer.zero_grad() + if idx % 10 == 0: + logger.info( + 'epoch: {}, train batch {}/{}, loss={:.5f}'.format( + epoch, idx, len(train_loader), loss.item())) + if dist.get_rank() == 0: + os.makedirs(self.ckpt_dir, exist_ok=True) + torch.save(ddp_model.module.state_dict(), + f'{self.ckpt_dir}/epoch{epoch}.bin') + + def evaluate(self, + checkpoint_path: Optional[str] = None, + *args, + **kwargs) -> Dict[str, float]: + pass diff --git a/modelscope/trainers/multi_modal/ofa/ofa_trainer_utils.py b/modelscope/trainers/multi_modal/ofa/ofa_trainer_utils.py index 10acc870..38a13f4d 100644 --- a/modelscope/trainers/multi_modal/ofa/ofa_trainer_utils.py +++ b/modelscope/trainers/multi_modal/ofa/ofa_trainer_utils.py @@ -35,7 +35,7 @@ class OFADataset(Dataset): self.dataset = OFAFileDataset( file_path=file_path, - selected_col_ids=selected_col_ids, + selected_col_ids=','.join(selected_col_ids), dtypes=dtypes, separator=separator, cached_index=cached_index) @@ -157,7 +157,7 @@ class AdjustLabelSmoothedCrossEntropyCriterion(_Loss): self.constraint_start = None self.constraint_end = None - if args.constraint_range is not None: + if args.constraint_range: constraint_start, constraint_end = args.constraint_range.split(',') self.constraint_start = int(constraint_start) self.constraint_end = int(constraint_end) @@ -280,35 +280,39 @@ class AdjustLabelSmoothedCrossEntropyCriterion(_Loss): return loss, nll_loss, ntokens -def get_schedule(args): +def get_schedule(scheduler): - if args.schedule == 'const': + if scheduler.name == 'const': scheduler_class = transformers.get_constant_schedule_with_warmup scheduler_args = { 'num_warmup_steps': - int(args.warmup_proportion * args.num_train_steps) + int(scheduler.warmup_proportion * scheduler.num_train_steps) } - elif args.schedule == 'linear': + elif scheduler.name == 'linear': scheduler_class = transformers.get_linear_schedule_with_warmup scheduler_args = { 'num_warmup_steps': - int(args.warmup_proportion * args.num_train_steps), - 'num_training_steps': args.num_train_steps + int(scheduler.warmup_proportion * scheduler.num_train_steps), + 'num_training_steps': + scheduler.num_train_steps } - elif args.schedule == 'cosine': + elif scheduler.name == 'cosine': scheduler_class = transformers.get_cosine_schedule_with_warmup scheduler_args = { 'num_warmup_steps': - int(args.warmup_proportion * args.num_train_steps), - 'num_training_steps': args.num_train_steps + int(scheduler.warmup_proportion * scheduler.num_train_steps), + 'num_training_steps': + scheduler.num_train_steps } - elif args.schedule == 'polynomial_decay': + elif scheduler.name == 'polynomial_decay': scheduler_class = transformers.get_polynomial_decay_schedule_with_warmup scheduler_args = { 'num_warmup_steps': - int(args.warmup_proportion * args.num_train_steps), - 'num_training_steps': args.num_train_steps, - 'lr_end': args.lr_end + int(scheduler.warmup_proportion * scheduler.num_train_steps), + 'num_training_steps': + scheduler.num_train_steps, + 'lr_end': + scheduler.lr_end } else: raise NotImplementedError diff --git a/tests/trainers/test_ofa_trainer.py b/tests/trainers/test_ofa_trainer.py index bfec1b85..af0cf2dc 100644 --- a/tests/trainers/test_ofa_trainer.py +++ b/tests/trainers/test_ofa_trainer.py @@ -1,4 +1,5 @@ # Copyright (c) Alibaba, Inc. and its affiliates. +import os import shutil import unittest @@ -13,7 +14,8 @@ class TestOfaTrainer(unittest.TestCase): model_id = '/apsarapangu/disk2/yichang.zyc/ckpt/MaaS/ofa_text-classification_mnli_large_en' self.trainer = OFATrainer(model_id) self.trainer.train() - shutil.rmtree(self.trainer.save_dir) + if os.path.exists(self.trainer.work_dir): + shutil.rmtree(self.trainer.work_dir) if __name__ == '__main__':