| @@ -0,0 +1,146 @@ | |||||
| # -*- coding: utf-8 -*- | |||||
| import glob | |||||
| import jiagu | |||||
| import numpy as np | |||||
| from random import random | |||||
| def normalize(vec): | |||||
| total = sum(vec) | |||||
| assert(abs(total) > 1e-6) | |||||
| for i in range(len(vec)): | |||||
| assert(vec[i] >= 0) | |||||
| vec[i] = float(vec[i]) / total | |||||
| def get_prob(vec, prob): | |||||
| assert (len(vec) == len(prob)) | |||||
| # 归一化分布 | |||||
| normalize(prob) | |||||
| r = random() | |||||
| index = -1 | |||||
| while r > 0: | |||||
| index = index + 1 | |||||
| r = r - prob[index] | |||||
| return vec[index] | |||||
| class Document(object): | |||||
| def __init__(self, filename): | |||||
| self.doc_name = filename[:-4] | |||||
| self.__load_document(filename) | |||||
| def __load_document(self, filename): | |||||
| """ | |||||
| 读取一篇文章,默认一个file里面包含一篇文章 | |||||
| :param filename: filename 为 *.txt | |||||
| :return: self.document 文章 | |||||
| self.words_list 文章中所有的词 | |||||
| """ | |||||
| try: | |||||
| doc_file = open(filename, "r", encoding="utf-8") | |||||
| self.document = "" | |||||
| self.words_list = [] | |||||
| for line in doc_file: | |||||
| if line: | |||||
| line = line.strip().replace("\t", "") | |||||
| self.document += line | |||||
| self.words_list.extend(jiagu.seg(line)) | |||||
| except Exception as e: | |||||
| print("无法加载文件,错误信息 : {}".format(e)) | |||||
| class Corpus(object): | |||||
| def __init__(self, filepath): | |||||
| self.Documents = [] | |||||
| self.filepath = filepath | |||||
| self._build_corpus() | |||||
| def _build_corpus(self): | |||||
| """ | |||||
| 把所有的文章加载进来 | |||||
| :return: | |||||
| """ | |||||
| vocabulary = set() | |||||
| files = glob.glob(self.filepath + "/*.txt") | |||||
| if len(files) > 0: | |||||
| for each in files: | |||||
| target = Document(each) | |||||
| self.Documents.append(target) | |||||
| for word in target.words_list: | |||||
| vocabulary.add(word) | |||||
| self.vocabulary = list(vocabulary) | |||||
| return True | |||||
| else: | |||||
| print("目标文件夹下没有文件!!!") | |||||
| return False | |||||
| class LdaModel(object): | |||||
| def __init__(self, filepath, number_of_topics, alpha=50, beta=0.1, iteration=3): | |||||
| self.alpha = alpha | |||||
| self.beta = beta | |||||
| self.iteration = iteration | |||||
| self.corpus = Corpus(filepath) | |||||
| self.number_of_topics = number_of_topics | |||||
| self.__initialize_all() | |||||
| def __initialize_all(self): | |||||
| print("LDA Initializing... \nnumber of topics : {}, iteration : {}".format(self.number_of_topics, self.iteration)) | |||||
| self.number_of_documents = len(self.corpus.Documents) | |||||
| assert(self.number_of_documents > self.number_of_topics) | |||||
| self.document_topic_counts = np.zeros([self.number_of_documents, self.number_of_topics], dtype=np.int) | |||||
| self.topic_word_counts = np.zeros([self.number_of_topics, len(self.corpus.vocabulary)], dtype=np.int) | |||||
| self.current_word_topic_assignments = [] | |||||
| self.topic_counts = np.zeros(self.number_of_topics) | |||||
| self.doc_name = dict() | |||||
| for d_index, document in enumerate(self.corpus.Documents): | |||||
| self.doc_name.setdefault(d_index, document.doc_name) | |||||
| word_topic_assignments = [] | |||||
| for word in document.words_list: | |||||
| if word in self.corpus.vocabulary: | |||||
| w_index = self.corpus.vocabulary.index(word) | |||||
| starting_topic_index = np.random.randint(self.number_of_topics) | |||||
| word_topic_assignments.append(starting_topic_index) | |||||
| self.document_topic_counts[d_index, starting_topic_index] += 1 | |||||
| self.topic_word_counts[starting_topic_index, w_index] += 1 | |||||
| self.topic_counts[starting_topic_index] += 1 | |||||
| self.current_word_topic_assignments.append(np.array(word_topic_assignments)) | |||||
| for iteration in range(self.iteration): | |||||
| print("Iteration #" + str(iteration + 1) + "...") | |||||
| for d_index, document in enumerate(self.corpus.Documents): | |||||
| for w, word in enumerate(document.words_list): | |||||
| if word in self.corpus.vocabulary: | |||||
| w_index = self.corpus.vocabulary.index(word) | |||||
| current_topic_index = self.current_word_topic_assignments[d_index][w] | |||||
| self.document_topic_counts[d_index, current_topic_index] -= 1 | |||||
| self.topic_word_counts[current_topic_index, w_index] -= 1 | |||||
| self.topic_counts[current_topic_index] -= 1 | |||||
| topic_distribution = (self.topic_word_counts[:, w_index] + self.beta) * \ | |||||
| (self.document_topic_counts[d_index] + self.alpha) / \ | |||||
| (self.topic_counts + self.beta) | |||||
| new_topic_index = get_prob(range(self.number_of_topics), topic_distribution) | |||||
| self.current_word_topic_assignments[d_index][w] = new_topic_index | |||||
| self.document_topic_counts[d_index, new_topic_index] += 1 | |||||
| self.topic_word_counts[new_topic_index, w_index] += 1 | |||||
| self.topic_counts[new_topic_index] += 1 | |||||
| print("LDA Initializing finished !\n") | |||||
| def get_document_topic(self): | |||||
| for d_index, topic in enumerate(np.argmax(self.document_topic_counts, axis=1)): | |||||
| print("this is file {}, topic : #{}".format(self.doc_name.get(d_index), topic)) | |||||
| def get_word_topic(self, topN=10): | |||||
| for row in (self.topic_word_counts.argsort(axis=1)[:, -topN:]): | |||||
| print(list(map(lambda x: self.corpus.vocabulary[x], row))) | |||||
| if __name__ == "__main__": | |||||
| filepath = "documents" | |||||
| number_of_topics = 3 | |||||
| test = LdaModel(filepath, number_of_topics) | |||||
| test.get_document_topic() | |||||
| test.get_word_topic() | |||||