基于神经网络的中文文本情感检测(五) 简单MLP网络的优化与模型保存

基于神经网络的中文文本情感检测(五) 简单MLP网络的优化与模型保存

  • 作者:Geticsen
  • 时间:2020-07-08
  • 81人已阅读
简介 本章节简单阐述了全连接神经网络识别情感文本的原理以及对于上次的神经网络进行了优化

在之前已经完成了简单神经网络的搭建但是有很多无用的乘法在里面所以这里直接化简。另外为了使运算快速直接将句子对应分词后的结果与对应节点结合。下面简单的说一下之前没有说明的MLP的中文检测原理。

分词去停等词构建词向量

以我要去湖南大学玩为例进行分词,将得到的词汇去除无意义的停等词后得到词汇表,这个词汇表(vocab)在使用maping将其反向映射一下得到(index2word)

image.png

构建输入节点隐藏层与输出节点

输入节点其实就是vocab等大的一个节点集合,这里构建的是一个三层的神经网络示意图如下:

image.png

输入神经网络训练与测试

整个的神经网络的训练与测试如下图所示:

image.png

固定阈值并导出模型

固定阈值与导出模型实际就是将神经网络中各个层的最终得到的阈值全部记录下来,如上简单理解全连接神经网络对于情感文本分类就是根据训练集合中的词汇出现的情景来自动更新对应单词的权重从而达到分类的过程。

最终的代码如下在其中有详尽的注释可以参考:

