import argparse from operator import add from simhash import Simhash, SimhashIndex import jieba import os,random import re, multiprocessing, glob stopword_path = 'stopwords.txt' def divideIntoNstrand(listTemp, n): k = int(len(listTemp)/n) for i in range(0, len(listTemp), k): yield listTemp[i:i + k] def distance(value1, value2, dim_bits=64): x = (value1 ^ value2) & ((1 << dim_bits) - 1) ans = 0 while x: ans += 1 x &= x - 1 return ans # 创建停用词list def stopwordslist(filepath): stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()] return stopwords # 对句子进行jieba分词 def seg_sentence(sentence, stopwords): sentence_seged = jieba.cut(sentence.strip()) # stopwords = stopwordslist(stopword_path) # 这里加载停用词的路径 outstr = '' for word in sentence_seged: if word not in stopwords: if word != '\t': outstr += word outstr += " " #print("seg_instance:{}".format(outstr)) return outstr def process_oneFile(input,save,save_bad_dir,stopwords,file_id): f = open(input,'r',encoding='utf-8') paras = f.read().split('\n\n') hashValues = map(lambda x: Simhash(seg_sentence(x,stopwords), f=64).value, paras) ids_hashValues = list(enumerate(hashValues)) bad_ids = set() para_len = len(ids_hashValues) for ids_hashValues_tmp in divideIntoNstrand(ids_hashValues, 2): for sample1 in ids_hashValues_tmp: if sample1[0]%5000 == 0: print(sample1[0]/para_len, file_id) for sample2 in ids_hashValues_tmp: if distance(sample1[1],sample2[1]) < 3 and sample1[0] != sample2[0]: bad_ids.add(sample1[0]) bad_ids.add(sample2[0]) bad_ids = list(bad_ids) bad_ids.sort(reverse=True) print('num bad paras: ', len(bad_ids)) print('total paras: ', para_len) bad_para = [] for key in bad_ids: bad_para.append(paras[key]) paras.pop(key) f_w_b = open(save_bad_dir,'w',encoding='utf-8') f_w_b.writelines('\n\n'.join(bad_para)) f_w_b.close() f_w = open(save,'w',encoding='utf-8') f_w.writelines('\n\n'.join(paras)) f_w.close() print(input) print(save) parser = argparse.ArgumentParser() parser.add_argument('--rankOfCluster', type=str, default='0to1of1') args = parser.parse_args() num_processes = 200 pool = multiprocessing.Pool(processes=num_processes) #读取文件名,并排序 input_path = '/raid-50/commmon-crawl/common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08/train/*.txt' input_paths = list(glob.glob(input_path)) input_paths.sort() random.seed(123) random.shuffle(input_paths) print('total files : ', len(input_paths)) #读取已处理文件 tmp = '/raid-50/commmon-crawl/common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08-dedup/train/*.txt' processed_files = list(glob.glob(tmp)) print('num of processed : ', len(processed_files)) #删除已处理文件 input_paths = list(set(input_paths)-set(processed_files)) #如果要多机器跑的话要先分块 def divideIntoNstrand(listTemp, n): twoList = [ [] for i in range(n)] for i,e in enumerate(listTemp): twoList[i%n].append(e) return twoList #多机器协同 all = int(args.rankOfCluster.split('of')[1]) rank_start = int(args.rankOfCluster.split('of')[0].split('to')[0]) rank_end = int(args.rankOfCluster.split('of')[0].split('to')[1]) print(rank_start,' to ',rank_end,' of ',all) input_paths = divideIntoNstrand(input_paths,all)[rank_start:rank_end] input_paths = [i for item in input_paths for i in item] print('num files in this machine: ', len(input_paths)) #数据保存的目录 save_paths = [] save_paths_bad = [] grop = re.compile('common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08') for i,input in enumerate(input_paths): save_paths.append(grop.sub('common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08-dedup',input)) save_paths_bad.append(save_paths[-1].replace('train', 'bad')) files_rank = list(set(save_paths)-set(processed_files)) print('num files need to be processed : ', len(files_rank)) stopwords = stopwordslist(stopword_path) # print(save_paths) i = 0 num_files = len(input_paths) for input,save,save_bad in zip(input_paths, save_paths, save_paths_bad): i += 1 if save not in processed_files: pool.apply_async(process_oneFile, (input, save, save_bad, stopwords, i/num_files,)) else: # print('Processed : ', save) pass pool.close() pool.join()