# 10 分钟快速上手 fastNLP torch

在这个例子中，我们将使用BERT来解决conll2003数据集中的命名实体识别任务。

In [1]:
# Linux/Mac 下载数据，并解压
import platform
if platform.system() != "Windows":
    !wget https://data.deepai.org/conll2003.zip --no-check-certificate -O conll2003.zip
    !unzip conll2003.zip -d conll2003
# Windows用户请通过复制该url到浏览器下载该数据并解压

--2022-07-07 10:12:29--  https://data.deepai.org/conll2003.zip
Resolving data.deepai.org (data.deepai.org)... 138.201.36.183
Connecting to data.deepai.org (data.deepai.org)|138.201.36.183|:443... connected.
  Issued certificate has expired.
HTTP request sent, awaiting response... 200 OK
Length: 982975 (960K) [application/x-zip-compressed]
Saving to: ‘conll2003.zip’


2022-07-07 10:12:32 (653 KB/s) - ‘conll2003.zip’ saved [982975/982975]

Archive:  conll2003.zip
  inflating: conll2003/metadata      
  inflating: conll2003/test.txt      
  inflating: conll2003/train.txt     
  inflating: conll2003/valid.txt     


## 目录
接下来我们将按照以下的内容介绍在如何通过fastNLP减少工程性代码的撰写 
- 1. 数据加载
- 2. 数据预处理、数据缓存
- 3. DataLoader
- 4. 模型准备
- 5. Trainer的使用
- 6. Evaluator的使用
- 7. 其它【待补充】
    - 7.1 使用多卡进行训练、评测
    - 7.2 使用ZeRO优化
    - 7.3 通过overfit测试快速验证模型
    - 7.4 复杂Monitor的使用
    - 7.5 训练过程中，使用不同的测试函数
    - 7.6 更有效率的Sampler
    - 7.7 保存模型
    - 7.8 断点重训
    - 7.9 使用huggingface datasets
    - 7.10 使用torchmetrics来作为metric
    - 7.11 将预测结果写出到文件
    - 7.12 混合 dataset 训练
    - 7.13 logger的使用
    - 7.14 自定义分布式 Metric 。
    - 7.15 通过batch_step_fn实现R-Drop

