Canal - MySQL 增量订阅、消费
之前在JD供职的时候有个需求是监控MySQL数据的变化触发消费任务,于是发现了这个玩意,不得不说这玩意用起来还是挺香的~
概述
—
Canal是阿里提供的一个针对MySQL数据库的增量日志解析,提供增量数据订阅和消费的框架(组件)。
基于日志增量订阅和消费的业务包括:
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
MySQL主备复制原理
MySQL master 将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
Canal 工作原理
Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 Canal )
Canal 解析 binary log 对象(原始为 byte 流)
至于更多的介绍大家可以移步GitHub看看:
https://github.com/alibaba/canal
如何订阅
—
订阅的方式主要是基于MySQL的配置变更实现的,无需代码经验,比较简单。
配置MySQL的配置文件(my.ini),增加如下配置:
[mysqld]
log-bin=mysql-bin #开启日志监控
binlog-format=ROW #监控模式为ROW
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
连接到数据库,执行如下语句:
[mysqld]
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
备注:Canal的原理是模拟自己为MySQL Slave,所以这里一定需要做为MySQL Slave的相关权限。
配置Canal的配置文件(conf/example/instance.properties),增加如下配置:
#数据库配置
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#主数据库
canal.instance.dbUsername = canal #帐号
canal.instance.dbPassword = canal #密码
canal.instance.defaultDatabaseName = #监听的数据库名称
canal.instance.connectionCharset = UTF-8 #字符集
#备份数据库(可以不配置)
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
执行启动文件(bin/startup.bat 或者 bin/startup.sh),检查日志:
logs/canal/canal.log
com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[127.0.0.1:11111]
com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
logs/example/example.log
c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
开始消费
—
这里可能需要一些编码经验了,毕竟是消费数据做业务,首先假设你已经有了一个Project了。
增加Maven依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
建立测试类:
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
public class TestCanal {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
运行测试类,可以看到控制台如下输出:
empty count : 1
empty count : 2
empty count : 3
此时通过SQL对数据进行操作:
INSERT TABLEA (ID, NAME) VALUES (1, 'A');
INSERT TABLEA (ID, NAME) VALUES (2, 'B');
INSERT TABLEA (ID, NAME) VALUES (3, 'C');
UPDATE TABLEA SET NAME = 'AA' WHERE ID = 1;
DELETE FROM TABLEA WHERE ID = 1;
注意:这里是在MySQL的GUI直接执行SQL。
在控制台中可以查看到数据库操作的推送信息:
================> binlog[mysql-bin.000001:13290] , name[test,tablea] , eventType : INSERT
id : 1 update=true
name : A update=true
id : 2 update=true
name : B update=true
id : 3 update=true
name : C update=true
================> binlog[mysql-bin.000001:13466] , name[test,tablea] , eventType : UPDATE
-------> before
id : 1 update=false
name : A update=false
-------> after
id : 1 update=false
name : AA update=true
================> binlog[mysql-bin.000001:13874] , name[test,tablea] , eventType : DELETE
id : 1 update=false
name : AA update=false
id : 2 update=false
name : B update=false
id : 3 update=false
name : C update=false
到此,最简单的监听数据变更就实现了,在业务中比较适用的场景就是异步监听数据库某张表的数据变化,比如在没有MQ的情况下实现弹幕。
实际上最现实的场景是你需要对方配合你提供数据变更通知但是对方不鸟你的时候,这时候只需要一个镜像的同步数据库基本上就可以实现无入侵监听了。这种场景在多部门合作下还是比较常见的~
中年男子王二蛋
—
吐槽、闲聊,偶尔说点儿正经事
支持一下,点击下方【在看】