vlambda博客
学习文章列表

7 - 教你如何读取MySQL binlog

解析binlog可以做很多的事情,比如在ES做增量索引,可靠的同步消息(本地事物提交,也要成功发送到MQ),缓存一致性(数据库中数据与缓存一致),Delta实时数仓(Delta中数据与数据库在某个时间点之前一致),Hive记录数据变更等等。

Canal是一个很好的解析MySQL数据库增量日志的工具。下面简单介绍下原理:

MySQL主备复制原理:

7 - 教你如何读取MySQL binlog

  1. MySQL master将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events,可以通过show binlog events进行查看);

  2. MySQL slave将master的binary log events拷贝到它的中继日志(relay log);

  3. MySQL slave 重放relay log中事件,将数据变更反映它自己的数据。

canal 工作原理:

  1. canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议

  2. MySQL master收到dump请求,开始推送binary log给slave(即 canal )

  3. canal解析binary log对象(原始为byte流)

    而且在canal 1.1.1版本之后, 支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的Kafka和RocketMQ。


今天笔者讲一个与canal原理相似的库,也是 通过BINLOG_DUMP(它还支持BINLOG_DUMP_GTID协议)协议网络读取binlog 并提供解析MySQL binlog的方法:https://github.com/shyiko/mysql-binlog-connector-java。 它有两种模式:BinaryLogFileReader日志读取模式(要求部署到MySQL机器,直接读取binlog文件)和BinaryLogClient客户端访问模式(需要授权,支持读取从库binlog)。
它可以根据binlog文件的position(起始位置)解析binlog event,下面通过一个示例了解一下BinaryLogClient,示例中只处理insert、delete和update事件:
准备条件,授权test连接MySQL账号如下权限
CREATE USER test IDENTIFIED BY 'test'; GRANT SELECTREPLICATION SLAVEREPLICATION CLIENT ON *.* TO 'test'@'%' IDENTIFIED BY 'test';  FLUSH PRIVILEGES;
Scala实现代码:
//MySQL连接信息与binlog信息case class MySQLConnectionInfo( host: String, port: Int, userName: String, password: String , binlogFileName: Option[String], recordPos: Option[Long])
//处理时间类型,这里只处理增删改三种object EventInfo { val INSERT_EVENT = "insert" val DELETE_EVENT = "delete" val UPDATE_EVENT = "update"}
//实现Binlog Serverclass MyBinlogServer { def connectMySQL(connect: MySQLConnectionInfo) = { val client = new BinaryLogClient(connect.host, connect.port, connect.userName, connect.password)    //设置binlog文件与偏移量 client.setBinlogFilename(connect.binlogFileName.get) client.setBinlogPosition(connect.recordPos.get)    //注册事件监听器 ,并对相应事件进行处理 client.registerEventListener(new BinaryLogClient.EventListener() { def onEvent(event: Event): Unit = { val header = event.getHeader[EventHeaderV4]() val eventType = header.getEventType
eventType match {         case TABLE_MAP => println(event.getData[TableMapEventData]())          case EXT_WRITE_ROWS | PRE_GA_WRITE_ROWS | WRITE_ROWS => printRecord(event, client.getBinlogFilename, EventInfo.INSERT_EVENT ,header) case EXT_UPDATE_ROWS | PRE_GA_UPDATE_ROWS | UPDATE_ROWS=> printRecord(event, client.getBinlogFilename, EventInfo.UPDATE_EVENT ,header) case EXT_DELETE_ROWS | PRE_GA_DELETE_ROWS | DELETE_ROWS =>            printRecord(event, client.getBinlogFilename, EventInfo.DELETE_EVENT ,header)// case ROTATE =>// val rotateEventData = event.getData[RotateEventData]()// val currentBinlogFile = rotateEventData.getBinlogFilename// val currentBinlogPosition = rotateEventData.getBinlogPosition// case QUERY =>// println("qe: " + event.getData())          case _ => } } })
client.connect() }}
//printRecord为打印信息,具体实现见源码//启动Serverobject MyBinlogServer{ def main(args: Array[String]) { val bls = new MyBinlogServer bls.connectMySQL(MySQLConnectionInfo("127.0.0.1" ,3306 ,"root" ,"mlsql" ,Option("master-bin.000069") ,Option(4))) }}
下面来建一张wow.test表,有id和name两个字段,来看一下输出情况:
1. 执行insert wow.test values(1,1)后,输出:
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}EXT_WRITE_ROWS ,newValue:1,1, pos:7831, next pos:7877
表示在wow.test中插入id=1 name=1,TableMapEventData可以理解为当前操作的表;
2. 执行insert wow.test values(2,2)后,输出:
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}EXT_WRITE_ROWS ,newValue:2,2, pos:8093, next pos:8139
表示在wow.test中插入id=2 name=2;
3. 执行update wow.test set name=3后,输出:
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}EXT_UPDATE_ROWS ,oldValue: 1,1 ,newValue: 1,3 ,pos: 8355 ,next pos: 8435EXT_UPDATE_ROWS ,oldValue: 2,2 ,newValue: 2,3 ,pos: 8355 ,next pos: 8435
可以看到有两条记录被更新,而且可以看到更新前的数据;
4. 执行delete from wow.test where id=2后,输出:
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}EXT_DELETE_ROWS ,newValue:2,3, pos:8651, next pos:8697
表示在wow.test中删除id=2的数据,而且可以看到原始的整条数据。