#### 1.  数据加载
目前在``conll2003``目录下有``train.txt``, ``test.txt``与``valid.txt``三个文件，文件的格式为[conll格式](https://universaldependencies.org/format.html)，其编码格式为 [BIO](https://blog.csdn.net/HappyRocking/article/details/79716212) 类型。可以通过继承 fastNLP.io.Loader 来简化加载过程，继承了 Loader 函数后，只需要在实现读取单个文件 _load() 函数即可。

In [1]:
import sys
sys.path.append('../..')

In [2]:
from fastNLP import DataSet, Instance
from fastNLP.io import Loader


# 继承Loader之后，我们只需要实现其中_load()方法，_load()方法传入一个文件路径，返回一个fastNLP DataSet对象，其目的是读取一个文件。
class ConllLoader(Loader):
    def _load(self, path):
        ds = DataSet()
        with open(path, 'r') as f:
            segments = []
            for line in f:
                line = line.strip()
                if line == '':  # 如果为空行，说明需要切换到下一句了。
                    if segments:
                        raw_words = [s[0] for s in segments]
                        raw_target = [s[1] for s in segments]
                        # 将一个 sample 插入到 DataSet中
                        ds.append(Instance(raw_words=raw_words, raw_target=raw_target))  
                    segments = []
                else:
                    parts = line.split()
                    assert len(parts)==4
                    segments.append([parts[0], parts[-1]])
        return ds
    

# 直接使用 load() 方法加载数据集, 返回的 data_bundle 是一个 fastNLP.io.DataBundle 对象，该对象相当于将多个 dataset 放置在一起，
#  可以方便之后的预处理，DataBundle 支持的接口可以在 ！！！ 查看。
data_bundle = ConllLoader().load({
    'train': 'conll2003/train.txt',
    'test': 'conll2003/test.txt',
    'dev': 'conll2003/valid.txt'
})
"""
也可以通过 ConllLoader().load('conll2003/') 来读取，其原理是load()函数将尝试从'conll2003/'文件夹下寻找文件名称中包含了
'train'、'test'和'dev'的文件，并分别读取将其命名为'train'、'test'和'dev'（如文件夹中同一个关键字出现在了多个文件名中将导致报错，
此时请通过dict的方式传入路径信息）。但在我们这里的数据里，没有文件包含dev，所以无法直接使用文件夹读取，转而通过dict的方式传入读取的路径，
该dict的key也将作为读取的数据集的名称，value即对应的文件路径。
"""

print(data_bundle)  # 打印 data_bundle 可以查看包含的 DataSet 
# data_bundle.get_dataset('train')  # 可以获取单个 dataset

In total 3 datasets:
	train has 14987 instances.
	test has 3684 instances.
	dev has 3466 instances.



#### 2.  数据预处理
接下来，我们将演示如何通过fastNLP提供的apply函数方便快捷地进行预处理。我们需要进行的预处理操作有：  
（1）使用BertTokenizer将文本转换为index；同时记录每个word被bpe之后第一个bpe的index，用于得到word的hidden state；  
（2）使用[Vocabulary](../fastNLP)来将raw_target转换为序号。  

In [3]:
# fastNLP 中提供了BERT, RoBERTa, GPT, BART 模型，更多的预训练模型请直接使用transformers
from fastNLP.transformers.torch import BertTokenizer
from fastNLP import cache_results, Vocabulary

# 使用cache_results来装饰函数，会将函数的返回结果缓存到'caches/{param_hash_id}_cache.pkl'路径中（其中{param_hash_id}是根据
#   传递给 process_data 函数参数决定的，因此当函数的参数变化时，会再生成新的缓存文件。如果需要重新生成新的缓存，(a) 可以在调用process_data
#   函数时，额外传入一个_refresh=True的参数; 或者（b）删除相应的缓存文件。此外，保存结果时，cache_results默认还会
#   记录 process_data 函数源码的hash值，当其源码发生了变动，直接读取缓存会发出警告，以防止在修改预处理代码之后，忘记刷新缓存。）
@cache_results('caches/cache.pkl')
def process_data(data_bundle, model_name):
    tokenizer = BertTokenizer.from_pretrained(model_name)
    def bpe(raw_words):
        bpes = [tokenizer.cls_token_id]
        first = [0]
        first_index = 1  # 记录第一个bpe的位置
        for word in raw_words:
            bpe = tokenizer.encode(word, add_special_tokens=False)
            bpes.extend(bpe)
            first.append(first_index)
            first_index += len(bpe)
        bpes.append(tokenizer.sep_token_id)
        first.append(first_index)
        return {'input_ids': bpes, 'input_len': len(bpes), 'first': first, 'first_len': len(raw_words)}
    # 对data_bundle中每个dataset的每一条数据中的raw_words使用bpe函数，并且将返回的结果加入到每条数据中。
    data_bundle.apply_field_more(bpe, field_name='raw_words', num_proc=4)
    # 对应我们还有 apply_field() 函数，该函数和 apply_field_more() 的区别在于传入到 apply_field() 中的函数应该返回一个 field 的
    #   内容（即不需要用dict包裹了）。此外，我们还提供了 data_bundle.apply() ，传入 apply() 的函数需要支持传入一个Instance对象，
    #   更多信息可以参考对应的文档。
    
    # tag的词表，由于这是词表，所以不需要有padding和unk
    tag_vocab = Vocabulary(padding=None, unknown=None)
    # 从 train 数据的 raw_target 中获取建立词表
    tag_vocab.from_dataset(data_bundle.get_dataset('train'), field_name='raw_target')
    # 使用词表将每个 dataset 中的raw_target转为数字，并且将写入到target这个field中
    tag_vocab.index_dataset(data_bundle.datasets.values(), field_name='raw_target', new_field_name='target')
    
    # 可以将 vocabulary 绑定到 data_bundle 上，方便之后使用。
    data_bundle.set_vocab(tag_vocab, field_name='target')
    
    return data_bundle, tokenizer

data_bundle, tokenizer = process_data(data_bundle, 'bert-base-cased', _refresh=True)  # 第一次调用耗时较长，第二次调用则会直接读取缓存的文件
# data_bundle = process_data(data_bundle, 'bert-base-uncased')  # 由于参数变化，fastNLP 会再次生成新的缓存文件。    

Output()

Output()

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



### 3. DataLoader  
由于现在的深度学习算法大都基于 mini-batch 进行优化，因此需要将多个 sample 组合成一个 batch 再输入到模型之中。在自然语言处理中，不同的 sample 往往长度不一致，需要进行 padding 操作。在fastNLP中，我们使用 fastNLP.TorchDataLoader 帮助用户快速进行 padding ，我们使用了 !!!fastNLP.Collator!!! 对象来进行 pad ，Collator 会在迭代过程中根据第一个 batch 的数据自动判定每个 field 是否可以进行 pad ，可以通过 Collator.set_pad() 函数修改某个 field 的 pad 行为。

In [4]:
from fastNLP import prepare_dataloader

# 将 data_bundle 中每个 dataset 取出并构造出相应的 DataLoader 对象。返回的 dls 是一个 dict ，包含了 'train', 'test', 'dev' 三个
#   fastNLP.TorchDataLoader 对象。
dls = prepare_dataloader(data_bundle, batch_size=24) 


# fastNLP 将默认尝试对所有 field 都进行 pad ，如果当前 field 是不可 pad 的类型，则不进行pad；如果是可以 pad 的类型
#   默认使用 0 进行 pad 。
for dl in dls.values():
    # 可以通过 set_pad 修改 padding 的行为。
    dl.set_pad('input_ids', pad_val=tokenizer.pad_token_id)
    # 如果希望忽略某个 field ，可以通过 set_ignore 方法。
    dl.set_ignore('raw_target')
    dl.set_pad('target', pad_val=-100)
# 另一种设置的方法是，可以在 dls = prepare_dataloader(data_bundle, batch_size=32) 之前直接调用 
#  data_bundle.set_pad('input_ids', pad_val=tokenizer.pad_token_id); data_bundle.set_ignore('raw_target')来进行设置。
#  DataSet 也支持这两个方法。
# 若此时调用 batch = next(dls['train'])，则 batch 是一个 dict ，其中包含了
#  'input_ids': torch.LongTensor([batch_size, max_len])
#  'input_len': torch.LongTensor([batch_size])
#  'first': torch.LongTensor([batch_size, max_len'])
#  'first_len': torch.LongTensor([batch_size])
#  'target': torch.LongTensor([batch_size, max_len'-2])
#  'raw_words': List[List[str]]  # 因为无法判断，所以 Collator 不会做任何处理

### 4. 模型准备
传入给fastNLP的模型，需要有两个特殊的方法``train_step``、``evaluate_step``，前者默认在 fastNLP.Trainer 中进行调用，后者默认在 fastNLP.Evaluator 中调用。如果模型中没有``train_step``方法，则Trainer会直接使用模型的``forward``函数；如果模型没有``evaluate_step``方法，则Evaluator会直接使用模型的``forward``函数。``train_step``方法（或当其不存在时，``forward``方法）的返回值必须为 dict 类型，并且必须包含``loss``这个 key 。

此外fastNLP会使用形参名匹配的方式进行参数传递，例如以下模型
```python
class Model(nn.Module):
   def train_step(self, x, y):
        return {'loss': (x-y).abs().mean()}
```
fastNLP将尝试从 DataLoader 返回的 batch(假设包含的 key 为 input_ids, target) 中寻找 'x' 和 'y' 这两个 key ，如果没有找到则会报错。有以下的方法可以解决报错
- 修改 train_step 的参数为(input_ids, target)，以保证和 DataLoader 返回的 batch 中的 key 匹配
- 修改 DataLoader 中返回 batch 的 key 的名字为 (x, y)
- 在 Trainer 中传入参数 train_input_mapping={'input_ids': 'x', 'target': 'y'} 将输入进行映射，train_input_mapping 也可以是一个函数，更多 train_input_mapping 的介绍可以参考文档。

``evaluate_step``也是使用同样的匹配方式，前两条解决方法是一致的，第三种解决方案中，需要在 Evaluator 中传入 evaluate_input_mapping={'input_ids': 'x', 'target': 'y'}。

In [5]:
import torch
from torch import nn
from torch.nn.utils.rnn import pad_sequence
from fastNLP.transformers.torch import BertModel
from fastNLP import seq_len_to_mask
import torch.nn.functional as F


class BertNER(nn.Module):
    def __init__(self, model_name, num_class, tag_vocab=None):
        super().__init__()
        self.bert = BertModel.from_pretrained(model_name)
        self.mlp = nn.Sequential(nn.Linear(self.bert.config.hidden_size, self.bert.config.hidden_size),
                                nn.Dropout(0.3),
                                nn.Linear(self.bert.config.hidden_size, num_class))
        self.tag_vocab = tag_vocab  # 这里传入 tag_vocab 的目的是为了演示 constrined_decode 
        if tag_vocab is not None:
            self._init_constrained_transition()
    
    def forward(self, input_ids, input_len, first):
        attention_mask = seq_len_to_mask(input_len)
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = outputs.last_hidden_state
        first = first.unsqueeze(-1).repeat(1, 1, last_hidden_state.size(-1))
        first_bpe_state = last_hidden_state.gather(dim=1, index=first)
        first_bpe_state = first_bpe_state[:, 1:-1]  # 删除 cls 和 sep
        
        pred = self.mlp(first_bpe_state)
        return {'pred': pred}
    
    def train_step(self, input_ids, input_len, first, target):
        pred = self(input_ids, input_len, first)['pred']
        loss = F.cross_entropy(pred.transpose(1, 2), target)
        return {'loss': loss}
    
    def evaluate_step(self, input_ids, input_len, first):
        pred = self(input_ids, input_len, first)['pred'].argmax(dim=-1)
        return {'pred': pred}
    
    def constrained_decode(self, input_ids, input_len, first, first_len):
        # 这个函数在推理时，将保证解码出来的 tag 一定不与前一个 tag 矛盾【例如一定不会出现 B-person 后面接着 I-Location 的情况】
        # 本身这个需求可以在 Metric 中实现，这里在模型中实现的目的是为了方便演示：如何在fastNLP中使用不同的评测函数
        pred = self(input_ids, input_len, first)['pred']
        cons_pred = []
        for _pred, _len in zip(pred, first_len):
            _pred = _pred[:_len]
            tags = [_pred[0].argmax(dim=-1).item()]  # 这里就不考虑第一个位置非法的情况了
            for i in range(1, _len):
                tags.append((_pred[i] + self.transition[tags[-1]]).argmax().item())
            cons_pred.append(torch.LongTensor(tags))
        cons_pred = pad_sequence(cons_pred, batch_first=True)
        return {'pred': cons_pred}
    
    def _init_constrained_transition(self):
        from fastNLP.modules.torch import allowed_transitions
        allowed_trans = allowed_transitions(self.tag_vocab)
        transition = torch.ones((len(self.tag_vocab), len(self.tag_vocab)))*-100000.0
        for s, e in allowed_trans:
            transition[s, e] = 0
        self.register_buffer('transition', transition)

model = BertNER('bert-base-uncased', len(data_bundle.get_vocab('target')), data_bundle.get_vocab('target'))

### Trainer 的使用
fastNLP 的 Trainer 是用于对模型进行训练的部件。

In [8]:
from torch import optim
from fastNLP import Trainer, LoadBestModelCallback, TorchWarmupCallback
from fastNLP import SpanFPreRecMetric

optimizer = optim.AdamW(model.parameters(), lr=2e-5)
callbacks = [
    LoadBestModelCallback(),   # 用于在训练结束之后加载性能最好的model的权重
    TorchWarmupCallback()
]    

trainer = Trainer(model=model, train_dataloader=dls['train'], optimizers=optimizer, 
                  evaluate_dataloaders=dls['dev'], 
                  metrics={'f': SpanFPreRecMetric(tag_vocab=data_bundle.get_vocab('target'))}, 
                  n_epochs=1, callbacks=callbacks, 
                  # 在评测时将 dataloader 中的 first_len 映射 seq_len, 因为 Accuracy.update 接口需要输入一个名为 seq_len 的参数
                  evaluate_input_mapping={'first_len': 'seq_len'}, overfit_batches=0,
                  device=0, monitor='f#f', fp16=False)  # fp16 为 True 的话，将使用 float16 进行训练。
trainer.run()

Output()

Output()

### Evaluator的使用
fastNLP中用于评测数据的对象。

In [9]:
from fastNLP import Evaluator
from fastNLP import SpanFPreRecMetric

evaluator = Evaluator(model=model, dataloaders=dls['test'], 
                      metrics={'f': SpanFPreRecMetric(tag_vocab=data_bundle.get_vocab('target'))}, 
                      evaluate_input_mapping={'first_len': 'seq_len'}, 
                      device=0)
evaluator.run()

Output()

{'f#f': 0.390326, 'pre#f': 0.414741, 'rec#f': 0.368626}

In [None]:
# 如果想评测一下使用 constrained decoding的性能，则可以通过传入 evaluate_fn 指定使用的函数
def input_mapping(x):
    x['seq_len'] = x['first_len']
    return x
evaluator = Evaluator(model=model, dataloaders=dls['test'], device=0,
                      metrics={'f': SpanFPreRecMetric(tag_vocab=data_bundle.get_vocab('target'))},
                      evaluate_fn='constrained_decode',
                      # 如果将 first_len 重新命名为了 seq_len, 将导致 constrained_decode 的输入缺少 first_len 参数，因此
                      #   额外重复一下 'first_len': 'first_len'，使得这个参数不会消失。
                      evaluate_input_mapping=input_mapping)
evaluator.run()

Output()