vlambda博客
学习文章列表

HMM(隐马尔可夫)中文分词

一、隐马尔可夫模型

  隐马尔可夫模型(Hidden Markov Model,HMM)是用来描述一个含有隐含未知参数的马尔可夫过程。

1、举例理解

  假设我手里有三个不同的骰子。第一个骰子是我们平常见的骰子(称这个骰子为D6),6个面,每个面(1,2,3,4,5,6)出现的概率是1/6。第二个骰子是个四面体(称这个骰子为D4),每个面(1,2,3,4)出现的概率是1/4。第三个骰子有八个面(称这个骰子为D8),每个面(1,2,3,4,5,6,7,8)出现的概率是1/8。

在这里插入图片描述


  假设我们开始掷骰子,我们先从三个骰子里挑一个,挑到每一个骰子的概率都是1/3。然后我们掷骰子,得到一个数字,1,2,3,4,5,6,7,8中的一个。不停的重复上述过程,我们会得到一串数字,每个数字都是1,2,3,4,5,6,7,8中的一个。例如我们可能得到这么一串数字(掷骰子10次):1 6 3 5 2 7 3 5 2 4

  这串数字叫做可见状态链。但是在隐马尔可夫模型中,我们不仅仅有这么一串可见状态链,还有一串隐含状态链。在这个例子里,这串隐含状态链就是你用的骰子的序列。比如,隐含状态链有可能是:D6 D8 D8 D6 D4 D8 D6 D6 D4 D8

  一般来说,HMM中说到的马尔可夫链其实是指隐含状态链,因为隐含状态(骰子)之间存在转换概率(transition probability)。在我们这个例子里,D6的下一个状态是D4,D6,D8的概率都是1/3。D4,D8的下一个状态是D4,D6,D8的转换概率也都一样是1/3。这样设定是为了最开始容易说清楚,但是我们其实是可以随意设定转换概率的。比如,我们可以这样定义,D6后面不能接D4,D6后面是D6的概率是0.9,是D8的概率是0.1。这样就是一个新的HMM。

  同样的,尽管可见状态之间没有转换概率,但是隐含状态和可见状态之间有一个概率叫做输出概率(emission probability)。就我们的例子来说,六面骰(D6)产生1的输出概率是1/6。产生2,3,4,5,6的概率也都是1/6。我们同样可以对输出概率进行其他定义。比如,我有一个被赌场动过手脚的六面骰子,掷出来是1的概率更大,是1/2,掷出来是2,3,4,5,6的概率是1/10。

HMM(隐马尔可夫)中文分词


HMM(隐马尔可夫)中文分词

2、例子抽象

  3种不同情况的骰子,即为:状态值集合(StatusSet)
所有可能出现的结果值(1、2、3、4、5、6、7、8):观察值集合(ObservedSet)。
  选择不同骰子之间的概率:转移概率矩阵(TransProbMatrix ),状态间转移的概率。
  在拿到某个骰子,投出某个观测值的概率:发射概率矩阵(EmitProbMatrix )-即:拿到D6这个骰子,投出6的概率是1/6。最初一次的状态:初始状态概率分布(InitStatus )。
  所以,很容易得到,计算概率的方法就是,初始状态概率分布(InitStatus )、发射概率矩阵(EmitProbMatrix )、转移概率矩阵(TransProbMatrix )的乘积。
  当某个状态序列的概率值最大,则该状态序列即为,出现该观测值的情况下,最可能出现的状态序列。

二、维特比算法

  维特比算法(Viterbi algorithm)是一种动态规划算法。它用于寻找最有可能产生观测事件序列的维特比路径——隐含状态序列,特别是在马尔可夫信息源上下文和隐马尔可夫模型中。

1、算法原理

  维特比算法就是求所有观测序列中的最优,如下图所示,我们要求从S到E的最优序列,中间有3个时刻,每个时刻都有对应的不同观察的概率,下图中每个时刻不同的观测标签有3个。

HMM(隐马尔可夫)中文分词


  求所有路径中最优路径,最容易想到的就是暴力解法,直接把所有路径全部计算出来,然后找出最优的。这方法理论上是可行,但当序列很长时,时间复杂夫很高。而且进行了大量的重复计算,viterbi算法就是用动态规划的方法就减少这些重复计算。
  viterbi算法是每次记录到当前时刻,每个观察标签的最优序列,如下图,假设在t时刻已经保存了从0到t时刻的最优路径,那么t+1时刻只需要计算从t到t+1的最优就可以了,图中红箭头表示从t时刻到t+1时刻,观测标签为1,2,3的最优。

