分库分表方案深入讲解,学不会你捶我
一、背景介绍
移动互联网时代,随着软件用户量的不断增长,由此产生的数据量也在飞速增长,比如,用户表、订单表、聊天消息表等。据统计,MySQL单表
可以存储10亿级
数据,只是这时候性能比较差,业界公认MySQL
单表容量在1KW量级
是最佳状态,因为这时它的BTREE
索引树高在3~5之间
。
既然一张表无法搞定,那么就想办法将数据存放到多个地方,目前比较普遍的方案有3个:
分区
要求数据不是海量(分区数有限,存储能力就有限)
业务并发能力要求不高
分库分表
互联网行业处理海量数据的通用方法
发展几十年的
RDBMS
(关系型数据库)具有生态完善、绝对稳定、事务特性的优点,只要有软件的地方,它都是核心存储的首选NoSQL/NewSQL
NoSQL/NewSQL
宣传的无论多厉害,就现在各大公司对它的定位,都是RDBMS
的补充,而不是取而代之
本文就分库分表
的一些核心流程展开介绍:
二、是单库分表,还是分库分表
单库分表适用场景:
单表数据量太大,查询时需要扫描的行数太多,
SQL
执行效率低下CPU
出现瓶颈分库分表适用场景:
磁盘读
IO
达到了瓶颈:热点数据太多,数据库缓存放不下,每次查询时会产生大量的IO
,导致查询速度低下请求的数据太多、网络带宽不够
数据库连接数超过了最大限定值
分库分表
的复杂度要高于单库分表
,如果数据量不是特别大,且QPS
也不是特别高,首选单库分表
,待某些指标有接近阈值的迹象时,再考虑分库分表。
分库分表
相对单库分表
来说,复杂的地方有:
需要约定多个分片数据源
需要定义多个分片数据源的事务管理器
数据一致性的处理方案(强一致 or 最终一致)
数据迁移、后续扩容
...
三、分布式数据库中间件选型
本文主要针对时下比较流行的两款数据库中间件产品做下介绍:
主要指标 | Sharding-JDBC | MyCat |
---|---|---|
所属 | Apache | 基于阿里 Cobar 二次开发,社区维护 |
活跃度 | 高 | 高 |
ORM支持 | 任意 | 任意 |
基于客户端还是服务端 | 客户端 | 服务端 |
分库 | 支持 | 支持 |
分表 | 支持 | 不支持单库分表 |
事务 | 自带弱XA、最大努力送达型柔性事务 | 自带弱XA |
监控 | 无,可通过其它方式支持 | 自带 |
读写分离 | 支持 | 支持 |
限制 | 部分 JDBC 方法不支持、SQL语句限制 | 部分 JDBC 方法不支持、SQL语句限制 |
数据库连接池 | 任意 | 任意 |
MySQL交互协议 | JDBC Driver | 前后端均用 NIO |
开发 | 开发成本高,代码入侵大 | 开发成本小,代码入侵小 |
运维 | 维护成本低 | 维护成本高 |
配置难度 | 一般 | 复杂 |
Sharding-JDBC
架构图:
简单介绍:
Sharding-JDBC
是一款轻量级的框架,以工程依赖JAR
的形式提供功能,无需额外部署和依赖,可以理解为增强版的JDBC
驱动对于运维同事来说,只需要协助一些简单的配置及后续的扩容工作,无需关注底层代码与分片策略规则,相对
MyCat
,这是Sharding-JDBC
的优势,减少了部署成本以及运维同事的学习成本
MyCat
架构图
简单介绍:
MyCat
并不是业务系统代码里面的配置,而是独立运行的中间件,所有配置都会交给运维同事执行对于运维同事来说,它是在数据库
Server
前增加的一层代理,MyCat
本身不存数据,数据存储在后端的数据库上,因此,数据可靠性以及事务等都是通过数据库保证的MyCat
down 掉的时候,系统不能对数据库进行操作,会对所有用户产生影响MyCat
比较适合大数据工作
通过以上分析,可见
Sharding-JDBC
相对MyCat
来说,更轻量,首选肯定是Sharding-JDBC
,只要代码层面做好防腐层(依赖倒置)的设计,就算以后数据量级达到了百亿、千亿,也可以更加灵活方便的替换其它中间件产品,甚至NewSQL
。
四、分布式ID的生成方式
实现方式
完全依赖数据源的方式:
ID
的生成规则,读取控制完全由数据源控制,常见的如数据库自增长ID
、优雅的Flickr
方案、基于Redis
的原子操作incr/incrBy
产生顺序号、Mongodb
的ObjectId
、美团(Leaf
)的号段模式...半依赖数据源的方式:
ID
的生成规则,有部分生成因子需要由数据源(或配置信息)控制,如百度的uid-generator
、美团(Leaf
)的Snowflake
模式...不依赖数据源的方式:
ID
的生成规则完全由机器信息独立计算,不依赖任何配置信息和数据记录,如常见的UUID
及变种、GUID
...
实践方案
实践方案适用于以上提及的三种实现方式,可作为这三种实现方式的一种补充,旨在提升系统吞吐量,但原有实现方式的局限性依然存在。
实时获取方案:顾名思义,每次要获取
ID
时,实时生成,简单快捷,ID
都是连续不间断的,但吞吐量可能不是最高的。预生成方案:预先生成一批
ID
放在数据池里,可简单自增长生成,也可以设置步长,分批生成,需要将这些预先生成的数据,放在存储容器里(JVM
内存,Redis
,数据库表均可以),可以较大幅度地提升吞吐量,但需要开辟临时的存储空间,断电宕机后可能会丢失已有ID
,ID
也可能出现间断。
选择分布式 ID 的生成方式时,需要特别注意以下几个地方:
全局唯一:必须保证
ID
是全局唯一的高可用:无限接近于
100%
的可用性高性能:低延时,
ID
生成响应要快接入方便:遵循拿来主义原则,在系统设计和实现上要尽可能的简单
长度适中:不要太长,最好
64bit
,使用long
比较好操作。如果是96bit
,需要各种移位,相当的不方便,还有可能有些组件不能支持这么大的ID
分片支持:可以控制
ShardingId
,比如某一个用户的文章要放在同一个分片内,这样查询效率高,修改也容易,实现稍复杂信息安全:如果
ID
是连续的,恶意用户的扒取工作就非常容易做了,直接按照顺序下载指定URL
即可;如果是订单号就更危险了,竞争对手可以直接知道我们一天的订单量,所以要结合自己的业务场景来考虑
如果系统要求的吞吐量不是极高,个人推荐了解下优雅的
Flickr
方案,如果系统要求的吞吐量极高,个人推荐了解下美团的(Leaf
)项目,由于篇幅有限,这里不做过多展开。
五、分片/表键选择
分片键的定义
分片键即分库分表的拆分字段,是在水平拆分过程中用于生成拆分规则的数据表字段,根据分片键的值将数据表水平拆分到每个分库/分表中。
数据表拆分的首要原则
要尽可能的找到数据表中的数据在业务逻辑上的主体,并确定大部分(或核心的)数据库操作(查、删、改)都是围绕这个主体进行的,那么就可以使用该主体对应的字段作为分片键。
业务逻辑上的主体,通常与业务的应用场景相关,下面的一些典型应用场景都有明确的业务逻辑主体:
面向用户的互联网应用,都是围绕用户维度来做各种操作,那么业务逻辑主体就是用户,可使用用户对应的字段作为分片键
侧重于卖家的电商应用,都是围绕卖家维度来进行各种操作,那么业务逻辑主体就是卖家,可使用卖家对应的字段作为分片键
如果实在找不到合适的业务逻辑主体作为分片键,可以考虑使用下面的方式来选择分片键:
根据数据分布和访问的均衡度来考虑分片键,尽量将拆分后的每张分表中的数据相对均匀地分布在不同的物理分表中,这比较适用于大量分析型查询的应用场景
按照数字(字符串)类型与时间类型字段相结合作为分片键,这比较适用于日志检索类的应用场景
确定了业务逻辑的主体,但是还有其它的附属主体
该怎么办?
比如消息表(message
)有以下几个核心字段:
消息表中,两个聊天对象可以唯一的确定一个会话,即一个会话下的消息是一个独立的集合,大部分操作(查、删、改)都是围绕这个集合展开的,故可以拿session_id
当作业务逻辑的主体,但是,通过message_id
和guid
操作的业务场景也比较多,此时可以考虑通过冗余映射表来辅助操作:
即message_message_id_mapping_*
和message_guid_mapping_*
分别是message_*
的冗余映射表(ps,以_*
结尾的表,表示分表,如message_*
表示消息分表;橙色标记的字段代表表的分片键):
当根据
session_id
查询时,根据指定的分片规则
(下文会介绍),可以直接查询message_*
表当根据
message_id
查询时,根据指定的分片规则,先查询message_message_id_mapping_*
,拿到session_id
后,再根据session_id
(根据指定的分片规则)和message_id
查询message_*
表当根据
guid
查询时,根据指定的分片规则,先查询message_guid_mapping_*
,拿到session_id
后,再根据session_id
(根据指定的分片规则)和guid
查询message_*
表
如果对查询的实时性要求很高,可以冗余全量数据,即
冗余全量表 VS 冗余映射表
速度对比:冗余全量表速度更快,冗余映射表需要二次查询,即使有引入缓存,还是多一次网络开销
存储成本:冗余全量表需要几倍于冗余映射表的存储成本
维护代价:冗余全量表维护代价更大,涉及到数据变更时,多张表都要进行修改
或许有人注意到,冗余映射表的后缀也加了
_*
,没错,冗余映射表也要进行分库分表。或许有人又说冗余映射表也要分库分表,复杂度比较高,可以将冗余映射表的数据全部存到
ES
,这也是可以的,网上确实有这样的案例,不过在开始之前,要结合系统的业务定位及使用额外组件的复杂度做一份调研,最终选定最合适的方案。注意: 如果决定将冗余映射分表存储到数据库中,要注意给冗余映射分表的分片键和映射字段(如本例中
session_id
)建立组合索引,保证查询时,可以用到覆盖索引,加快查询性能。
六、分片算法/规则
常用的分片算法有取模分片、范围分片、hash
分片、复合分片等,这里不做过多展开,只介绍最常用的两种:取模分片、范围分片。还是拿五、分片/表键选择
中的消息表(采用冗余映射表的方式)为例,为了方便介绍,只介绍单库分表的分片算法,分库分表无非多了个分库规则,基本同分表规则。
其中,session_id
和message_id
都是分布式的自增ID
,guid
是随机生成的GUID
,故可以进行如下的分片规则:
session_id
采用取模分片:两个聊天对象可以唯一确定一个session_id
,为了保证每张分表数据的相对均匀,message_*
采用取模分片
算法message_id
采用范围分片:message_id
是自增的分布式ID
,大部分根据message_id
的操作(查、删、改)集中于最新的一部分数据,为了兼容操作最近消息的场景及方便扩容,message_message_id_mapping_*
采用范围分片(采用范围分片,大部分操作只会落在最新的几张分表中,操作的分表越少,查询性能越高)guid
采用取模分片:guid
没有规律而言,为了保证分表数据的相对均匀,message_guid_mapping_*
采用取模分片算法
通过上面的举例,可以看出,选择分片算法要综合考虑分片键的业务场景、字段值的规律、分片后分表的数据分布、后续的扩容... 注意: 对于取模分片的分表数量,建议是
2
的N
次幂,好处是方便动态缩容...对于程序员来说,2
的N
次幂,总是有那么一些微妙的地方😂
七、数据迁移方案
由于本人前面经历过两次大的项目迁移工作,均涉及到比较复杂且迁移量比较大的数据迁移,一开始时,就采用Jdbc + 多实例Job + Redis分段 + 多线程
的方式去做数据迁移,故分库分表的数据迁移工作也是通过此种方式实现的,屡试不爽。
由于篇幅有限,这里只介绍下大概的思路及核心伪代码:
通过
JDBC
分别与数据源表和目标表建立数据库连接定义方法内线程池,好处是可以动态的控制
Job
每次执行时的线程数量,防止迁移速度太快把数据库CPU
打满(Job
执行结束时,记得销毁线程池)多实例Job
为了防止重复迁移同一批数据,采用的是Redis
的incr
将数据分段(执行时,每个实例的每个线程会在Redis
中取一个偏移值,然后根据偏移值和事先定义好的的每次的迁移数据量计算出要迁移的数据段)定义迁移
Task
接口,方便后续的扩展,每次有迁移或修复任务时,只需要重点关注querySql
、execSql
定义接口,职能是控制
Job
的开关和迁移的偏移量,这个要与Redis
结合适用场景:线上不停机进行数据迁移、脏数据清理、更换数据库(eg,Qracle >> MySQL)、数据修复等操作,且支持跨数据库实例操作
性能:视具体数据库性能而定(只要数据库性能足够好,迁移性能就可以足够高)
核心伪代码如下:
/**
* Job 处理器
*/
@Scheduled(fixedRate = 1000 * 10)
public void process() {
// 省略 Job 执行的开关
// 下面没有出现声明的变量取的都是 Apollo 的配置
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch latch = new CountDownLatch(threadNum);
try {
for (int i = 0; i < threadNum; i++) {
executorService.submit(() -> {
Long solveCount = qps / threadNum * 60L;
Long offset = redisService.incr(MIGRATION_INCR);
// offset == 1L 表示刚开始迁移,当前ID需要设置为0
Long currId = offset == 0L ? 0L : (offset - 1) * solveCount; // 当前ID
Long idLe = offset * solveCount;
// 执行数据迁移
executeMigrate(commLog, offset, currId, idLe);
latch.countDown();
});
}
latch.await();
} catch (InterruptedException e) {
log.error(commLog + "整体异常【offset: %s】", e);
} finally {
executorService.shutdown();
// 省略一些统计执行消耗时间的日志打印
}
}
/**
* 数据迁移执行器
*/
public void executeMigrate(String target, Long offset, Long currId, Long idLe, Integer everyCommitCount) {
Connection sourceConn = null;
Connection messageConn = null;
Statement stmt = null;
ResultSet res = null;
PreparedStatement pstmt = null;
Long count = 0L; // 迁移数量
try {
IDataMigrateReady ready = transferringService.toReady(target);
Integer sourceFlag = ready.getSourceFlag(target);
sourceConn = sourceFlag == 0 ? conn.getMessageConn() : conn.getXiaoxingConn();
messageConn = conn.getMessageConn();
messageConn.setAutoCommit(false);
String querySql = task.getQuerySql(target, currId, idLe);
stmt = sourceConn.createStatement();
long readStartTime = System.currentTimeMillis();
stmt.execute(querySql); // 执行sql
res = stmt.getResultSet();
String execSql = task.getExecSql();
pstmt = messageConn.prepareStatement(execSql); // 预编译
int num = 0;
while (res.next()) {
num++;
count++;
for (int i = 1; i <= ready.getFieldNum(); i++) {
pstmt.setObject(i, res.getObject("t" + i));
}
pstmt.addBatch();
// 每everyCommitCount条数据提交一次事务
if (num > everyCommitCount) {
long executeStartTime = System.currentTimeMillis();
pstmt.executeBatch();
messageConn.commit();
pstmt.clearBatch();
num = 0;
}
}
pstmt.executeBatch();
messageConn.commit();
} catch (Exception e) {
log.error(String.format("%s ==> 错误,【offset: %s】", target, offset), e);
} finally {
conn.closeConn(sourceConn, null, messageConn, stmt, res, pstmt);
// 省略一些统计执行消耗时间的日志打印
}
}
/**
* 数据迁移 Task 的接口定义
*/
public interface IDataMigrateTask {
/**
* 获取 查询SQL
*/
String getQuerySql(String target, Long currId, Long idLe);
/**
* 获取 插入SQL
*/
String getExecSql();
/**
* 获取涉及到的字段的数目
*/
default Integer getFieldNum() {
return 0;
}
/**
* 获取源标识
* <p>
* 0:sourceConn
* 1:targetConn
*/
default Integer getSourceFlag(String target) {
return 0;
}
}
/**
* 分布式 Long 值自增
*/
@Override
public Long incr(String key) {
RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
return entityIdCounter.getAndIncrement();
}
上面只列出了迁移的核心代码,如果将一张大表的数据同时迁移到各个拆分好的分表及冗余映射/全量表中,还要做一些封装,感兴趣的同学,可以参考下
消息服务
的类DataMigrateAllSubTableExecutingService
。执行效率案例:
环境:
4核16G
的MySQL5.7
实例操作:业务高峰期,
4
个服务实例分别开辟一个含有4
个线程的线程池,同时往76
张分表中一共迁移2.4亿
数据,大约花费5
个小时说明:源表总数据量是
2.4亿
,由于目标表存在映射分表,故迁移的总数据量远超2.4亿
(大约有7.2亿
条数据)缺点: 需要自己实现迁移的代码(核心迁移的执行器代码只需要实现一次,在每次数据迁移时,迁移的
Task
需要自己实现,总之,只要封装的足够好,后续Task
的开发工作就足够简单。重复造轮子,还说出了优越感🙄...)如果您有更好的迁移方案或者市面上有更好的迁移方案,也请分享一下~
八、无限扩容方案
继续拿五、分片/表键选择
中的消息表(采用冗余映射表的方式)为例介绍下最常规的扩容方案:Double 扩容方案
具体操作如下(假设已有2
张分表message_0、message_1
,要double
扩容为message_0、message_1、message_2、message_3
):
运维层面:新增两个分表
message_2、message_3
,通过MySQL
的gh-ost工具
同步数据:message_0
复制到message_2
message_1
复制到message_3
待 步骤1 中的数据完全追平后,开始调整分片规则并使之生效:
原 session_id % 2 = 0 => message_0
改为session_id % 4 = 0 => message_0、session_id % 4 = 2 => message_2
原 session_id % 2 = 1 => message_1
改为session_id % 4 = 1 => message_1、session_id % 4 = 3 => message_3
新的分片规则生效后,关闭
gh-ost工具
同步工作;此时,四个分表的数据都已完整,只是有冗余数据(多存了和自己配对的节点的那部分数据),择机清除即可
优点: 无需停机
缺点: 多个分表之间数据有冗余的情况,需要找机会删除冗余的数据(可以通过上面提到的
Jdbc + 多实例Job + Redis分段 + 多线程
清理)
message_*
和message_guid_mapping_*
都是采用的取模分片算法,都可以采用上述的Double
扩容方案。
message_message_id_mapping_*
采用的是范围分片算法,可以找运维同事写个脚本在最后一张分表的容量快用光时,提前创建分表,也可以通过代码去实现,需要注意的是分片规则也要支持自动感知及修改。
为了保证业务
SQL
的执行效率,每张分表的最大数据容量最好限定在2000万
以内,如果超过了2000万
,就要开始进行扩容操作。首次分表时,要对线上的业务数据做下统计分析,根据事先指定好的分片规则,计算出每张分表的初始数据容量,并根据每天、每月的数据增长量估算出下次扩容的时间,这个时间不宜太短也不宜太长,自己要把握好尺寸。
首次分表,推荐每张分表的初始数据量可以在
1000万
以内。
九、总结
本文主要介绍了下分库分表的几个核心流程,分库分表设计时,只要将以上几个核心流程设计好,基本就已经完成了一半的工作,另一半的工作就是方案的落地,后面有时间,我会继续分享下基于Sharding-JDBC
分库分表的核心流程及注意事项。限于篇幅,一些流程只介绍了下常规操作,大家也可以根据本文的流程标题自行搜索,这样可以更加全面的认识并掌握分库分表。如果有的地方介绍有误或者您有更好的解决办法,也请留言,大家一起交流,谢谢~
全文完
以下文章您可能也会感兴趣:
我们正在招聘 Java 工程师,欢迎有兴趣的同学投递简历到 [email protected] 。