You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

dataset.py 6.9 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """
  16. Data operations, will be used in run_pretrain.py
  17. """
  18. import os
  19. import mindspore.common.dtype as mstype
  20. import mindspore.dataset.engine.datasets as de
  21. import mindspore.dataset.transforms.c_transforms as C
  22. from mindspore import log as logger
  23. from .config import bert_net_cfg
  24. def create_bert_dataset(epoch_size=1, device_num=1, rank=0, do_shuffle="true", enable_data_sink="true",
  25. data_sink_steps=1, data_dir=None, schema_dir=None):
  26. """create train dataset"""
  27. # apply repeat operations
  28. repeat_count = epoch_size
  29. files = os.listdir(data_dir)
  30. data_files = []
  31. for file_name in files:
  32. if "tfrecord" in file_name:
  33. data_files.append(os.path.join(data_dir, file_name))
  34. ds = de.TFRecordDataset(data_files, schema_dir if schema_dir != "" else None,
  35. columns_list=["input_ids", "input_mask", "segment_ids", "next_sentence_labels",
  36. "masked_lm_positions", "masked_lm_ids", "masked_lm_weights"],
  37. shuffle=de.Shuffle.FILES if do_shuffle == "true" else False,
  38. num_shards=device_num, shard_id=rank, shard_equal_rows=True)
  39. ori_dataset_size = ds.get_dataset_size()
  40. print('origin dataset size: ', ori_dataset_size)
  41. new_size = ori_dataset_size
  42. if enable_data_sink == "true":
  43. new_size = data_sink_steps * bert_net_cfg.batch_size
  44. ds.set_dataset_size(new_size)
  45. new_repeat_count = int(repeat_count * ori_dataset_size // ds.get_dataset_size())
  46. type_cast_op = C.TypeCast(mstype.int32)
  47. ds = ds.map(input_columns="masked_lm_ids", operations=type_cast_op)
  48. ds = ds.map(input_columns="masked_lm_positions", operations=type_cast_op)
  49. ds = ds.map(input_columns="next_sentence_labels", operations=type_cast_op)
  50. ds = ds.map(input_columns="segment_ids", operations=type_cast_op)
  51. ds = ds.map(input_columns="input_mask", operations=type_cast_op)
  52. ds = ds.map(input_columns="input_ids", operations=type_cast_op)
  53. # apply batch operations
  54. ds = ds.batch(bert_net_cfg.batch_size, drop_remainder=True)
  55. ds = ds.repeat(max(new_repeat_count, repeat_count))
  56. logger.info("data size: {}".format(ds.get_dataset_size()))
  57. logger.info("repeatcount: {}".format(ds.get_repeat_count()))
  58. return ds, new_repeat_count
  59. def create_ner_dataset(batch_size=1, repeat_count=1, assessment_method="accuracy",
  60. data_file_path=None, schema_file_path=None):
  61. """create finetune or evaluation dataset"""
  62. type_cast_op = C.TypeCast(mstype.int32)
  63. ds = de.TFRecordDataset([data_file_path], schema_file_path if schema_file_path != "" else None,
  64. columns_list=["input_ids", "input_mask", "segment_ids", "label_ids"])
  65. if assessment_method == "Spearman_correlation":
  66. type_cast_op_float = C.TypeCast(mstype.float32)
  67. ds = ds.map(input_columns="label_ids", operations=type_cast_op_float)
  68. else:
  69. ds = ds.map(input_columns="label_ids", operations=type_cast_op)
  70. ds = ds.map(input_columns="segment_ids", operations=type_cast_op)
  71. ds = ds.map(input_columns="input_mask", operations=type_cast_op)
  72. ds = ds.map(input_columns="input_ids", operations=type_cast_op)
  73. ds = ds.repeat(repeat_count)
  74. # apply shuffle operation
  75. buffer_size = 960
  76. ds = ds.shuffle(buffer_size=buffer_size)
  77. # apply batch operations
  78. ds = ds.batch(batch_size, drop_remainder=True)
  79. return ds
  80. def create_classification_dataset(batch_size=1, repeat_count=1, assessment_method="accuracy",
  81. data_file_path=None, schema_file_path=None):
  82. """create finetune or evaluation dataset"""
  83. type_cast_op = C.TypeCast(mstype.int32)
  84. ds = de.TFRecordDataset([data_file_path], schema_file_path if schema_file_path != "" else None,
  85. columns_list=["input_ids", "input_mask", "segment_ids", "label_ids"])
  86. if assessment_method == "Spearman_correlation":
  87. type_cast_op_float = C.TypeCast(mstype.float32)
  88. ds = ds.map(input_columns="label_ids", operations=type_cast_op_float)
  89. else:
  90. ds = ds.map(input_columns="label_ids", operations=type_cast_op)
  91. ds = ds.map(input_columns="segment_ids", operations=type_cast_op)
  92. ds = ds.map(input_columns="input_mask", operations=type_cast_op)
  93. ds = ds.map(input_columns="input_ids", operations=type_cast_op)
  94. ds = ds.repeat(repeat_count)
  95. # apply shuffle operation
  96. buffer_size = 960
  97. ds = ds.shuffle(buffer_size=buffer_size)
  98. # apply batch operations
  99. ds = ds.batch(batch_size, drop_remainder=True)
  100. return ds
  101. def create_squad_dataset(batch_size=1, repeat_count=1, data_file_path=None, schema_file_path=None, is_training=True):
  102. """create finetune or evaluation dataset"""
  103. type_cast_op = C.TypeCast(mstype.int32)
  104. if is_training:
  105. ds = de.TFRecordDataset([data_file_path], schema_file_path if schema_file_path != "" else None,
  106. columns_list=["input_ids", "input_mask", "segment_ids",
  107. "start_positions", "end_positions",
  108. "unique_ids", "is_impossible"])
  109. ds = ds.map(input_columns="start_positions", operations=type_cast_op)
  110. ds = ds.map(input_columns="end_positions", operations=type_cast_op)
  111. else:
  112. ds = de.TFRecordDataset([data_file_path], schema_file_path if schema_file_path != "" else None,
  113. columns_list=["input_ids", "input_mask", "segment_ids", "unique_ids"])
  114. ds = ds.map(input_columns="input_ids", operations=type_cast_op)
  115. ds = ds.map(input_columns="input_mask", operations=type_cast_op)
  116. ds = ds.map(input_columns="segment_ids", operations=type_cast_op)
  117. ds = ds.map(input_columns="segment_ids", operations=type_cast_op)
  118. ds = ds.map(input_columns="input_mask", operations=type_cast_op)
  119. ds = ds.map(input_columns="input_ids", operations=type_cast_op)
  120. ds = ds.repeat(repeat_count)
  121. # apply shuffle operation
  122. buffer_size = 960
  123. ds = ds.shuffle(buffer_size=buffer_size)
  124. # apply batch operations
  125. ds = ds.batch(batch_size, drop_remainder=True)
  126. return ds