HMM(隐马尔可夫)中文分词


  每次只需要保存到当前位置最优路径,之后循环向后走。到结束时,从最后一个时刻的最优值回溯到开始位置,回溯完成后,这个从开始到结束的路径就是最优的。

2、代码实现

import numpy as np

def viterbi_decode(nodes, trans):
    """
    Viterbi算法求最优路径
    其中 nodes.shape=[seq_len, num_labels],
        trans.shape=[num_labels, num_labels].
    """

    # 获得输入状态序列的长度,以及观察标签的个数
    seq_len, num_labels = len(nodes), len(trans)
    # 简单起见,先不考虑发射概率,直接用起始0时刻的分数
    scores = nodes[0].reshape((-11))

    paths = []
    # 递推求解上一时刻t-1到当前时刻t的最优
    for t in range(1, seq_len):
        # scores 表示起始0到t-1时刻的每个标签的最优分数
        scores_repeat = np.repeat(scores, num_labels, axis=1)
        # observe当前时刻t的每个标签的观测分数
        observe = nodes[t].reshape((1-1))
        observe_repeat = np.repeat(observe, num_labels, axis=0)
        # 从t-1时刻到t时刻最优分数的计算,这里需要考虑转移分数trans
        M = scores_repeat + trans + observe_repeat
        # 寻找到t时刻的最优路径
        scores = np.max(M, axis=0).reshape((-11))
        idxs = np.argmax(M, axis=0)
        # 路径保存
        paths.append(idxs.tolist())

    best_path = [0] * seq_len
    best_path[-1] = np.argmax(scores)
    # 最优路径回溯
    for i in range(seq_len-2-1-1):
        idx = best_path[i+1]
        best_path[i] = paths[i][idx]

    return best_path

三、HMM模型

HMM的典型模型是一个五元组:
StatusSet: 状态值集合
ObservedSet: 观察值集合
TransProbMatrix: 转移概率矩阵
EmitProbMatrix: 发射概率矩阵
InitStatus: 初始状态分布

基本假设

HMM模型的三个基本假设如下:
有限历史性假设:
P(Status[i]|Status[i-1],Status[i-2],… Status[1]) = P(Status[i]|Status[i-1])
齐次性假设(状态和当前时刻无关):
P(Status[i]|Status[i-1]) = P(Status[j]|Status[j-1])
观察值独立性假设(观察值只取决于当前状态值):
P(Observed[i]|Status[i],Status[i-1],…,Status[1]) = P(Observed[i]|Status[i])

1、状态值集合(StatusSet)

  (B, M, E, S): {B:begin, M:middle, E:end, S:single}。分别代表每个状态代表的是该字在词语中的位置,B代表该字是词语中的起始字,M代表是词语中的中间字,E代表是词语中的结束字,S则代表是单字成词。
如:

给你一个隐马尔科夫链的例子。
可以标注为:
给/S 你/S 一个/BE 隐马尔科夫链/BMMMME 的/S 例子/BE 。/S

2、观察值集合(ObservedSet)

  就是所有汉字(东南西北你我他…),甚至包括标点符号所组成的集合。
  状态值也就是我们要求的值,在HMM模型中文分词中,我们的输入是一个句子(也就是观察值序列),输出是这个句子中每个字的状态值。

3、初始状态概率分布(InitStatus )

如:

B  -0.26268660809250016
E  -3.14e+100
M  -3.14e+100
S  -1.4652633398537678

数值是对概率值取【对数】之后的结果(可以让概率【相乘】的计算变成对数【相加】)。其中-3.14e+100作为负无穷,也就是对应的概率值是0。
也就是句子的第一个字属于{B,E,M,S}这四种状态的概率。

4、转移概率矩阵(TransProbMatrix )

【有限历史性假设】
转移概率是马尔科夫链。Status(i)只和Status(i-1)相关,这个假设能大大简化问题。所以,它其实就是一个4x4(4就是状态值集合的大小)的二维矩阵。矩阵的横坐标和纵坐标顺序是BEMS x BEMS。(数值是概率求对数后的值)

5、发射概率矩阵(EmitProbMatrix )

【观察值独立性假设】
P(Observed[i], Status[j]) = P(Status[j]) * P(Observed[i]|Status[j])
其中,P(Observed[i]|Status[j])这个值就是从EmitProbMatrix中获取。

四、代码实现

