You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

dedup_simhash.py 4.6 kB

4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import argparse
  2. from operator import add
  3. from simhash import Simhash, SimhashIndex
  4. import jieba
  5. import os,random
  6. import re, multiprocessing, glob
  7. stopword_path = 'stopwords.txt'
  8. def divideIntoNstrand(listTemp, n):
  9. k = int(len(listTemp)/n)
  10. for i in range(0, len(listTemp), k):
  11. yield listTemp[i:i + k]
  12. def distance(value1, value2, dim_bits=64):
  13. x = (value1 ^ value2) & ((1 << dim_bits) - 1)
  14. ans = 0
  15. while x:
  16. ans += 1
  17. x &= x - 1
  18. return ans
  19. # 创建停用词list
  20. def stopwordslist(filepath):
  21. stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()]
  22. return stopwords
  23. # 对句子进行jieba分词
  24. def seg_sentence(sentence, stopwords):
  25. sentence_seged = jieba.cut(sentence.strip())
  26. # stopwords = stopwordslist(stopword_path)
  27. # 这里加载停用词的路径
  28. outstr = ''
  29. for word in sentence_seged:
  30. if word not in stopwords:
  31. if word != '\t':
  32. outstr += word
  33. outstr += " "
  34. #print("seg_instance:{}".format(outstr))
  35. return outstr
  36. def process_oneFile(input,save,save_bad_dir,stopwords,file_id):
  37. f = open(input,'r',encoding='utf-8')
  38. paras = f.read().split('\n\n')
  39. hashValues = map(lambda x: Simhash(seg_sentence(x,stopwords), f=64).value, paras)
  40. ids_hashValues = list(enumerate(hashValues))
  41. bad_ids = set()
  42. para_len = len(ids_hashValues)
  43. for ids_hashValues_tmp in divideIntoNstrand(ids_hashValues, 2):
  44. for sample1 in ids_hashValues_tmp:
  45. if sample1[0]%5000 == 0:
  46. print(sample1[0]/para_len, file_id)
  47. for sample2 in ids_hashValues_tmp:
  48. if distance(sample1[1],sample2[1]) < 3 and sample1[0] != sample2[0]:
  49. bad_ids.add(sample1[0])
  50. bad_ids.add(sample2[0])
  51. bad_ids = list(bad_ids)
  52. bad_ids.sort(reverse=True)
  53. print('num bad paras: ', len(bad_ids))
  54. print('total paras: ', para_len)
  55. bad_para = []
  56. for key in bad_ids:
  57. bad_para.append(paras[key])
  58. paras.pop(key)
  59. f_w_b = open(save_bad_dir,'w',encoding='utf-8')
  60. f_w_b.writelines('\n\n'.join(bad_para))
  61. f_w_b.close()
  62. f_w = open(save,'w',encoding='utf-8')
  63. f_w.writelines('\n\n'.join(paras))
  64. f_w.close()
  65. print(input)
  66. print(save)
  67. parser = argparse.ArgumentParser()
  68. parser.add_argument('--rankOfCluster', type=str, default='0to1of1')
  69. args = parser.parse_args()
  70. num_processes = 200
  71. pool = multiprocessing.Pool(processes=num_processes)
  72. #读取文件名,并排序
  73. input_path = '/raid-50/commmon-crawl/common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08/train/*.txt'
  74. input_paths = list(glob.glob(input_path))
  75. input_paths.sort()
  76. random.seed(123)
  77. random.shuffle(input_paths)
  78. print('total files : ', len(input_paths))
  79. #读取已处理文件
  80. tmp = '/raid-50/commmon-crawl/common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08-dedup/train/*.txt'
  81. processed_files = list(glob.glob(tmp))
  82. print('num of processed : ', len(processed_files))
  83. #删除已处理文件
  84. input_paths = list(set(input_paths)-set(processed_files))
  85. #如果要多机器跑的话要先分块
  86. def divideIntoNstrand(listTemp, n):
  87. twoList = [ [] for i in range(n)]
  88. for i,e in enumerate(listTemp):
  89. twoList[i%n].append(e)
  90. return twoList
  91. #多机器协同
  92. all = int(args.rankOfCluster.split('of')[1])
  93. rank_start = int(args.rankOfCluster.split('of')[0].split('to')[0])
  94. rank_end = int(args.rankOfCluster.split('of')[0].split('to')[1])
  95. print(rank_start,' to ',rank_end,' of ',all)
  96. input_paths = divideIntoNstrand(input_paths,all)[rank_start:rank_end]
  97. input_paths = [i for item in input_paths for i in item]
  98. print('num files in this machine: ', len(input_paths))
  99. #数据保存的目录
  100. save_paths = []
  101. save_paths_bad = []
  102. grop = re.compile('common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08')
  103. for i,input in enumerate(input_paths):
  104. save_paths.append(grop.sub('common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08-dedup',input))
  105. save_paths_bad.append(save_paths[-1].replace('train', 'bad'))
  106. files_rank = list(set(save_paths)-set(processed_files))
  107. print('num files need to be processed : ', len(files_rank))
  108. stopwords = stopwordslist(stopword_path)
  109. # print(save_paths)
  110. i = 0
  111. num_files = len(input_paths)
  112. for input,save,save_bad in zip(input_paths, save_paths, save_paths_bad):
  113. i += 1
  114. if save not in processed_files:
  115. pool.apply_async(process_oneFile, (input, save, save_bad, stopwords, i/num_files,))
  116. else:
  117. # print('Processed : ', save)
  118. pass
  119. pool.close()
  120. pool.join()