import csv
import json
import os
import time
import sys
import jieba
import numpy as np
#导入counter与numpy库
from collections import Counter
import xlrd
# 在自己的类中封装网络
class SentimentNetwork:
    ## Project 6 新增:增加 min_count 和 polarity_cutoff 参数(最小次数,比值列表)
    def __init__(self, reviews=[], labels=[], min_count=10, polarity_cutoff=0.1, hidden_nodes=10, learning_rate=0.1):
        """用所给条件创建SentimenNetwork
               参数:
                   reviews(list) - reviews列表用于training
                   labels(list) - POSITIVE/NEGATIVE 标签列表关联reviews
                   min_count(int) - 大于min_count才会被加入单词列表
                   polarity_cutoff(float) - 大于 positive/negative
                                     的比值才会被加入词汇列表
                   hidden_nodes(int) - 节点的数量 用于创建 hidden layer
                   learning_rate(float) - 学习速率用于整个training
        """
        # 给我们的随机数生成器分配一个种子以确保我们得到
        # 开发过程中的可复制结果
        np.random.seed(1)

        # 处理reviews 并且关联labels以便于其他事情
        # 准备训练
        ## Project 6:增加 min_count 增加 polarity_cutoff参数在预处理
        # 设置语料库 全局都要用
        self.set_corpus_to_jieba()
        self.pre_process_data(reviews, labels, polarity_cutoff, min_count)

        # 构建一个有number nodes的影藏层 并且学习速率为 learning_rate
        # 这儿有词汇列表并且构建一个单一的输出节点.
        self.init_network(len(self.review_vocab), hidden_nodes, 1, learning_rate)

    ## Project 6: 增加 min_count 增加 polarity_cutoff参数在预处理
    def pre_process_data(self, reviews, labels, polarity_cutoff, min_count):

        ## ----------------------------------------
        ## Project 6: 在构建词汇表之前计算 每个单词的 positive/negative 的比值
        #
        positive_counts = Counter()
        negative_counts = Counter()
        total_counts = Counter()

        #获取停等词
        self.get_stopword()
        #去掉停等词
        for i in range(len(reviews)):
            if (labels[i] == '1'):
                for word in self.get_zh_split(reviews[i]):
                    if word not in self.stopword:
                        positive_counts[word] += 1
                        total_counts[word] += 1
            else:
                for word in self.get_zh_split(reviews[i]):
                    if word not in self.stopword:
                        positive_counts[word] += 1
                        total_counts[word] += 1

        pos_neg_ratios = Counter()

        for term, cnt in list(total_counts.most_common()):
            if (cnt >= 50):
                pos_neg_ratio = positive_counts[term] / float(negative_counts[term] + 1)
                pos_neg_ratios[term] = pos_neg_ratio

        for word, ratio in pos_neg_ratios.most_common():
            if (ratio > 1):
                pos_neg_ratios[word] = np.log(ratio)
            else:
                pos_neg_ratios[word] = -np.log((1 / (ratio + 0.01)))
        #
        ## Project 6结束
        ## ----------------------------------------

        #计算review_vocab 从被给定的reviews 去掉挺等词
        review_vocab = set()
        for review in reviews:
            for word in self.get_zh_split(review):
                ## Project 6新增: 超过min_count的 才会被统计
                #                     并且这个词汇的 pos/neg 比率, 超过polarity_cutoff
                #   去掉停等词
                if word not in self.stopword:
                    if (total_counts[word] > min_count):
                        if (word in pos_neg_ratios.keys()):
                            if ((pos_neg_ratios[word] >= polarity_cutoff) or (pos_neg_ratios[word] <= -polarity_cutoff)):
                                review_vocab.add(word)
                        else:
                            review_vocab.add(word)

        # 将词汇集合转换为list列表
        self.review_vocab = list(review_vocab)

        # 统计所有的标签.
        label_vocab = set()
        for label in labels:
            label_vocab.add(label)

        # 将标签的set集合转换为list列表
        self.label_vocab = list(label_vocab)

        # 存储词汇数量以及标签数量.
        self.review_vocab_size = len(self.review_vocab)
        self.label_vocab_size = len(self.label_vocab)

        # 创建一个字典用来映射词汇对应索引
        self.word2index = {}
        for i, word in enumerate(self.review_vocab):
            self.word2index[word] = i

        # 创建一个字典用来映射标签的索引
        self.label2index = {}
        for i, label in enumerate(self.label_vocab):
            self.label2index[label] = i

    def init_network(self, input_nodes, hidden_nodes, output_nodes, learning_rate):
        # 设置input, hidden 和output 层的数量.
        self.input_nodes = input_nodes
        self.hidden_nodes = hidden_nodes
        self.output_nodes = output_nodes

        # 储存learning_rate
        self.learning_rate = learning_rate

        # 初始化权重

        # 初始化input 层 和 hidden 层.
        self.weights_0_1 = np.zeros((self.input_nodes, self.hidden_nodes))

        # 初始化 hidden 层 和 output 层. 正态分布
        self.weights_1_2 = np.random.normal(0.0, self.output_nodes ** -0.5,
                                            (self.hidden_nodes, self.output_nodes))
        print("weights_1_2.T shape  Init",self.weights_1_2.shape)

        ## Project 5 新增: 移除 self.layer_0; 增加 self.layer_1
        # input 层, 一个二维的 1行 hidden_nodes列 的矩阵
        self.layer_1 = np.zeros((1, hidden_nodes))

    ## Project 5新增: 移除 update_input_layer() 函数
    # 中文分词比较麻烦 我们使用jieba分词
    def get_zh_split(self,line):
        return list(jieba.cut(line, cut_all=False))

    def get_target_for_label(self, label):
        if (label == '1'):
            return 1
        else:
            return 0
    # 激活函数
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    #求导
    def sigmoid_output_2_derivative(self, output):
        return output * (1 - output)

    ##  Project 5 新增: 改变第一个参数 'training_reviews'
    #                     为 'training_reviews_raw'
    def train(self, training_reviews_raw, training_labels):

        ## Project 5 新增: 预处理训练文本 以便直接处理
        #                     non-zero inputs中的索引
        training_reviews = list()
        for review in training_reviews_raw:
            indices = set()
            for word in self.get_zh_split(review):
                if (word in self.word2index.keys()):
                    indices.add(self.word2index[word])
            training_reviews.append(list(indices))

        # 确保 reviews 和 labels 数量匹配
        assert (len(training_reviews) == len(training_labels))

        # 保持追踪正确率
        correct_so_far = 0

        # 记录当前的时间
        start = time.time()

        # 循环所有的文本并进行正反向传播,
        # 给每一个实体更新权重
        for i in range(len(training_reviews)):

            # 获取正确的文本(词汇分析索引记录)以及标签
            review = training_reviews[i]
            label = training_labels[i]

            #### 实现正向传播 ####
            ### 正向传播 ###

            ## Project 5新增:因为 'layer_0' 不再使用去除'update_input_layer' 函数

            # 隐藏层
            ## Project 5新增: 给inputs non-zero 加权重
            self.layer_1 *= 0
            for index in review:
                self.layer_1 += self.weights_0_1[index]

            # 输出层
            ## Project 5新增 : 改变 'self.layer_1' 取代'本地的 layer_1'
            layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))

            #### 实现反向传播 ####
            ### 反向传播 ###

            # 输出层误差
            layer_2_error = layer_2 - self.get_target_for_label(
                label)  # 输出层误差 是 目标 和实际输出的不同.
            layer_2_delta = layer_2_error * self.sigmoid_output_2_derivative(layer_2)

            # 反向传播错误
            layer_1_error = layer_2_delta.dot(self.weights_1_2.T)  # 误差传播到隐藏层
            layer_1_delta = layer_1_error  # 隐藏层 没有用激活函数所以误差相同

            # 更新权值
            ## Project 5新增: 改变 'self.layer_1' 取代本地的 'layer_1'
            self.weights_1_2 -= self.layer_1.T.dot(
                layer_2_delta) * self.learning_rate  # 更新 hidden-to-output 权重使用梯度下降

            ## New for Project 5: 仅仅跟新在正向传播中被使用的
            for index in review:
                self.weights_0_1[index] -= layer_1_delta[0] * self.learning_rate  # 更新 input-to-hidden 权重使用梯度下降

            # 保持跟踪正确率.
            if (layer_2 >= 0.5 and label == '1'):
                correct_so_far += 1
            elif (layer_2 < 0.5 and label == '0'):
                correct_so_far += 1

            # 方便调试, 打印步骤和正确率
            # 整个过程持续保持.
            elapsed_time = float(time.time() - start)
            reviews_per_second = i / elapsed_time if elapsed_time > 0 else 0

            sys.stdout.write("\r已处理:" + str(100 * i / float(len(training_reviews)))[:4] \
                             + "% 速度(总数/秒):" + str(reviews_per_second)[0:5] \
                             + " #正确:" + str(correct_so_far) + " #已训练:" + str(i + 1) \
                             + " 训练准确率:" + str(correct_so_far * 100 / float(i + 1))[:4] + "%")
            if (i % 2500 == 0):
                print("")

    def test(self, testing_reviews, testing_labels):
        """
        尝试预测,
        跟踪正确率.
        """
        # 保持追踪正确率
        correct = 0

        # 存储时间
        start = time.time()

        # 循环预测
        for i in range(len(testing_reviews)):
            pred = self.run(testing_reviews[i])
            if (pred == testing_labels[i]):
                correct += 1

            # 方便调试, 打印步骤和正确率
            # 整个过程持续保持.
            elapsed_time = float(time.time() - start)
            reviews_per_second = i / elapsed_time if elapsed_time > 0 else 0

            sys.stdout.write("\r已处理:" + str(100 * i / float(len(testing_reviews)))[:4] \
                             + "% 速度(总数/秒):" + str(reviews_per_second)[0:5] \
                             + " #正确:" + str(correct) + " #已测试:" + str(i + 1) \
                             + " 测试正确率:" + str(correct * 100 / float(i + 1))[:4] + "%")

    def run(self, review):
        """
       返回对情感的预测
        """
        # 运行正向传播就像 "train" 函数一样.

        ##Project 5新增: 移除不被使用的update_input_layer 函数

        # 隐藏层
        ## Project 5新增: 在review中标志索引 增加它的权重到layer_1
        self.layer_1 *= 0
        unique_indices = set()
        for word in self.get_zh_split(review):
            if word in self.word2index.keys():
                unique_indices.add(self.word2index[word])
        for index in unique_indices:
            self.layer_1 += self.weights_0_1[index]

        # 输出层
        ##  Project 5 新增: 用self.layer_1 代替本地的 layer_1
        layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))

        # 输出层大于0.5输出 POSITIVE ;
        # 其他输出 NEGATIVE
        if (layer_2[0] >= 0.5):
            return "1"
        else:
            return "0"

    def run_test(self, review):
        """
       返回对情感的预测
        """
        self.layer_1 *= 0
        unique_indices = set()
        slice_word = []
        slice_word_without_stopword = set()
        for word in self.get_zh_split(review):
            slice_word.append(word)
            if word in self.word2index.keys():
                slice_word_without_stopword.add(word)
                unique_indices.add(self.word2index[word])
        for index in unique_indices:
            self.layer_1 += self.weights_0_1[index]
        # 输出层
        ##  Project 5 新增: 用self.layer_1 代替本地的 layer_1
        layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))
        similar_words = {}
        for one_word in slice_word_without_stopword:
            words = self.get_most_similar_words(one_word)
            similar_words[one_word] = words
        # 输出层大于0.5输出 POSITIVE ;
        # 其他输出 NEGATIVE
        mood = ""
        if (layer_2[0] >= 0.5):
            mood = "积极"
        else:
            mood = "消极"
        return {"mood": mood, "slice_word": slice_word, "slice_word_without_stopword": list(slice_word_without_stopword),"similar_words":similar_words}

    def get_most_similar_words(self,focus="好"):
        most_similar = Counter()
        for word in self.word2index.keys():
            most_similar[word] = np.dot(self.weights_0_1[self.word2index[word]],self.weights_0_1[self.word2index[focus]])
        similar = []
        print(most_similar[focus])
        index_i=0
        for item in most_similar.most_common():
            index_i+=1
            if most_similar[focus]>= item[1]:
                break
        for one in most_similar.most_common()[index_i:index_i+10]:
            similar.append(one[0])
        return similar
    #配置网络模型
    def set_model(self):
        model = self.get_model_if_exist()
        if model:
            self.label2index = model["label2index"]
            self.label_vocab = model["label_vocab"]
            self.review_vocab = model["review_vocab"]
            self.word2index = model["word2index"]
            self.weights_0_1 = model["weights_0_1"]
            self.weights_1_2 = model["weights_1_2"]
            # hidden_nodes = 10
            # 设置input, hidden 和output 层的数量.
            self.input_nodes = len(self.review_vocab)
            self.hidden_nodes = 10
            self.output_nodes = 1
            self.layer_1 = np.zeros((1, self.hidden_nodes))

    # 获取模型
    def get_model_if_exist(self):
        model = None
        if os.path.isdir("./model") and os.path.isfile("./model/model.txt") and os.path.isfile("./model/weights_0_1.txt")\
                and os.path.isfile("./model/weights_1_2.txt"):
            with open("./model/model.txt","r",encoding='UTF-8') as f:
                model =json.loads(f.read())
                model["weights_0_1"] = np.loadtxt('./model/weights_0_1.txt',delimiter=',')
                model["weights_1_2"] = np.loadtxt('./model/weights_1_2.txt', delimiter=',').reshape(10,1)
        return model
    #保存当前模型
    def save_model(self):
        model = {
            "label2index":self.label2index,
            "label_vocab":self.label_vocab,
            "review_vocab":self.review_vocab,
            "word2index":self.word2index
        }
        np.savetxt("./model/weights_0_1.txt", self.weights_0_1, fmt='%s', delimiter=',')
        np.savetxt("./model/weights_1_2.txt", self.weights_1_2, fmt='%s', delimiter=',')
        file = open('./model/model.txt', 'w',encoding='UTF-8')
        file.write(json.dumps(model,ensure_ascii=False))
        file.close()
    #给结巴(jieba)设置语料集
    def set_corpus_to_jieba(self):
        jieba.load_userdict("./corpus/情感语料库.txt")
    #获取停等词
    def get_stopword(self):
        all_stopword = set()
        [all_stopword.add(line.rstrip()) for line in open("./stopword/中文停用词库.txt", encoding="utf-8")]
        [all_stopword.add(line.rstrip()) for line in open("./stopword/哈工大停用词表.txt", encoding="utf-8")]
        [all_stopword.add(line.rstrip()) for line in open("./stopword/四川大学机器智能实验室停用词库.txt", encoding="utf-8")]
        self.stopword = all_stopword
    #去除停等词
    def remove_stopword(self):
        word_list = self.review_vocab
        word_filiter = [word for word in word_list if len(word) >= 2 and word not in self.stopword]
        return word_filiter