import pandas as pd
import codecs
from numpy import *
import numpy as np
import sys
import re
STATES = ['B''M''E''S']
array_A = {}    #状态转移概率矩阵
array_B = {}    #发射概率矩阵
array_E = {}    #测试集存在的字符,但在训练集中不存在,发射概率矩阵
array_Pi = {}   #初始状态分布
word_set = set()    #训练数据集中所有字的集合
count_dic = {}  #‘B,M,E,S’每个状态在训练集中出现的次数
line_num = 0    #训练集语句数量

#初始化所有概率矩阵
def Init_Array():
    for state0 in STATES:
        array_A[state0] = {}
        for state1 in STATES:
            array_A[state0][state1] = 0.0
    for state in STATES:
        array_Pi[state] = 0.0
        array_B[state] = {}
        array_E = {}
        count_dic[state] = 0

#对训练集获取状态标签
def get_tag(word):
    tag = []
    if len(word) == 1:
        tag = ['S']
    elif len(word) == 2:
        tag = ['B''E']
    else:
        num = len(word) - 2
        tag.append('B')
        tag.extend(['M'] * num)
        tag.append('E')
    return tag


#将参数估计的概率取对数,对概率0取无穷小-3.14e+100
def Prob_Array():
    for key in array_Pi:
        if array_Pi[key] == 0:
            array_Pi[key] = -3.14e+100
        else:
            array_Pi[key] = log(array_Pi[key] / line_num)
    for key0 in array_A:
        for key1 in array_A[key0]:
            if array_A[key0][key1] == 0.0:
                array_A[key0][key1] = -3.14e+100
            else:
                array_A[key0][key1] = log(array_A[key0][key1] / count_dic[key0])
    # print(array_A)
    for key in array_B:
        for word in array_B[key]:
            if array_B[key][word] == 0.0:
                array_B[key][word] = -3.14e+100
            else:
                array_B[key][word] = log(array_B[key][word] /count_dic[key])

#将字典转换成数组
def Dic_Array(array_b):
    tmp = np.empty((4,len(array_b['B'])))
    for i in range(4):
        for j in range(len(array_b['B'])):
            tmp[i][j] = array_b[STATES[i]][list(word_set)[j]]
    return tmp

#判断一个字最大发射概率的状态
def dist_tag():
    array_E['B']['begin'] = 0
    array_E['M']['begin'] = -3.14e+100
    array_E['E']['begin'] = -3.14e+100
    array_E['S']['begin'] = -3.14e+100
    array_E['B']['end'] = -3.14e+100
    array_E['M']['end'] = -3.14e+100
    array_E['E']['end'] = 0
    array_E['S']['end'] = -3.14e+100

def dist_word(word0,word1,word2,array_b):
    if dist_tag(word0,array_b) == 'S':
        array_E['B'][word1] = 0
        array_E['M'][word1] = -3.14e+100
        array_E['E'][word1] = -3.14e+100
        array_E['S'][word1] = -3.14e+100
    return

