vlambda博客
学习文章列表

聚类(五)密度聚类算法

      密度聚类亦称“基于密度的聚类”(density-based clustering),此类算法假设聚类结构能通过样本分布的紧密程度确定。DBSCAN算法将簇定义为密度相连的样本的最大集合,能够将密度足够高的区域划分为簇,不需要给定簇数量,并可在有噪声的空间数据集中发现人任意形状的簇。

       DBSCAN(Density-Based Spatial Clustering of Application With Noise)是一种著名的密度聚类算法,它基于一组“邻域”(neighborhood)参数   来刻画样本分布的紧密程度 。给定数据集   ,定义一下概念:

  •    -邻域:对   ,其   -邻域包含样本集   中与   的距离不大于   的样本,即

       
  • 核心对象(core object):若   的   -邻域至少包含   个样本,即   ,则   是一个核心对象;

  • 密度直达(directly density-reachable):若   位于   的   -邻域中,且   核心对象,则称   由   密度直达;

  • 密度可达(density-reachable):对   和   ,如存在样本序列   ,其中   ,且   由   密度可达,则称   由   密度可达;

  • 密度相连(density-connected):对   与   ,若存在   使得   与   均由   密度可达,则称   与   密度相连。

      DBSCAN定义的基本概念(   =3):虚线显示出   -邻域,   是核心对象,   由   密度直达,   由   密度可达,   由   密度相连。

聚类(五)密度聚类算法

     DBSCAN“簇”定义:由密度可达关系导出的最大密度相连接样本集合,即给定邻域参数   ,簇   是满足以下性质的非空样本子集:

  • 连接性(connectivity):      与   密度相连;

  • 最大性(maximality):   ,      密度可达   。

      如何从数据集   中找出满足以上性质的聚类簇?实际上,若   为核心对象,由   密度可达的所有样本组成的集合记为   ,则不难证明   即为满足连接性与最大性的簇

      于是,DBSCAN算法先任选数据集中的一个核心对象为“种子”(seed),再由此出发确定相应的聚类簇,算法描述如下。在第1-7行中,算法先根据给定的邻域参数   找出所有核心对象;然后在第10-24行中,以任一核心对象为出发点,找出其密度可达的样本生成聚类簇,直到所有核心对象均被访问过为止。

聚类(五)密度聚类算法

聚类(五)密度聚类算法

聚类(五)密度聚类算法

聚类(五)密度聚类算法

  • 优点:不需要事先给定簇的数目   ;适用于稠密的非凸数据集,可以发现任意形状的簇;可以在聚类时发现噪声点、对数据集中的异常点不敏感;对样本输入顺序不敏感。

  • 缺点:对于高维数据效果不好;不适用于数据集中样本密度差异很小的情况;调参复杂,给定   选择过大的   会导致核心对象数量减少,使得一些包含对象较少的自然簇被丢弃;选择过小的   会导致大量对象被标记为核心对象,从而将噪声归入簇;给定   选择过小的   会导致大量的对象被误标为噪声,一个自然簇被误拆为多个簇;选择过大的   则可能有跟多噪声被归入簇,而本应分离的若干自然簇也被合并为一个簇;数据量很大时算法收敛的时间较长,可对搜索最近邻时建立的KD-Tree或Ball-Tree进行规模限制。

