""" 此模块可以非常方便的测试模型。 若你的模型属于:文本分类,序列标注,自然语言推理(NLI),可以直接使用此模块测试 若模型不属于上述类别,也可以自己准备假数据,设定loss和metric进行测试 此模块的测试仅保证模型能使用fastNLP进行训练和测试,不测试模型实际性能 Example:: # import 全大写变量... from model_runner import * # 测试一个文本分类模型 init_emb = (VOCAB_SIZE, 50) model = SomeModel(init_emb, num_cls=NUM_CLS) RUNNER.run_model_with_task(TEXT_CLS, model) # 序列标注模型 RUNNER.run_model_with_task(POS_TAGGING, model) # NLI模型 RUNNER.run_model_with_task(NLI, model) # 自定义模型 RUNNER.run_model(model, data=get_mydata(), loss=Myloss(), metrics=Mymetric()) """ from fastNLP import Trainer, Tester, DataSet, Callback from fastNLP import AccuracyMetric from fastNLP import CrossEntropyLoss from fastNLP.core.const import Const as C from random import randrange VOCAB_SIZE = 100 NUM_CLS = 100 MAX_LEN = 10 N_SAMPLES = 100 N_EPOCHS = 1 BATCH_SIZE = 5 TEXT_CLS = 'text_cls' POS_TAGGING = 'pos_tagging' NLI = 'nli' class ModelRunner(): class Checker(Callback): def on_backward_begin(self, loss): assert loss.to('cpu').numpy().isfinate() def gen_seq(self, length, vocab_size): """generate fake sequence indexes with given length""" # reserve 0 for padding return [randrange(1, vocab_size) for _ in range(length)] def gen_var_seq(self, max_len, vocab_size): """generate fake sequence indexes in variant length""" length = randrange(3, max_len) # at least 3 words in a seq return self.gen_seq(length, vocab_size) def prepare_text_classification_data(self): index = 'index' ds = DataSet({index: list(range(N_SAMPLES))}) ds.apply_field(lambda x: self.gen_var_seq(MAX_LEN, VOCAB_SIZE), field_name=index, new_field_name=C.INPUT, is_input=True) ds.apply_field(lambda x: randrange(NUM_CLS), field_name=index, new_field_name=C.TARGET, is_target=True) ds.apply_field(len, C.INPUT, C.INPUT_LEN, is_input=True) return ds def prepare_pos_tagging_data(self): index = 'index' ds = DataSet({index: list(range(N_SAMPLES))}) ds.apply_field(lambda x: self.gen_var_seq(MAX_LEN, VOCAB_SIZE), field_name=index, new_field_name=C.INPUT, is_input=True) ds.apply_field(lambda x: self.gen_seq(len(x), NUM_CLS), field_name=C.INPUT, new_field_name=C.TARGET, is_target=True) ds.apply_field(len, C.INPUT, C.INPUT_LEN, is_input=True, is_target=True) return ds def prepare_nli_data(self): index = 'index' ds = DataSet({index: list(range(N_SAMPLES))}) ds.apply_field(lambda x: self.gen_var_seq(MAX_LEN, VOCAB_SIZE), field_name=index, new_field_name=C.INPUTS(0), is_input=True) ds.apply_field(lambda x: self.gen_var_seq(MAX_LEN, VOCAB_SIZE), field_name=index, new_field_name=C.INPUTS(1), is_input=True) ds.apply_field(lambda x: randrange(NUM_CLS), field_name=index, new_field_name=C.TARGET, is_target=True) ds.apply_field(len, C.INPUTS(0), C.INPUT_LENS(0), is_input=True, is_target=True) ds.apply_field(len, C.INPUTS(1), C.INPUT_LENS(1), is_input = True, is_target = True) ds.set_input(C.INPUTS(0), C.INPUTS(1)) ds.set_target(C.TARGET) return ds def run_text_classification(self, model, data=None): if data is None: data = self.prepare_text_classification_data() loss = CrossEntropyLoss(pred=C.OUTPUT, target=C.TARGET) metric = AccuracyMetric(pred=C.OUTPUT, target=C.TARGET) self.run_model(model, data, loss, metric) def run_pos_tagging(self, model, data=None): if data is None: data = self.prepare_pos_tagging_data() loss = CrossEntropyLoss(pred=C.OUTPUT, target=C.TARGET, padding_idx=0) metric = AccuracyMetric(pred=C.OUTPUT, target=C.TARGET, seq_len=C.INPUT_LEN) self.run_model(model, data, loss, metric) def run_nli(self, model, data=None): if data is None: data = self.prepare_nli_data() loss = CrossEntropyLoss(pred=C.OUTPUT, target=C.TARGET) metric = AccuracyMetric(pred=C.OUTPUT, target=C.TARGET) self.run_model(model, data, loss, metric) def run_model(self, model, data, loss, metrics): """run a model, test if it can run with fastNLP""" print('testing model:', model.__class__.__name__) tester = Tester(data=data, model=model, metrics=metrics, batch_size=BATCH_SIZE, verbose=0) before_train = tester.test() trainer = Trainer(train_data=data, model=model, loss=loss, batch_size=BATCH_SIZE, n_epochs=N_EPOCHS, dev_data=None, save_path=None, use_tqdm=False) trainer.train(load_best_model=False) after_train = tester.test() for metric_name, v1 in before_train.items(): assert metric_name in after_train # # at least we can sure model params changed, even if we don't know performance # v2 = after_train[metric_name] # assert v1 != v2 def run_model_with_task(self, task, model): """run a model with certain task""" TASKS = { TEXT_CLS: self.run_text_classification, POS_TAGGING: self.run_pos_tagging, NLI: self.run_nli, } assert task in TASKS TASKS[task](model) RUNNER = ModelRunner()