7 - 教你如何读取MySQL binlog
解析binlog可以做很多的事情,比如在ES做增量索引,可靠的同步消息(本地事物提交,也要成功发送到MQ),缓存一致性(数据库中数据与缓存一致),Delta实时数仓(Delta中数据与数据库在某个时间点之前一致),Hive记录数据变更等等。
MySQL主备复制原理:
MySQL master将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events,可以通过show binlog events进行查看);
MySQL slave将master的binary log events拷贝到它的中继日志(relay log);
MySQL slave 重放relay log中事件,将数据变更反映它自己的数据。
canal 工作原理:
canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议
MySQL master收到dump请求,开始推送binary log给slave(即 canal )
canal解析binary log对象(原始为byte流)
而且在canal 1.1.1版本之后, 支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的Kafka和RocketMQ。
CREATE USER test IDENTIFIED BY 'test';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'test'@'%' IDENTIFIED BY 'test';FLUSH PRIVILEGES;
//MySQL连接信息与binlog信息case class MySQLConnectionInfo(host: String, port: Int, userName: String, password: String, binlogFileName: Option[String], recordPos: Option[Long])//处理时间类型,这里只处理增删改三种object EventInfo {val INSERT_EVENT = "insert"val DELETE_EVENT = "delete"val UPDATE_EVENT = "update"}//实现Binlog Serverclass MyBinlogServer {def connectMySQL(connect: MySQLConnectionInfo) = {val client = new BinaryLogClient(connect.host, connect.port, connect.userName, connect.password)//设置binlog文件与偏移量client.setBinlogFilename(connect.binlogFileName.get)client.setBinlogPosition(connect.recordPos.get)//注册事件监听器 ,并对相应事件进行处理client.registerEventListener(new BinaryLogClient.EventListener() {def onEvent(event: Event): Unit = {val header = event.getHeader[EventHeaderV4]()val eventType = header.getEventTypeeventType match {case TABLE_MAP => println(event.getData[TableMapEventData]())case EXT_WRITE_ROWS | PRE_GA_WRITE_ROWS | WRITE_ROWS =>printRecord(event, client.getBinlogFilename, EventInfo.INSERT_EVENT ,header)case EXT_UPDATE_ROWS | PRE_GA_UPDATE_ROWS | UPDATE_ROWS=>printRecord(event, client.getBinlogFilename, EventInfo.UPDATE_EVENT ,header)case EXT_DELETE_ROWS | PRE_GA_DELETE_ROWS | DELETE_ROWS =>printRecord(event, client.getBinlogFilename, EventInfo.DELETE_EVENT ,header)// case ROTATE =>// val rotateEventData = event.getData[RotateEventData]()// val currentBinlogFile = rotateEventData.getBinlogFilename// val currentBinlogPosition = rotateEventData.getBinlogPosition// case QUERY =>// println("qe: " + event.getData())case _ =>}}})client.connect()}}//printRecord为打印信息,具体实现见源码//启动Serverobject MyBinlogServer{def main(args: Array[String]) {val bls = new MyBinlogServerbls.connectMySQL(MySQLConnectionInfo("127.0.0.1" ,3306 ,"root" ,"mlsql" ,Option("master-bin.000069") ,Option(4)))}}
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}EXT_WRITE_ROWS ,newValue:1,1, pos:7831, next pos:7877
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}EXT_WRITE_ROWS ,newValue:2,2, pos:8093, next pos:8139
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}EXT_UPDATE_ROWS ,oldValue: 1,1 ,newValue: 1,3 ,pos: 8355 ,next pos: 8435EXT_UPDATE_ROWS ,oldValue: 2,2 ,newValue: 2,3 ,pos: 8355 ,next pos: 8435
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}EXT_DELETE_ROWS ,newValue:2,3, pos:8651, next pos:8697
到这里可以看到,增删改的所有变更都可以捕获到,而且可以解析出具体变化的情况。
下面介绍几个关于MySQL binlog的命令:
show VARIABLES like 'log_%';
show master logs;
show master status;
sh mysqlbinlog --start-datetime="2019-06-19 01:00:00" --stop-datetime="2019-06-20 23:00:00" --base64-output=decode-rows -vv master-bin.000050
connect jdbc whereurl="jdbc:mysql://172.16.2.120:3308/wow"and driver="com.mysql.jdbc.Driver"and user="root"and password="mlsql"as wow;load jdbc.`wow.test` as wow_test;save overwrite wow_test as delta.`/dw/delta/wow/test`;
--流的mlsql脚本必须要设置流的名字set streamName="binlog-delta";--加载binlogload binlog.`` wherehost="172.16.2.120"and port="3308"and userName="root"and password="mlsql"and bingLogNamePrefix="mysql-bin"and binlogIndex="240"and binlogFileOffset="1613280"and databaseNamePattern="wow"and tableNamePattern="test"as wow_test;--bingLogNamePrefix:mysql binlog文件前缀--binlogIndex:binlog文件的索引--binlogFileOffset:开始同步binlog的偏移量--databaseNamePattern:要同步的库--tableNamePattern:要同步的表(正则表达式可以匹配多个表,进行同步)--!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";--流式程序可以配置回调函数,用于监控流的状态save append wow_testas binlogRate.`/dw/delta/{db}/{table}`options mode="Append"and idCols="id"and duration="10"and checkpointLocation="/chk/delta/binlog";--mode:append 追加模式--idCols:数据的唯一键,用于更新和删除数据--duration:同步时间间隔,单位秒--checkpointLocation:存储binlog offset等状态信息,流状态可以从这个检查点恢复
load delta.`/dw/delta/wow/test` as wow_test;
mysql show master status;
!kafkaTool streamOffset /chk/delta/binlog;
当然,还可以先读取binlog的数据写入到Kafka,然后读取Kafka在写入到Delta。如果需要保存binlog,这种方式比较好。但是要增加一个Kafka到Delta的延时检查。
spark-binlog还有需要完善的地方,比如如何提高写Kafka的并发,目前支持Topic只有一个Partition,如果连接的库挂掉该如何处理,如何在原有的Source中动态增加或删除表,对DDL的支持等。优点是使用简单,而且可以读取从库。Canal在并发上做的比较好,可以根据表的名字和主键设置并发,并且支持HA。在canal版本>=1.1.4,还有canal-admin提供web管理集群、Server和Instance,使用很方便。