#开始训练
def get_start():
    #这里是你的数据集的位置换数据集需要自己解析一遍
    with open('./dataSet/online_shopping_10_cats.csv', 'r', encoding="utf-8") as csvfile:
        reader = csv.reader(csvfile)
        all_items = [row for row in reader]
    reviews = []
    car_type = []
    label = []
    review = []
    for i in range(1, len(all_items)):
        car_type.append(all_items[i][0])
        label.append(all_items[i][1])
        review.append(all_items[i][2])
        #reviews.append(all_items[i])
    all_item = []
    [all_item.append(line.rstrip().split(" ")) for line in open("./dataSet/test.txt", encoding="utf-8")]
    labels = []
    reviews = []
    [labels.append(item[0]) for item in all_item]
    [reviews.append(item[1]) for item in all_item]
    labels.extend(label)
    reviews.extend(review)
    mlp = SentimentNetwork(reviews, labels, learning_rate=0.1)
    train_review1 = []
    train_review2 = []
    train_review3 = []
    train_label1 = []
    train_label2 = []
    train_label3 = []
    for i in range(len(reviews)):
        if i%3==0:
            train_review1.append(reviews[i])
            train_label1.append(labels[i])
        elif i%3==1:
            train_review2.append(reviews[i])
            train_label2.append(labels[i])
        elif i%3==2:
            train_review3.append(reviews[i])
            train_label3.append(labels[i])
    # mlp.set_model()

    mlp.train(train_review2, train_label2)
    mlp.train(train_review3, train_label3)
    mlp.train(train_review1, train_label1)
    mlp.train(reviews, labels)
    mlp.train(train_review3, train_label3)
    mlp.train(train_review1, train_label1)
    mlp.train(train_review2, train_label2)

    mlp.test(reviews[-1000:], labels[-1000:])
    mlp.save_model()
def test():
    with open('./dataSet/online_shopping_10_cats.csv', 'r', encoding="utf-8") as csvfile:
        reader = csv.reader(csvfile)
        all_items = [row for row in reader]
    reviews = []
    car_type = []
    label = []
    review = []
    for i in range(1, len(all_items)):
        car_type.append(all_items[i][0])
        label.append(all_items[i][1])
        review.append(all_items[i][2])
    mlp = SentimentNetwork()
    mlp.set_model()
    mlp.test(review, label)
def continue_train():
    with open('./dataSet/online_shopping_10_cats.csv', 'r', encoding="utf-8") as csvfile:
        reader = csv.reader(csvfile)
        all_items = [row for row in reader]
    car_type = []
    label = []
    review = []
    for i in range(1, len(all_items)):
        car_type.append(all_items[i][0])
        label.append(all_items[i][1])
        review.append(all_items[i][2])
    mlp = SentimentNetwork(learning_rate=0.1)
    mlp.set_model()
    mlp.train(review[:100], label[:100])
    mlp.test(review[-1000:], label[-1000:])
    #mlp.save_model()
if __name__ == '__main__':
    get_start()


文章评论

Top