到这里可以看到,增删改的所有变更都可以捕获到,而且可以解析出具体变化的情况。

细心的读者可以看到,ROTATE和QUERY事件被注释掉了,ROTATE是记录binlog文件的变更的,比如在做检查点的时候要记录当前读的文件和它的偏移量。QUERY可以解析出执行的sql,比如执行修改表结构的ddl。在实际使用中,可能还需要去解析一些事件。

下面介绍几个关于MySQL binlog的命令:

1. 找到binlog文件的路径:
show VARIABLES like 'log_%';
2. 查询binlog文件:
show master logs;
3. 查询最新binlog的position:
show master status;
4. 根据开始结束时间查binlog变更,用于寻找binlog在指定时间范围内的position:
sh mysqlbinlog --start-datetime="2019-06-19 01:00:00" --stop-datetime="2019-06-20 23:00:00" --base64-output=decode-rows -vv master-bin.000050
在MLSQL中,引用的spark-binlog包就使用了该包,实现了解析binlog的Spark Source,在Source端启动BinlogServer,并实现getOffset、getBatch等方法。详情请参照:https://github.com/allwefantasy/spark-binlog MLSQLBinLogDataSource。
文章最后,通过一个例子演示MLSQL是如何优雅的同步Binlog到Delta:
1. 初始化Delta
connect jdbc where url="jdbc:mysql://172.16.2.120:3308/wow" and driver="com.mysql.jdbc.Driver" and user="root" and password="mlsql"as wow;load jdbc.`wow.test` as wow_test;save overwrite wow_test as delta.`/dw/delta/wow/test`;
2. 启动流
--流的mlsql脚本必须要设置流的名字set streamName="binlog-delta";--加载binlogload binlog.`` where  host="172.16.2.120" and port="3308" and userName="root" and password="mlsql" and bingLogNamePrefix="mysql-bin" and binlogIndex="240" and binlogFileOffset="1613280" and databaseNamePattern="wow" and tableNamePattern="test"as wow_test;
--bingLogNamePrefix:mysql binlog文件前缀--binlogIndex:binlog文件的索引--binlogFileOffset:开始同步binlog的偏移量--databaseNamePattern:要同步的库--tableNamePattern:要同步的表(正则表达式可以匹配多个表,进行同步)
--!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";--流式程序可以配置回调函数,用于监控流的状态
save append wow_test   as binlogRate.`/dw/delta/{db}/{table}`  options mode="Append" and idCols="id" and duration="10"  and checkpointLocation="/chk/delta/binlog";
--mode:append 追加模式 --idCols:数据的唯一键,用于更新和删除数据 --duration:同步时间间隔,单位秒  --checkpointLocation:存储binlog offset等状态信息,流状态可以从这个检查点恢复
可以通过如下脚本查询数据,当然也可以增加时间和版本等条件查询:
load delta.`/dw/delta/wow/test` as wow_test;
3. 如何检查流延迟:
查询MySQL最新binlog的Position:
mysql show master status;
查看流的CheckPoint Offsets:
!kafkaTool streamOffset /chk/delta/binlog;
根据File和Position计算Offsets与Ch ec kP oint的Offsets比较看消息延时状况。

当然,还可以先读取binlog的数据写入到Kafka,然后读取Kafka在写入到Delta。如果需要保存binlog,这种方式比较好。但是要增加一个Kafka到Delta的延时检查。

spark-binlog还有需要完善的地方,比如如何提高写Kafka的并发,目前支持Topic只有一个Partition,如果连接的库挂掉该如何处理如何在原有的Source中动态增加或删除表,对DDL的支持等。优点是使用简单,而且可以读取从库。Canal在并发上做的比较好,可以根据表的名字和主键设置并发,并且支持HA。canal版本>=1.1.4,还有canal-admin提供web管理集群、Server和Instance,使用很方便。





喜欢就点击最上方的[ MLSQL之道 ]关注下吧,后面精彩不断!
更多介绍请访问: http://docs.mlsql.tech/zh/