diff --git a/src/dedup_simhash.py b/src/dedup_simhash.py new file mode 100755 index 00000000..9807b331 --- /dev/null +++ b/src/dedup_simhash.py @@ -0,0 +1,143 @@ +import argparse +from operator import add +from simhash import Simhash, SimhashIndex +import jieba +import os,random +import re, multiprocessing, glob + +stopword_path = 'stopwords.txt' +def divideIntoNstrand(listTemp, n): + k = int(len(listTemp)/n) + for i in range(0, len(listTemp), k): + yield listTemp[i:i + k] + +def distance(value1, value2, dim_bits=64): + x = (value1 ^ value2) & ((1 << dim_bits) - 1) + ans = 0 + while x: + ans += 1 + x &= x - 1 + return ans + +# 创建停用词list +def stopwordslist(filepath): + stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()] + return stopwords + +# 对句子进行jieba分词 +def seg_sentence(sentence, stopwords): + sentence_seged = jieba.cut(sentence.strip()) + # stopwords = stopwordslist(stopword_path) + # 这里加载停用词的路径 + outstr = '' + for word in sentence_seged: + if word not in stopwords: + if word != '\t': + outstr += word + outstr += " " + #print("seg_instance:{}".format(outstr)) + return outstr + +def process_oneFile(input,save,save_bad_dir,stopwords,file_id): + + f = open(input,'r',encoding='utf-8') + paras = f.read().split('\n\n') + hashValues = map(lambda x: Simhash(seg_sentence(x,stopwords), f=64).value, paras) + ids_hashValues = list(enumerate(hashValues)) + + bad_ids = set() + para_len = len(ids_hashValues) + + for ids_hashValues_tmp in divideIntoNstrand(ids_hashValues, 2): + for sample1 in ids_hashValues_tmp: + if sample1[0]%5000 == 0: + print(sample1[0]/para_len, file_id) + for sample2 in ids_hashValues_tmp: + if distance(sample1[1],sample2[1]) < 3 and sample1[0] != sample2[0]: + bad_ids.add(sample1[0]) + bad_ids.add(sample2[0]) + + bad_ids = list(bad_ids) + bad_ids.sort(reverse=True) + print('num bad paras: ', len(bad_ids)) + print('total paras: ', para_len) + + bad_para = [] + for key in bad_ids: + bad_para.append(paras[key]) + paras.pop(key) + + f_w_b = open(save_bad_dir,'w',encoding='utf-8') + f_w_b.writelines('\n\n'.join(bad_para)) + f_w_b.close() + + f_w = open(save,'w',encoding='utf-8') + f_w.writelines('\n\n'.join(paras)) + f_w.close() + + print(input) + print(save) + +parser = argparse.ArgumentParser() +parser.add_argument('--rankOfCluster', type=str, default='0to1of1') +args = parser.parse_args() + +num_processes = 200 +pool = multiprocessing.Pool(processes=num_processes) + +#读取文件名,并排序 +input_path = '/raid-50/commmon-crawl/common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08/train/*.txt' +input_paths = list(glob.glob(input_path)) +input_paths.sort() +random.seed(123) +random.shuffle(input_paths) +print('total files : ', len(input_paths)) + +#读取已处理文件 +tmp = '/raid-50/commmon-crawl/common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08-dedup/train/*.txt' +processed_files = list(glob.glob(tmp)) +print('num of processed : ', len(processed_files)) + +#删除已处理文件 +input_paths = list(set(input_paths)-set(processed_files)) + +#如果要多机器跑的话要先分块 +def divideIntoNstrand(listTemp, n): + twoList = [ [] for i in range(n)] + for i,e in enumerate(listTemp): + twoList[i%n].append(e) + return twoList + +#多机器协同 +all = int(args.rankOfCluster.split('of')[1]) +rank_start = int(args.rankOfCluster.split('of')[0].split('to')[0]) +rank_end = int(args.rankOfCluster.split('of')[0].split('to')[1]) +print(rank_start,' to ',rank_end,' of ',all) +input_paths = divideIntoNstrand(input_paths,all)[rank_start:rank_end] +input_paths = [i for item in input_paths for i in item] +print('num files in this machine: ', len(input_paths)) + +#数据保存的目录 +save_paths = [] +save_paths_bad = [] +grop = re.compile('common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08') +for i,input in enumerate(input_paths): + save_paths.append(grop.sub('common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08-dedup',input)) + save_paths_bad.append(save_paths[-1].replace('train', 'bad')) + +files_rank = list(set(save_paths)-set(processed_files)) +print('num files need to be processed : ', len(files_rank)) + +stopwords = stopwordslist(stopword_path) +# print(save_paths) +i = 0 +num_files = len(input_paths) +for input,save,save_bad in zip(input_paths, save_paths, save_paths_bad): + i += 1 + if save not in processed_files: + pool.apply_async(process_oneFile, (input, save, save_bad, stopwords, i/num_files,)) + else: + # print('Processed : ', save) + pass +pool.close() +pool.join()