|
- # coding: utf-8
- #================================================================#
- # Copyright (C) 2020 Freecss All rights reserved.
- #
- # File Name :data_generator.py
- # Author :freecss
- # Email :karlfreecss@gmail.com
- # Created Date :2020/04/02
- # Description :
- #
- #================================================================#
-
- from itertools import product
- import math
- import numpy as np
- import random
- import pickle as pk
- import random
- from multiprocessing import Pool
- import copy
-
- #def hamming_code_generator(data_len, p_len):
- # ret = []
- # for data in product((0, 1), repeat=data_len):
- # p_idxs = [2 ** i for i in range(p_len)]
- # total_len = data_len + p_len
- # data_idx = 0
- # hamming_code = []
- # for idx in range(total_len):
- # if idx + 1 in p_idxs:
- # hamming_code.append(0)
- # else:
- # hamming_code.append(data[data_idx])
- # data_idx += 1
- #
- # for idx in range(total_len):
- # if idx + 1 in p_idxs:
- # for i in range(total_len):
- # if (i + 1) & (idx + 1) != 0:
- # hamming_code[idx] ^= hamming_code[i]
- # #hamming_code = "".join([str(x) for x in hamming_code])
- # ret.append(hamming_code)
- # return ret
-
- def code_generator(code_len, code_num, letter_num = 2):
- codes = list(product(list(range(letter_num)), repeat = code_len))
- random.shuffle(codes)
- return codes[:code_num]
-
- def hamming_distance_static(codes):
- min_dist = len(codes)
- avg_dist = 0.
- avg_min_dist = 0.
- relation_num = 0.
- for code1 in codes:
- tmp_min_dist = len(codes)
- for code2 in codes:
- if code1 == code2:
- continue
- dist = 0
- relation_num += 1
- for c1, c2 in zip(code1, code2):
- if c1 != c2:
- dist += 1
- avg_dist += dist
- if tmp_min_dist > dist:
- tmp_min_dist = dist
- avg_min_dist += tmp_min_dist
- if min_dist > tmp_min_dist:
- min_dist = tmp_min_dist
- return avg_dist / relation_num, avg_min_dist / len(codes)
-
- def generate_cosin_data(codes, err, repeat, letter_num):
- Y = np.random.random(100000) * letter_num * 3 - 3
- X = np.random.random(100000) * 20 - 10
- data_X = np.concatenate((X.reshape(-1, 1), Y.reshape(-1, 1)), axis = 1)
-
- samples = {}
- all_sign = list(set(sum([[c for c in code] for code in codes], [])))
-
- for d, sign in enumerate(all_sign):
- labels = np.logical_and(Y < np.cos(X) + 2 * d, Y > np.cos(X) + 2 * d - 2)
- samples[sign] = data_X[labels]
-
- data = []
- labels = []
- count = 0
- for _ in range(repeat):
- if (count > 100000):
- break
- for code in codes:
- tmp = []
- count += 1
- for d in code:
- if random.random() < err:
- candidates = copy.deepcopy(all_sign)
- candidates.remove(d)
- d = candidates[random.randint(0, letter_num - 2)]
- idx = random.randint(0, len(samples[d]) - 1)
- tmp.append(samples[d][idx])
- data.append(tmp)
- labels.append(code)
- data = np.array(data)
- labels = np.array(labels)
- return data, labels
-
-
- #codes = """110011001
- #100011001
- #101101101
- #011111001
- #100100001
- #111111101
- #101110001
- #111100101
- #101000101
- #001001101
- #111110101
- #100101001
- #010010101
- #110100101
- #001111101
- #111111001"""
- #codes = codes.split()
-
- def generate_data_via_codes(codes, err, letter_num):
- #codes = code_generator(code_len, code_num)
- data, labels = generate_cosin_data(codes, err, 100000, letter_num)
- return data, labels
-
- def generate_data(params):
- code_len = params["code_len"]
- times = params["times"]
- p = params["p"]
- code_num = params["code_num"]
-
- err = p / 20.
- codes = code_generator(code_len, code_num)
- data, labels = generate_cosin_data(codes, err)
- data_name = "code_%d_%d" % (code_len, code_num)
- pk.dump((codes, data, labels), open("generated_data/%d_%s_%.2f.pk" % (times, data_name, err), "wb"))
- return True
-
- def generate_multi_data():
- pool = Pool(64)
- params_list = []
- #for code_len in [7, 9, 11, 13, 15]:
- for code_len in [7, 11, 15]:
- for times in range(20):
- for p in range(0, 11):
- for code_num_power in range(1, code_len):
- code_num = 2 ** code_num_power
- params_list.append({"code_len" : code_len, "times" : times, "p" : p, "code_num" : code_num})
- return list(pool.map(generate_data, params_list))
-
- def read_lexicon(file_path):
- ret = []
- with open(file_path) as fin:
- ret = [s.strip() for s in fin]
-
- all_sign = list(set(sum([[c for c in s] for s in ret], [])))
- #ret = ["".join(str(all_sign.index(t)) for t in tmp) for tmp in ret]
-
- return ret, len(all_sign)
-
- import os
-
- if __name__ == "__main__":
- for root, dirs, files in os.walk("lexicons"):
- if root != "lexicons":
- continue
- for file_name in files:
- file_path = os.path.join(root, file_name)
- codes, letter_num = read_lexicon(file_path)
- data, labels = generate_data_via_codes(codes, 0, letter_num)
-
- save_path = os.path.join("dataset", file_name.split(".")[0] + ".pk")
- pk.dump((data, labels, codes), open(save_path, "wb"))
-
-
- #res = read_lexicon("add2.txt")
- #print(res)
- exit(0)
-
- generate_multi_data()
- exit()
|