Browse Source

first commit

master
yands 4 years ago
parent
commit
ffe13d4e04
1 changed files with 143 additions and 0 deletions
  1. +143
    -0
      src/dedup_simhash.py

+ 143
- 0
src/dedup_simhash.py View File

@@ -0,0 +1,143 @@
import argparse
from operator import add
from simhash import Simhash, SimhashIndex
import jieba
import os,random
import re, multiprocessing, glob

stopword_path = 'stopwords.txt'
def divideIntoNstrand(listTemp, n):
k = int(len(listTemp)/n)
for i in range(0, len(listTemp), k):
yield listTemp[i:i + k]

def distance(value1, value2, dim_bits=64):
x = (value1 ^ value2) & ((1 << dim_bits) - 1)
ans = 0
while x:
ans += 1
x &= x - 1
return ans

# 创建停用词list
def stopwordslist(filepath):
stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()]
return stopwords

# 对句子进行jieba分词
def seg_sentence(sentence, stopwords):
sentence_seged = jieba.cut(sentence.strip())
# stopwords = stopwordslist(stopword_path)
# 这里加载停用词的路径
outstr = ''
for word in sentence_seged:
if word not in stopwords:
if word != '\t':
outstr += word
outstr += " "
#print("seg_instance:{}".format(outstr))
return outstr

def process_oneFile(input,save,save_bad_dir,stopwords,file_id):

f = open(input,'r',encoding='utf-8')
paras = f.read().split('\n\n')
hashValues = map(lambda x: Simhash(seg_sentence(x,stopwords), f=64).value, paras)
ids_hashValues = list(enumerate(hashValues))

bad_ids = set()
para_len = len(ids_hashValues)

for ids_hashValues_tmp in divideIntoNstrand(ids_hashValues, 2):
for sample1 in ids_hashValues_tmp:
if sample1[0]%5000 == 0:
print(sample1[0]/para_len, file_id)
for sample2 in ids_hashValues_tmp:
if distance(sample1[1],sample2[1]) < 3 and sample1[0] != sample2[0]:
bad_ids.add(sample1[0])
bad_ids.add(sample2[0])

bad_ids = list(bad_ids)
bad_ids.sort(reverse=True)
print('num bad paras: ', len(bad_ids))
print('total paras: ', para_len)

bad_para = []
for key in bad_ids:
bad_para.append(paras[key])
paras.pop(key)

f_w_b = open(save_bad_dir,'w',encoding='utf-8')
f_w_b.writelines('\n\n'.join(bad_para))
f_w_b.close()

f_w = open(save,'w',encoding='utf-8')
f_w.writelines('\n\n'.join(paras))
f_w.close()

print(input)
print(save)

parser = argparse.ArgumentParser()
parser.add_argument('--rankOfCluster', type=str, default='0to1of1')
args = parser.parse_args()

num_processes = 200
pool = multiprocessing.Pool(processes=num_processes)

#读取文件名,并排序
input_path = '/raid-50/commmon-crawl/common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08/train/*.txt'
input_paths = list(glob.glob(input_path))
input_paths.sort()
random.seed(123)
random.shuffle(input_paths)
print('total files : ', len(input_paths))

#读取已处理文件
tmp = '/raid-50/commmon-crawl/common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08-dedup/train/*.txt'
processed_files = list(glob.glob(tmp))
print('num of processed : ', len(processed_files))

#删除已处理文件
input_paths = list(set(input_paths)-set(processed_files))

#如果要多机器跑的话要先分块
def divideIntoNstrand(listTemp, n):
twoList = [ [] for i in range(n)]
for i,e in enumerate(listTemp):
twoList[i%n].append(e)
return twoList

#多机器协同
all = int(args.rankOfCluster.split('of')[1])
rank_start = int(args.rankOfCluster.split('of')[0].split('to')[0])
rank_end = int(args.rankOfCluster.split('of')[0].split('to')[1])
print(rank_start,' to ',rank_end,' of ',all)
input_paths = divideIntoNstrand(input_paths,all)[rank_start:rank_end]
input_paths = [i for item in input_paths for i in item]
print('num files in this machine: ', len(input_paths))

#数据保存的目录
save_paths = []
save_paths_bad = []
grop = re.compile('common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08')
for i,input in enumerate(input_paths):
save_paths.append(grop.sub('common-crawl-WET-20201124-v1-v2-cleanV2-3-fasttext200th08-dedup',input))
save_paths_bad.append(save_paths[-1].replace('train', 'bad'))

files_rank = list(set(save_paths)-set(processed_files))
print('num files need to be processed : ', len(files_rank))

stopwords = stopwordslist(stopword_path)
# print(save_paths)
i = 0
num_files = len(input_paths)
for input,save,save_bad in zip(input_paths, save_paths, save_paths_bad):
i += 1
if save not in processed_files:
pool.apply_async(process_oneFile, (input, save, save_bad, stopwords, i/num_files,))
else:
# print('Processed : ', save)
pass
pool.close()
pool.join()

Loading…
Cancel
Save