import numpy as npimport pandas as pdfrom queue import Queue # 队列,先进先出import matplotlib.pyplot as pltfrom sklearn.datasets import make_blobs
class DensityClustering_DBSCAN: def __init__(self, X, epsilon = 0.5, min_pts = 3, dist_method = 'euclidean'): """ epsilon: 邻域 minpts: 邻域内至少包含的样本量 """ self.X = X self.m = X.shape[0] self.epsilon = epsilon self.min_pts = min_pts self.dist_method = dist_method self.distance_fun = self.distance_function() self.label_ = None # 样本所述簇所属类别列表 def distance_function(self): if self.dist_method == 'euclidean': return lambda x, y: np.sqrt(((x - y) ** 2).sum()) elif self.dist_method == "": return None def fit_dbscan(self): """密度聚类核心算法""" self.label_ = - 1 * np.ones(self.m) # 初始化样本全部为噪声 dist_mat = np.zeros((self.m, self.m)) # 距离矩阵 #计算样本之间的距离,对称矩阵 for i in range(self.m - 1): for j in range(i, self.m): dist_mat[i, j] = self.distance_fun(self.X[i], self.X[j]) dist_mat[j, i] = dist_mat[i, j] # 初始化核心对象集合 core_objects = set() # 空集合 for i in range(self.m): # 对每一个样本,考察其邻域内样本量是否大于等于minpts if np.sum(dist_mat[i] <= self.epsilon) >= self.min_pts: core_objects.add(i) k = 0 # 初始化簇数,即簇标记 unvisited_set = set(range(self.m)) # 为访问的样本集合 while len(core_objects) > 0: unvisited_set_old = unvisited_set.copy() # 记录当前未访问的样本集合 # 随机选取一个核心对象 obj_idx = np.random.choice(list(core_objects)) queue_obj = Queue() # 初始化一个队列 queue_obj.put(obj_idx) # 核心对象加入队列 # 未访问集合中删除核心对象 unvisited_set = unvisited_set - {obj_idx} while not queue_obj.empty(): q = queue_obj.get() # 取队首元素 if q in core_objects: # 判断是否为核心对象 # 获取其邻域内未被访问的样本集合 delta = set(np.argwhere(dist_mat[q] <= self.epsilon).reshape(-1).tolist()) & unvisited_set for d in delta: queue_obj.put(d) # 其邻域内未被访问的样本入列 unvisited_set = unvisited_set - delta #从未访问过的集合中减去q邻域对象 # 获取簇聚类idx cluster_k = unvisited_set_old - unvisited_set # 类型为集合 self.label_[list(cluster_k)] = k # q以及邻域样本标记为同一个簇 k += 1 # 用以标记下一个簇,即下一个样本所属簇编号 core_objects = core_objects - cluster_k # 去掉已考察的核心对象 def predict(self): self.fit_dbscan() return self.label_
X = pd.read_csv(r'../datasets/watermelon4.0.csv').values
dbs = DensityClustering_DBSCAN(X, epsilon = 0.11, min_pts = 5)dbs.fit_dbscan()labels = dbs.predict()plt.figure(figsize = (7, 5))cluster = X[labels == -1]plt.plot(cluster[:, 0], cluster[:, 1], "ko", label = 'noise')
markers = "sp<>*"for i in range(len(np.unique(labels)) - 1): cluster = X[labels == i] plt.plot(cluster[:, 0], cluster[:, 1], markers[i], label = 'cluster' + str(i + 1))plt.xlabel("Feature_1", fontsize = 12)plt.ylabel("Feature_2", fontsize = 12)plt.title("Density Based Clustering of DBSCAN Algorithm", fontsize = 14)plt.legend()plt.grid(ls = ":")plt.show()

聚类(五)密度聚类算法

centers = np.array([[0.2, 2.3], [-1.5, 2.3], [-2.8, 1.8], [-2.8, 2.8], [-2.8, 1.3]])std = np.array([0.4, 0.3, 0.1, 0.1, 0.1])X, y = make_blobs(n_samples=2000, n_features=2, centers=centers, cluster_std=std, random_state=0)
dbs = DensityClustering_DBSCAN(X, epsilon = 0.11, min_pts = 5)dbs.fit_dbscan()labels = dbs.predict()plt.figure(figsize = (12, 8))cluster = X[labels == -1]plt.plot(cluster[:, 0], cluster[:, 1], "ko", label = 'noise')
markers = "sp<>*ov^h1234"for i in range(len(np.unique(labels)) - 1): cluster = X[labels == i] plt.plot(cluster[:, 0], cluster[:, 1], markers[i], label = 'cluster' + str(i + 1))plt.xlabel("Feature_1", fontsize = 12)plt.ylabel("Feature_2", fontsize = 12)plt.title("Density Based Clustering of DBSCAN Algorithm", fontsize = 14)plt.legend()plt.grid(ls = ":")plt.show()

聚类(五)密度聚类算法

from sklearn.preprocessing import StandardScalerimport seaborn as sns
X = pd.read_csv(r'../datasets/consumption_data.csv')X = StandardScaler().fit_transform(X)
dbs = DensityClustering_DBSCAN(X, epsilon = 0.85, min_pts = 5)dbs.fit_dbscan()labels = dbs.predict()
title = ["R index", "F index", "M index"]plt.figure(figsize = (7, 10))cluster_k = np.unique(labels)for f in range(X.shape[1]): # 表示特征 plt.subplot(311 + f) for c in cluster_k: # 表示簇索引 sns.kdeplot(X[labels == c][:, f]) plt.title(title[f])plt.show()

from sklearn.cluster import DBSCANfrom sklearn.preprocessing import StandardScalerimport seaborn as sns
X = pd.read_csv(r'../datasets/consumption_data.csv')X = StandardScaler().fit_transform(X)
dbs = DBSCAN(eps = 0.85, min_samples=5).fit(X)labels = dbs.labels_
title = ["RS index", "FS index", "MS index"]plt.figure(figsize = (7, 10))cluster_k = np.unique(labels)for f in range(X.shape[1]): # 表示特征 plt.subplot(311 + f) for c in cluster_k: # 表示簇索引 sns.kdeplot(X[labels == c][:, f]) plt.title(title[f]) plt.grid()plt.show()