|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- import argparse
- from operator import add
- from simhash import Simhash, SimhashIndex
- import jieba
- import os,random
- import re, multiprocessing, glob
-
- stopword_path = 'stopwords.txt'
- def divideIntoNstrand(listTemp, n):
- k = int(len(listTemp)/n)
- for i in range(0, len(listTemp), k):
- yield listTemp[i:i + k]
-
- def distance(value1, value2, dim_bits=64):
- x = (value1 ^ value2) & ((1 << dim_bits) - 1)
- ans = 0
- while x:
- ans += 1
- x &= x - 1
- return ans
-
- # 创建停用词list
- def stopwordslist(filepath):
- stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()]
- return stopwords
-
- # 对句子进行jieba分词
- def seg_sentence(sentence, stopwords):
- sentence_seged = jieba.cut(sentence.strip())
- # stopwords = stopwordslist(stopword_path)
- # 这里加载停用词的路径
- outstr = ''
- for word in sentence_seged:
- if word not in stopwords:
- if word != '\t':
- outstr += word
- outstr += " "
- #print("seg_instance:{}".format(outstr))
- return outstr
-
- def process_oneFile(input,save,save_bad_dir,stopwords,file_id):
-
- f = open(input,'r',encoding='utf-8')
- paras = f.read().split('\n\n')
- hashValues = map(lambda x: Simhash(seg_sentence(x,stopwords), f=64).value, paras)
- ids_hashValues = list(enumerate(hashValues))
-
- bad_ids = set()
- para_len = len(ids_hashValues)
-
- for ids_hashValues_tmp in divideIntoNstrand(ids_hashValues, 2):
- for sample1 in ids_hashValues_tmp:
- if sample1[0]%5000 == 0:
- print(sample1[0]/para_len, file_id)
- for sample2 in ids_hashValues_tmp:
- if distance(sample1[1],sample2[1]) < 3 and sample1[0] != sample2[0]:
- bad_ids.add(sample1[0])
- bad_ids.add(sample2[0])
-
- bad_ids = list(bad_ids)
- bad_ids.sort(reverse=True)
- print('num bad paras: ', len(bad_ids))
- print('total paras: ', para_len)
-
- bad_para = []
- for key in bad_ids:
- bad_para.append(paras[key])
- paras.pop(key)
-
- f_w_b = open(save_bad_dir,'w',encoding='utf-8')
- f_w_b.writelines('\n\n'.join(bad_para))
- f_w_b.close()
-
- f_w = open(save,'w',encoding='utf-8')
- f_w.writelines('\n\n'.join(paras))
- f_w.close()
-
- print(input)
- print(save)
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--rankOfCluster', type=str, default='0to1of1')
- args = parser.parse_args()
-
- num_processes = 200
- pool = multiprocessing.Pool(processes=num_processes)
-
- #读取文件名,并排序
- input_path = '/raid-50/commmon-crawl/common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08/train/*.txt'
- input_paths = list(glob.glob(input_path))
- input_paths.sort()
- random.seed(123)
- random.shuffle(input_paths)
- print('total files : ', len(input_paths))
-
- #读取已处理文件
- tmp = '/raid-50/commmon-crawl/common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08-dedup/train/*.txt'
- processed_files = list(glob.glob(tmp))
- print('num of processed : ', len(processed_files))
-
- #删除已处理文件
- input_paths = list(set(input_paths)-set(processed_files))
-
- #如果要多机器跑的话要先分块
- def divideIntoNstrand(listTemp, n):
- twoList = [ [] for i in range(n)]
- for i,e in enumerate(listTemp):
- twoList[i%n].append(e)
- return twoList
-
- #多机器协同
- all = int(args.rankOfCluster.split('of')[1])
- rank_start = int(args.rankOfCluster.split('of')[0].split('to')[0])
- rank_end = int(args.rankOfCluster.split('of')[0].split('to')[1])
- print(rank_start,' to ',rank_end,' of ',all)
- input_paths = divideIntoNstrand(input_paths,all)[rank_start:rank_end]
- input_paths = [i for item in input_paths for i in item]
- print('num files in this machine: ', len(input_paths))
-
- #数据保存的目录
- save_paths = []
- save_paths_bad = []
- grop = re.compile('common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08')
- for i,input in enumerate(input_paths):
- save_paths.append(grop.sub('common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08-dedup',input))
- save_paths_bad.append(save_paths[-1].replace('train', 'bad'))
-
- files_rank = list(set(save_paths)-set(processed_files))
- print('num files need to be processed : ', len(files_rank))
-
- stopwords = stopwordslist(stopword_path)
- # print(save_paths)
- i = 0
- num_files = len(input_paths)
- for input,save,save_bad in zip(input_paths, save_paths, save_paths_bad):
- i += 1
- if save not in processed_files:
- pool.apply_async(process_oneFile, (input, save, save_bad, stopwords, i/num_files,))
- else:
- # print('Processed : ', save)
- pass
- pool.close()
- pool.join()
|