聚类(五)密度聚类算法
密度聚类亦称“基于密度的聚类”(density-based clustering),此类算法假设聚类结构能通过样本分布的紧密程度确定。DBSCAN算法将簇定义为密度相连的样本的最大集合,能够将密度足够高的区域划分为簇,不需要给定簇数量,并可在有噪声的空间数据集中发现人任意形状的簇。
DBSCAN(Density-Based Spatial Clustering of Application With Noise)是一种著名的密度聚类算法,它基于一组“邻域”(neighborhood)参数
核心对象(core object):若
密度直达(directly density-reachable):若
密度可达(density-reachable):对
密度相连(density-connected):对
DBSCAN定义的基本概念(
DBSCAN“簇”定义:由密度可达关系导出的最大密度相连接样本集合,即给定邻域参数
连接性(connectivity):
最大性(maximality):
如何从数据集
于是,DBSCAN算法先任选数据集中的一个核心对象为“种子”(seed),再由此出发确定相应的聚类簇,算法描述如下。在第1-7行中,算法先根据给定的邻域参数
优点:不需要事先给定簇的数目
缺点:对于高维数据效果不好;不适用于数据集中样本密度差异很小的情况;调参复杂,给定
import numpy as np
import pandas as pd
from queue import Queue # 队列,先进先出
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs
class DensityClustering_DBSCAN:
def __init__(self, X, epsilon = 0.5, min_pts = 3, dist_method = 'euclidean'):
"""
epsilon: 邻域
minpts: 邻域内至少包含的样本量
"""
self.X = X
self.m = X.shape[0]
self.epsilon = epsilon
self.min_pts = min_pts
self.dist_method = dist_method
self.distance_fun = self.distance_function()
self.label_ = None # 样本所述簇所属类别列表
def distance_function(self):
if self.dist_method == 'euclidean':
return lambda x, y: np.sqrt(((x - y) ** 2).sum())
elif self.dist_method == "":
return None
def fit_dbscan(self):
"""密度聚类核心算法"""
self.label_ = - 1 * np.ones(self.m) # 初始化样本全部为噪声
dist_mat = np.zeros((self.m, self.m)) # 距离矩阵
#计算样本之间的距离,对称矩阵
for i in range(self.m - 1):
for j in range(i, self.m):
dist_mat[i, j] = self.distance_fun(self.X[i], self.X[j])
dist_mat[j, i] = dist_mat[i, j]
# 初始化核心对象集合
core_objects = set() # 空集合
for i in range(self.m):
# 对每一个样本,考察其邻域内样本量是否大于等于minpts
if np.sum(dist_mat[i] <= self.epsilon) >= self.min_pts:
core_objects.add(i)
k = 0 # 初始化簇数,即簇标记
unvisited_set = set(range(self.m)) # 为访问的样本集合
while len(core_objects) > 0:
unvisited_set_old = unvisited_set.copy() # 记录当前未访问的样本集合
# 随机选取一个核心对象
obj_idx = np.random.choice(list(core_objects))
queue_obj = Queue() # 初始化一个队列
queue_obj.put(obj_idx) # 核心对象加入队列
# 未访问集合中删除核心对象
unvisited_set = unvisited_set - {obj_idx}
while not queue_obj.empty():
q = queue_obj.get() # 取队首元素
if q in core_objects: # 判断是否为核心对象
# 获取其邻域内未被访问的样本集合
delta = set(np.argwhere(dist_mat[q] <= self.epsilon).reshape(-1).tolist()) & unvisited_set
for d in delta:
queue_obj.put(d) # 其邻域内未被访问的样本入列
unvisited_set = unvisited_set - delta #从未访问过的集合中减去q邻域对象
# 获取簇聚类idx
cluster_k = unvisited_set_old - unvisited_set # 类型为集合
self.label_[list(cluster_k)] = k # q以及邻域样本标记为同一个簇
k += 1 # 用以标记下一个簇,即下一个样本所属簇编号
core_objects = core_objects - cluster_k # 去掉已考察的核心对象
def predict(self):
self.fit_dbscan()
return self.label_
X = pd.read_csv(r'../datasets/watermelon4.0.csv').values
dbs = DensityClustering_DBSCAN(X, epsilon = 0.11, min_pts = 5)
dbs.fit_dbscan()
labels = dbs.predict()
plt.figure(figsize = (7, 5))
cluster = X[labels == -1]
plt.plot(cluster[:, 0], cluster[:, 1], "ko", label = 'noise')
markers = "sp<>*"
for i in range(len(np.unique(labels)) - 1):
cluster = X[labels == i]
plt.plot(cluster[:, 0], cluster[:, 1], markers[i], label = 'cluster' + str(i + 1))
plt.xlabel("Feature_1", fontsize = 12)
plt.ylabel("Feature_2", fontsize = 12)
plt.title("Density Based Clustering of DBSCAN Algorithm", fontsize = 14)
plt.legend()
plt.grid(ls = ":")
plt.show()
centers = np.array([[0.2, 2.3], [-1.5, 2.3], [-2.8, 1.8], [-2.8, 2.8], [-2.8, 1.3]])
std = np.array([0.4, 0.3, 0.1, 0.1, 0.1])
X, y = make_blobs(n_samples=2000, n_features=2, centers=centers, cluster_std=std, random_state=0)
dbs = DensityClustering_DBSCAN(X, epsilon = 0.11, min_pts = 5)
dbs.fit_dbscan()
labels = dbs.predict()
plt.figure(figsize = (12, 8))
cluster = X[labels == -1]
plt.plot(cluster[:, 0], cluster[:, 1], "ko", label = 'noise')
markers = "sp<>*ov^h1234"
for i in range(len(np.unique(labels)) - 1):
cluster = X[labels == i]
plt.plot(cluster[:, 0], cluster[:, 1], markers[i], label = 'cluster' + str(i + 1))
plt.xlabel("Feature_1", fontsize = 12)
plt.ylabel("Feature_2", fontsize = 12)
plt.title("Density Based Clustering of DBSCAN Algorithm", fontsize = 14)
plt.legend()
plt.grid(ls = ":")
plt.show()
from sklearn.preprocessing import StandardScaler
import seaborn as sns
X = pd.read_csv(r'../datasets/consumption_data.csv')
X = StandardScaler().fit_transform(X)
dbs = DensityClustering_DBSCAN(X, epsilon = 0.85, min_pts = 5)
dbs.fit_dbscan()
labels = dbs.predict()
title = ["R index", "F index", "M index"]
plt.figure(figsize = (7, 10))
cluster_k = np.unique(labels)
for f in range(X.shape[1]): # 表示特征
plt.subplot(311 + f)
for c in cluster_k: # 表示簇索引
sns.kdeplot(X[labels == c][:, f])
plt.title(title[f])
plt.show()
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
import seaborn as sns
X = pd.read_csv(r'../datasets/consumption_data.csv')
X = StandardScaler().fit_transform(X)
dbs = DBSCAN(eps = 0.85, min_samples=5).fit(X)
labels = dbs.labels_
title = ["RS index", "FS index", "MS index"]
plt.figure(figsize = (7, 10))
cluster_k = np.unique(labels)
for f in range(X.shape[1]): # 表示特征
plt.subplot(311 + f)
for c in cluster_k: # 表示簇索引
sns.kdeplot(X[labels == c][:, f])
plt.title(title[f])
plt.grid()
plt.show()