#Viterbi算法求测试集最优状态序列
def Viterbi(sentence,array_pi,array_a,array_b):
    tab = [{}]  #动态规划表
    path = {}

    if sentence[0not in array_b['B']:
        for state in STATES:
            if state == 'S':
                array_b[state][sentence[0]] = 0
            else:
                array_b[state][sentence[0]] = -3.14e+100

    for state in STATES:
        tab[0][state] = array_pi[state] + array_b[state][sentence[0]]
        # print(tab[0][state])
        #tab[t][state]表示时刻t到达state状态的所有路径中,概率最大路径的概率值
        path[state] = [state]
    for i in range(1,len(sentence)):
        tab.append({})
        new_path = {}
        # if sentence[i] not in array_b['B']:
        #     print(sentence[i-1],sentence[i])
        for state in STATES:
            if state == 'B':
                array_b[state]['begin'] = 0
            else:
                array_b[state]['begin'] = -3.14e+100
        for state in STATES:
            if state == 'E':
                array_b[state]['end'] = 0
            else:
                array_b[state]['end'] = -3.14e+100
        for state0 in STATES:
            items = []
            # if sentence[i] not in word_set:
            #     array_b[state0][sentence[i]] = -3.14e+100
            # if sentence[i] not in array_b[state0]:
            #     array_b[state0][sentence[i]] = -3.14e+100
            # print(sentence[i] + state0)
            # print(array_b[state0][sentence[i]])
            for state1 in STATES:
                # if tab[i-1][state1] == -3.14e+100:
                #     continue
                # else:
                if sentence[i] not in array_b[state0]:  #所有在测试集出现但没有在训练集中出现的字符
                    if sentence[i-1not in array_b[state0]:
                        prob = tab[i - 1][state1] + array_a[state1][state0] + array_b[state0]['end']
                    else:
                        prob = tab[i - 1][state1] + array_a[state1][state0] + array_b[state0]['begin']
                    # print(sentence[i])
                    # prob = tab[i-1][state1] + array_a[state1][state0] + array_b[state0]['other']
                else:
                    prob = tab[i-1][state1] + array_a[state1][state0] + array_b[state0][sentence[i]]    #计算每个字符对应STATES的概率
#                     print(prob)
                items.append((prob,state1))
            # print(sentence[i] + state0)
            # print(array_b[state0][sentence[i]])
            # print(sentence[i])
            # print(items)
            best = max(items)   #bset:(prob,state)
            # print(best)
            tab[i][state0] = best[0]
            # print(tab[i][state0])
            new_path[state0] = path[best[1]] + [state0]
        path = new_path

    prob, state = max([(tab[len(sentence) - 1][state], state) for state in STATES])
    return path[state]

#根据状态序列进行分词
def tag_seg(sentence,tag):
    word_list = []
    start = -1
    started = False

    if len(tag) != len(sentence):
        return None

    if len(tag) == 1:
        word_list.append(sentence[0])   #语句只有一个字,直接输出

    else:
        if tag[-1] == 'B' or tag[-1] == 'M':    #最后一个字状态不是'S'或'E'则修改
            if tag[-2] == 'B' or tag[-2] == 'M':
                tag[-1] = 'S'
            else:
                tag[-1] = 'E'


        for i in range(len(tag)):
            if tag[i] == 'S':
                if started:
                    started = False
                    word_list.append(sentence[start:i])
                word_list.append(sentence[i])
            elif tag[i] == 'B':
                if started:
                    word_list.append(sentence[start:i])
                start = i
                started = True
            elif tag[i] == 'E':
                started = False
                word = sentence[start:i + 1]
                word_list.append(word)
            elif tag[i] == 'M':
                continue

    return word_list



if __name__ == '__main__':
    trainset = open('CTBtrainingset.txt', encoding='utf-8')     #读取训练集
    testset = open('testset.txt', encoding='utf-8')       #读取测试集
    # trainlist = []

    Init_Array()

    for line in trainset:
        line = line.strip()
        # trainlist.append(line)
        line_num += 1

        word_list = []
        for k in range(len(line)):
            if line[k] == ' ':continue
            word_list.append(line[k])
        # print(word_list)
        word_set = word_set | set(word_list)    #训练集所有字的集合

        line = line.split(' ')
        # print(line)
        line_state = []     #这句话的状态序列

        for i in line:
            line_state.extend(get_tag(i))
        # print(line_state)
        array_Pi[line_state[0]] += 1  # array_Pi用于计算初始状态分布概率

        for j in range(len(line_state)-1):
            # count_dic[line_state[j]] += 1   #记录每一个状态的出现次数
            array_A[line_state[j]][line_state[j+1]] += 1  #array_A计算状态转移概率

        for p in range(len(line_state)):
            count_dic[line_state[p]] += 1  # 记录每一个状态的出现次数
            for state in STATES:
                if word_list[p] not in array_B[state]:
                    array_B[state][word_list[p]] = 0.0  #保证每个字都在STATES的字典中
            # if word_list[p] not in array_B[line_state[p]]:
            #     # print(word_list[p])
            #     array_B[line_state[p]][word_list[p]] = 0
            # else:
            array_B[line_state[p]][word_list[p]] += 1  # array_B用于计算发射概率

    Prob_Array()    #对概率取对数保证精度

    print('参数估计结果')
    print('初始状态分布')
    print(array_Pi)
    print('状态转移矩阵')
    print(array_A)
    print('发射矩阵')
    print(array_B)


    output = ''

    for line in testset:
        line = line.strip()
        tag = Viterbi(line, array_Pi, array_A, array_B)
        # print(tag)
        seg = tag_seg(line, tag)
        # print(seg)
        list = ''
        for i in range(len(seg)):
            list = list + seg[i] + '/'
        # print(list)
        output = output + list + '\n'
    print(output)
    outputfile = open('output.txt', mode='w', encoding='utf-8')
    outputfile.write(output)

五、输出结果

六、结果分析

待分词文本:
欢迎大家来到文本计算与认知智能实验室
回溯路径是:
EMBEBEBSEBEBSSEBEB
倒序是:
BE/BE/S/S/BE/BE/S/BE/BE/BME
中文分词结果:
欢迎/大家/来/到/文本/计算/与/认知/智能/实验室