贝叶斯分类器(一)朴素和半朴素贝叶斯分类器
本章开始介绍一个新的算法思想——贝叶斯分类器。
贝叶斯决策论(Bayesian Decision Theory):概率框架下实施决策的基本方法。对分类任务,在所有相关概率都已知的理想情形下,贝叶斯决策论考虑如何基于这些概率和误判损失来选择最优的类别标记。
考虑多分类问题,假设有
条件风险(Conditional Risk):基于后验概率
贝叶斯判定准则(Bayes decision rule):为最小化总体风险,只需在每个样本上选择那个能使条件风险
若目标是最小化分类错误率,则误判损失
此时条件风险
即对每个样本
估计后验概率
其中,
类先验概率
对类条件概率
估计类条件概率
概率模型训练得过程就是参数估计过程。对于参数估计,统计学界的频率主义学派(Frequentist)和贝叶斯主义学派(Bayesian)给出了完全不同的解决方案。频率派认为参数虽然未知,但是客观存在固定值,可通过优化似然函数得到;贝叶斯学派认为参数是未观察到的随机变量,其本身也有分布,假设参数服从一个先验分布,然后基于观测到的数据来计算后验分布来得到。
极大似然估计(Maximum Likelihood Estimation, MLE):根据数据采样来估计概率分布参数的经典方法。极大似然法的核心思想就是:估计出的参数使得已知样本出现的概率最大,即使得训练数据的似然最大。
令
对数似然(log-likelihood)为:
参数
如在连续属性的情形下,假设概率密度函数
需要注意的是,这种参数化的方法虽能使类条件概率估计变得相对简单,但估计结果的准确性严重依赖于所假设的概率分布形式是否符合潜在的真实数据分布。在现实任务中,欲做出能较好地接近潜在真实分布的假设,往往需在一定程度上利用关于应用任务本身的经验知识,否则若仅凭“猜测”来假设概率分布形式,很可能产生误导性的结果。
朴素贝叶斯分类器(naive Bayes classifier)采用了“属性条件独立性假设”:对已知类别,假设所有属性相互独立。即假设每个属性独立地对分类结果发生影响。
基于属性条件独立性假设:
其中
由于对所有类别来说
显然,朴素贝叶斯分类器的训练过程就是基于训练集
令
离散属性,令
连续属性,可考虑概率密度函数,假定
注意:若某个属性值在训练集中没有与某个类同时出现过,则直接进行概率估计和判别会出现问题。如:
无论该样本的其他属性是什么,哪怕在其他属性上明显是好瓜,分类的结果都将是“好瓜=否”,这显然不太合理。
为避免其他属性携带的信息被训练集中未出现的属性值抹去,在估计概率值时通常进行“平滑”,常用“拉普拉斯修正”。令
显然,拉普拉斯修正避免了因训练集样本不充分而导致概率估值为0的问题,并且在训练集变大时,修正过程所引入的先验的影响也会逐渐变得可忽略,使得估值渐趋近于实际概率值。
在现实任务中朴素贝叶斯分类器有很多使用方法。如,若任务对预测速度要求较高,则对给定训练集,可将朴素贝叶斯分类器涉及的所有概率估值事先计算好存储起来,这样在进行预测时,只需“查表”即可进行判别;若任务数据更替频繁,则可采用“懒惰学习”(lazy learning)方式,先不进行任何训练,待收到预测请求时再根据当前数据集进行概率估值;若数据不断增加,则可在现有估值基础上,仅对新增样本的属性所涉及的概率估值进行计数修正即可实现增量学习。
import time
import pandas as pd
import numpy as np
import collections as cc
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from scipy.stats import norm
class NaiveBayesClassifier:
def __init__(self, X_train, y_train, X_test, y_test):
self.X_train, self.y_train = X_train, y_train
self.X_test, self.y_test = X_test, y_test
self.class_labels = np.sort(np.unique(self.y_train)) # 类别
self.class_len = len(self.class_labels) # 类别数
self.n_samples, self.n_features = self.X_train.shape
self.prior_prob = np.zeros(self.class_len) # 存储每个值的先验概率
self.class_num_value = np.zeros(self.class_len)
self.feature_names = self.X_train.columns #特征属性名称
self.classifier_train_data = dict() # 每个类别下,每个属性中各取值的频次
def prior_probability(self):
"""
计算先验概率
"""
print("Start calculating prior probability.......")
start_prior = time.time()
self.class_num_value = cc.Counter(self.y_train) # 每个类别的样本量
for i, key in enumerate(self.class_num_value.keys()):
self.prior_prob[i] = (self.class_num_value[key] + 1) / (self.n_samples + self.class_len)
end_prior = time.time()
print("End calculating and take time: %.6f" % (end_prior - start_prior))
print("每个类别的先验概率为:")
for c, label in enumerate(self.class_labels):
print(label, ":", self.prior_prob[c])
def feature_probability_statistic(self):
"""
统计训练集中的离散数据特征取值出现的频次,连续特征数据的高斯概率值
"""
print("Start feature frequency statistic......")
start = time.time()
for label in self.class_labels:
train_class = self.X_train.loc[self.y_train == label]
feature_counter = dict() #每个特征中特定取值的频次
for name in self.feature_names:
feature_data = train_class.loc[:, name] # 某类别下某特征的取值
if feature_data.dtype == 'object':
# 离散特征属性
feature_counter[name] = cc.Counter(feature_data)
else:
# 连续特征属性
mu, sigma = norm.fit(feature_data) # 极大似然估计指定类别下某特征属性的均值和方差
feature_counter[name] = {"mu":mu, "sigma":sigma}
self.classifier_train_data[label] = feature_counter
# print(self.classifier_train_data)
end = time.time()
print("End feature frequency statistic and take time: %.6f" % (end - start))
def predict(self):
"""
计算条件概率,新输入实例进行分类
"""
print("Start predict test samples......")
start_test = time.time()
y_test_pred = []
for i in range(self.X_test.shape[0]):
# 每一个测试样本分别计算其概率,然后标记类别
test_sample = self.X_test.iloc[i, :] # 单个样本
y_hat = [] # 当前样本在各个类别下的概率值
for k, label in enumerate(self.class_labels):
prob_ln = np.log(self.prior_prob[k])
label_frequency = self.classifier_train_data[label]
for j in range(self.n_features):
feat_name = self.feature_names[j] # 当前特征名称
test_value = test_sample[j] # 当前测试样本第j个取值
if type(test_value) == str:
# 离散值
n_values = len(set(self.X_train.iloc[:, j]))
prob_ln += np.log((label_frequency[feat_name][test_value] + 1) / (self.class_num_value[label] + n_values))
else:
# 连续值
mu, sigma = label_frequency[feat_name].values()
prob_ln += np.log(norm.pdf(test_value, mu, sigma) + 1e-7)
y_hat.append(prob_ln)
y_test_pred.append(self.class_labels[np.argmax(y_hat)])
end_test = time.time()
print("end and predict test samples and take time: ", end_test - start_test)
print("-" * 60)
return y_test_pred
train_data = pd.read_csv(r'../datasets/watermelon.csv')
X_train, y_train = train_data.iloc[:, 1:-1], train_data.iloc[:, -1]
test_data = pd.read_csv(r'../datasets/watermelon.csv')
X_test, y_test = test_data.iloc[:, 1:-1], test_data.iloc[:, -1]
nbc = NaiveBayesClassifier(X_train, y_train, X_test, y_test)
nbc.prior_probability()
nbc.feature_probability_statistic()
y_test_pred = nbc.predict()
print(classification_report(y_test, y_test_pred))
test_data = pd.read_csv(r'../datasets/iris.csv')
X, y = test_data.iloc[:, :-1], test_data.iloc[:, -1]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state=0, stratify = y)
nbc = NaiveBayesClassifier(X_train, y_train, X_test, y_test)
nbc.prior_probability()
nbc.feature_probability_statistic()
y_test_pred = nbc.predict()
print(classification_report(y_test, y_test_pred))
test_data = pd.read_csv(r'../datasets/agaricus-lepiota.data')
X, y = test_data.iloc[:, 1:], test_data.iloc[:, 0]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state=0, stratify = y)
nbc = NaiveBayesClassifier(X_train, y_train, X_test, y_test)
nbc.prior_probability()
nbc.feature_probability_statistic()
y_test_pred = nbc.predict()
print(classification_report(y_test, y_test_pred))
sklearn.naive_bayes也封装了现成的贝叶斯分类器API,主要有下面几种:
BernoulliNB: 用于多元伯努利分布的朴素贝叶斯分类器。像MultinomialNB一样,这个分类器也适用于离散数据。区别在于,MultinomialNB可处理多分类,但BernoulliNB是为二分类或布尔型函数而设计的;
CategoricalNB: 分类朴素贝叶斯分类器适用于具有分类分步的离散特征的分类。每个特征的类别都是从分类分布中提取出来的;
ComplementNB: 补充朴素贝叶斯分类器旨在纠正标准多项朴素贝叶斯分类器所做的“严重假设”。它特别适用于不平衡的数据集;
GaussianNB: 高斯朴素贝叶斯
MultinomialNB: 用于多项式模型的朴素贝叶斯分类器。多项式朴素贝叶斯分类器适用于具有离散特征的分类(例如,用于文本分类的字数统计)。多项式分布通常需要整数特征计数。但是实际上,小数计数(例如tf-idf)也可能起作用。
from sklearn.naive_bayes import CategoricalNB, GaussianNB
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, plot_precision_recall_curve, average_precision_score,roc_curve, auc
from sklearn.preprocessing import OrdinalEncoder, LabelEncoder, label_binarize
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
def sklearn_cnb_api(X_train, y_train, X_test, y_test):
cnb = CategoricalNB()
cnb.fit(X_train, y_train)
y_hat = cnb.predict(X_test)
print(classification_report(y_test, y_hat))
if len(np.unique(y_train)) == 2:
plt_2_classified(cnb, y_test, y_hat)
else:
multi_classifier(y_test, y_hat)
def sklearn_gnb_api(X_train, y_train, X_test, y_test):
gnb = GaussianNB()
gnb.fit(X_train, y_train)
y_hat = gnb.predict(X_test)
print(classification_report(y_test, y_hat))
if len(np.unique(y_train)) == 2:
plt_2_classified(cnb, y_test, y_hat)
else:
multi_classifier(y_test, y_hat)
def plt_2_classified(classifier, y_test, y_hat):
plot_precision_recall_curve(classifier, X_test, y_test)
aps = average_precision_score(y_test, y_hat)
plt.title('2-class Precision recall curve and AP: {:.2f}'.format(aps))
plt.show()
def multi_classifier(y_test, y_hat):
y_hat_encode = label_binarize(y_hat, classes=np.unique(y_train))
y_test_encode = label_binarize(y_test, classes=np.unique(y_train))
fpr, tpr, _ = roc_curve(y_test_encode.ravel(), y_hat_encode.ravel())
plt.plot(fpr, tpr, 'r-', label = '%.2f' % auc(fpr, tpr))
plt.plot([0, 1], [0, 1])
plt.legend(loc = 'lower right', fontsize = 12)
plt.xlabel("False Precision Rate", fontsize = 12)
plt.ylabel("True Precision Rate", fontsize = 12)
plt.grid()
plt.title("ROC curve under multi classification model")
plt.show()
test_data = pd.read_csv(r'../datasets/agaricus-lepiota.data')
X, y = test_data.iloc[:, 1:], test_data.iloc[:, 0]
X = OrdinalEncoder().fit_transform(X)
y = LabelEncoder().fit_transform(y)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state=0, stratify = y)
sklearn_cnb_api(X_train, y_train, X_test, y_test)
test_data = pd.read_csv(r'../datasets/nursery.csv')
X, y = test_data.iloc[:, :-1], test_data.iloc[:, -1]
X = OrdinalEncoder().fit_transform(X)
y = LabelEncoder().fit_transform(y)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state=0)
sklearn_cnb_api(X_train, y_train, X_test, y_test)
test_data = pd.read_csv(r'../datasets/iris.csv')
X, y = test_data.iloc[:, :-1], test_data.iloc[:, -1]
X = OrdinalEncoder().fit_transform(X)
y = LabelEncoder().fit_transform(y)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state=0, stratify = y)
sklearn_gnb_api(X_train, y_train, X_test, y_test)
半朴素贝叶斯分类器(semi-naive Bayes classifiers)的基本思想是适当考虑一部分属性间的相互依赖信息,从而既不需进行完全联合概率计算,又不至于彻底忽略了比较强的属性依赖关系。“独依赖估计”(One-Dependent Estimator, ODE)是半朴素贝叶斯分类器最常用的策略。顾名思义,所谓“独依赖”就是假设每个属性在类别之外最多依赖于一个其他属性,即:
其中
最直接的做法是假设所有属性都依赖于同一个属性,称为“超父”,然后通过交叉验证等模型选择方法来确定超父属性,由此形成了
条件互信息
TAN(Tree Augmented naive Bayes)则是在最大带权生成树算法的基础上,通过以下步骤将属性间的依赖关系简约为图(c)所示的树形结构:
(1)计算任意两个属性之间的条件互信息(conditional mutual information):
若
若
(2)以属性为结点构建完全图,任意两个结点之间的权重设为
(3)构建此完全图的最大带权生成树,挑选根变量,将边置为有向;
(4)加入类别结点
以西瓜数据集为例:求解任意两个特征间的条件互信息,仅关注离散值特征
import numpy as np
import pandas as pd
from sklearn.metrics import mutual_info_score
def conditional_mutual_information(data_train):
"""
任意两个属性间在已知类别条件下的互信息
"""
cmi_labels = dict() # 存储对应类别下的各属性互信息
n_features = data_train.shape[1]
label_name = np.sort(np.unique(data_train.iloc[:, -1]))
for label in label_name:
cmi_labels[label] = np.zeros((n_features - 1, n_features - 1))
data_label = data_train[data_train.iloc[:, -1] == label]
for i in range(n_features - 1):
for j in range(i + 1, n_features - 1):
mis = mutual_info_score(data_label.iloc[:, i], data_label.iloc[:, j])
cmi_labels[label][i, j] = mis
cmi_labels[label][j, i] = mis
# 各类别下的各属性之间的互信息求和
cmi = np.zeros((n_features - 1, n_features - 1))
for label in label_name:
cmi += cmi_labels[label]
return cmi
data = pd.read_csv(r'../datasets/watermelon.csv').iloc[:, 1:]
train_data = data.iloc[:, :-3]
train_data['label'] = data.iloc[:, -1]
cmi = conditional_mutual_information(train_data)
print(cmi)
避免图过于复杂,权重过小的边未在图中绘制,故可能得到非完全图。事实上在最大带权生成树肯定不包含过小权重的边。从下图看出,边较为粗的权重较大。
import matplotlib.pyplot as plt
import networkx as nx
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
feat_names = train_data.columns
n_features = train_data.shape[1]
plt.figure(figsize = (6, 6))
G = nx.Graph() # 构建一个简单的无向图
for i in range(n_features - 1):
for j in range(i + 1, n_features - 1):
# 添加带权边(顶点名称、顶点名称、权重)
if cmi[i, j] > 0.1: # 不绘制权重太小的边
G.add_edge(feat_names[i], feat_names[j], weight = np.round(cmi[i, j], 2))
pos = nx.spring_layout(G) # 节点位置,随机
# 画出节点位置
nx.draw_networkx_nodes(G, pos, node_size = 800, node_color = 'red')
# 根据权重设置边的粗细
width = [float(d['weight'] * 2) for (u, v, d) in G.edges(data = True)]
nx.draw_networkx_edges(G, pos, width = width, edge_color = 'k') # 绘制边
labels = nx.get_edge_attributes(G, 'weight') # 获得权重labels
# 显示边的权重
nx.draw_networkx_edge_labels(G, pos, edge_labels = labels, font_color = 'b')
# labels标签定义
nx.draw_networkx_labels(G, pos, font_color = 'k', font_weight = 'bold')
plt.axis('off')
plt.show()
def select_node(path, path_mst, node_select, node_name):
# 根据所给的结点,查询依赖结点
for p in path:
if (p[0] != node_name) and (p[1] != node_name):
if node_select.count(p[0]) != 0:
path_mst.append((p[0], p[1], p[2]['weight']))
node_select.append(p[1])
elif node_select.count(p[1]) != 0:
path_mst.append((p[1], p[0], p[2]['weight']))
node_select.append(p[0])
return path_mst, node_select
path = nx.maximum_spanning_tree(G) # 最大带权生成树
# 返回包含结点信息的边和权重
path = sorted(path.edges(data = True))
path = sorted(path, key = lambda x: x[2]['weight'], reverse = True)
path_mst = [] # 存储边
node_select = ['纹理'] # 根节点
for p in path:
if p[1] == node_select[0]:
path_mst.append((p[1], p[0], p[2]['weight']))
node_select.append(p[0])
elif p[0] == node_select[0]:
path_mst.append((p[0], p[1], p[2]['weight']))
node_select.append(p[1])
selected = 0
while len(node_select) < len(feat_names) - 1:
path_mst, node_select = select_node(path, path_mst, node_select, node_select[selected])
selected += 1
print(path_mst)
plt.figure(figsize = (6, 5))
G_mst = nx.DiGraph() # 简单有向图
G_mst.add_weighted_edges_from(path_mst) # 绘制有向边
pos = nx.spring_layout(G_mst)
nx.draw_networkx_nodes(G_mst, pos, node_size = 700, node_color = 'r')
nx.draw_networkx_edges(G_mst, pos, width = 1, edge_color = 'k', arrowsize = 18, connectionstyle = 'arc3,rad=0.4')
nx.draw_networkx_labels(G_mst, pos, font_color = 'w')
plt.axis('off')
plt.show()
采用拉普拉斯修正后的先验概率
基于类
属性的依赖关系定义如下:
大小的属性依赖为:形状,且属性取值为大时依赖形状为圆形;
颜色不存在依赖属性;
形状的依赖属性为大小,且属性取值为圆形时依赖大小为大。
AODE(Averaged One-Dependent Estimator)是一种基于集成学习机制、更为强大的独依赖分类器。与SPODE通过模型选择确定超父属性不同,AODE尝试将每个属性作为超父来构建SPODE,然后将那些具有足够训练数据支撑的SPODE集成起来作为最终结果,即:
其中
拉普拉斯修正为:
其中
与朴素贝叶斯分类器类似,AODE的训练过程也是“计数”,即在训练数据集上对符合条件的样本进行计数的过程。与朴素贝叶斯分类器相似,AODE无需模型选择,既能通过预计算节省预测时间,也能采用懒惰学习方式在预测时再进行计数,并且易于实现增量学习。