实战|Flink不支持分库分表的改造之路
点击下面图片直达文章合集
大家好,我是威哥,《RocketMQ技术内幕》作者、RocketMQ社区首席布道师、中通快递基础架构资深架构师,越努力越幸运,唯有坚持不懈,与大家共勉。
1、背景
在flink提供的jdbc-connector中只支持单表的数据同步,但随着业务量的增大,单表记录数过多,会导致数据查询效率降低。
为了解决单表存在的性能瓶颈,会采用分库分表。例如将订单表order拆分为1024张分表:order -> order_0000~order_1023。
显然官方默认提供的flink jdbc插件并不适用这种情况,需要我们将会对flink插件进行改造,从而支持分库分表的数据同步。
2、技术方案
2.1 flink-jdbc-connector简介
我们在创建flink jdbc同步作业时,一般是通过下面的来声明一个table。
-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users',
);
并且提供了可选配置,可以针对一个sql在指定数据固定范围内(scan.partition.lower-bound,scan.partition.upper-bound)根据拆分字段(scan.partition.column)和数量(scan.partition.num),将sql进行等步长拆分。
可选配置如下:
scan.partition.column:用于将输入进行分区的列名
scan.partition.num:分区数。
scan.partition.lower-bound:第一个分区的最小值。
scan.partition.upper-bound:最后一个分区的最大值。
例如我们预估需要通过1000条订单数据,如果不做拆分,基于flink sql的同步语句如下:
select id,name from order
如果按照id拆分成两个子任务,则sql语句如下:
select id,name from order where id between 1 and 50
select id,name from order where id between 51 and 100
上面只是为了方便举例,在真实的生产环境,同步订单表都是千万级别,将一条大SQL拆分成小任务,一方面可以减少对数据表的锁操作,降低对源端数据库的压力,另一方面可以结合flink配置的并发度,并发同步数据,增大同步效率。
基于flink-jdbc-connector数据拆分的原理如下图所示:
2.2 数据分库分表原理探究与技术方案
flink-jdbc-connector数据拆分属性原理如下:
在flink-jdbc-connector包中提供了JdbcParameterValuesProvider接口,被JdbcInputFormat用来计算要运行的并行查询列表(即拆分)。
每个查询将使用由每个JdbcParameterValuesProvider实现提供的矩阵行进行参数化。
public interface JdbcParameterValuesProvider {
/** Returns the necessary parameters array to use for query in parallel a table. */
Serializable[][] getParameterValues();
}
其中getParameterValues()
的返回值:Serializable[x][y]
,x
值即为SQL拆分的数据,每个x
对应的y
个元素的一维数组,包含的是每个sql的变量信息,例如上述根据id进行拆分数量为2。
第一个关键点Serializable[][]的二维数组结构为:
//结构 :x=0~1
//Serializable[x] = {{id_min},{id_max}}
Serializable[0] = {1,50}
Serializable[1] = {51,100}
SQL模版语句如下:
select id,name from order where id between ? and ?
那么对于分表来说,其变量相当于又增加了一个table_name,这样我们只需要改动两个地方,就可以实现分表的效果:
在构建Serializable [] [] 时,新增维度:table_name,其结构如下:
//结构 :x=0~2047
//Serializable[x] = {"order_{0000~1023}",{id_min},{id_max}}
Serializable[0] = {"order_0000",1,50}
Serializable[1] = {"order_0000",51,100}
Serializable[2] = {"order_0001",1,50}
Serializable[3] = {"order_0001",51,100}
...
Serializable[2047] = {"order_1023",51,100}
对应SQL的模版为:
select id,name from ${table_name} where id between ? and ?
在分表的基础上继续再推导,例如如果实现2库(10.1.1.2、10.1.1.2),4个schema(order_00~order_03),1024张表,最终拆解如下:
Serializable [] [] 存储数据格式为:
//结构:x=0~2047
//Serializable[x] = {"{db_url}","{schema_name}","order_{0000~1023}",{id_min},{id_max}}
Serializable[0] = {"jdbc://10.1.1.2","order_00","order_0000",1,50}
...
Serializable[2047] = {"jdbc://10.1.1.3","order_03","order_1023",1,50}
对应的SQL模版如下:
select id,name from {table_name} where id between {id_min} and {id_max}
2.3 方案落地
经过上面的分析,主要涉及如下5个改造点。
2.3.1 改造flink-jdbc-connection配置
主要涉及url、table-name的改造:
-
url:支持多个数据库配置,并在schema支持正则表达式动态匹配数据库中的schema -
table-name:表名支持正则匹配,可同时匹配多个表
具体代码如下:
'url' = 'jdbc:mysql://localhost:3306/order_([0-9]{1,}),jdbc:mysql://localhost:3306/order_([0-9]{1,})',
'table-name' = 'order_([0-9]{1,})',
2.3.2 解析URL、Table名称
主要是根据配置的url,table_name表达式,基本的编码步骤如下:
-
查询数据库中所有schema -
通过正则匹配schema -
查询匹配schema下面的table -
通过正则匹配表 -
返回数据库url与table的对应关系: List<TableItem>
2.3.3 实现分库分表JdbcMultiTableProvider
主要基于原有数据分片结果,根据分库分表,对Serializable[][]
进行二次拆分,示例代码如下:
public class JdbcMultiTableProvider implements JdbcParameterValuesProvider {
//匹配的数据库连接与table的对应关系
private List<TableItem> tables;
//原jdbc数据分片配置后的拆分结果
private Serializable[][] partition;
public JdbcMultiTableProvider(List<TableItem> tables) {
this.tables = tables;
}
/**
* @return 返回拆分后的分片和数据块的对应关系,Serializable[partition][parameterValues]
* 启动partition为分片索引,parameterValues为每个分片对应的数据参数。
*/
@Override
public Serializable[][] getParameterValues() {
int tableNum = tables.stream().mapToInt(item -> item.getTable().size()).sum();
int splitCount = partition == null ? tableNum : tableNum * partition.length;
int paramLength = partition == null ? 2 : 4;
Serializable[][] parameters = new Serializable[splitCount][paramLength];
int splitIndex = 0;
for (TableItem tableItem : tables) {
for (String table : tableItem.getTable()) {
if (partition != null) {
for (Serializable[] serializables : partition) {
parameters[splitIndex][0] = tableItem.getUrl();
parameters[splitIndex][1] = table;
//数据分片配置
parameters[splitIndex][2] = serializables[0];
parameters[splitIndex][3] = serializables[1];
splitIndex++;
}
} else {
parameters[splitIndex][0] = tableItem.getUrl();
parameters[splitIndex][1] = table;
splitIndex++;
}
}
}
return parameters;
}
public JdbcParameterValuesProvider withPartition(JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider) {
if (null == jdbcNumericBetweenParametersProvider) {
return this;
}
this.partition = jdbcNumericBetweenParametersProvider.getParameterValues();
return this;
}
public static class TableItem {
private String url;
private List<String> table;
//get/set..
}
}
2.3.4 改造JdbcDynamicTableSource
主要目的生成基于分库分表的JdbcRowDataInputFormat
对象,示例代码如下:
@Override
@SuppressWarnings("unchecked")
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
final JdbcRowDataInputFormat.Builder builder = JdbcRowDataInputFormat.builder()
.setDrivername(options.getDriverName())
.setDBUrl(options.getDbURL())
.setUsername(options.getUsername().orElse(null))
.setPassword(options.getPassword().orElse(null));
if (readOptions.getFetchSize() != 0) {
builder.setFetchSize(readOptions.getFetchSize());
}
final JdbcDialect dialect = options.getDialect();
JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider = null;
//数据分片配置
if (readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = readOptions.getPartitionLowerBound().get();
long upperBound = readOptions.getPartitionUpperBound().get();
int numPartitions = readOptions.getNumPartitions().get();
jdbcNumericBetweenParametersProvider = new JdbcNumericBetweenParametersProvider(lowerBound, upperBound).ofBatchNum(numPartitions);
}
//根据table分片
List<TableItem> tableItems = options.getTables();
builder.setParametersProvider(new JdbcMultiTableProvider(tableItems)
.withPartition(jdbcNumericBetweenParametersProvider, physicalSchema, readOptions.getPartitionColumnName().orElse(null)));
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
builder.setRowConverter(dialect.getRowConverter(rowType));
builder.setRowDataTypeInfo((TypeInformation<RowData>) runtimeProviderContext
.createTypeInformation(physicalSchema.toRowDataType()));
return InputFormatProvider.of(builder.build());
}
2.3.5 改造JdbcRowDataInputFormat
在JdbcRowDataInputFormat的open(InputSplit inputSplit)中,初始化Connection、statement、以及sql查询模板。
JdbcRowDataInputFormat整个生命周期中,每个并行实例调用一次openInputFormat(),并对应关闭当前并行实例的方法:closeInputFormat())。
每次切换分片,都会调用一次open(InputSplit inputSplit)(对应关闭当前数据分片方法:close()),inputSplit的值对应Serializable[x][y]中x的值递增,并且每个并行实例不会重复执行,比如有1024个分表,每个表2个数据分片,那么inputSplit.getSplitNumber()值的范围是:[0~2047]。JdbcRowDataInputFormat对象持有Serializable[ x ] [y ],并且根据open(InputSplit inputSplit)来定位当前JdbcRowDataInputFormat处理对应分区的数据,从而达到数据分区根据并发度,并发查询的效果。
示例代码如下:
@Override
public void open(InputSplit inputSplit) throws IOException {
try {
//分库,分表逻辑
Object[] params = parameterValues[inputSplit.getSplitNumber()];
//初始化数据库连接,url= params[0].toString();
initConnect(params);
String url = params[0].toString();
final JdbcDialect dialect = RdbsDialects.get(url).get();
//数据查询模板,String table = params[1].toString();
String queryTemplate = queryTemplate(params, dialect);
statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
if (inputSplit != null && parameterValues != null) {
//从index=2 开始为数据分片配置
for (int i = 2; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
Object param = parameterValues[inputSplit.getSplitNumber()][i];
if (param instanceof String) {
statement.setString(i - 1, (String) param);
} else if (param instanceof Long) {
statement.setLong(i - 1, (Long) param);
} else if (param instanceof Integer) {
statement.setInt(i - 1, (Integer) param);
...
//extends with other types if needed
throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet).");
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
}
}
resultSet = statement.executeQuery();
hasNext = resultSet.next();
} catch (SQLException se) {
throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
}
}
基于上述步骤改造后,就可以实现从flink-jdbc-connector source端单库单表,到分库分表的改造。
作者介绍:李大伟,现任中通快递中间件高级架构师,对消息中间件、Flink有着非常丰富的实践经验,原文首发他维护的CSDN:https://blog.csdn.net/u011618288/article/details/121025234
最后说一句(求关注,别白嫖我)
如果这篇文章对您有所帮助,或者有所启发的话,帮忙扫描下发二维码关注一下,您的支持是我坚持写作最大的动力。
求一键三连:点赞、转发、在看。
走进作者
点击查看“阅读原文”,可直接进入[中间件兴趣圈]文章合集。